Author: chirino
Date: Wed Feb 11 21:44:12 2009
New Revision: 743521
URL: http://svn.apache.org/viewvc?rev=743521&view=rev
Log:
Fixing the 10_1_1 p2p case
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=743521&r1=743520&r2=743521&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
Wed Feb 11 21:44:12 2009
@@ -225,14 +225,14 @@
if (okToAdd(elem)) {
ok = true;
if (limiter.add(elem)) {
- blockSource(sourceController);
setUnThrottleListener();
+ blockSource(sourceController);
}
} else {
// Add to overflow queue and block source:
overflowQueue.add(elem);
- blockSource(sourceController);
setUnThrottleListener();
+ blockSource(sourceController);
}
}
if (ok) {
@@ -315,8 +315,7 @@
waitForResume();
if (!blockedSources.contains(source)) {
- // System.out.println("BLOCKING : SINK["+this + "], SOURCE[" +
- // source+"]");
+// System.out.println("BLOCKING : SINK[" + this + "], SOURCE[" +
source + "]");
blockedSources.add(source);
source.onFlowBlock(this);
}
@@ -341,7 +340,7 @@
// If we've exceeded the the throttle threshold, register
// a listener so we can resume the blocked sources after
// the limiter falls below the threshold:
- if (!overflowQueue.isEmpty()) {
+ if (!overflowQueue.isEmpty() || limiter.getThrottled()) {
setUnThrottleListener();
} else if (notifyUnblock) {
mutex.notifyAll();
@@ -397,8 +396,7 @@
try {
Thread.currentThread().setName(name);
for (ISourceController<E> source : blockedSources) {
- // System.out.println("UNBLOCKING:
SINK["+FlowController.this
- // + "], SOURCE[" + source+"]");
+// System.out.println("UNBLOCKING: SINK[" +
FlowController.this + "], SOURCE[" + source + "]");
source.onFlowResume(FlowController.this);
}
for (FlowUnblockListener<E> listener :
unblockListeners) {
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=743521&r1=743520&r2=743521&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
Wed Feb 11 21:44:12 2009
@@ -75,14 +75,15 @@
boolean notify = false;
if (node.cursor == null) {
readyDirectSubs.addLast(node);
- // System.out.println("Subscription state change:
un-ready direct -> ready direct: "+node);
+// System.out.println("Subscription state change:
un-ready direct -> ready direct: "+node);
} else {
if (readyPollingSubs.isEmpty()) {
notify = !store.isEmpty();
}
readyPollingSubs.addLast(node);
- // System.out.println("Subscription state change:
un-ready polling -> ready polling: "+node);
+// System.out.println("Subscription state change:
un-ready polling -> ready polling: "+node);
}
+
if (notify) {
notifyReady();
}
@@ -169,7 +170,7 @@
sub.resumeAt(node);
unreadyPollingSubs.addLast(sub);
matchCount++;
- // System.out.println("Subscription state change:
un-ready direct -> un-ready polling: "+sub);
+// System.out.println("Subscription state change:
un-ready direct -> un-ready polling: "+sub);
}
sub = next;
}
@@ -181,7 +182,7 @@
subNode.unlink();
subNode.resumeAt(node);
unreadyPollingSubs.addLast(subNode);
- // System.out.println("Subscription state change: ready
direct -> un-ready polling: "+subNode);
+// System.out.println("Subscription state change: ready
direct -> un-ready polling: "+subNode);
}
matchCount += matches.size();
@@ -236,6 +237,8 @@
public boolean pollingDispatch() {
+// System.out.println("polling dispatch");
+
// Keep looping until we can find one subscription that we can
// dispatch a message to.
while (true) {
@@ -261,7 +264,7 @@
} else {
// Cursor dried up... this subscriber can now be direct
// dispatched.
- // System.out.println("Subscription state change:
ready polling -> ready direct: "+subNode);
+// System.out.println("Subscription state change: ready
polling -> ready direct: "+subNode);
subNode.unlink();
readyDirectSubs.addLast(subNode);
}
@@ -291,7 +294,7 @@
}
return true;
} else {
- // System.out.println("Subscription state change: ready
polling -> un-ready polling: "+subNode);
+// System.out.println("Subscription state change: ready
polling -> un-ready polling: "+subNode);
// Subscription is no longer ready..
subNode.cursorUnPeek(storeNode);
subNode.unlink();