Hey,

the lab stuff looks very interesting, I however couldn't quite figure out 
how to "unsubscribe" one channel from the broadcast since I cannot exchange 
the "topic" for every subscriber when one subscriber decides to leave. Its 
also a lot lower level than I'm currently comfortable with since I haven't 
checked out any of the core.async internals yet.

However I wrote my own little pubsub utilities which (almost) only use the 
public API.

(def my-topic (pubsub/topic 100))
 
(pubsub/subscribe-go
 [subscription my-topic (sliding-buffer 100)]
 (loop []
   (when-let [ev (<! subscription)]
     (prn [:sub-got ev])
     (recur))))
 
;; without go
(let [sub (pubsub/subscribe my-topic (sliding-buffer 100))]
  (prn [:msg (<!! sub)])
  (close! sub))


Complete code at: https://gist.github.com/thheller/5973825

subscribe-go is a convenience macro which allows to subscribe to multiple 
topics and is a normal go block, so same rules apply and you may take! from 
any other channel as well. When the block ends the subscription is 
automatically removed, otherwise a subscription is removed by closing it.

Could be optimized but it seems to work fine for me.

Will follow the lab.clj when I'm ready to get dirty with the internals. ;) 

Cheers,
/thomas

On Wednesday, July 10, 2013 12:27:59 PM UTC+2, Alex Miller wrote:
>
> There is a broadcast fn in the lab namespace (
> https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/lab.clj)
>  
> that does this and I believe David Nolen has created a different variant in 
> some of his stuff. The lab one is experimental and would welcome feedback 
> on it.
>
> Alex
>
> On Tuesday, July 9, 2013 6:46:21 PM UTC-5, Thomas Heller wrote:
>>
>> Hey,
>>
>> I'm doing some core.async tests and want to create a basic pub/sub model. 
>> Messages are >! on one channel and >! to many others.
>>
>> (deftest ^:wip async-test2
>>  
>>   (let [subscribers (atom [])
>>         events (chan 100)]
>>  
>>     (go (loop []
>>           (when-let [ev (<! events)]
>>             (doseq [c @subscribers]
>>               (alt!
>>                [[c ev]] :sent
>>                :default nil ;; could "force" unsubscribe c?
>>                ))
>>             (recur))))
>>  
>>     (let [s1 (chan 1)
>>           s2 (chan 100)
>>                     r1 (atom [])
>>           r2 (atom [])]
>>             (swap! subscribers conj s1 s2)
>>             ;; simulated slow reader
>>       (go (loop []
>>             (when-let [ev (<! s1)]
>>               (swap! r1 conj ev)
>>               (<! (timeout 10))
>>               (recur)))) 
>>             ;; good reader
>>       (go (loop []
>>             (when-let [ev (<! s2)]
>>               (swap! r2 conj ev)
>>               (recur))))
>>             (<!! (go (loop [i 0]
>>                  (when (< i 100)
>>                    (>! events i)
>>                    (recur (inc i))))))
>>  
>>       (close! events)
>>             (Thread/sleep 25)
>>             (pprint @r1)
>>       (pprint @r2))
>>     ))
>>
>>
>> In this example the s1 subscriber will loose almost all messages since he 
>> cannot keep up (and buffer is too small). I choose to alt!/:default to drop 
>> messages, since I don't want any subscriber to block others. How do you 
>> guys deal with slow-readers?
>>
>> I don't really have a specific problem but I wonder if there are any 
>> plans for some built-in pub/sub mechanisms for core.async. Seems like a 
>> very common pattern.
>>
>> Anyways, core.async is nice!
>>
>> Cheers,
>> /thomas
>>
>>
>>

-- 
-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


Reply via email to