Author: supun
Date: Sat Mar 13 06:22:14 2010
New Revision: 922486
URL: http://svn.apache.org/viewvc?rev=922486&view=rev
Log:
improving documentation and fixing a minor bug
Modified:
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java
Modified:
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
---
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java
(original)
+++
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java
Sat Mar 13 06:22:14 2010
@@ -408,11 +408,19 @@ public class MultiPriorityBlockingQueue<
}
public void clear() {
- while (true) {
- if (poll() == null) break;
- }
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ for (InternalQueue<E> intQueue : queues) {
+ intQueue.clear();
+ }
+ count = 0;
+ } finally {
+ lock.unlock();
+ }
}
+ @SuppressWarnings({"SuspiciousToArrayCall"})
public <T> T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
Modified:
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
---
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java
(original)
+++
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java
Sat Mar 13 06:22:14 2010
@@ -22,8 +22,8 @@ package org.apache.synapse.commons.execu
import java.util.List;
/**
- * This interface implements the algorith for determining the next internal
- * queue for picking up the message. This class is created onece and
initialized.
+ * This interface abstracts the algorith for determining the next internal
+ * queue for picking up the message. This class is created once and
initialized.
* This class should capture any runtime information about the queues since the
* MultiPriorityBlockingQueue doesn't hold any runtime state information about
* the queues.
Modified:
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
---
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java
(original)
+++
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java
Sat Mar 13 06:22:14 2010
@@ -26,10 +26,10 @@ import java.util.List;
*
* <p>This algorithm works in cycles. Lets say we have queues with following
priorities.
* 7, 5, 2 and assume we name the queues as 1, 2, 3 in the order. </p>
- * <p>Here is how messages are picked in a single cycle
- * 1, 1, 1, 1, 1, 1, 1 // all the messages for the queue with priority 1 are
sent for this cycle
- * 2, 2, 2, 2, 2, // all the messages for the queue with priority 2 are sent
for this cycle
- * 3, 3 // all the messages with priority 2 are sent for this cycle</p>
+ * <p>Here is how messages are picked in a single cycle </p>
+ * <p> 1, 1, 1, 1, 1, 1, 1 all the messages for the queue with priority 1 are
sent for this cycle
+ * 2, 2, 2, 2, 2, all the messages for the queue with priority 2 are sent for
this cycle
+ * 3, 3 all the messages with priority 2 are sent for this cycle</p>
*
* <p>This algorithm choose the queues in the above order if all the queues
have messages at the
* point of selection. If a queue doesn't have messages it will skip the queue
and move to the
@@ -37,25 +37,16 @@ import java.util.List;
*/
public class PRRNextQueueAlgorithm<E> implements NextQueueAlgorithm<E> {
- /**
- * We hold a reference to the actual queue
- */
+ /** Reference to the actual queue */
private List<InternalQueue<E>> queues;
- /**
- * Number of queues, this is just to avoid the overhead of calculating
- * this again and again
- */
+ /** Number of queues, we keep this to avoid the overhead of calculatin
this again and again */
private int size = 0;
- /**
- * Current queue we are operating on
- */
+ /** Current queue we are operating on */
private int currentQueue = 0;
- /**
- * Number of messages sent from the current queue
- */
+ /** Number of messages sent from the current queue */
private int currentCount = 0;
public InternalQueue<E> getNextQueue() {
@@ -80,6 +71,7 @@ public class PRRNextQueueAlgorithm<E> im
// we move forward until we find a non empty queue or
everything is empty
} while (internalQueue.size() == 0 && c < size);
+ // if we come to the initial queue, that means all the queues are
empty.
if (internalQueue.size() == 0) {
currentQueue = 0;
return null;
@@ -88,9 +80,6 @@ public class PRRNextQueueAlgorithm<E> im
currentCount++;
- /*log.info("Get the queue with the priority: " +
- internalQueue.getPriority());*/
-
return internalQueue;
}
Modified:
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
---
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java
(original)
+++
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java
Sat Mar 13 06:22:14 2010
@@ -47,6 +47,7 @@ public class PriorityExecutor {
private int max = ExecutorConstants.DEFAULT_MAX;
/** Keep alive time for spare threads */
private int keepAlive = ExecutorConstants.DEFAULT_KEEP_ALIVE;
+
/** This will be executed before the Task is submitted */
private BeforeExecuteHandler beforeExecuteHandler;
/** Queue used by the executor */
@@ -56,12 +57,14 @@ public class PriorityExecutor {
private String fileName;
/**
- * Execute a given task with the priority specified.
+ * Execute a given task with the priority specified. If the task throws an
exception,
+ * it will be captured and logged to prevent the threads from dying.
*
* @param task task to be executed
* @param priority priority of the tast
*/
public void execute(final Runnable task, int priority) {
+ // create a dummy worker to execute the task
Worker w = new Worker(task, priority);
if (beforeExecuteHandler != null) {
Modified:
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
---
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java
(original)
+++
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java
Sat Mar 13 06:22:14 2010
@@ -168,6 +168,14 @@ public class FixedSizeQueue<E> extends A
return capacity;
}
+ @Override
+ public boolean contains(Object o) {
+ for (E e : array) {
+ if (e.equals(o)) return true;
+ }
+ return false;
+ }
+
private int increment(int i) {
return (++i == array.length)? 0 : i;
}
Modified:
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
---
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java
(original)
+++
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java
Sat Mar 13 06:22:14 2010
@@ -59,11 +59,17 @@ public class UnboundedQueue<E> extends A
}
public E poll() {
- return elements.remove(elements.size() - 1);
+ if (elements.size() > 0) {
+ return elements.remove(elements.size() - 1);
+ }
+ return null;
}
public E peek() {
- return elements.get(elements.size() - 1);
+ if (elements.size() > 0) {
+ return elements.get(elements.size() - 1);
+ }
+ return null;
}
public int getPriority() {
@@ -105,4 +111,9 @@ public class UnboundedQueue<E> extends A
public int getCapacity() {
return Integer.MAX_VALUE;
}
+
+ @Override
+ public boolean contains(Object o) {
+ return elements.contains(o);
+ }
}