If you are looking for a more idiomatic solution, 
https://github.com/jpalmucci/clj-yield wraps a lazy sequence around a blocking 
queue. 

On May 30, 2013, at 11:58 AM, Artem Boytsov <aboyt...@gmail.com> wrote:

> Hello, Colin,
> 
> I suspected I should turn to existing Java concurrency constructs. Thank you 
> very much for your response, and this is what I'm going to do. I was just 
> hoping there's some Clojure idiomatic way to solve this, using agents, 
> futures, promises, refs, and other Clojure stuff. For example, if there were 
> a function taking a list of agents and return any one of them which is ready 
> (vs. all of them), I would be able to implement my example relatively simply. 
> Just wanted to make sure I'm not missing anything.
> 
> Artem.
> 
> On Thursday, May 30, 2013 2:12:02 AM UTC-7, Colin Yates wrote:
> Can you not use 
> http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html?
>   That will provide the blocking element.  
> 
> To execute N (i.e. 10 in your example) use a 
> http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html.
>   The 'glue' would be an infinite loop which .takes from the incoming 
> sequence (which could also be a LBQ) and then puts it on the thread pool.
> 
> That gets stuff happening in parallel.  
> 
> To consume the results of that stuff in a sequence have a(nother) LBQ which 
> the consumers consume (using the blocking .take) and have the glue code wrap 
> the function it received from the LBQ in a function which takes the result of 
> that function and puts it on the sequence.  
> 
> This looks like (clojure forgiveness is required):
> 
> [code]
> (def incoming-queue (java....LinkedBlockingQueue.))
> (def outgoing-queue (java....LinkedBlockingQueue.))
> (def workers (java... some thread pool/executor.))
> 
> ; the following would need to reify itself to be a Runnable, not got that far 
> yet :)
> (defn execute [job result-queue] (let [result (job)] (.put result-queue 
> result)))
> 
> (def stop-loop (atom false))
> (while (not @stop-loop)
>   (def next (.take incoming-queue))
>   (execute next outgoing-queue))
> [/code]
> 
> A few caveats/notes:
>  - this uses a lot of Java constructs - that is fine.  It is perfectly 
> idiomatic to use the right Clojure or Java constructs.  LBQs rock.
>  - the above won't compile and the 'execute' needs to return a Runnable - not 
> sure how.
>  - it ties up a worker thread until the result can be put onto the outgoing 
> LBQ.  If the outgoing LBQ is bounded and you don't have enough consumers then 
> eventually all the worker threads will be effectively idle until the results 
> can be consumed.  
>  - if you didn't want to use a ThreadPool then you could update 'executor' to 
> maintain an (atom) number of currently executing jobs.  The glue code is 
> single threaded so no chance of multiple jobs starting in parallel.  The 
> single threaded 'cost' is fine as it is doing nothing other than moving 
> things around.
> 
> I am a (Clojure) newbie so be warned!  I fully look forward to somebody 
> providing a much nicer and more idiomatic Clojure implementation :).
> 
> Hope this helps.
> 
> Col
> 
> On Thursday, 30 May 2013 06:19:29 UTC+1, Artem Boytsov wrote:
> Hello, folks!
> 
> I'm a relative noob in Clojure especially when it comes to concurrency, so 
> please forgive my ignorance. I have a processing stage (producer) that feeds 
> to another one (consumer). The producer has a bunch of items to process and 
> it's I/O blocking which takes random time, but the order of the items is 
> insignificant, so ideally they would materialize on the consumer side on the 
> first come first serve basis.
> 
> I would like to create a blocking lazy sequence I could just give to the 
> consumer. I know how to create a lazy sequence (lazy-seq), or how to make it 
> run in background and block on results (seque), but what I can't wrap my head 
> around is how parallelize the processing the Clojure way. I was considering 
> kicking off multiple agents, but how can I wait for any one of them to 
> finish, not all of them (as await does)? I'm not sure but I think the same 
> goes for futures/promises. I could have multiple agents putting the results 
> into some shared sequence, but then how do I block on the sequence itself?
> 
> What I'm trying to do can be described in the following way in a silly 
> imperative pseudo-code:
> 
> workers = new Worker[10]                       ; initially w.got_data == nil 
> for each x in source_data:
>    w = wait_for_any_worker_ready(workers)      ; initially all of them are 
> ready
>    if (w.got_data) 
>      output.enqueue(w.data)                    ; the consumer will read 
> output in a blocking way
>      w.process(x)                              ; non-blocking, kicks off in 
> the background
> 
> Or, another way to describe it, given a seq of integers:
> 
> [ 1, 2, 3, 4 ... ]
> 
> and a simple function with a variable delay:
> 
> (defn process [x]
>    (Thread/sleep (* 10000 (rand)))
>    (* 2 x))
> 
> How can I write a function which would return a blocking lazy sequence of 
> processed integers, in arbitrary order, parallelizing the processing in up to 
> 10 threads?
> 
> Thank you!
> 
> Artem.
> 
> -- 
> -- 
> You received this message because you are subscribed to the Google
> Groups "Clojure" group.
> To post to this group, send email to clojure@googlegroups.com
> Note that posts from new members are moderated - please be patient with your 
> first post.
> To unsubscribe from this group, send email to
> clojure+unsubscr...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/clojure?hl=en
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Clojure" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to clojure+unsubscr...@googlegroups.com.
> For more options, visit https://groups.google.com/groups/opt_out.
>  
>  

-- 
-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


Reply via email to