;; simple gauche producer/consumer test (bounded buffer) (use gauche.threads) ;; adapted from $gauche/ext/threads/test.scm (let ((log '()) (cell #f) (m (make-mutex)) (put-cv (make-condition-variable)) (get-cv (make-condition-variable))) (define (put! msg) (mutex-lock! m) (if cell (begin (mutex-unlock! m put-cv) (put! msg)) (begin (set! cell msg) (push! log `(put ,msg)) (condition-variable-signal! get-cv) (mutex-unlock! m)))) (define (get!) (mutex-lock! m) (if cell (let1 r cell (set! cell #f) (push! log `(get ,r)) (condition-variable-signal! put-cv) (mutex-unlock! m) r) (begin (mutex-unlock! m get-cv) (get!)))) (define (producer) (put! 'a) (put! 'b) (put! 'c)) (define (consumer) (get!) (get!) (get!)) (let ((tp (thread-start! (make-thread producer 'producer))) (tc (thread-start! (make-thread consumer 'consumer)))) (thread-join! tp) (thread-join! tc) (print (reverse log)))) (define (make-semaphore init maximum) (let ((value init) (max maximum) (m (make-mutex)) ;;(signal-cv (make-condition-variable)) (wait-cv (make-condition-variable))) (define (signal) (mutex-lock! m) (if (< value max) (begin (inc! value) (print "signal: increment value to " value) (condition-variable-signal! wait-cv) (mutex-unlock! m)) (begin (print "signal: value = max")))) (define (wait) (mutex-lock! m) (if (> value 0) (begin (dec! value) (print "wait: decrement value to " value) ;;(condition-variable-signal! signal-cv) (mutex-unlock! m)) (begin (print "wait: value = 0, waiting") (mutex-unlock! m wait-cv) (wait)))) (lambda args (apply (case (car args) ((signal) signal) ((wait) wait) (else (print "semaphore: unknown method " (car args)))) (cdr args))))) (define (semtest) (let ((full (make-semaphore 0 1)) (empty (make-semaphore 1 1)) (box '())) (define (producer) (dotimes (i 5) (empty 'wait) (set! box i) (print "produced " box) (full 'signal))) (define (consumer) (full 'wait) (print "consumed " box) (empty 'signal)) (let ((tp (thread-start! (make-thread producer 'producer))) (tc (thread-start! (make-thread consumer 'consumer)))) (thread-join! tp) (thread-join! tc) (print 'done)))) ;; ruby semaphore ;; class Semaphore ;; def initialize max = 1, init = max ;; fail ArgumentError, "maximum value must be > 0" unless max > 0 ;; fail ArgumentError, "initial value must be >= 0" unless init >= 0 ;; fail ArgumentError, "initial value must be <= maximum" unless init <= max ;; @waiting = [] ;; @maximum = max ;; @value = init ;; end ;; def signal ;; return if @value == @maximum ;; Thread.critical = true ;; @value += 1 ;; begin ;; t = @waiting.shift ;; t.run if t ;; rescue ThreadError ;; # nothing: tried to run a dead thread ;; end ;; Thread.critical = false ;; self ;; end ;; def wait ;; while(Thread.critical = true; @value == 0) ;; @waiting.push Thread.current ;; Thread.stop # implies Thread.critical = false ;; end ;; @value -= 1 ;; Thread.critical = false ;; self ;; end ;; def synchronize ;; wait ;; begin ;; yield ;; ensure ;; signal ;; end ;; self ;; end ;; end ;; ########################################################################## ;; if __FILE__ == $0 ;; srand ;; def snooze ;; sleep rand 0 ;; end ;; class Test1 ;; def initialize ;; puts "* N processes contending for M resources, where N > M" ;; threads = [] ;; @sem = Semaphore.new 3 # 3 resources available ;; for i in 0..9 do ;; threads.push Thread.start(i) { |ii| snooze; client ii } ;; end ;; threads.each{ |t| t.join } ;; end ;; def client i ;; puts "client #{i}: wait" ;; @sem.wait ;; puts "client #{i}: running" ;; snooze ;; puts "client #{i}: signal" ;; @sem.signal ;; end ;; end ;; class Test2 ;; def initialize ;; puts "* synchronous producer/consumer" ;; threads = [] ;; @sin = Semaphore.new 1, 0 ;; @sout = Semaphore.new 1, 0 ;; @shared = 0 ;; @N = 10 ;; threads.push Thread.start{ producer } ;; threads.push Thread.start{ consumer } ;; threads.each{ |t| t.join } ;; end ;; def producer ;; for i in 1..@N do ;; snooze ;; @shared = i ;; puts "produced #{i}" ;; @sin.signal ;; @sout.wait ;; end ;; end ;; def consumer ;; for i in 1..@N do ;; snooze ;; @sin.wait ;; puts "consumed #{@shared}" ;; @sout.signal ;; end ;; end ;; end ;; class Test3 ;; def initialize ;; puts "* asynchronous producer/consumer with finite (circular) buffer" ;; threads = [] ;; @MAX = 3 ;; @buffer = Array.new @MAX, 0 ;; @input = 0 ;; @output = 0 ;; @elements = Semaphore.new @MAX, 0 ;; @spaces = Semaphore.new @MAX ;; @N = 10 ;; threads.push Thread.start{ producer } ;; threads.push Thread.start{ consumer } ;; threads.each{ |t| t.join } ;; end ;; def producer ;; for i in 1..@N do ;; snooze ;; @spaces.wait ;; @buffer[@input] = i ;; @input = (@input + 1) % @MAX ;; puts "produced #{i}" ;; @elements.signal ;; end ;; end ;; def consumer ;; for i in 1..@N do ;; snooze ;; @elements.wait ;; n = @buffer[@output] ;; @output = (@output + 1) % @MAX ;; puts "consumed #{n}" ;; @spaces.signal ;; end ;; end ;; end ;; class Test4 ;; def initialize ;; puts "* transfer of control (coroutines)" ;; threads = [] ;; @a = Semaphore.new 1, 0 ;; @b = Semaphore.new 1, 0 ;; @c = Semaphore.new 1, 0 ;; threads.push Thread.new{ parent } ;; threads.push Thread.new{ process_a } ;; threads.push Thread.new{ process_b } ;; threads.each{ |t| t.join } ;; end ;; def parent ;; puts "P: 1" ;; @a.signal ;; @c.wait ;; puts "P: 6" ;; end ;; def process_a ;; @a.wait ;; puts "A: 2" ;; @b.signal ;; @a.wait ;; puts "A: 4" ;; @b.signal ;; end ;; def process_b ;; @b.wait ;; puts "B: 3" ;; @a.signal ;; @b.wait ;; puts "B: 5" ;; @c.signal ;; end ;; end ;; ----------------------------------------------------------------- ;; XXX warning: brain-damage ahead XXX ;; (define buffer-size 10) ;; (define buffer (make-list buffer-size '())) ;; (define buffer-empty (make-mutex)) ;; (define buffer-full (make-mutex)) ;; (define done #f) ;; (define (producer) ;; (print "producer: started") ;; (do ((i 1 (inc! i))) ;; ((> i 3)) ;; (print "producer: wait for buffer-empty") ;; (mutex-lock! buffer-empty) ;; (print "producer: producing") ;; (set! buffer (make-list 10 i)) ;; (thread-sleep! 1) ;; (print "producer: unlock buffer-full") ;; (mutex-unlock! buffer-full)) ;; (print "producer done") ;; (set! done #t)) ;; (define (consumer) ;; (print "consumer started") ;; (until done ;; (print "consumer: wait for buffer-full") ;; (mutex-lock! buffer-full) ;; (print "consumer: consuming") ;; (print (list 'consumed buffer)) ;; (thread-sleep! 1) ;; (print "consumer: unlock buffer-empty") ;; (mutex-unlock! buffer-empty)) ;; (print "consumer: done")) ;; (define (main args) ;; (let ((p-thread ()) (c-thread ())) ;; (mutex-lock! buffer-empty) ;; (mutex-lock! buffer-full) ;; ;; create threads ;; (set! p-thread (make-thread producer)) ;; (set! c-thread (make-thread consumer)) ;; ;; start threads ;; (thread-start! p-thread) ;; (thread-start! c-thread) ;; ;; unlock the buffer-empty mutex, starting producer ;; (mutex-unlock! buffer-empty) ;; ;; join threads ;; (thread-join! p-thread) ;; (thread-join! c-thread))) ;; Local Variables: ;; mode: gauche ;; End: