Re: A blocking lazy sequence populated by multiple worker threads
If you are looking for a more idiomatic solution, https://github.com/jpalmucci/clj-yield wraps a lazy sequence around a blocking queue. On May 30, 2013, at 11:58 AM, Artem Boytsov aboyt...@gmail.com wrote: Hello, Colin, I suspected I should turn to existing Java concurrency constructs. Thank you very much for your response, and this is what I'm going to do. I was just hoping there's some Clojure idiomatic way to solve this, using agents, futures, promises, refs, and other Clojure stuff. For example, if there were a function taking a list of agents and return any one of them which is ready (vs. all of them), I would be able to implement my example relatively simply. Just wanted to make sure I'm not missing anything. Artem. On Thursday, May 30, 2013 2:12:02 AM UTC-7, Colin Yates wrote: Can you not use http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html? That will provide the blocking element. To execute N (i.e. 10 in your example) use a http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html. The 'glue' would be an infinite loop which .takes from the incoming sequence (which could also be a LBQ) and then puts it on the thread pool. That gets stuff happening in parallel. To consume the results of that stuff in a sequence have a(nother) LBQ which the consumers consume (using the blocking .take) and have the glue code wrap the function it received from the LBQ in a function which takes the result of that function and puts it on the sequence. This looks like (clojure forgiveness is required): [code] (def incoming-queue (javaLinkedBlockingQueue.)) (def outgoing-queue (javaLinkedBlockingQueue.)) (def workers (java... some thread pool/executor.)) ; the following would need to reify itself to be a Runnable, not got that far yet :) (defn execute [job result-queue] (let [result (job)] (.put result-queue result))) (def stop-loop (atom false)) (while (not @stop-loop) (def next (.take incoming-queue)) (execute next outgoing-queue)) [/code] A few caveats/notes: - this uses a lot of Java constructs - that is fine. It is perfectly idiomatic to use the right Clojure or Java constructs. LBQs rock. - the above won't compile and the 'execute' needs to return a Runnable - not sure how. - it ties up a worker thread until the result can be put onto the outgoing LBQ. If the outgoing LBQ is bounded and you don't have enough consumers then eventually all the worker threads will be effectively idle until the results can be consumed. - if you didn't want to use a ThreadPool then you could update 'executor' to maintain an (atom) number of currently executing jobs. The glue code is single threaded so no chance of multiple jobs starting in parallel. The single threaded 'cost' is fine as it is doing nothing other than moving things around. I am a (Clojure) newbie so be warned! I fully look forward to somebody providing a much nicer and more idiomatic Clojure implementation :). Hope this helps. Col On Thursday, 30 May 2013 06:19:29 UTC+1, Artem Boytsov wrote: Hello, folks! I'm a relative noob in Clojure especially when it comes to concurrency, so please forgive my ignorance. I have a processing stage (producer) that feeds to another one (consumer). The producer has a bunch of items to process and it's I/O blocking which takes random time, but the order of the items is insignificant, so ideally they would materialize on the consumer side on the first come first serve basis. I would like to create a blocking lazy sequence I could just give to the consumer. I know how to create a lazy sequence (lazy-seq), or how to make it run in background and block on results (seque), but what I can't wrap my head around is how parallelize the processing the Clojure way. I was considering kicking off multiple agents, but how can I wait for any one of them to finish, not all of them (as await does)? I'm not sure but I think the same goes for futures/promises. I could have multiple agents putting the results into some shared sequence, but then how do I block on the sequence itself? What I'm trying to do can be described in the following way in a silly imperative pseudo-code: workers = new Worker[10] ; initially w.got_data == nil for each x in source_data: w = wait_for_any_worker_ready(workers) ; initially all of them are ready if (w.got_data) output.enqueue(w.data); the consumer will read output in a blocking way w.process(x) ; non-blocking, kicks off in the background Or, another way to describe it, given a seq of integers: [ 1, 2, 3, 4 ... ] and a simple function with a variable delay: (defn process [x] (Thread/sleep (* 1 (rand))) (* 2 x)) How can I write a function which would
Re: A blocking lazy sequence populated by multiple worker threads
Can you not use http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html? That will provide the blocking element. To execute N (i.e. 10 in your example) use a http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html. The 'glue' would be an infinite loop which .takes from the incoming sequence (which could also be a LBQ) and then puts it on the thread pool. That gets stuff happening in parallel. To consume the results of that stuff in a sequence have a(nother) LBQ which the consumers consume (using the blocking .take) and have the glue code wrap the function it received from the LBQ in a function which takes the result of that function and puts it on the sequence. This looks like (clojure forgiveness is required): [code] (def incoming-queue (javaLinkedBlockingQueue.)) (def outgoing-queue (javaLinkedBlockingQueue.)) (def workers (java... some thread pool/executor.)) ; the following would need to reify itself to be a Runnable, not got that far yet :) (defn execute [job result-queue] (let [result (job)] (.put result-queue result))) (def stop-loop (atom false)) (while (not @stop-loop) (def next (.take incoming-queue)) (execute next outgoing-queue)) [/code] A few caveats/notes: - this uses a lot of Java constructs - that is fine. It is perfectly idiomatic to use the right Clojure or Java constructs. LBQs rock. - the above won't compile and the 'execute' needs to return a Runnable - not sure how. - it ties up a worker thread until the result can be put onto the outgoing LBQ. If the outgoing LBQ is bounded and you don't have enough consumers then eventually all the worker threads will be effectively idle until the results can be consumed. - if you didn't want to use a ThreadPool then you could update 'executor' to maintain an (atom) number of currently executing jobs. The glue code is single threaded so no chance of multiple jobs starting in parallel. The single threaded 'cost' is fine as it is doing nothing other than moving things around. I am a (Clojure) newbie so be warned! I fully look forward to somebody providing a much nicer and more idiomatic Clojure implementation :). Hope this helps. Col On Thursday, 30 May 2013 06:19:29 UTC+1, Artem Boytsov wrote: Hello, folks! I'm a relative noob in Clojure especially when it comes to concurrency, so please forgive my ignorance. I have a processing stage (producer) that feeds to another one (consumer). The producer has a bunch of items to process and it's I/O blocking which takes random time, but the order of the items is insignificant, so ideally they would materialize on the consumer side on the first come first serve basis. I would like to create a blocking lazy sequence I could just give to the consumer. I know how to create a lazy sequence (lazy-seq), or how to make it run in background and block on results (seque), but what I can't wrap my head around is how parallelize the processing the Clojure way. I was considering kicking off multiple agents, but how can I wait for *any one *of them to finish, not all of them (as await does)? I'm not sure but I think the same goes for futures/promises. I could have multiple agents putting the results into some shared sequence, but then how do I block on the sequence itself? What I'm trying to do can be described in the following way in a silly imperative pseudo-code: workers = new Worker[10] ; initially w.got_data == nil for each x in source_data: w = wait_for_any_worker_ready(workers) ; initially all of them are ready if (w.got_data) output.enqueue(w.data); the consumer will read output in a blocking way w.process(x) ; non-blocking, kicks off in the background Or, another way to describe it, given a seq of integers: [ 1, 2, 3, 4 ... ] and a simple function with a variable delay: (defn process [x] (Thread/sleep (* 1 (rand))) (* 2 x)) How can I write a function which would return a blocking lazy sequence of processed integers, in arbitrary order, parallelizing the processing in up to 10 threads? Thank you! Artem. -- -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.
Re: A blocking lazy sequence populated by multiple worker threads
On May 30, 2013 4:12 AM, Colin Yates colin.ya...@gmail.com wrote: ; the following would need to reify itself to be a Runnable, not got that far yet :) (defn execute [job result-queue] (let [result (job)] (.put result-queue result))) A no-args fn is both a perfectly good Callable and a perfectly good Runnable, making interop with java.util.concurrent pretty painless. So it takes as little as #(execute my-job my-queue) -- -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.
Re: A blocking lazy sequence populated by multiple worker threads
Nice. On 30 May 2013 12:57, John D. Hume duelin.mark...@gmail.com wrote: On May 30, 2013 4:12 AM, Colin Yates colin.ya...@gmail.com wrote: ; the following would need to reify itself to be a Runnable, not got that far yet :) (defn execute [job result-queue] (let [result (job)] (.put result-queue result))) A no-args fn is both a perfectly good Callable and a perfectly good Runnable, making interop with java.util.concurrent pretty painless. So it takes as little as #(execute my-job my-queue) -- -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to a topic in the Google Groups Clojure group. To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure/C6JRJfruoQA/unsubscribe?hl=en. To unsubscribe from this group and all its topics, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out. -- -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.
Re: A blocking lazy sequence populated by multiple worker threads
Hello, Colin, I suspected I should turn to existing Java concurrency constructs. Thank you very much for your response, and this is what I'm going to do. I was just hoping there's some Clojure idiomatic way to solve this, using agents, futures, promises, refs, and other Clojure stuff. For example, if there were a function taking a list of agents and return any one of them which is ready (vs. all of them), I would be able to implement my example relatively simply. Just wanted to make sure I'm not missing anything. Artem. On Thursday, May 30, 2013 2:12:02 AM UTC-7, Colin Yates wrote: Can you not use http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html? That will provide the blocking element. To execute N (i.e. 10 in your example) use a http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html. The 'glue' would be an infinite loop which .takes from the incoming sequence (which could also be a LBQ) and then puts it on the thread pool. That gets stuff happening in parallel. To consume the results of that stuff in a sequence have a(nother) LBQ which the consumers consume (using the blocking .take) and have the glue code wrap the function it received from the LBQ in a function which takes the result of that function and puts it on the sequence. This looks like (clojure forgiveness is required): [code] (def incoming-queue (javaLinkedBlockingQueue.)) (def outgoing-queue (javaLinkedBlockingQueue.)) (def workers (java... some thread pool/executor.)) ; the following would need to reify itself to be a Runnable, not got that far yet :) (defn execute [job result-queue] (let [result (job)] (.put result-queue result))) (def stop-loop (atom false)) (while (not @stop-loop) (def next (.take incoming-queue)) (execute next outgoing-queue)) [/code] A few caveats/notes: - this uses a lot of Java constructs - that is fine. It is perfectly idiomatic to use the right Clojure or Java constructs. LBQs rock. - the above won't compile and the 'execute' needs to return a Runnable - not sure how. - it ties up a worker thread until the result can be put onto the outgoing LBQ. If the outgoing LBQ is bounded and you don't have enough consumers then eventually all the worker threads will be effectively idle until the results can be consumed. - if you didn't want to use a ThreadPool then you could update 'executor' to maintain an (atom) number of currently executing jobs. The glue code is single threaded so no chance of multiple jobs starting in parallel. The single threaded 'cost' is fine as it is doing nothing other than moving things around. I am a (Clojure) newbie so be warned! I fully look forward to somebody providing a much nicer and more idiomatic Clojure implementation :). Hope this helps. Col On Thursday, 30 May 2013 06:19:29 UTC+1, Artem Boytsov wrote: Hello, folks! I'm a relative noob in Clojure especially when it comes to concurrency, so please forgive my ignorance. I have a processing stage (producer) that feeds to another one (consumer). The producer has a bunch of items to process and it's I/O blocking which takes random time, but the order of the items is insignificant, so ideally they would materialize on the consumer side on the first come first serve basis. I would like to create a blocking lazy sequence I could just give to the consumer. I know how to create a lazy sequence (lazy-seq), or how to make it run in background and block on results (seque), but what I can't wrap my head around is how parallelize the processing the Clojure way. I was considering kicking off multiple agents, but how can I wait for *any one *of them to finish, not all of them (as await does)? I'm not sure but I think the same goes for futures/promises. I could have multiple agents putting the results into some shared sequence, but then how do I block on the sequence itself? What I'm trying to do can be described in the following way in a silly imperative pseudo-code: workers = new Worker[10] ; initially w.got_data == nil for each x in source_data: w = wait_for_any_worker_ready(workers) ; initially all of them are ready if (w.got_data) output.enqueue(w.data); the consumer will read output in a blocking way w.process(x) ; non-blocking, kicks off in the background Or, another way to describe it, given a seq of integers: [ 1, 2, 3, 4 ... ] and a simple function with a variable delay: (defn process [x] (Thread/sleep (* 1 (rand))) (* 2 x)) How can I write a function which would return a blocking lazy sequence of processed integers, in arbitrary order, parallelizing the processing in up to 10 threads? Thank you! Artem. -- -- You received this message because you are subscribed to the Google Groups
A blocking lazy sequence populated by multiple worker threads
Hello, folks! I'm a relative noob in Clojure especially when it comes to concurrency, so please forgive my ignorance. I have a processing stage (producer) that feeds to another one (consumer). The producer has a bunch of items to process and it's I/O blocking which takes random time, but the order of the items is insignificant, so ideally they would materialize on the consumer side on the first come first serve basis. I would like to create a blocking lazy sequence I could just give to the consumer. I know how to create a lazy sequence (lazy-seq), or how to make it run in background and block on results (seque), but what I can't wrap my head around is how parallelize the processing the Clojure way. I was considering kicking off multiple agents, but how can I wait for *any one *of them to finish, not all of them (as await does)? I'm not sure but I think the same goes for futures/promises. I could have multiple agents putting the results into some shared sequence, but then how do I block on the sequence itself? What I'm trying to do can be described in the following way in a silly imperative pseudo-code: workers = new Worker[10] ; initially w.got_data == nil for each x in source_data: w = wait_for_any_worker_ready(workers) ; initially all of them are ready if (w.got_data) output.enqueue(w.data); the consumer will read output in a blocking way w.process(x) ; non-blocking, kicks off in the background Or, another way to describe it, given a seq of integers: [ 1, 2, 3, 4 ... ] and a simple function with a variable delay: (defn process [x] (Thread/sleep (* 1 (rand))) (* 2 x)) How can I write a function which would return a blocking lazy sequence of processed integers, in arbitrary order, parallelizing the processing in up to 10 threads? Thank you! Artem. -- -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.