On Mon, Nov 09, 2009 at 10:07:41PM -0800, David Brown wrote:
>On Mon, Nov 09, 2009 at 09:42:28PM -0800, Mark Engelberg wrote:
>>But let's say the agent is responsible some enormous database, and
>>it's impractical for the in-memory state to hold all the information
>>that readers might find useful.  In this case, I think you're right
>>that the basic agent functionality doesn't map well to this without
>>some additional work.  It seems like you would need to create a "read
>>message" which essentially is a function that reads the relevant data
>>from the database and stores it in some sort of future-like stateful
>>entity that will block when you deref it until it has been filled by
>>the agent.
>
>Making it a full queue unstead of just an event handles the common
>case where I will have a sequence of reads to make.

Ok, here's my first attempt.  It seems to work.  It's basically like
send-off, except that it wants a queue size, and it returns a lazy
sequence.  It passes an extra argument to the agent function that the
agent should call with each item it wishes to queue.  This is largely
modelled after seque from core.

(import '(java.util.concurrent BlockingQueue LinkedBlockingQueue))
(defn send-queued
   "Dispatch blocking action to agent.  The state of the agent will be
   set to the value of:
   (apply action-fn state-of-agent enqueue args)
   The agent should call enqueue for each item to return to the caller
   of send-queued.  send-queued returns a lazy sequence of the items the
   agent passes to enqueue (in order).  The agent may enqueue 'n' items
   before blocking on its call to enqueue."
   [a n f & args]
   (let [#^BlockingQueue q (LinkedBlockingQueue. (int n))
         NIL (Object.) ;nil sentinel since LBQ doesn't support nils
         enqueue (fn [x]
                   (.put q (if (nil? x) NIL x)))
         action (fn [state1]
                  (let [state2 (apply f state1 enqueue args)]
                    (.put q q) ; q itself is eos sentinel
                    state2))
         drain (fn drain []
                 (lazy-seq
                   (let [x (.take q)]
                     (if (identical? x q) ;q itself is eos sentinel
                       (do @a nil)  ;touch agent just to propagate errors
                       (cons (if (identical? x NIL) nil x) (drain))))))]
     (send-off a action)
     (drain)))

David

--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to