Re: A blocking lazy sequence populated by multiple worker threads

2013-06-12 Thread Jeff Palmucci
If you are looking for a more idiomatic solution, 
https://github.com/jpalmucci/clj-yield wraps a lazy sequence around a blocking 
queue. 

On May 30, 2013, at 11:58 AM, Artem Boytsov aboyt...@gmail.com wrote:

 Hello, Colin,
 
 I suspected I should turn to existing Java concurrency constructs. Thank you 
 very much for your response, and this is what I'm going to do. I was just 
 hoping there's some Clojure idiomatic way to solve this, using agents, 
 futures, promises, refs, and other Clojure stuff. For example, if there were 
 a function taking a list of agents and return any one of them which is ready 
 (vs. all of them), I would be able to implement my example relatively simply. 
 Just wanted to make sure I'm not missing anything.
 
 Artem.
 
 On Thursday, May 30, 2013 2:12:02 AM UTC-7, Colin Yates wrote:
 Can you not use 
 http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html?
   That will provide the blocking element.  
 
 To execute N (i.e. 10 in your example) use a 
 http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html.
   The 'glue' would be an infinite loop which .takes from the incoming 
 sequence (which could also be a LBQ) and then puts it on the thread pool.
 
 That gets stuff happening in parallel.  
 
 To consume the results of that stuff in a sequence have a(nother) LBQ which 
 the consumers consume (using the blocking .take) and have the glue code wrap 
 the function it received from the LBQ in a function which takes the result of 
 that function and puts it on the sequence.  
 
 This looks like (clojure forgiveness is required):
 
 [code]
 (def incoming-queue (javaLinkedBlockingQueue.))
 (def outgoing-queue (javaLinkedBlockingQueue.))
 (def workers (java... some thread pool/executor.))
 
 ; the following would need to reify itself to be a Runnable, not got that far 
 yet :)
 (defn execute [job result-queue] (let [result (job)] (.put result-queue 
 result)))
 
 (def stop-loop (atom false))
 (while (not @stop-loop)
   (def next (.take incoming-queue))
   (execute next outgoing-queue))
 [/code]
 
 A few caveats/notes:
  - this uses a lot of Java constructs - that is fine.  It is perfectly 
 idiomatic to use the right Clojure or Java constructs.  LBQs rock.
  - the above won't compile and the 'execute' needs to return a Runnable - not 
 sure how.
  - it ties up a worker thread until the result can be put onto the outgoing 
 LBQ.  If the outgoing LBQ is bounded and you don't have enough consumers then 
 eventually all the worker threads will be effectively idle until the results 
 can be consumed.  
  - if you didn't want to use a ThreadPool then you could update 'executor' to 
 maintain an (atom) number of currently executing jobs.  The glue code is 
 single threaded so no chance of multiple jobs starting in parallel.  The 
 single threaded 'cost' is fine as it is doing nothing other than moving 
 things around.
 
 I am a (Clojure) newbie so be warned!  I fully look forward to somebody 
 providing a much nicer and more idiomatic Clojure implementation :).
 
 Hope this helps.
 
 Col
 
 On Thursday, 30 May 2013 06:19:29 UTC+1, Artem Boytsov wrote:
 Hello, folks!
 
 I'm a relative noob in Clojure especially when it comes to concurrency, so 
 please forgive my ignorance. I have a processing stage (producer) that feeds 
 to another one (consumer). The producer has a bunch of items to process and 
 it's I/O blocking which takes random time, but the order of the items is 
 insignificant, so ideally they would materialize on the consumer side on the 
 first come first serve basis.
 
 I would like to create a blocking lazy sequence I could just give to the 
 consumer. I know how to create a lazy sequence (lazy-seq), or how to make it 
 run in background and block on results (seque), but what I can't wrap my head 
 around is how parallelize the processing the Clojure way. I was considering 
 kicking off multiple agents, but how can I wait for any one of them to 
 finish, not all of them (as await does)? I'm not sure but I think the same 
 goes for futures/promises. I could have multiple agents putting the results 
 into some shared sequence, but then how do I block on the sequence itself?
 
 What I'm trying to do can be described in the following way in a silly 
 imperative pseudo-code:
 
 workers = new Worker[10]   ; initially w.got_data == nil 
 for each x in source_data:
w = wait_for_any_worker_ready(workers)  ; initially all of them are 
 ready
