not sure when then added this but it looks like 
pub<http://clojure.github.io/core.async/#clojure.core.async/pub>
 and sub <http://clojure.github.io/core.async/#clojure.core.async/sub> are 
in the async lib now. 

any differences between what they have now and what you need?

looking for some example code, but could not find any.... so i tried 
this.... and it seems to work:

(def t-fn
(fn 
[ele]
"all"))
(def pub-chan (chan))
(def a-pub (pub pub-chan t-fn))
(def sub-one (chan))
(def sub-two (chan))
(defn p-s-test
[]
(sub a-pub "all" sub-one)
(sub a-pub "all" sub-two)
(go (while true
(let [v (<! sub-one)]
(println "sub one read: " v))))
(go (while true
(let [v (<! sub-two)]
(println "sub two read: " v))))
(go (>! pub-chan "shine")
(<! (timeout 2000))
(>! pub-chan "on")
(<! (timeout 2000))
(>! pub-chan "you crazy")
(<! (timeout 2000))
(>! pub-chan "diamond")))

????
-Scott

On Thursday, July 11, 2013 2:15:28 AM UTC-7, Thomas Heller wrote:
>
> Hey,
>
> the lab stuff looks very interesting, I however couldn't quite figure out 
> how to "unsubscribe" one channel from the broadcast since I cannot exchange 
> the "topic" for every subscriber when one subscriber decides to leave. Its 
> also a lot lower level than I'm currently comfortable with since I haven't 
> checked out any of the core.async internals yet.
>
> However I wrote my own little pubsub utilities which (almost) only use the 
> public API.
>
> (def my-topic (pubsub/topic 100))
>  
> (pubsub/subscribe-go
>  [subscription my-topic (sliding-buffer 100)]
>  (loop []
>    (when-let [ev (<! subscription)]
>      (prn [:sub-got ev])
>      (recur))))
>  
> ;; without go
> (let [sub (pubsub/subscribe my-topic (sliding-buffer 100))]
>   (prn [:msg (<!! sub)])
>   (close! sub))
>
>
> Complete code at: https://gist.github.com/thheller/5973825
>
> subscribe-go is a convenience macro which allows to subscribe to multiple 
> topics and is a normal go block, so same rules apply and you may take! from 
> any other channel as well. When the block ends the subscription is 
> automatically removed, otherwise a subscription is removed by closing it.
>
> Could be optimized but it seems to work fine for me.
>
> Will follow the lab.clj when I'm ready to get dirty with the internals. ;) 
>
> Cheers,
> /thomas
>
> On Wednesday, July 10, 2013 12:27:59 PM UTC+2, Alex Miller wrote:
>>
>> There is a broadcast fn in the lab namespace (
>> https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/lab.clj)
>>  
>> that does this and I believe David Nolen has created a different variant in 
>> some of his stuff. The lab one is experimental and would welcome feedback 
>> on it.
>>
>> Alex
>>
>> On Tuesday, July 9, 2013 6:46:21 PM UTC-5, Thomas Heller wrote:
>>>
>>> Hey,
>>>
>>> I'm doing some core.async tests and want to create a basic pub/sub 
>>> model. Messages are >! on one channel and >! to many others.
>>>
>>> (deftest ^:wip async-test2
>>>  
>>>   (let [subscribers (atom [])
>>>         events (chan 100)]
>>>  
>>>     (go (loop []
>>>           (when-let [ev (<! events)]
>>>             (doseq [c @subscribers]
>>>               (alt!
>>>                [[c ev]] :sent
>>>                :default nil ;; could "force" unsubscribe c?
>>>                ))
>>>             (recur))))
>>>  
>>>     (let [s1 (chan 1)
>>>           s2 (chan 100)
>>>                     r1 (atom [])
>>>           r2 (atom [])]
>>>             (swap! subscribers conj s1 s2)
>>>             ;; simulated slow reader
>>>       (go (loop []
>>>             (when-let [ev (<! s1)]
>>>               (swap! r1 conj ev)
>>>               (<! (timeout 10))
>>>               (recur)))) 
>>>             ;; good reader
>>>       (go (loop []
>>>             (when-let [ev (<! s2)]
>>>               (swap! r2 conj ev)
>>>               (recur))))
>>>             (<!! (go (loop [i 0]
>>>                  (when (< i 100)
>>>                    (>! events i)
>>>                    (recur (inc i))))))
>>>  
>>>       (close! events)
>>>             (Thread/sleep 25)
>>>             (pprint @r1)
>>>       (pprint @r2))
>>>     ))
>>>
>>>
>>> In this example the s1 subscriber will loose almost all messages since 
>>> he cannot keep up (and buffer is too small). I choose to alt!/:default to 
>>> drop messages, since I don't want any subscriber to block others. How do 
>>> you guys deal with slow-readers?
>>>
>>> I don't really have a specific problem but I wonder if there are any 
>>> plans for some built-in pub/sub mechanisms for core.async. Seems like a 
>>> very common pattern.
>>>
>>> Anyways, core.async is nice!
>>>
>>> Cheers,
>>> /thomas
>>>
>>>
>>>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to