Hey, the lab stuff looks very interesting, I however couldn't quite figure out how to "unsubscribe" one channel from the broadcast since I cannot exchange the "topic" for every subscriber when one subscriber decides to leave. Its also a lot lower level than I'm currently comfortable with since I haven't checked out any of the core.async internals yet.
However I wrote my own little pubsub utilities which (almost) only use the public API. (def my-topic (pubsub/topic 100)) (pubsub/subscribe-go [subscription my-topic (sliding-buffer 100)] (loop [] (when-let [ev (<! subscription)] (prn [:sub-got ev]) (recur)))) ;; without go (let [sub (pubsub/subscribe my-topic (sliding-buffer 100))] (prn [:msg (<!! sub)]) (close! sub)) Complete code at: https://gist.github.com/thheller/5973825 subscribe-go is a convenience macro which allows to subscribe to multiple topics and is a normal go block, so same rules apply and you may take! from any other channel as well. When the block ends the subscription is automatically removed, otherwise a subscription is removed by closing it. Could be optimized but it seems to work fine for me. Will follow the lab.clj when I'm ready to get dirty with the internals. ;) Cheers, /thomas On Wednesday, July 10, 2013 12:27:59 PM UTC+2, Alex Miller wrote: > > There is a broadcast fn in the lab namespace ( > https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/lab.clj) > > that does this and I believe David Nolen has created a different variant in > some of his stuff. The lab one is experimental and would welcome feedback > on it. > > Alex > > On Tuesday, July 9, 2013 6:46:21 PM UTC-5, Thomas Heller wrote: >> >> Hey, >> >> I'm doing some core.async tests and want to create a basic pub/sub model. >> Messages are >! on one channel and >! to many others. >> >> (deftest ^:wip async-test2 >> >> (let [subscribers (atom []) >> events (chan 100)] >> >> (go (loop [] >> (when-let [ev (<! events)] >> (doseq [c @subscribers] >> (alt! >> [[c ev]] :sent >> :default nil ;; could "force" unsubscribe c? >> )) >> (recur)))) >> >> (let [s1 (chan 1) >> s2 (chan 100) >> r1 (atom []) >> r2 (atom [])] >> (swap! subscribers conj s1 s2) >> ;; simulated slow reader >> (go (loop [] >> (when-let [ev (<! s1)] >> (swap! r1 conj ev) >> (<! (timeout 10)) >> (recur)))) >> ;; good reader >> (go (loop [] >> (when-let [ev (<! s2)] >> (swap! r2 conj ev) >> (recur)))) >> (<!! (go (loop [i 0] >> (when (< i 100) >> (>! events i) >> (recur (inc i)))))) >> >> (close! events) >> (Thread/sleep 25) >> (pprint @r1) >> (pprint @r2)) >> )) >> >> >> In this example the s1 subscriber will loose almost all messages since he >> cannot keep up (and buffer is too small). I choose to alt!/:default to drop >> messages, since I don't want any subscriber to block others. How do you >> guys deal with slow-readers? >> >> I don't really have a specific problem but I wonder if there are any >> plans for some built-in pub/sub mechanisms for core.async. Seems like a >> very common pattern. >> >> Anyways, core.async is nice! >> >> Cheers, >> /thomas >> >> >> -- -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.