if (w.got_data) 
  output.enqueue(w.data); the consumer will read 
 output in a blocking way
  w.process(x)  ; non-blocking, kicks off in 
 the background
 
 Or, another way to describe it, given a seq of integers:
 
 [ 1, 2, 3, 4 ... ]
 
 and a simple function with a variable delay:
 
 (defn process [x]
(Thread/sleep (* 1 (rand)))
(* 2 x))
 
 How can I write a function which would 

Re: A blocking lazy sequence populated by multiple worker threads

2013-05-30 Thread Colin Yates
Can you not use 
http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html?
 
 That will provide the blocking element.  

To execute N (i.e. 10 in your example) use a 
http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html.
 
 The 'glue' would be an infinite loop which .takes from the incoming 
sequence (which could also be a LBQ) and then puts it on the thread pool.

That gets stuff happening in parallel.  

To consume the results of that stuff in a sequence have a(nother) LBQ which 
the consumers consume (using the blocking .take) and have the glue code 
wrap the function it received from the LBQ in a function which takes the 
result of that function and puts it on the sequence.  

This looks like (clojure forgiveness is required):

[code]
(def incoming-queue (javaLinkedBlockingQueue.))
(def outgoing-queue (javaLinkedBlockingQueue.))
(def workers (java... some thread pool/executor.))

; the following would need to reify itself to be a Runnable, not got that 
far yet :)
(defn execute [job result-queue] (let [result (job)] (.put result-queue 
result)))

(def stop-loop (atom false))
(while (not @stop-loop)
  (def next (.take incoming-queue))
  (execute next outgoing-queue))
[/code]

A few caveats/notes:
 - this uses a lot of Java constructs - that is fine.  It is perfectly 
idiomatic to use the right Clojure or Java constructs.  LBQs rock.
 - the above won't compile and the 'execute' needs to return a Runnable - 
not sure how.
 - it ties up a worker thread until the result can be put onto the outgoing 
LBQ.  If the outgoing LBQ is bounded and you don't have enough consumers 
then eventually all the worker threads will be effectively idle until the 
results can be consumed.  
 - if you didn't want to use a ThreadPool then you could update 'executor' 
to maintain an (atom) number of currently executing jobs.  The glue code is 
single threaded so no chance of multiple jobs starting in parallel.  The 
single threaded 'cost' is fine as it is doing nothing other than moving 
things around.

I am a (Clojure) newbie so be warned!  I fully look forward to somebody 
providing a much nicer and more idiomatic Clojure implementation :).

Hope this helps.

Col

On Thursday, 30 May 2013 06:19:29 UTC+1, Artem Boytsov wrote:

 Hello, folks!

 I'm a relative noob in Clojure especially when it comes to concurrency, so 
 please forgive my ignorance. I have a processing stage (producer) that 
 feeds to another one (consumer). The producer has a bunch of items to 
 process and it's I/O blocking which takes random time, but the order of the 
 items is insignificant, so ideally they would materialize on the consumer 
 side on the first come first serve basis.

 I would like to create a blocking lazy sequence I could just give to the 
 consumer. I know how to create a lazy sequence (lazy-seq), or how to make 
 it run in background and block on results (seque), but what I can't wrap my 
 head around is how parallelize the processing the Clojure way. I was 
 considering kicking off multiple agents, but how can I wait for *any one *of 
 them to finish, not all of them (as await does)? I'm not sure but I think 
 the same goes for futures/promises. I could have multiple agents putting 
 the results into some shared sequence, but then how do I block on the 
 sequence itself?

 What I'm trying to do can be described in the following way in a silly 
 imperative pseudo-code:

 workers = new Worker[10]   ; initially w.got_data == 
 nil 
 for each x in source_data:
w = wait_for_any_worker_ready(workers)  ; initially all of them are 
 ready
if (w.got_data) 
  output.enqueue(w.data); the consumer will read 
 output in a blocking way
  w.process(x)  ; non-blocking, kicks off 
 in the background

 Or, another way to describe it, given a seq of integers:

 [ 1, 2, 3, 4 ... ]

 and a simple function with a variable delay:

 (defn process [x]
(Thread/sleep (* 1 (rand)))
(* 2 x))

 How can I write a function which would return a blocking lazy sequence of 
 processed integers, in arbitrary order, parallelizing the processing in up 
 to 10 threads?

 Thank you!

 Artem.


-- 
-- 
You received this message because you are subscribed to the Google
Groups Clojure group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
Clojure group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.




