;; channels - pipe-like structures that can contain one (or more) ;; items. All operations on a channel are synchronized for ;; multi-thread safety. Channels act as communication channels between ;; processes or threads. ;; (channel-put! channel item) put item in the channel, blocks when the ;; channel is locked or full ;; (channel-get! channel) get an item from the channel, blocks when the ;; channel is locked or empty. ;; XXX can't this whole channel thing be implemented more elegantly ;; using streams? In other words, can we lose the set!'s? (use gauche.threads) ;; hand-coded channel implementation (define cell ()) (define cell-empty? #t) (define mutex (make-mutex)) (define put-cv (make-condition-variable)) (define get-cv (make-condition-variable)) (define (channel-put! data) (mutex-lock! mutex) (if cell-empty? (begin (set! cell data) (set! cell-empty? #f) (condition-variable-signal! get-cv) (mutex-unlock! mutex)) (begin (mutex-unlock! mutex put-cv) (channel-put! data)))) (define (channel-get!) (mutex-lock! mutex) (if cell-empty? (begin (mutex-unlock! mutex get-cv) (channel-get!)) (begin (let1 result cell (set! cell ()) (set! cell-empty? #t) (condition-variable-signal! put-cv) (mutex-unlock! mutex) result)))) ;; emerging pattern: (define-macro (define/sync mutex condition success-cv no-success-cv signature . body) `(define (,@signature) (mutex-lock! ,mutex) (if ,condition (begin (let1 result (begin ,@body) (condition-variable-signal! ,success-cv) (mutex-unlock! ,mutex) result)) (begin (mutex-unlock! ,mutex ,no-success-cv) (,@signature))))) ;; define the fns... (define/sync mutex cell-empty? get-cv put-cv (channel-put! data) (set! cell data) (set! cell-empty? #f)) (define/sync mutex (not cell-empty?) put-cv get-cv (channel-get!) (let1 result cell (set! cell ()) (set! cell-empty? #t) result)) ;; ... correctly yielding ;; (define (channel-put! data) ;; (mutex-lock! mutex) ;; (if cell-empty? ;; (begin ;; (let1 result (begin ;; (set! cell data) ;; (set! cell-empty? #f)) ;; (condition-variable-signal! get-cv) ;; (mutex-unlock! mutex) ;; result)) ;; (begin ;; (mutex-unlock! mutex put-cv) ;; (channel-put! data)))) ;; ;; (define (channel-get!) ;; (mutex-lock! mutex) ;; (if (not cell-empty?) ;; (begin (let1 result (begin (let1 result cell ;; (set! cell ()) ;; (set! cell-empty? #t) ;; result)) ;; (condition-variable-signal! put-cv) ;; (mutex-unlock! mutex) ;; result)) ;; (begin ;; (mutex-unlock! mutex get-cv) ;; (channel-get!)))) ;; automatically creating all stuff needed for proper syncing (define-macro (make-synchronisation-context name) `(define ,(string->symbol (string-append name "-mutex")) (make-mutex)) `(define ,(string->symbol (string-append name "-success-cv")) (make-condition-variable)) `(define ,(string->symbol (string-append name "-no-success-cv")) (make-condition-variable))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; simplifying further: ;; define a semaphore object that incorporates all sync aspects (define (make-semaphore . args) (let-optionals* args ((maximum 1) (initial maximum)) (let ((mutex (make-mutex)) (signal-cv (make-condition-variable)) (value initial)) (define (signal) (mutex-lock! mutex) (cond ((< value maximum) (inc! value) (condition-variable-broadcast! signal-cv)) (else (print "semaphore-signal: value == maximum"))) (mutex-unlock! mutex)) (define (wait) (mutex-lock! mutex) (cond ((= value 0) (mutex-unlock! mutex signal-cv) (wait)) (else (dec! value) (mutex-unlock! mutex)))) ;; dispatcher (lambda args (apply (case (car args) ((signal) signal) ((wait) wait)) (cdr args)))))) (define (semaphore-signal sem) (sem 'signal)) (define (semaphore-wait sem) (sem 'wait)) (define (synchronize semaphore thunk) (semaphore 'wait) (apply thunk '()) (semaphore 'signal)) ;; semaphore tests (adapted from semaphore.rb) (define (nap sec) (sys-nanosleep (inexact->exact (* sec 1000000000)))) (define (sem-test-1) (print "* N processes contending for M resources, where N > M") (let ((s (make-semaphore)) (threads ())) (define (client) (let1 name (thread-name (current-thread)) (format #t "client ~s: wait\n" name) (s 'wait) (format #t "client ~s: run\n" name) (nap 0.1) (format #t "client ~s: signal\n" name) (s 'signal))) (do ((i 0 (+ i 1))) ((= i 10)) (push! threads (thread-start! (make-thread client i)))) (map thread-join! threads))) (define (sem-test-2) (print "* synchronous producer/consumer") (let ((s-in (make-semaphore 1 0)) (s-out (make-semaphore 1 0)) (shared 0) (N 10) (threads ())) (define (producer) (dotimes (i N) (nap 0.1) (set! shared i) (format #t "produced ~s\n" i) (s-in 'signal) (s-out 'wait))) (define (consumer) (dotimes (i N) (nap 0.1) (s-in 'wait) (format #t "consumed ~s\n" shared) (s-out 'signal))) (push! threads (thread-start! (make-thread producer))) (push! threads (thread-start! (make-thread consumer))) (map thread-join! threads))) ;; TODO isolate the synced (finite) buffer: useful code (define (sem-test-3) (print "* asynchronous producer/consumer with finite (circular) buffer") (let* ((threads '()) (N 10) (size 3) (buffer (make-vector size 'x)) (in-pos 0) (out-pos 0) (elements (make-semaphore size 0)) (spaces (make-semaphore size)) (buffer-put (lambda (item) (vector-set! buffer in-pos item) (set! in-pos (modulo (+ in-pos 1) size)))) (buffer-get (lambda () (let ((result (vector-ref buffer out-pos))) (set! out-pos (modulo (+ out-pos 1) size)) result)))) (define (producer) (dotimes (i N) (nap 0.1) (spaces 'wait) (buffer-put i) (format #t "produced ~d ~s\n" i buffer) (elements 'signal))) (define (consumer) (dotimes (i N) (nap 0.3) (elements 'wait) (format #t "consumed ~d ~s \n" (buffer-get) buffer) (spaces 'signal))) (push! threads (thread-start! (make-thread producer))) (push! threads (thread-start! (make-thread consumer))) (map thread-join! threads))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; synchronised queue (see SICP 3.3.2, cons factored out) (define (make-synchronised-queue) (let ((mutex (make-mutex)) (available-cv (make-condition-variable)) (head ()) (tail ())) (define (empty?) (null? head)) (define (enqueue item) (mutex-lock! mutex) (let1 new-pair (cons item ()) (cond ((empty?) (set! head new-pair) (set! tail new-pair)) (else (set-cdr! tail new-pair) (set! tail new-pair)))) ;;(format #t "(qlen ~d)\n" (length (front-ptr))) (condition-variable-broadcast! available-cv) (mutex-unlock! mutex)) (define (dequeue) (mutex-lock! mutex) (cond ((empty?) (mutex-unlock! mutex available-cv) (dequeue)) (else (let1 result (car head) (set! head (cdr head)) (mutex-unlock! mutex) result)))) ;; dispatcher (lambda args (apply (case (car args) ((enqueue push) enqueue) ((dequeue pop) dequeue) ((empty?) empty?)) (cdr args))))) (define (queue-push q item) (q 'push item)) (define (queue-pop q) (q 'pop)) (define (test-sync-queue) (let ((queue (make-synchronised-queue))) (define (producer) (dotimes (i 10) (format #t "produced ~s\n" i) (queue 'push i) (nap 1/10)) (print "producer done") 'ok) (define (consumer) (dotimes (i 10) (format #t "consumed ~s\n" (queue 'pop)) (nap 3/10)) (print "consumer done") 'ok) (parallel-execute producer consumer))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; serializers, from SICP 3.4.2 (define (parallel-execute . args) "each arg is a thunk, to be run in a separate thread" (let ((threads ())) (dolist (thunk args) (push! threads (thread-start! (make-thread thunk)))) (map thread-join! threads))) (define (make-serializer/no-dw) (let ((mutex (make-mutex))) (lambda (p) (define (serialized-p . args) (mutex-lock! mutex) (let ((val (apply p args))) (mutex-unlock! mutex) val)) serialized-p))) ;; same using dynamic-wind ;; note: this function returns a function A which takes a function B ;; and returns a new function C that is a sync'd variant of function B. (define (make-serializer) (let ((mutex (make-mutex))) (lambda (proc) (lambda args (dynamic-wind (lambda () (mutex-lock! mutex)) (lambda () (apply proc args)) (lambda () (mutex-unlock! mutex))))))) ;; serializer test: can only produce 101 or 121, not 110, 11 or 100. (let ((x 10) (s (make-serializer))) (parallel-execute (s (lambda () (set! x (* x x)))) (s (lambda () (set! x (+ x 1))))) x) (use srfi-1) (list-tabulate 100 (lambda (i) (let ((x 10) (s (make-serializer))) (parallel-execute (s (lambda () (set! x (* x x)))) (s (lambda () (set! x (+ x 1))))) x))) ;; non-serialised (list-tabulate 100 (lambda (i) (let ((x 10)) (parallel-execute (lambda () (set! x (* x x))) (lambda () (set! x (+ x 1)))) x))) ;; simple account test (define (make-account balance) (define (withdraw amount) (if (>= balance amount) (begin (set! balance (- balance amount)) balance) "Insufficient funds")) (define (deposit amount) (set! balance (+ balance amount)) balance) (let ((protected (make-serializer))) (define (dispatch m) (cond ((eq? m 'withdraw) (protected withdraw)) ((eq? m 'deposit) (protected deposit)) ((eq? m 'balance) balance) (else (error "Unknown request -- MAKE-ACCOUNT" m)))) dispatch))