Can you not use 
http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html?
 
 That will provide the blocking element.  

To execute N (i.e. 10 in your example) use a 
http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html.
 
 The 'glue' would be an infinite loop which .takes from the incoming 
sequence (which could also be a LBQ) and then puts it on the thread pool.

That gets stuff happening in parallel.  

To consume the results of that stuff in a sequence have a(nother) LBQ which 
the consumers consume (using the blocking .take) and have the glue code 
wrap the function it received from the LBQ in a function which takes the 
result of that function and puts it on the sequence.  

This looks like (clojure forgiveness is required):

[code]
(def incoming-queue (java....LinkedBlockingQueue.))
(def outgoing-queue (java....LinkedBlockingQueue.))
(def workers (java... some thread pool/executor.))

; the following would need to reify itself to be a Runnable, not got that 
far yet :)
(defn execute [job result-queue] (let [result (job)] (.put result-queue 
result)))

(def stop-loop (atom false))
(while (not @stop-loop)
  (def next (.take incoming-queue))
  (execute next outgoing-queue))
[/code]

A few caveats/notes:
 - this uses a lot of Java constructs - that is fine.  It is perfectly 
idiomatic to use the right Clojure or Java constructs.  LBQs rock.
 - the above won't compile and the 'execute' needs to return a Runnable - 
not sure how.
 - it ties up a worker thread until the result can be put onto the outgoing 
LBQ.  If the outgoing LBQ is bounded and you don't have enough consumers 
then eventually all the worker threads will be effectively idle until the 
results can be consumed.  
 - if you didn't want to use a ThreadPool then you could update 'executor' 
to maintain an (atom) number of currently executing jobs.  The glue code is 
single threaded so no chance of multiple jobs starting in parallel.  The 
single threaded 'cost' is fine as it is doing nothing other than moving 
things around.

I am a (Clojure) newbie so be warned!  I fully look forward to somebody 
providing a much nicer and more idiomatic Clojure implementation :).

Hope this helps.

Col

On Thursday, 30 May 2013 06:19:29 UTC+1, Artem Boytsov wrote:
>
> Hello, folks!
>
> I'm a relative noob in Clojure especially when it comes to concurrency, so 
> please forgive my ignorance. I have a processing stage (producer) that 
> feeds to another one (consumer). The producer has a bunch of items to 
> process and it's I/O blocking which takes random time, but the order of the 
> items is insignificant, so ideally they would materialize on the consumer 
> side on the first come first serve basis.
>
> I would like to create a blocking lazy sequence I could just give to the 
> consumer. I know how to create a lazy sequence (lazy-seq), or how to make 
> it run in background and block on results (seque), but what I can't wrap my 
> head around is how parallelize the processing the Clojure way. I was 
> considering kicking off multiple agents, but how can I wait for *any one *of 
> them to finish, not all of them (as await does)? I'm not sure but I think 
> the same goes for futures/promises. I could have multiple agents putting 
> the results into some shared sequence, but then how do I block on the 
> sequence itself?
>
> What I'm trying to do can be described in the following way in a silly 
> imperative pseudo-code:
>
> workers = new Worker[10]                       ; initially w.got_data == 
> nil 
> for each x in source_data:
>    w = wait_for_any_worker_ready(workers)      ; initially all of them are 
> ready
>    if (w.got_data) 
>      output.enqueue(w.data)                    ; the consumer will read 
> output in a blocking way
>      w.process(x)                              ; non-blocking, kicks off 
> in the background
>
> Or, another way to describe it, given a seq of integers:
>
> [ 1, 2, 3, 4 ... ]
>
> and a simple function with a variable delay:
>
> (defn process [x]
>    (Thread/sleep (* 10000 (rand)))
>    (* 2 x))
>
> How can I write a function which would return a blocking lazy sequence of 
> processed integers, in arbitrary order, parallelizing the processing in up 
> to 10 threads?
>
> Thank you!
>
> Artem.
>

-- 
-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


Reply via email to