Hi, Eric,

Thanks a lot for the patch. As it happens, we just ran into a similar problem last week and I have added a slightly different but equivalent solution to the most recent snapshot. Instead of specifying a queue class, I have now added a QueueFactory interface and implementations for all of the standard jdk5 queues. This allows you to generate any type of bounded queue by specifying the appropriate factory and setting the parameters for the constructor as properties for that factory. Here is an example of the digester config for DedicatedThreadStageDriverFactory with the new queue factory:

<driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="f1"> <property propName="queueFactory" className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
               capacity="10" fair="false"/>
 </driverFactory>

Thanks again!

Kris

Eric wrote:
here's a patch that adds a "capacity" property to the queue factory so
that you can limit pileups to less than the default Integer.MAX_SIZE

thanks for pipeline,
eric

------------------------------------------------------------------------

Index: pipeline/src/driver/DedicatedThreadStageDriverFactory.java
===================================================================
--- pipeline/src/driver/DedicatedThreadStageDriverFactory.java  (revision 384)
+++ pipeline/src/driver/DedicatedThreadStageDriverFactory.java  (working copy)
@@ -16,6 +16,8 @@
package org.apache.commons.pipeline.driver; +import java.lang.reflect.*;
+
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.commons.pipeline.Stage;
@@ -29,7 +31,8 @@
  *
  */
 public class DedicatedThreadStageDriverFactory implements StageDriverFactory {
- + +
     /** Creates a new instance of DedicatedThreadStageDriverFactory */
     public DedicatedThreadStageDriverFactory() {
     }
@@ -43,7 +46,8 @@
      */
     public StageDriver createStageDriver(Stage stage, StageContext context) {
         try {
-            return new DedicatedThreadStageDriver(stage, context, 
queueClass.newInstance(), timeout, faultTolerance);
+                       Constructor queueClassConstructor = 
queueClass.getConstructor(new Class[] {int.class});
+            return new DedicatedThreadStageDriver(stage, context, 
(BlockingQueue) queueClassConstructor.newInstance(new Object[]{new 
Integer(capacity)}), timeout, faultTolerance);
         } catch (Exception e) {
             throw new IllegalStateException("Instantiation of driver failed due to 
illegal factory state.", e);
         }
@@ -70,8 +74,30 @@
         if (queueClass == null) throw new IllegalArgumentException("Queue class may 
not be null.");
         this.queueClass = queueClass;
     }
+
+    /**
+     * Holds value of property timeout.
+     */
+    private int capacity = Integer.MAX_VALUE ;
/**
+     * Que capacity before blocking.
+     * Default is Integer.MAX_VALUE
+     * @return Value of property timeout.
+     */
+    public int getCapacity() {
+        return this.capacity;
+    }
+ + /**
+     * Setter for property capacity.
+     * @param capacity New value of property capacity.
+     */
+    public void setCapacity(int capacity) {
+        this.capacity = capacity;
+    }
+ + /**
      * Holds value of property timeout.
      */
     private long timeout = 500;

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to