Can you not use http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html? That will provide the blocking element.
To execute N (i.e. 10 in your example) use a http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html. The 'glue' would be an infinite loop which .takes from the incoming sequence (which could also be a LBQ) and then puts it on the thread pool. That gets stuff happening in parallel. To consume the results of that stuff in a sequence have a(nother) LBQ which the consumers consume (using the blocking .take) and have the glue code wrap the function it received from the LBQ in a function which takes the result of that function and puts it on the sequence. This looks like (clojure forgiveness is required): [code] (def incoming-queue (java....LinkedBlockingQueue.)) (def outgoing-queue (java....LinkedBlockingQueue.)) (def workers (java... some thread pool/executor.)) ; the following would need to reify itself to be a Runnable, not got that far yet :) (defn execute [job result-queue] (let [result (job)] (.put result-queue result))) (def stop-loop (atom false)) (while (not @stop-loop) (def next (.take incoming-queue)) (execute next outgoing-queue)) [/code] A few caveats/notes: - this uses a lot of Java constructs - that is fine. It is perfectly idiomatic to use the right Clojure or Java constructs. LBQs rock. - the above won't compile and the 'execute' needs to return a Runnable - not sure how. - it ties up a worker thread until the result can be put onto the outgoing LBQ. If the outgoing LBQ is bounded and you don't have enough consumers then eventually all the worker threads will be effectively idle until the results can be consumed. - if you didn't want to use a ThreadPool then you could update 'executor' to maintain an (atom) number of currently executing jobs. The glue code is single threaded so no chance of multiple jobs starting in parallel. The single threaded 'cost' is fine as it is doing nothing other than moving things around. I am a (Clojure) newbie so be warned! I fully look forward to somebody providing a much nicer and more idiomatic Clojure implementation :). Hope this helps. Col On Thursday, 30 May 2013 06:19:29 UTC+1, Artem Boytsov wrote: > > Hello, folks! > > I'm a relative noob in Clojure especially when it comes to concurrency, so > please forgive my ignorance. I have a processing stage (producer) that > feeds to another one (consumer). The producer has a bunch of items to > process and it's I/O blocking which takes random time, but the order of the > items is insignificant, so ideally they would materialize on the consumer > side on the first come first serve basis. > > I would like to create a blocking lazy sequence I could just give to the > consumer. I know how to create a lazy sequence (lazy-seq), or how to make > it run in background and block on results (seque), but what I can't wrap my > head around is how parallelize the processing the Clojure way. I was > considering kicking off multiple agents, but how can I wait for *any one *of > them to finish, not all of them (as await does)? I'm not sure but I think > the same goes for futures/promises. I could have multiple agents putting > the results into some shared sequence, but then how do I block on the > sequence itself? > > What I'm trying to do can be described in the following way in a silly > imperative pseudo-code: > > workers = new Worker[10] ; initially w.got_data == > nil > for each x in source_data: > w = wait_for_any_worker_ready(workers) ; initially all of them are > ready > if (w.got_data) > output.enqueue(w.data) ; the consumer will read > output in a blocking way > w.process(x) ; non-blocking, kicks off > in the background > > Or, another way to describe it, given a seq of integers: > > [ 1, 2, 3, 4 ... ] > > and a simple function with a variable delay: > > (defn process [x] > (Thread/sleep (* 10000 (rand))) > (* 2 x)) > > How can I write a function which would return a blocking lazy sequence of > processed integers, in arbitrary order, parallelizing the processing in up > to 10 threads? > > Thank you! > > Artem. > -- -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.