I recently discovered a problem with the seque function. I posted it here,
but the message seems to be in the moderator's queue.
The expression (dorun (seque 3 (seque 3 (range 10))) causes clojure to hang.
The problem is that the (send-off ...) call within the drain function,
which causes the background thread to actually go and do work, does not
take effect until the pending sends (in the nested vector) are released.
In this case the pending sends are not released until the (.take q)
returns, but (.take q) blocks until the background thread pulls the seq and
(.offer)s an element to the LinkedBlockingQueue.
This can be remedied crudely by introducing a call to
(release-pending-sends) before the (.take q) operation:
diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj
index 336be78..aecdfeb 100644
--- a/src/clj/clojure/core.clj
+++ b/src/clj/clojure/core.clj
@@ -4759,6 +4759,7 @@
(.put q q)
(throw e))))
drain (fn drain []
+ (release-pending-sends)
(lazy-seq
(let [x (.take q)]
(if (identical? x q) ;q itself is eos sentinel
--
but this will cause all of the sends in the current agent action, not just
those related to the seque, to be released early, and that seems sloppy.
Perhaps it would be better to introduce (send-now agt f & args) and
(send-off-now agt f & args) which do the same as (send) and (send-off) but
release the action immediately, like so:
---
src/clj/clojure/core.clj | 30
++++++++++++++++++++++---
src/jvm/clojure/lang/Agent.java | 8 +++---
src/jvm/clojure/lang/LockingTransaction.java | 2 +-
3 files changed, 31 insertions(+), 9 deletions(-)
diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj
index 336be78..94663a1 100644
--- a/src/clj/clojure/core.clj
+++ b/src/clj/clojure/core.clj
@@ -1875,6 +1875,28 @@
(if (:error-handler opts) :continue :fail)))
a)))
+(defn send-now
+ "Dispatch an action to an agent. Returns the agent immediately.
+ Subsequently, in a thread from a thread pool, the state of the agent
+ will be set to the value of:
+
+ (apply action-fn state-of-agent args)"
+ {:added "1.0"
+ :static true}
+ [^clojure.lang.Agent a f & args]
+ (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args false
true))
+
+(defn send-off-now
+ "Dispatch a potentially blocking action to an agent. Returns the
+ agent immediately. Subsequently, in a separate thread, the state of
+ the agent will be set to the value of:
+
+ (apply action-fn state-of-agent args)"
+ {:added "1.0"
+ :static true}
+ [^clojure.lang.Agent a f & args]
+ (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args true
true))
+
(defn send
"Dispatch an action to an agent. Returns the agent immediately.
Subsequently, in a thread from a thread pool, the state of the agent
@@ -1884,7 +1906,7 @@
{:added "1.0"
:static true}
[^clojure.lang.Agent a f & args]
- (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args false))
+ (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args false
false))
(defn send-off
"Dispatch a potentially blocking action to an agent. Returns the
@@ -1895,7 +1917,7 @@
{:added "1.0"
:static true}
[^clojure.lang.Agent a f & args]
- (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args true))
+ (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args true
false))
(defn release-pending-sends
"Normally, actions sent directly or indirectly during another action
@@ -4764,9 +4786,9 @@
(if (identical? x q) ;q itself is eos sentinel
(do @agt nil) ;touch agent just to propagate errors
(do
- (send-off agt fill)
+ (send-off-now agt fill)
(cons (if (identical? x NIL) nil x) (drain)))))))]
- (send-off agt fill)
+ (send-off-now agt fill)
(drain))))
(defn class?
diff --git a/src/jvm/clojure/lang/Agent.java
b/src/jvm/clojure/lang/Agent.java
index e63d060..1fa297b 100644
--- a/src/jvm/clojure/lang/Agent.java
+++ b/src/jvm/clojure/lang/Agent.java
@@ -233,23 +233,23 @@ synchronized public Object restart(Object newState,
boolean clearActions){
return newState;
}
-public Object dispatch(IFn fn, ISeq args, boolean solo) {
+public Object dispatch(IFn fn, ISeq args, boolean solo, boolean now) {
Throwable error = getError();
if(error != null)
{
throw Util.runtimeException("Agent is failed, needs
restart", error);
}
Action action = new Action(this, fn, args, solo);
- dispatchAction(action);
+ dispatchAction(action, now);
return this;
}
-static void dispatchAction(Action action){
+static void dispatchAction(Action action, boolean now){
LockingTransaction trans = LockingTransaction.getRunning();
if(trans != null)
trans.enqueue(action);
- else if(nested.get() != null)
+ else if(nested.get() != null && !now)
{
nested.set(nested.get().cons(action));
}
diff --git a/src/jvm/clojure/lang/LockingTransaction.java
b/src/jvm/clojure/lang/LockingTransaction.java
index 44d2de6..7b75f7b 100644
--- a/src/jvm/clojure/lang/LockingTransaction.java
+++ b/src/jvm/clojure/lang/LockingTransaction.java
@@ -366,7 +366,7 @@ Object run(Callable fn) throws Exception{
}
for(Agent.Action action : actions)
{
-
Agent.dispatchAction(action);
+
Agent.dispatchAction(action, false);
}
}
}
--
Thoughts?
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to [email protected]
Note that posts from new members are moderated - please be patient with your
first post.
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en