Re: A blocking lazy sequence populated by multiple worker threads

2013-05-30 Thread John D. Hume
On May 30, 2013 4:12 AM, Colin Yates colin.ya...@gmail.com wrote:
 ; the following would need to reify itself to be a Runnable, not got that
far yet :)
 (defn execute [job result-queue] (let [result (job)] (.put result-queue
result)))


A no-args fn is both a perfectly good Callable and a perfectly good
Runnable, making interop with java.util.concurrent pretty painless.

So it takes as little as
#(execute my-job my-queue)

-- 
-- 
You received this message because you are subscribed to the Google
Groups Clojure group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
Clojure group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.




Re: A blocking lazy sequence populated by multiple worker threads

2013-05-30 Thread Colin Yates
Nice.


On 30 May 2013 12:57, John D. Hume duelin.mark...@gmail.com wrote:

 On May 30, 2013 4:12 AM, Colin Yates colin.ya...@gmail.com wrote:
  ; the following would need to reify itself to be a Runnable, not got
 that far yet :)
  (defn execute [job result-queue] (let [result (job)] (.put result-queue
 result)))
 

 A no-args fn is both a perfectly good Callable and a perfectly good
 Runnable, making interop with java.util.concurrent pretty painless.

 So it takes as little as
 #(execute my-job my-queue)

 --
 --
 You received this message because you are subscribed to the Google
 Groups Clojure group.
 To post to this group, send email to clojure@googlegroups.com
 Note that posts from new members are moderated - please be patient with
 your first post.
 To unsubscribe from this group, send email to
 clojure+unsubscr...@googlegroups.com
 For more options, visit this group at
 http://groups.google.com/group/clojure?hl=en
 ---
 You received this message because you are subscribed to a topic in the
 Google Groups Clojure group.
 To unsubscribe from this topic, visit
 https://groups.google.com/d/topic/clojure/C6JRJfruoQA/unsubscribe?hl=en.
 To unsubscribe from this group and all its topics, send an email to
 clojure+unsubscr...@googlegroups.com.
 For more options, visit https://groups.google.com/groups/opt_out.




-- 
-- 
You received this message because you are subscribed to the Google
Groups Clojure group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
Clojure group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.




Re: A blocking lazy sequence populated by multiple worker threads

2013-05-30 Thread Artem Boytsov
Hello, Colin,

I suspected I should turn to existing Java concurrency constructs. Thank 
you very much for your response, and this is what I'm going to do. I was 
just hoping there's some Clojure idiomatic way to solve this, using agents, 
futures, promises, refs, and other Clojure stuff. For example, if there 
were a function taking a list of agents and return any one of them which is 
ready (vs. all of them), I would be able to implement my example relatively 
simply. Just wanted to make sure I'm not missing anything.

Artem.

