If the upstream channel is permanent, then it won't close if all downstream
channels are closed. You can create a permanent channel using
(permanent-channel) or (channel* :permanent? true). Once you have that,
you can replace all that code with a simple (siphon perm-ch conn-ch)
As an aside, though, you could also accomplish the above without delving
into functions that are really implementation details.
(if-let [evs (seq (channel->seq ch))]
evs
(run-pipeline (read-channel* :on-timeout ::timeout, timeout
POLL-TIMEOUT)
(fn [msg]
(when-not (= ::timeout msg)
(conj (channel->seq ch) msg)))))
This is non-blocking, but will require some outer handler to deal with the
async-result.
Also, in the future you'll probably get a more reliable reply if you use
the Aleph mailing list.
Zach
On Thursday, October 25, 2012 4:09:40 AM UTC-7, Marko Topolnik wrote:
>
> I use lamina channels in a library that maintains multiple event streams.
> The event source is the Asterisk Management Interface, whose raw events are
> processed, filtered, collated, and finally pushed into appropriate lamina
> channels.
>
> On the client side of my library I want to expose the event streams as a
> web service. My challenge is to implement long polling: a request either
> finds events already enqueued, returning immediately; or blocks until an
> event occurs (with a timeout), then accumulates any further events for a
> short period, then returns the accumulated events. This reduces network
> overhead for the typical case of events occuring in short bursts (a single
> user action triggers several events).
>
> I have implemented this behavior in a piece of code that occupies its
> thread for the entire duration of the request:
>
> (require
> [lamina.core :as m]
> [lamina.core.graph.node :as node]
> [lamina.core.channel :as chan])
>
> (let [evs (node/drain (chan/emitter-node ch))
> evs (if (seq evs)
> evs
> (try (let [ev @(m/with-timeout POLL-TIMEOUT (m/read-channel
> ch))]
> (Thread/sleep EVENT-BURST-PERIOD)
> (conj (node/drain (chan/emitter-node ch)) ev))
> (catch TimeoutException _ nil)))]
> (vec evs))
>
> I would instead like to do this without blocking the thread. Ideally, I'll
> use aleph to implement the web service and connect a downstream HTTP
> response channel to my library's lamina channel. What I'm missing is, how
> do I disconnect the downstream channel without breaking anything in the
> upstream channel? I want to cleanly disconnect it, let my channel enqueue
> any further events, then later connect another aleph channel, which will
> drain those events with no loss.
>
>
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to [email protected]
Note that posts from new members are moderated - please be patient with your
first post.
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en