Using agents and blocking I/O.

2009-11-09 Thread David Brown

I'm trying to get a better grasp of how Agents are intended to be
used, so let me give an example scenario.

Let's say I have some thing that keeps track of the state of some I/O
entity, let's say some kind of file-based storage.  There is state
associated with the entity.  It's important that only one thread be
able to read or write from this storage at a time, since the state has
to match what the external store's state is (say it's a cache or
something).

Write requests seems like a perfect match for agents, since they will
be serialized and will happen asynchronously.  But, what about reads.
The reader needs to be able to get the result back from the read, how
to do this.

I can think of a few ways:

   - The reader passes in an atom to hold the result.  After issuing
 the request, awaits for the agent to process the request, and then
 retrieves the answer from the agent.

   - It could use a BlockingQueue of some type to wait for the answer.

In both cases, the reads run completely synchronously, waiting for
their answer, and really the whole thing isn't really any better than
just using locks.

Or, should I rethink the whole thing, and try to represent my entire
problem reactively?  The essentially means converting my entire
problem in to continuation-passing style, and giving some of the
continuations to agents.  Possible, but very pervasive.

Any suggestions?

Thanks,
David

--~--~-~--~~~---~--~~
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
-~--~~~~--~~--~--~---



Re: Using agents and blocking I/O.

2009-11-09 Thread David Brown

On Mon, Nov 09, 2009 at 08:28:43PM -0800, David Brown wrote:

>In both cases, the reads run completely synchronously, waiting for
>their answer, and really the whole thing isn't really any better than
>just using locks.

I guess a deeper concern is that there seems to only be a single call
in the entire Clojure concurrency system: 'await'.

One very useful extension GHC adds to STM is the 'retry' call, which
causes the transaction to retry, but it blocks until something else
modifies one of the refs that it has read.  It allows any arbitrary
concurrency to be implemented, since now a thread can wait for a
result, for example.

Should I just be less afraid of using the Java concurrency classes?

David

--~--~-~--~~~---~--~~
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
-~--~~~~--~~--~--~---



Re: Using agents and blocking I/O.

2009-11-09 Thread Sean Devlin

David,
Agents are designed to be call only once.  That's why they're useful
for I/O (stuff w/ side effects).  refs, however, will retry inside a
transactions.

As always, Rich explains it better than me:
http://www.infoq.com/presentations/Value-Identity-State-Rich-Hickey

Sean

On Nov 9, 11:41 pm, David Brown  wrote:
> On Mon, Nov 09, 2009 at 08:28:43PM -0800, David Brown wrote:
> >In both cases, the reads run completely synchronously, waiting for
> >their answer, and really the whole thing isn't really any better than
> >just using locks.
>
> I guess a deeper concern is that there seems to only be a single call
> in the entire Clojure concurrency system: 'await'.
>
> One very useful extension GHC adds to STM is the 'retry' call, which
> causes the transaction to retry, but it blocks until something else
> modifies one of the refs that it has read.  It allows any arbitrary
> concurrency to be implemented, since now a thread can wait for a
> result, for example.
>
> Should I just be less afraid of using the Java concurrency classes?
>
> David
--~--~-~--~~~---~--~~
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
-~--~~~~--~~--~--~---



Re: Using agents and blocking I/O.

2009-11-09 Thread Timothy Pratley

(locking resource (read/write)) sounds appropriate for such a resource
to me. Maybe you should do locking writes through an agent, and just
rely on locking for blocking reads. I don't really know how lock
requests are queued, is that why you are looking for more complicated
answers?

;; ugh this is ugly!!! I think using locking is more obvious
(defn send-off-and-wait
  [a f & args]
  (let [blocker (java.util.concurrent.LinkedBlockingQueue.)
done-fn #(.add blocker (apply f % args))]
(send-off a done-fn)
(.take blocker)))


--~--~-~--~~~---~--~~
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
-~--~~~~--~~--~--~---



Re: Using agents and blocking I/O.

2009-11-09 Thread Mark Engelberg

On Mon, Nov 9, 2009 at 8:28 PM, David Brown  wrote:
> Let's say I have some thing that keeps track of the state of some I/O
> entity, let's say some kind of file-based storage.  There is state
> associated with the entity.  It's important that only one thread be
> able to read or write from this storage at a time, since the state has
> to match what the external store's state is (say it's a cache or
> something).