On Thursday, May 30, 2013 2:12:02 AM UTC-7, Colin Yates wrote:

 Can you not use 
 http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html?
  
  That will provide the blocking element.  

 To execute N (i.e. 10 in your example) use a 
 http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html.
  
  The 'glue' would be an infinite loop which .takes from the incoming 
 sequence (which could also be a LBQ) and then puts it on the thread pool.

 That gets stuff happening in parallel.  

 To consume the results of that stuff in a sequence have a(nother) LBQ 
 which the consumers consume (using the blocking .take) and have the glue 
 code wrap the function it received from the LBQ in a function which takes 
 the result of that function and puts it on the sequence.  

 This looks like (clojure forgiveness is required):

 [code]
 (def incoming-queue (javaLinkedBlockingQueue.))
 (def outgoing-queue (javaLinkedBlockingQueue.))
 (def workers (java... some thread pool/executor.))

 ; the following would need to reify itself to be a Runnable, not got that 
 far yet :)
 (defn execute [job result-queue] (let [result (job)] (.put result-queue 
 result)))

 (def stop-loop (atom false))
 (while (not @stop-loop)
   (def next (.take incoming-queue))
   (execute next outgoing-queue))
 [/code]

 A few caveats/notes:
  - this uses a lot of Java constructs - that is fine.  It is perfectly 
 idiomatic to use the right Clojure or Java constructs.  LBQs rock.
  - the above won't compile and the 'execute' needs to return a Runnable - 
 not sure how.
  - it ties up a worker thread until the result can be put onto the 
 outgoing LBQ.  If the outgoing LBQ is bounded and you don't have enough 
 consumers then eventually all the worker threads will be effectively idle 
 until the results can be consumed.  
  - if you didn't want to use a ThreadPool then you could update 'executor' 
 to maintain an (atom) number of currently executing jobs.  The glue code is 
 single threaded so no chance of multiple jobs starting in parallel.  The 
 single threaded 'cost' is fine as it is doing nothing other than moving 
 things around.

 I am a (Clojure) newbie so be warned!  I fully look forward to somebody 
 providing a much nicer and more idiomatic Clojure implementation :).

 Hope this helps.

 Col

 On Thursday, 30 May 2013 06:19:29 UTC+1, Artem Boytsov wrote:

 Hello, folks!

 I'm a relative noob in Clojure especially when it comes to concurrency, 
 so please forgive my ignorance. I have a processing stage (producer) that 
 feeds to another one (consumer). The producer has a bunch of items to 
 process and it's I/O blocking which takes random time, but the order of the 
 items is insignificant, so ideally they would materialize on the consumer 
 side on the first come first serve basis.

 I would like to create a blocking lazy sequence I could just give to the 
 consumer. I know how to create a lazy sequence (lazy-seq), or how to make 
 it run in background and block on results (seque), but what I can't wrap my 
 head around is how parallelize the processing the Clojure way. I was 
 considering kicking off multiple agents, but how can I wait for *any one 
 *of them to finish, not all of them (as await does)? I'm not sure but I 
 think the same goes for futures/promises. I could have multiple agents 
 putting the results into some shared sequence, but then how do I block on 
 the sequence itself?

 What I'm trying to do can be described in the following way in a silly 
 imperative pseudo-code:

 workers = new Worker[10]   ; initially w.got_data == 
 nil 
 for each x in source_data:
w = wait_for_any_worker_ready(workers)  ; initially all of them 
 are ready
if (w.got_data) 
  output.enqueue(w.data); the consumer will read 
 output in a blocking way
  w.process(x)  ; non-blocking, kicks off 
 in the background

 Or, another way to describe it, given a seq of integers:

 [ 1, 2, 3, 4 ... ]

 and a simple function with a variable delay:

 (defn process [x]
(Thread/sleep (* 1 (rand)))
(* 2 x))

 How can I write a function which would return a blocking lazy sequence of 
 processed integers, in arbitrary order, parallelizing the processing in up 
 to 10 threads?

 Thank you!

 Artem.



-- 
-- 
You received this message because you are subscribed to the Google
Groups 

A blocking lazy sequence populated by multiple worker threads

2013-05-29 Thread Artem Boytsov
Hello, folks!

I'm a relative noob in Clojure especially when it comes to concurrency, so 
please forgive my ignorance. I have a processing stage (producer) that 
feeds to another one (consumer). The producer has a bunch of items to 
process and it's I/O blocking which takes random time, but the order of the 
items is insignificant, so ideally they would materialize on the consumer 
side on the first come first serve basis.

I would like to create a blocking lazy sequence I could just give to the 
consumer. I know how to create a lazy sequence (lazy-seq), or how to make 
it run in background and block on results (seque), but what I can't wrap my 
head around is how parallelize the processing the Clojure way. I was 
considering kicking off multiple agents, but how can I wait for *any one *of 
them to finish, not all of them (as await does)? I'm not sure but I think 
the same goes for futures/promises. I could have multiple agents putting 
the results into some shared sequence, but then how do I block on the 
sequence itself?

What I'm trying to do can be described in the following way in a silly 
imperative pseudo-code:

workers = new Worker[10]   ; initially w.got_data == 
nil 
for each x in source_data:
   w = wait_for_any_worker_ready(workers)  ; initially all of them are 
ready
   if (w.got_data) 
 output.enqueue(w.data); the consumer will read 
output in a blocking way
 w.process(x)  ; non-blocking, kicks off in 
the background

Or, another way to describe it, given a seq of integers:

[ 1, 2, 3, 4 ... ]

and a simple function with a variable delay:

(defn process [x]
   (Thread/sleep (* 1 (rand)))
   (* 2 x))

How can I write a function which would return a blocking lazy sequence of 
processed integers, in arbitrary order, parallelizing the processing in up 
to 10 threads?

Thank you!

Artem.

-- 
-- 
You received this message because you are subscribed to the Google
Groups Clojure group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
Clojure group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.