not sure when then added this but it looks like pub<http://clojure.github.io/core.async/#clojure.core.async/pub> and sub <http://clojure.github.io/core.async/#clojure.core.async/sub> are in the async lib now.
any differences between what they have now and what you need? looking for some example code, but could not find any.... so i tried this.... and it seems to work: (def t-fn (fn [ele] "all")) (def pub-chan (chan)) (def a-pub (pub pub-chan t-fn)) (def sub-one (chan)) (def sub-two (chan)) (defn p-s-test [] (sub a-pub "all" sub-one) (sub a-pub "all" sub-two) (go (while true (let [v (<! sub-one)] (println "sub one read: " v)))) (go (while true (let [v (<! sub-two)] (println "sub two read: " v)))) (go (>! pub-chan "shine") (<! (timeout 2000)) (>! pub-chan "on") (<! (timeout 2000)) (>! pub-chan "you crazy") (<! (timeout 2000)) (>! pub-chan "diamond"))) ???? -Scott On Thursday, July 11, 2013 2:15:28 AM UTC-7, Thomas Heller wrote: > > Hey, > > the lab stuff looks very interesting, I however couldn't quite figure out > how to "unsubscribe" one channel from the broadcast since I cannot exchange > the "topic" for every subscriber when one subscriber decides to leave. Its > also a lot lower level than I'm currently comfortable with since I haven't > checked out any of the core.async internals yet. > > However I wrote my own little pubsub utilities which (almost) only use the > public API. > > (def my-topic (pubsub/topic 100)) > > (pubsub/subscribe-go > [subscription my-topic (sliding-buffer 100)] > (loop [] > (when-let [ev (<! subscription)] > (prn [:sub-got ev]) > (recur)))) > > ;; without go > (let [sub (pubsub/subscribe my-topic (sliding-buffer 100))] > (prn [:msg (<!! sub)]) > (close! sub)) > > > Complete code at: https://gist.github.com/thheller/5973825 > > subscribe-go is a convenience macro which allows to subscribe to multiple > topics and is a normal go block, so same rules apply and you may take! from > any other channel as well. When the block ends the subscription is > automatically removed, otherwise a subscription is removed by closing it. > > Could be optimized but it seems to work fine for me. > > Will follow the lab.clj when I'm ready to get dirty with the internals. ;) > > Cheers, > /thomas > > On Wednesday, July 10, 2013 12:27:59 PM UTC+2, Alex Miller wrote: >> >> There is a broadcast fn in the lab namespace ( >> https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/lab.clj) >> >> that does this and I believe David Nolen has created a different variant in >> some of his stuff. The lab one is experimental and would welcome feedback >> on it. >> >> Alex >> >> On Tuesday, July 9, 2013 6:46:21 PM UTC-5, Thomas Heller wrote: >>> >>> Hey, >>> >>> I'm doing some core.async tests and want to create a basic pub/sub >>> model. Messages are >! on one channel and >! to many others. >>> >>> (deftest ^:wip async-test2 >>> >>> (let [subscribers (atom []) >>> events (chan 100)] >>> >>> (go (loop [] >>> (when-let [ev (<! events)] >>> (doseq [c @subscribers] >>> (alt! >>> [[c ev]] :sent >>> :default nil ;; could "force" unsubscribe c? >>> )) >>> (recur)))) >>> >>> (let [s1 (chan 1) >>> s2 (chan 100) >>> r1 (atom []) >>> r2 (atom [])] >>> (swap! subscribers conj s1 s2) >>> ;; simulated slow reader >>> (go (loop [] >>> (when-let [ev (<! s1)] >>> (swap! r1 conj ev) >>> (<! (timeout 10)) >>> (recur)))) >>> ;; good reader >>> (go (loop [] >>> (when-let [ev (<! s2)] >>> (swap! r2 conj ev) >>> (recur)))) >>> (<!! (go (loop [i 0] >>> (when (< i 100) >>> (>! events i) >>> (recur (inc i)))))) >>> >>> (close! events) >>> (Thread/sleep 25) >>> (pprint @r1) >>> (pprint @r2)) >>> )) >>> >>> >>> In this example the s1 subscriber will loose almost all messages since >>> he cannot keep up (and buffer is too small). I choose to alt!/:default to >>> drop messages, since I don't want any subscriber to block others. How do >>> you guys deal with slow-readers? >>> >>> I don't really have a specific problem but I wonder if there are any >>> plans for some built-in pub/sub mechanisms for core.async. Seems like a >>> very common pattern. >>> >>> Anyways, core.async is nice! >>> >>> Cheers, >>> /thomas >>> >>> >>> -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.