CmcC 2012-05-31:
Asynchronous Threads is an
OO wrapper to make asynchronous threading over the
Thread package a little easier (latest version always
here).
Changelog edit
pyk 2015-04-08: Small modification to reflect that
thread::errorproc is not per-thread, but a thread-global shared configuration. Added a more thorough version of
main script check. Also some minor stylistic changes.
Implementation edit
# Async - asynchronous thread wrapper
if {0 && [info exists argv0] && $argv0 eq [info script]} {
lappend ::auto_path .
}
package require Thread
#package require Debug
#Debug define thread
oo::class create ::Async {
# response - process response from next call
method response {var count op} {
if {[catch {
upvar 1 $var result
variable id
# get the async response
lassign $result($count) code e eo
unset result($count)
# get the scripts associated with this response (by $count)
variable responder
variable next
lassign $responder([incr next]) response error
unset responder($next)
#Debug.thread {response $next: $var $op -> code:$code e:$e eo:($eo)}
# invoke the appropriate script to process result
switch $code {
return - 2 -
ok - 0 {
if {$response ne {}} {
#Debug.thread {DO: $response $e}
{*}$response $e
} else {
#Debug.thread {DO EMPTY}
}
}
default {
if {$error eq {}} {
::return -code $code -options $eo $e
} else {
{*}$error $code $e $eo
}
}
}
} e eo]} {
puts stderr "ERR: $e $eo"
}
}
# call - asynchronously send call script to thread
# callback $response on success, $error on error
method call {call {response {}} {error {}}} {
variable id
variable responder
variable rcount
set responder([incr rcount]) [list $response $error]
#Debug.thread {$id call$rcount ($call) response:($response) error:($error)}
::thread::send -async $id [list ::_thread::call $call] [namespace current]::waiter($rcount)
}
# construct some pass through commands - their use is not generally recommended
foreach n {preserve release configure exists broadcast join transfer} {
method $n args [string map [list %N% $n] {
variable id
thread::%N% $id {*}$args
}]
}
destructor {
my release ;# just delete the thread
}
constructor args {
if {[llength $args]%2} {
variable script [lindex $args end]
set args [lrange $args 0 end-1]
} else {
variable script {}
}
variable prescript {
namespace eval ::_thread {
# call - run the script, return the full result
proc call {script} {
list [catch {uplevel #0 $script} e eo] $e $eo
}
}
}
variable postscript {
::thread::wait
}
variable {*}$args
variable next ;# next expected response
variable rcount ;# last sent request
variable id [::thread::create -preserved $prescript$script$postscript]
::thread::configure $id -eventmark 3
trace add variable [namespace current]::waiter write [list [self] response]
}
}
if {[info exists argv0] && [
file dirname [file normalize [info script]/...]] eq [
file dirname [file normalize $argv0/...]]} {
# Unit Test
#Debug on thread
proc output args {
puts stderr $args
}
proc terror args {
puts stderr [::thread::id]:$args
}
::thread::errorproc terror
interp bgerror {} output
set max 10
for {set i 0} {$i < $max} {incr i} {
set thread($i) [Async new {
proc bgerror args {
puts stderr [::thread::id]:$args
}
interp bgerror {} bgerror
}]
}
after idle {
time {
set i [expr {int(rand() * $max)}]
$thread($i) call ::thread::id [list output [incr count] $i:]
} 100
}
vwait forever
}