Remember that agents also possess some in-memory state.  So, I think
ideally you want to try to code your problem so that your
"writers' send a function to the agent that makes a permanent
transformation to the file-based storage, but also updates the
in-memory state with some representation of what went on in the
file-based storage that will be of use to readers.

Then, your reader just calls await to ensure that any writes sent from
the same thread have been fully processed by the agent, and then calls
deref to access the in-memory state of the agent.  You could do this
await/deref sequence inside a future if you want to be able to
potentially continue getting work done while the read is happening.

But let's say the agent is responsible some enormous database, and
it's impractical for the in-memory state to hold all the information
that readers might find useful.  In this case, I think you're right
that the basic agent functionality doesn't map well to this without
some additional work.  It seems like you would need to create a "read
message" which essentially is a function that reads the relevant data
from the database and stores it in some sort of future-like stateful
entity that will block when you deref it until it has been filled by
the agent.

--~--~-~--~~~---~--~~
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
-~--~~~~--~~--~--~---



Re: Using agents and blocking I/O.

2009-11-09 Thread David Brown

On Mon, Nov 09, 2009 at 09:42:28PM -0800, Mark Engelberg wrote:

>But let's say the agent is responsible some enormous database, and
>it's impractical for the in-memory state to hold all the information
>that readers might find useful.  In this case, I think you're right
>that the basic agent functionality doesn't map well to this without
>some additional work.  It seems like you would need to create a "read
>message" which essentially is a function that reads the relevant data
>from the database and stores it in some sort of future-like stateful
>entity that will block when you deref it until it has been filled by
>the agent.

Ok.  So, it's the existence of this future-like entity that blocks
upon deref until filled is indeed somewhat missing.  It's not
particularly difficult to implement.

This thing could easily create a lazy sequence, in fact, the code
would look a lot like the code for seque, with just a separation of
the writer from the reader.  I'll have to think about it to make sure
that it can be used safely.

Making it a full queue unstead of just an event handles the common
case where I will have a sequence of reads to make.

Thanks,
David

--~--~-~--~~~---~--~~
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
-~--~~~~--~~--~--~---



Re: Using agents and blocking I/O.

2009-11-09 Thread David Brown

On Mon, Nov 09, 2009 at 10:07:41PM -0800, David Brown wrote:
>On Mon, Nov 09, 2009 at 09:42:28PM -0800, Mark Engelberg wrote:
>>But let's say the agent is responsible some enormous database, and
>>it's impractical for the in-memory state to hold all the information
>>that readers might find useful.  In this case, I think you're right
>>that the basic agent functionality doesn't map well to this without
>>some additional work.  It seems like you would need to create a "read
>>message" which essentially is a function that reads the relevant data
>>from the database and stores it in some sort of future-like stateful
>>entity that will block when you deref it until it has been filled by
>>the agent.
>
>Making it a full queue unstead of just an event handles the common
>case where I will have a sequence of reads to make.

Ok, here's my first attempt.  It seems to work.  It's basically like
send-off, except that it wants a queue size, and it returns a lazy
sequence.  It passes an extra argument to the agent function that the
agent should call with each item it wishes to queue.  This is largely
modelled after seque from core.

