- kill - terminate all deadlocked filaments.
- vwait - enter Tcl eventloop and wait on a variable, this allows filaments to be restarted with an external messages sent to one or more filaments.
- return - keeps filaments in the wait state, allowing an external message to be sent and later re-run.
############################################################################ # # filament # # A very light weight non-preemptive thread/coroutine/generator package # # Similar to the real Tcl 'thread' package, each filament gets its own # interpreter, and communicates with other filaments by sending messages. # Unlike 'thread', messages are just strings rather than commands, and # explicit 'init', 'run', and 'recv' states have separate code bodies. # Filaments must be in a 'wait' state in order to recieve messages. # # Tom Poindexter, January 2006 # package provide filament 0.1 namespace eval ::filament { variable nextCnt 0 variable status variable messages array set status {} array set messages {} variable msgQueue {} } ############################################################################ # # ::filament::new # # Create a new filament. The filament will begin the the 'run' state, unless # another state is set during initialization. Each filament creates a # slave interp by the same name. # # name - name of the new filament, or "#auto" for a generated name # args - a list of "type-list code" pairs, # # vars var-list a list of variables used by the filament, any other # variables used in code are not retained. # # init code code to be executed on filament creation. # also define proc's for your run & recv code here. # the last command can be 'filament_wait' or # 'filament_sleep' if a beginning state other than 'run' # is required. # # run code code to be executed when a filament is run. defaults to # 'filament_wait' if no run code is defined. # # recv code code to be executed when a message is received, the # variable MESSAGE is set with a two element list of the # sending filament, and the message. defaults to # 'filament_wait' if no run code is defined. # # # A type-list can specify any of "init", "run", or "recv". E.g., if the # code to execute for 'run' and 'recv' are the same, then type-list should # be a list of {run recv}. # # Any uncaught error generated by init, run, or recv will cause the # filament to be terminated. The global variable 'FILAMENT' is set to the # name of the filament. # proc ::filament::new {name args} { variable nextCnt variable status variable messages if {$name eq "#auto"} { set name filament[incr nextCnt] } if {([llength $args] % 2) != 0} { error "'args' must contain list of 'type code' pairs" } if {[info exists status($name)]} { error "a filament with name \"$name\" already exists" } set vars "" set init "" set run filament_wait set recv filament_wait set ok 0 foreach {type codestr} $args { if {[lsearch $type vars] >= 0} { set vars $codestr } if {[lsearch $type init] >= 0} { set init $codestr } if {[lsearch $type run] >= 0} { set run $codestr set ok 1 } if {[lsearch $type recv] >= 0} { set recv $codestr set ok 1 } } if {! $ok} { error "neither 'run' or 'recv' code was defined" } regsub -all "#\[^\n\]*\n" $vars { } vars if {[catch {llength $vars}]} { error "vars not a valid list of variable names" } lappend vars FILAMENT MESSAGE set globalvars [concat [linsert $vars 0 global]] interp create $name interp eval $name [list set FILAMENT $name] interp eval $name [list set MESSAGE {}] set status($name) run set messages($name) "" set runproc "proc RUN TYPE \{\n \ $globalvars \n\ switch \$TYPE [list run $run recv $recv ]\}" interp eval $name $runproc interp alias $name filament_send {} ::filament::send $name interp alias $name filament_wait {} ::filament::wait $name interp alias $name filament_kill {} ::filament::kill $name interp alias $name filament_sleep {} ::filament::sleep $name interp alias $name filament_names {} ::filament::names $name interp alias $name filament_messages {} ::filament::messages $name interp alias $name filament_uplevel0 {} ::filament::uplevel0 $name if {[catch {interp eval $name $init} err]} { if {$err ne "FILAMENT_OK"} { catch {interp delete $name} catch {unset status($name)} catch {unset messages($name)} error "::filament::new: error during $name init:\n$err" } } return $name } ############################################################################ # # ::filament::send # # Implements the 'filament_send' command # # Filaments can send messages to other filaments. filament_send does not # cause the calling filament to yield. The target filament(s) are # added to the msgQueue so that they are serviced before other running # filaments. ::filament::send can also be used by the top level code # to prime initial messages or to re-prime filaments that become # deadlocked. The filament receiving the message has it's 'recv' code # invoked, the global variable MESSAGE contains a list of two elements, # the id of the sending filament, and the message. # # filament_send message ?id? # # message - the message to send # # id - the name of a filament to send. if id is a null string, # or not specified, then the message is sent to all other # filaments, excluding self proc ::filament::send {this message {id ""}} { variable status variable messages variable msgQueue if {![string length $id]} { foreach name [array names status] { if {$name ne $this} { lappend messages($name) [list $this $message] lappend msgQueue $name } } } elseif {![info exists status($id)]} { error "filament $id does not exist" } else { lappend messages($id) [list $this $message] lappend msgQueue $id } } ############################################################################ # # ::filament::wait # # Implements the 'filament_wait' command # # filament_wait # # The calling filament waits until a message is received. Upon receiving a # message, the 'recv' code is executed. filament_wait will cause the # calling filament to yield. The global variable MESSAGE receives a list of # the sending filament id and the message. proc ::filament::wait {this} { variable status set status($this) wait error FILAMENT_OK } ############################################################################ # # ::filament::kill # # Implements the 'filament_kill' command # # Cause a filament to terminate. If a filament is killing itself, the # calling filament will immediately be terminated. If killing another # filament, the calling filament does not yield. # # filament_kill ?id? ?when? # # id - the id to terminate. if not specified, then the calling filament # is termiated. # # when - "waitkill" or "kill". "waitkill" terminates the filament after # any pending messages have been delivered (default). # "kill" terminates the filament immediately. proc ::filament::kill {this {id ""} {when waitkill}} { variable status if {![string length $id]} { set id $this } if {[string length $id] && ![info exists status($id)]} { error "filament $id does not exist" } # when - "kill" or "waitkill" until all messages to filament are delivered switch $when { kill {set status($id) kill} waitkill {set status($id) waitkill} default {error "\"when\" argument \"$when\", must be either \"wait\" or \"kill\""} } if {$this eq $id} { error FILAMENT_OK } } ############################################################################ # # ::filament::sleep # # Implements the 'filament_sleep' command. # # filament_sleep # # The calling filament will yield, and be later rescheduled to run. proc ::filament::sleep {this} { variable status set status($this) sleep error FILAMENT_OK } ############################################################################ # # ::filament::messages # # Implements the 'filament_messages' command. # # filament_messages ?id? # # Returns the number of messages waiting to be delivered. An error is raised # if the filament does not exist. # If id is not specified, then the number of messages for the calling # filament is returned. proc ::filament::messages {this {id ""}} { variable messages if {![string length $id]} { set id $this } if {[info exists messages($id)]} { return [llength $messages($id)] } else { error "filament id \"$id\" does not exists" } } ############################################################################ # # ::filament::names # # Implements the 'filament_names' command. # # filament_names # # Returns a list of the names of all filaments and their status. status # is one of: "run", "wait", "waitkill", "kill", "sleep" # proc ::filament::names {this} { variable status return [array get status] } ############################################################################ # # ::filament::uplevel0 # # Implements the 'filament_uplevel0' command. # # filament_uplevel0 args # # Runs a command as uplevel #0, any value is returned. This can be used # to set/read global variables, run top level procs, access shared files, etc. # proc ::filament::uplevel0 {this args} { return [uplevel #0 $args] } ############################################################################ # # ::filament::doKill # # Kill helper - destroy the filament interpreter, cleanup status & messages # proc ::filament::doKill {name} { variable status variable messages catch {interp delete $name} catch {unset status($name)} catch {unset messages($name)} } ############################################################################ # # ::filament::doRun # # Run helper, type is either 'run' or 'recv'. If an interp throws the # error "FILAMENT_OK", then we'll assume that one of our own. # Any other error thrown by the interp will immediately terminate the # filament and invoke 'bgerror' to report the condition. proc ::filament::doRun {name type} { variable status if { [catch {interp eval $name RUN $type} err] } { if {$err eq "FILAMENT_OK"} { # check if this filament is still valid if {[info exists status($name)]} { # yes, reschedule } } else { # some other error, kill this filament catch {doKill $name} catch {bgerror "\"::filament::run $name $type\", error:\n$err"} } } else { # returned without error return $err } return "" } ############################################################################ # # ::filament::run # # Begin execution of filamants. # # ::filament::run ?deadlock_resolution? ?waitvar? # # Execution will continue until: # # a) no filaments remain, all have been terminated # # b) all filaments are in a 'wait' state and no messages queued (deadlocked) # # optional arguments: # # deadlock_resolution - # "kill" - cause all filaments to be terminated (default) # "vwait" - enter Tcl event loop by executing "vwait $waitvar" # this allows ::filament::send to be used to send # a message to a filament, and run again # "return" - return leaving filaments interps active. it is # possible to send messages to filaments, and rerun # ::filament::run to restart. # # waitvar - the name of the variable for deadlock_resolution method "vwait" # # ::filament::run returns a list of filaments that are deadlocked, if any # proc ::filament::run {{deadlock kill} {waitvar ""}} { variable status variable messages variable msgQueue if {[lsearch {kill vwait return} $deadlock] == -1} { error "\"deadlock\" argument must be \"kill\", \"vwait\", or \"return\"" } if {$deadlock eq "vwait" && ![string length $waitvar]} { error "vwait specified, but \"waitvar\" is null" } while {1} { set ran 0 # get names of all filaments, this list can be modified # during the loop by sending messages, lappending the targets set msgQueue [array names status] while {[llength $msgQueue]} { set name [lindex $msgQueue 0] set msgQueue [lrange $msgQueue 1 end] if {! [info exists status($name)]} { continue } else { set state $status($name) } switch $state { run { doRun $name run set ran 1 } waitkill - wait { set allmessages "" catch {set allmessages $messages($name)} set messages($name) "" foreach msg $allmessages { if {[info exists status($name)]} { # default next state to 'run'. the actual 'recv' # code in the filament may call filament_wait, # filament_sleep, or filament_kill to change # to a different state upon exit. set status($name) run interp eval $name [list set MESSAGE $msg] doRun $name recv set ran 1 } } if {[info exists status($name)]} { # if waitkill, then kill the filament now if {$state eq "waitkill" && \ [llength $messages($name)]==0} { doKill $name } else { catch {interp eval $name [list set MESSAGE {}]} } } } kill { doKill $name } sleep { set status($name) run set ran 1 } } } # if nothing was run and filaments exist, then we must be deadlocked set len [llength [array names status]] if {! $ran && $len} { switch $deadlock { kill { foreach {name state} [array get status] { doKill $name } break } vwait { vwait $waitvar } return { break } } } elseif {! $len} { # all filaments have ended break } } # return the filament names and status return [array names status] }
Here are some simple examples to demostrate Filament.
#################################################################### # an anonymous filament, produces consecutive integers, and # sends that result to two different consumers. terminate after 25 # integers have been produced randomly, the # consumer threads are also killed, randomly with or without pending messages # delivered. # package require filament filament::new #auto vars {i} init { set i 0 } run { incr i puts -nonewline "$FILAMENT running: " if {$i > 25} { set luck [expr rand()] if {$luck > .66} { puts "killing consumers, undelivered messages are lost!" catch {filament_kill "" kill} } elseif {$luck > .33} { puts "killing consumers, pending messages to be delivered" filament_kill consumer_odd filament_kill consumer_even catch {filament_kill busy_bee} } else { puts "ignoring consumers" catch {filament_kill busy_bee} } puts "$FILAMENT killing self" filament_kill } if {$i % 2} { puts "produced $i, sending to consumer_odd" filament_send $i consumer_odd } else { puts "produced $i, sending to consumer_even" filament_send $i consumer_even catch {filament_send "hey! be quiet!" busy_bee} } } # consumer_odd - in running state, immediately wait for a message # to be delivered. filament::new consumer_odd run { puts " consumer_odd is waiting" filament_wait } recv { foreach {from msg} $MESSAGE {break} puts "consumer_odd got message $msg from $from" } # consumer_even - in running state, do some trival work, but every # fifth invocation, wait for a message filament::new consumer_even vars {i} init { set i 0 } run { puts -nonewline " consumer_even is running: " if {[incr i] % 5 == 0} { puts "now waiting for messages" filament_wait } puts "going to sleep" filament_sleep } recv { foreach {from msg} $MESSAGE {break} puts "consumer_even got message $msg from $from" } # busy_bee - a filament that runs and discards any messages # randomly fail to test the error catching code filament::new busy_bee {run recv} { if {rand() < .05} { error "this will raise an error, causing busy_bee to terminate" } puts -nonewline " buzzzzzzzz" if {[filament_messages]} { puts ": has [filament_messages] messages waiting" filament_wait } puts "" } # report background errors proc bgerror {message} { puts "bgerror got: $message" } # start filaments puts "starting trivial filaments\n\n" set final [filament::run return] puts "\n\nfilaments left waiting in deadlock: $final" if {[llength $final]} { puts "" puts "sending an external message to all filaments" # note that the first argument (sending filament id) can be anything filament::send BIG_DADDY external_message filament::run kill } puts "trivial done"
Implement the Language Shootout [1] cheap-concurrency benchmark.
##################################################################### # shootout cheap-concurrency # http://shootout.alioth.debian.org/debian/benchmark.php?test=message&lang=allpackage require filament # set argv 3000 set N [lindex $argv 0] set Nthreads 500 for {set i 1} {$i < $Nthreads} {incr i} { set next [expr {$i + 1}] filament::new t$i vars {next} init "set next t$next; filament_wait" recv { set x [lindex $MESSAGE 1] filament_send [expr {$x + 1}] $next filament_wait } } filament::new t$i init {filament_wait} recv { set x [lindex $MESSAGE 1] filament_uplevel0 set sum [expr {$x + 1}] filament_wait } proc cheap-concurrency {N} { global sum set sum 0 for {set i 0} {$i < $N} {incr i} { filament::send "" $sum t1 filament::run return } puts $sum } cheap-concurrency $N
A simple demo using vwait to restart deadlocked filaments:
##################################################################### # using vwait to inject messages package require filament filament::new getmsg vars {i} init {set i 0; filament_wait} recv { puts "filament getmsg got $MESSAGE" } run { if {[incr i] > 5} { puts "getmsg: $i done" filament_kill } else { puts "getmsg: $i waiting" filament_wait } } set ::wait_is_over 0 set after_code { filament::send mcp wakeup! set ::wait_is_over 1 after 1000 [set ::after_code] } after 0 $after_code filament::run vwait ::wait_is_over
See also: