Oh, very cool. We're using your latest version now, works great when using the DedicatedThread factory but it's running out of memory even when using only threads=1 for the ThreadPool factory. Any ideas?
thanks, eric On Wed, Nov 08, 2006 at 11:01:11AM -0700, Kris Nuttycombe wrote: > Hi, Eric, > > Thanks a lot for the patch. As it happens, we just ran into a similar > problem last week and I have added a slightly different but equivalent > solution to the most recent snapshot. Instead of specifying a queue > class, I have now added a QueueFactory interface and implementations for > all of the standard jdk5 queues. This allows you to generate any type of > bounded queue by specifying the appropriate factory and setting the > parameters for the constructor as properties for that factory. Here is > an example of the digester config for DedicatedThreadStageDriverFactory > with the new queue factory: > > <driverFactory > className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" > > id="f1"> > <property propName="queueFactory" > className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory" > capacity="10" fair="false"/> > </driverFactory> > > Thanks again! > > Kris > > Eric wrote: > >here's a patch that adds a "capacity" property to the queue factory so > >that you can limit pileups to less than the default Integer.MAX_SIZE > > > >thanks for pipeline, > >eric > > > > > >------------------------------------------------------------------------ > > > >Index: pipeline/src/driver/DedicatedThreadStageDriverFactory.java > >=================================================================== > >--- pipeline/src/driver/DedicatedThreadStageDriverFactory.java (revision > >384) > >+++ pipeline/src/driver/DedicatedThreadStageDriverFactory.java (working > >copy) > >@@ -16,6 +16,8 @@ > > > > package org.apache.commons.pipeline.driver; > > > >+import java.lang.reflect.*; > >+ > > import java.util.concurrent.BlockingQueue; > > import java.util.concurrent.LinkedBlockingQueue; > > import org.apache.commons.pipeline.Stage; > >@@ -29,7 +31,8 @@ > > * > > */ > > public class DedicatedThreadStageDriverFactory implements > > StageDriverFactory { > >- > >+ > >+ > > /** Creates a new instance of DedicatedThreadStageDriverFactory */ > > public DedicatedThreadStageDriverFactory() { > > } > >@@ -43,7 +46,8 @@ > > */ > > public StageDriver createStageDriver(Stage stage, StageContext > > context) { > > try { > >- return new DedicatedThreadStageDriver(stage, context, > >queueClass.newInstance(), timeout, faultTolerance); > >+ Constructor queueClassConstructor = > >queueClass.getConstructor(new Class[] {int.class}); > >+ return new DedicatedThreadStageDriver(stage, context, > >(BlockingQueue) queueClassConstructor.newInstance(new Object[]{new > >Integer(capacity)}), timeout, faultTolerance); > > } catch (Exception e) { > > throw new IllegalStateException("Instantiation of driver > > failed due to illegal factory state.", e); > > } > >@@ -70,8 +74,30 @@ > > if (queueClass == null) throw new IllegalArgumentException("Queue > > class may not be null."); > > this.queueClass = queueClass; > > } > >+ > >+ /** > >+ * Holds value of property timeout. > >+ */ > >+ private int capacity = Integer.MAX_VALUE ; > > > > /** > >+ * Que capacity before blocking. > >+ * Default is Integer.MAX_VALUE > >+ * @return Value of property timeout. > >+ */ > >+ public int getCapacity() { > >+ return this.capacity; > >+ } > >+ > >+ /** > >+ * Setter for property capacity. > >+ * @param capacity New value of property capacity. > >+ */ > >+ public void setCapacity(int capacity) { > >+ this.capacity = capacity; > >+ } > >+ > >+ /** > > * Holds value of property timeout. > > */ > > private long timeout = 500; > > -- http://ir.iit.edu/~ej
pgppVLDMPtBiS.pgp
Description: PGP signature