Hi, Eric,
Thanks a lot for the patch. As it happens, we just ran into a similar
problem last week and I have added a slightly different but equivalent
solution to the most recent snapshot. Instead of specifying a queue
class, I have now added a QueueFactory interface and implementations for
all of the standard jdk5 queues. This allows you to generate any type of
bounded queue by specifying the appropriate factory and setting the
parameters for the constructor as properties for that factory. Here is
an example of the digester config for DedicatedThreadStageDriverFactory
with the new queue factory:
<driverFactory
className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"
id="f1">
<property propName="queueFactory"
className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
capacity="10" fair="false"/>
</driverFactory>
Thanks again!
Kris
Eric wrote:
here's a patch that adds a "capacity" property to the queue factory so
that you can limit pileups to less than the default Integer.MAX_SIZE
thanks for pipeline,
eric
------------------------------------------------------------------------
Index: pipeline/src/driver/DedicatedThreadStageDriverFactory.java
===================================================================
--- pipeline/src/driver/DedicatedThreadStageDriverFactory.java (revision 384)
+++ pipeline/src/driver/DedicatedThreadStageDriverFactory.java (working copy)
@@ -16,6 +16,8 @@
package org.apache.commons.pipeline.driver;
+import java.lang.reflect.*;
+
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.pipeline.Stage;
@@ -29,7 +31,8 @@
*
*/
public class DedicatedThreadStageDriverFactory implements StageDriverFactory {
-
+
+
/** Creates a new instance of DedicatedThreadStageDriverFactory */
public DedicatedThreadStageDriverFactory() {
}
@@ -43,7 +46,8 @@
*/
public StageDriver createStageDriver(Stage stage, StageContext context) {
try {
- return new DedicatedThreadStageDriver(stage, context,
queueClass.newInstance(), timeout, faultTolerance);
+ Constructor queueClassConstructor =
queueClass.getConstructor(new Class[] {int.class});
+ return new DedicatedThreadStageDriver(stage, context,
(BlockingQueue) queueClassConstructor.newInstance(new Object[]{new
Integer(capacity)}), timeout, faultTolerance);
} catch (Exception e) {
throw new IllegalStateException("Instantiation of driver failed due to
illegal factory state.", e);
}
@@ -70,8 +74,30 @@
if (queueClass == null) throw new IllegalArgumentException("Queue class may
not be null.");
this.queueClass = queueClass;
}
+
+ /**
+ * Holds value of property timeout.
+ */
+ private int capacity = Integer.MAX_VALUE ;
/**
+ * Que capacity before blocking.
+ * Default is Integer.MAX_VALUE
+ * @return Value of property timeout.
+ */
+ public int getCapacity() {
+ return this.capacity;
+ }
+
+ /**
+ * Setter for property capacity.
+ * @param capacity New value of property capacity.
+ */
+ public void setCapacity(int capacity) {
+ this.capacity = capacity;
+ }
+
+ /**
* Holds value of property timeout.
*/
private long timeout = 500;
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]