(import '(java.util.concurrent BlockingQueue LinkedBlockingQueue))
(defn send-queued
   "Dispatch blocking action to agent.  The state of the agent will be
   set to the value of:
   (apply action-fn state-of-agent enqueue args)
   The agent should call enqueue for each item to return to the caller
   of send-queued.  send-queued returns a lazy sequence of the items the
   agent passes to enqueue (in order).  The agent may enqueue 'n' items
   before blocking on its call to enqueue."
   [a n f & args]
   (let [#^BlockingQueue q (LinkedBlockingQueue. (int n))
 NIL (Object.) ;nil sentinel since LBQ doesn't support nils
 enqueue (fn [x]
   (.put q (if (nil? x) NIL x)))
 action (fn [state1]
  (let [state2 (apply f state1 enqueue args)]
(.put q q) ; q itself is eos sentinel
state2))
 drain (fn drain []
 (lazy-seq
   (let [x (.take q)]
 (if (identical? x q) ;q itself is eos sentinel
   (do @a nil)  ;touch agent just to propagate errors
   (cons (if (identical? x NIL) nil x) (drain))]
 (send-off a action)
 (drain)))

David

--~--~-~--~~~---~--~~
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
-~--~~~~--~~--~--~---



Re: Using agents and blocking I/O.

2009-11-10 Thread pmf
On Nov 10, 7:07 am, David Brown  wrote:
> Ok.  So, it's the existence of this future-like entity that blocks
> upon deref until filled is indeed somewhat missing.  It's not
> particularly difficult to implement.
>
> This thing could easily create a lazy sequence, in fact, the code
> would look a lot like the code for seque, with just a separation of
> the writer from the reader.  I'll have to think about it to make sure
> that it can be used safely.

You might want to look at the (recently added) fill-queue (in
clojure.contrib.seq-utils), which provides a lazy seq that is filled
by another thread and blocks if readers consume faster than the queue
is filled; maybe your problem fits into this mechanism.

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en


Re: Using agents and blocking I/O.

2009-11-10 Thread David Brown
On Tue, Nov 10, 2009 at 07:41:41AM -0800, pmf wrote:

>> This thing could easily create a lazy sequence, in fact, the code
>> would look a lot like the code for seque, with just a separation of
>> the writer from the reader.  I'll have to think about it to make sure
>> that it can be used safely.
>
>You might want to look at the (recently added) fill-queue (in
>clojure.contrib.seq-utils), which provides a lazy seq that is filled
>by another thread and blocks if readers consume faster than the queue
>is filled; maybe your problem fits into this mechanism.

Well, almost.  Except that it would create a future that I really
don't have any use for.  Last night, I posted very similar code to do
the same kind of thing with an agent (I have since fixed the exception
handling in the agent so it terminates the queue so the reader will
get an exception rather than just hang).

With fill-queue, send-queued, and seque all looking nearly identical,
I wonder if we're missing how this should be abstracted.

I did learn, though from reading it that seque stops short if the
computation runs ahead enough to fill up the queue.  I'm not quite
sure what this would be useful for.

(import '(java.util.concurrent BlockingQueue LinkedBlockingQueue))
(defn send-queued
   "Dispatch blocking action to agent.  The state of the agent will be
   set to the value of:
   (apply action-fn state-of-agent enqueue args)
   The agent should call enqueue for each item to return to the caller
   of send-queued.  send-queued returns a lazy sequence of the items the
   agent passes to enqueue (in order).  The agent may enqueue 'n' items
   before blocking on its call to enqueue."
   [a n f & args]
   (let [#^BlockingQueue q (LinkedBlockingQueue. (int n))
 NIL (Object.) ;nil sentinel since LBQ doesn't support nils
 enqueue (fn [x]
   (.put q (if (nil? x) NIL x)))
 action (fn [state1]
  (try
(let [state2 (apply f state1 enqueue args)]
  (.put q q) ; q itself is eos sentinel
  state2)
(catch Exception e
  (.put q q)
  (throw e
 drain (fn drain []
 (lazy-seq
   (let [x (.take q)]
 (if (identical? x q) ;q itself is eos sentinel
   (do @a nil)  ;touch agent just to propagate errors
   (cons (if (identical? x NIL) nil x) (drain))]
 (send-off a action)
 (drain)))

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en


Re: Using agents and blocking I/O.

2009-11-10 Thread John Harrop
On Tue, Nov 10, 2009 at 10:41 AM, pmf  wrote:

> On Nov 10, 7:07 am, David Brown  wrote:
> > Ok.  So, it's the existence of this future-like entity that blocks
> > upon deref until filled is indeed somewhat missing.  It's not
> > particularly difficult to implement.
> >
> > This thing could easily create a lazy sequence, in fact, the code
> > would look a lot like the code for seque, with just a separation of
> > the writer from the reader.  I'll have to think about it to make sure
> > that it can be used safely.
>
> You might want to look at the (recently added) fill-queue (in
> clojure.contrib.seq-utils), which provides a lazy seq that is filled
> by another thread and blocks if readers consume faster than the queue
> is filled; maybe your problem fits into this mechanism.


This suggests something else, in turn -- a tunable, somewhat lazy seq
produced using parallelism:

(defmacro p-lazy-seq [lookahead continue? & body]
  `(let [lzy# (fn lzy [] (lazy-seq (if ~continue? (cons (future ~...@body)
(~'lzy)
 s# (lzy#)]
 (map (fn [x# _#] (deref x#)) s# (drop ~lookahead s#

The result for

(p-lazy-seq n continue?
  body)

should be the same as for

(letfn [(f []
  (lazy-seq
(if continue?
  (cons (do body) (f)]
  (f))

i.e. continue? is evaluated and if true body is evaluated to yield the next
element of the lazy sequence; then continue? is evaluated again, etc. etc.

Except that the generation of elements is done on worker threads, possibly
more than one at a time if on multicore hardware, transparently to the
consumer of the seq.

Consumption of the seq blocks if it reaches an element not yet produced. The
tandem map of s# and (drop ~lookahead s#) is used to force the creation of a
future wrapping the element n ahead of the current element, while the
current element's future is dereferenced (which is where the blocking may
occur) and its result returned. Note that the future n elements ahead is NOT
dereferenced; it's just generated (by realizing that element of a lazy
sequence of future objects) which causes it to be enqueued for calculation
on a thread pool without blocking until it has a result.

Thus, lookahead futures will be computing or done at any given time. A
lookahead of zero would result in no better than sequential performance as
the consumer of the seq would block until one more element was produced,
process it, block until one more element was produced, etc.; however, a
lookahead of 1 allows the consumer of the seq to be processing one element
while the next is being produced by another thread, and higher lookaheads
can exploit more than two cores for even more parallelism.

As for a practical application:

(def *rngs* (atom {}))

(defn thread-local-rand [n]
 (if-let [rng (@*rngs* (Thread/currentThread))]
   (rng n)
   (let [rng-1 (java.util.Random.)
 rng #(.nextInt rng-1 %)]
 (swap! *rngs* assoc (Thread/currentThread) rng)
 (rng n

user=> (take 10 (p-lazy-seq 3 true (thread-local-rand 10)))
(1 2 6 1 5 1 7 8 4 3)

This should generate the random numbers on multiple threads, using multiple
RNGs. In the limit, on multicore hardware and with a slow enough RNG
implementation (e.g. SecureRandom), a consumer of that sequence might be
able to obtain and use random numbers faster than with direct calls to the
RNG implementation.

(The mechanism I used to make a function that maintains its own encapsulated
thread-local state, persistent across calls, might be worth a closer look,
too, for situations where binding just can't get the job done.)

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en

Re: Using agents and blocking I/O.

2009-11-10 Thread John Harrop
On Tue, Nov 10, 2009 at 11:41 AM, John Harrop  wrote:

> user=> (take 10 (p-lazy-seq 3 true (thread-local-rand 10)))
> (1 2 6 1 5 1 7 8 4 3)
>
> This should generate the random numbers on multiple threads, using multiple
> RNGs. In the limit, on multicore hardware and with a slow enough RNG
> implementation (e.g. SecureRandom), a consumer of that sequence might be
> able to obtain and use random numbers faster than with direct calls to the
> RNG implementation.
>

Oh, and did I mention the random numbers should also be more *secure*? There
would be multiple SecureRandom (or whatever)  instances and which one is
chosen to produce a given random number will be essentially, well, random,
making it, if possible, even harder to guess the next random number the
consumer will use than if it just used one SecureRandom instance directly.

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en

Re: Using agents and blocking I/O.

2009-11-10 Thread Anne Ogborn
Anybody else getting bombarded by multiple copies of this message?

--- On Tue, 11/10/09, John Harrop  wrote:

> From: John Harrop 
> Subject: Re: Using agents and blocking I/O.
> To: clojure@googlegroups.com
> Date: Tuesday, November 10, 2009, 8:45 AM
> On Tue, Nov 10,
> 2009 at 11:41 AM, John Harrop 
> wrote:
> 
> user=> (take 10
> (p-lazy-seq 3 true (thread-local-rand
> 10)))(1 2 6 1 5 1 7 8 4
> 3)
> This should generate the random numbers on
> multiple threads, using multiple RNGs. In the limit, on
> multicore hardware and with a slow enough RNG implementation
> (e.g. SecureRandom), a consumer of that sequence might be
> able to obtain and use random numbers faster than with
> direct calls to the RNG implementation.
> 
> Oh, and did I mention the random numbers should
> also be more *secure*? There would be multiple SecureRandom
> (or whatever)  instances and which one is chosen to produce
> a given random number will be essentially, well, random,
> making it, if possible, even harder to guess the next random
> number the consumer will use than if it just used one
> SecureRandom instance directly.
> 
> 
> 
> 
> -- 
> 
> You received this message because you are subscribed to the
> Google
> 
> Groups "Clojure" group.
> 
> To post to this group, send email to
> clojure@googlegroups.com
> 
> Note that posts from new members are moderated - please be
> patient with your first post.
> 
> To unsubscribe from this group, send email to
> 
> clojure+unsubscr...@googlegroups.com
> 
> For more options, visit this group at
> 
> http://groups.google.com/group/clojure?hl=en


  

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en