Author: nuttycom
Date: Tue Oct 10 14:21:02 2006
New Revision: 462575

Initial import. Stage implementations contributed by Steve Christensen.

   (with props)
   (with props)
   (with props)

 Tue Oct 10 14:21:02 2006
@@ -0,0 +1,372 @@
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.pipeline.driver;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Feeder;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.driver.AbstractStageDriver;
+import org.apache.commons.pipeline.driver.FaultTolerance;
+import static org.apache.commons.pipeline.StageDriver.State.*;
+import static org.apache.commons.pipeline.driver.FaultTolerance.*;
+ * This is a very simple implementation of a AbstractStageDriver which spawns
+ * a single thread to process a stage.
+ */
+public class ThreadPoolStageDriver extends AbstractStageDriver {
+    private final Log log = LogFactory.getLog(ThreadPoolStageDriver.class);
+    //wait timeout to ensure deadlock cannot occur on thread termination
+    private long timeout;
+    //flag describing whether or not the driver is fault tolerant
+    private FaultTolerance faultTolerance = FaultTolerance.NONE;
+    //signal telling threads to start polling queue
+    final private CountDownLatch startSignal;
+    //signal threads use to tell driver they have finished
+    final private CountDownLatch doneSignal;
+    // number of threads polling queue
+    private int numThreads = 1;
+    //queue to hold data to be processed
+    private BlockingQueue queue;
+    //current state of thread processing
+    private volatile State currentState = State.STOPPED;
+    //feeder used to feed data to this stage's queue
+    private final Feeder feeder = new Feeder() {
+        public void feed(Object obj) {
+            if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage 
" + stage
+                    + " (" + 
ThreadPoolStageDriver.this.queue.remainingCapacity() + " available slots in 
+            try {
+                ThreadPoolStageDriver.this.queue.put(obj);
+            } catch (InterruptedException e) {
+                throw new IllegalStateException("Unexpected interrupt while 
waiting for space to become available for object "
+                        + obj + " in queue for stage " + stage, e);
+            }
+            synchronized(ThreadPoolStageDriver.this) {
+                ThreadPoolStageDriver.this.notifyAll();
+            }
+        }
+    };
+    /**
+     * Creates a new ThreadPoolStageDriver with the specified thread wait
+     * timeout and fault tolerance values.
+     * @param stage The stage that the driver will run
+     * @param context the context in which to run the stage
+     * @param queue The object queue to use for storing objects prior to 
processing. The
+     * default is [EMAIL PROTECTED] LinkedBlockingQueue}
+     * @param timeout The amount of time, in milliseconds, that the worker 
+     * will wait before checking the processing state if no objects are 
+     * in the thread's queue.
+     * @param faultTolerance Flag determining the behavior of the driver when
+     * an error is encountered in execution of [EMAIL PROTECTED] 
+     * If this is set to false, any exception thrown during [EMAIL PROTECTED] 
+     * will cause the worker thread to halt without executing [EMAIL 
PROTECTED] Stage#postprocess()}
+     * ([EMAIL PROTECTED] Stage#release()} will be called.)
+     * @param numThreads Number of threads that will be simultaneously reading 
from queue
+     */
+    public ThreadPoolStageDriver(Stage stage, StageContext context,
+            BlockingQueue queue,
+            long timeout,
+            FaultTolerance faultTolerance,
+            int numThreads) {
+        super(stage, context);
+        this.numThreads = numThreads;
+        this.startSignal = new CountDownLatch(1);
+        this.doneSignal = new CountDownLatch(this.numThreads);
+        this.queue = queue;
+        this.timeout = timeout;
+        this.faultTolerance = faultTolerance;
+    }
+    /**
+     * Return the Feeder used to feed data to the queue of objects to be 
+     * @return The feeder for objects processed by this driver's stage.
+     */
+    public Feeder getFeeder() {
+        return this.feeder;
+    }
+    /**
+     * Start the processing of the stage. Creates threads to poll items
+     * from queue.
+     * @throws org.apache.commons.pipeline.StageException Thrown if the driver 
is in an illegal state during startup
+     */
+    public synchronized void start() throws StageException {
+        if (this.currentState == STOPPED) {
+            setState(STARTED);
+            if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage 
+ "...");
+            stage.preprocess();
+            if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + 
stage + " complete.");
+            log.debug("Starting worker threads for stage " + stage + ".");
+            for (int i=0;i<this.numThreads;i++) {
+                new LatchWorkerThread(i).start();
+            }
+            // let threads know they can start
+            testAndSetState(STARTED, RUNNING);
+            startSignal.countDown();
+            log.debug("Worker threads for stage " + stage + " started.");
+            //wait to ensure that the stage starts up correctly
+            try {
+                while ( !(this.currentState == RUNNING || this.currentState == 
ERROR) ) this.wait();
+            } catch (InterruptedException e) {
+                throw new StageException(this.getStage(), "Worker thread 
unexpectedly interrupted while waiting for thread startup.", e);
+            }
+        } else {
+            throw new IllegalStateException("Attempt to start driver in state 
" + this.currentState);
+        }
+    }
+    /**
+     * Causes processing to shut down gracefully. Waits until all worker 
+     * have completed.
+     * @throws org.apache.commons.pipeline.StageException Thrown if the driver 
is in an illegal state for shutdown.
+     */
+    public synchronized void finish() throws StageException {
+        if (currentState == STOPPED) {
+            throw new IllegalStateException("The driver is not currently 
+        }
+        try {
+            while ( !(this.currentState == RUNNING || this.currentState == 
ERROR) ) this.wait();
+            //ask the worker threads to shut down
+            testAndSetState(RUNNING, STOP_REQUESTED);
+            if (log.isDebugEnabled()) log.debug("Waiting for worker threads to 
stop for stage " + stage + ".");
+            doneSignal.await();
+            if (log.isDebugEnabled()) log.debug("Postprocessing stage " + 
stage + "...");
+            ThreadPoolStageDriver.this.stage.postprocess();
+            if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + 
stage + " complete.");
+            //do not transition into finished state if an error has occurred
+            testAndSetState(STOP_REQUESTED, FINISHED);
+            while ( !(this.currentState == FINISHED || this.currentState == 
ERROR) ) this.wait();
+            log.debug("Worker threads for stage " + stage + " halted");
+        } catch (StageException e) {
+            log.error("An error occurred during postprocess for stage " + 
stage , e);
+            recordFatalError(e);
+            setState(ERROR);
+        } catch (InterruptedException e) {
+            throw new StageException(this.getStage(), "StageDriver 
unexpectedly interrupted while waiting for shutdown of worker threads.", e);
+        } finally {
+            if (log.isDebugEnabled()) log.debug("Releasing resources for stage 
" + stage + "...");
+            stage.release();
+            if (log.isDebugEnabled()) log.debug("Stage " + stage + " 
+        }
+        setState(STOPPED);
+    }
+    /**
+     * Return the current state of stage processing.
+     * @return the current state of processing
+     */
+    public StageDriver.State getState() {
+        return this.currentState;
+    }
+    /**
+     * Atomically tests to determine whether or not the driver is in the one of
+     * the specified states.
+     */
+    private synchronized boolean isInState(State... states) {
+        for (State state : states) if (state == currentState) return true;
+        return false;
+    }
+    /**
+     * Set the current state of stage processing and notify any listeners
+     * that may be waiting on a state change.
+     */
+    private synchronized void setState(State nextState) {
+        if (log.isDebugEnabled()) log.debug("State change for " + stage + ": " 
+ this.currentState + " -> " + nextState);
+        this.currentState = nextState;
+        this.notifyAll();
+    }
+    /**
+     * This method performs an atomic conditional state transition change
+     * to the value specified by the nextState parameter if and only if the
+     * current state is equal to the test state.
+     */
+    private synchronized boolean testAndSetState(State testState, State 
nextState) {
+        if (currentState == testState) {
+            setState(nextState);
+            return true;
+        } else {
+            return false;
+        }
+    }
+    /**
+     * Sets the failure tolerance flag for the worker thread. If faultTolerance
+     * is set to CHECKED, [EMAIL PROTECTED] StageException StageException}s 
thrown by
+     * the [EMAIL PROTECTED] Stage#process(Object)} method will not interrupt 
+     * processing, but will simply be logged with a severity of ERROR.
+     * If faultTolerance is set to ALL, runtime exceptions will also be
+     * logged and otherwise ignored.
+     * @param faultTolerance the flag value
+     */
+    public final void setFaultTolerance(String faultTolerance) {
+        this.faultTolerance = FaultTolerance.valueOf(faultTolerance);
+    }
+    /**
+     * Sets the failure tolerance flag for the worker thread. If faultTolerance
+     * is set to CHECKED, [EMAIL PROTECTED] StageException StageException}s 
thrown by
+     * the [EMAIL PROTECTED] Stage#process(Object)} method will not interrupt 
+     * processing, but will simply be logged with a severity of ERROR.
+     * If faultTolerance is set to ALL, runtime exceptions will also be
+     * logged and otherwise ignored.
+     * @param faultTolerance the flag value
+     */
+    public final void setFaultTolerance(FaultTolerance faultTolerance) {
+        this.faultTolerance = faultTolerance;
+    }
+    /**
+     * Getter for property faultTolerant.
+     * @return Value of property faultTolerant.
+     */
+    public FaultTolerance getFaultTolerance() {
+        return this.faultTolerance;
+    }
+    /*********************************
+     *********************************/
+    private UncaughtExceptionHandler workerThreadExceptionHandler = new 
UncaughtExceptionHandler() {
+        public void uncaughtException(Thread t, Throwable e) {
+            setState(ERROR);
+            recordFatalError(e);
+            log.error("Uncaught exception in stage " + stage, e);
+        }
+    };
+    /**
+     * This worker thread removes and processes data objects from the incoming
+     * synchronize queue. It calls the Stage's process() method to process data
+     * from the queue. This loop runs until State has changed to
+     * STOP_REQUESTED. To break the loop the calling code must run the writer's
+     * finish() method to set the running property to false.
+     *
+     * @throws StageException if an error is encountered during data processing
+     * and faultTolerant is set to false.
+     */
+    private class LatchWorkerThread extends Thread {
+        final int threadID;
+        LatchWorkerThread(int threadID) {
+            this.setUncaughtExceptionHandler(workerThreadExceptionHandler);
+            this.threadID = threadID;
+        }        
+        public final void run() {
+            try {
+                ThreadPoolStageDriver.this.startSignal.await();
+                //do not transition into running state if an error has 
occurred or a stop requested
+                running: while (currentState != ERROR) {
+                    try {
+                        Object obj = queue.poll(timeout, 
+                        if (obj == null) {
+                            if (currentState == STOP_REQUESTED) break running;
+                        } else {
+                            try {
+                                stage.process(obj);
+                            } catch (StageException e) {
+                                recordProcessingException(obj, e);
+                                if (faultTolerance == NONE) throw e;
+                            } catch (RuntimeException e) {
+                                recordProcessingException(obj, e);
+                                if (faultTolerance == CHECKED || 
faultTolerance == NONE) throw e;
+                            }
+                        }
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException("Worker thread " + 
this.threadID + " unexpectedly interrupted while waiting on data for stage " + 
stage, e);
+                    }
+                }
+                if (log.isDebugEnabled()) log.debug("Stage " + stage + " 
(threadID: " + this.threadID + ") exited running state.");
+            } catch (StageException e) {
+                log.error("An error occurred in the stage " + stage + " 
(threadID: " + this.threadID + ")", e);
+                recordFatalError(e);
+                setState(ERROR);
+            } catch (InterruptedException e) {
+                log.error("Stage " + stage + " (threadID: " + threadID + ") 
interrupted while waiting for barrier",e);
+                recordFatalError(e);
+                setState(ERROR);
+            } finally {
+                doneSignal.countDown();
+                synchronized (ThreadPoolStageDriver.this) {
+                    ThreadPoolStageDriver.this.notifyAll();
+                }
+            }
+        }
+    }
+    /**
+     * Get the size of the queue used by this StageDriver.
+     * @return the queue capacity
+     */
+    public int getQueueSize() {
+        return this.queue.size() + this.queue.remainingCapacity();
+    }
+    /**
+     * Get the timeout value (in milliseconds) used by this StageDriver on
+     * thread termination.
+     * @return the timeout setting in milliseconds
+     */
+    public long getTimeout() {
+        return this.timeout;
+    }
+    /**
+     * Returns the number of threads allocated to the thread pool.
+     */
+    public int getNumThreads() {
+        return numThreads;
+    }

    svn:eol-style = native

 Tue Oct 10 14:21:02 2006
@@ -0,0 +1,146 @@
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.pipeline.driver;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageDriverFactory;
+ * This factory is used to create ThreadPoolStageDriver instances configured
+ * to run specific stages.
+ */
+public class ThreadPoolStageDriverFactory implements StageDriverFactory {
+    private int numThreads = 1;
+    /** Creates a new instance of ThreadPoolStageDriverFactory */
+    public ThreadPoolStageDriverFactory() {
+    }
+    /**
+     * Creates the new [EMAIL PROTECTED] ThreadPoolStageDriver} based upon the 
+     * of this factory instance
+     * @param stage The stage to be run by the newly created driver
+     * @param context The context in which the stage will be run
+     * @return the newly created driver
+     */
+    public StageDriver createStageDriver(Stage stage, StageContext context) {
+        try {
+            return new ThreadPoolStageDriver(stage, context, 
queueClass.newInstance(), timeout, faultTolerance, numThreads);
+        } catch (Exception e) {
+            throw new IllegalStateException("Instantiation of driver failed 
due to illegal factory state.", e);
+        }
+    }
+    /**
+     * Holds value of property queueClass.
+     */
+    private Class<? extends BlockingQueue> queueClass = 
+    /**
+     * Getter for property queueClass.
+     * @return Value of property queueClass.
+     */
+    public Class<? extends BlockingQueue> getQueueClass() {
+        return this.queueClass;
+    }
+    /**
+     * Setter for property queueClass.
+     * @param queueClass New value of property queueClass.
+     */
+    public void setQueueClass(Class<? extends BlockingQueue> queueClass) {
+        if (queueClass == null) throw new IllegalArgumentException("Queue 
class may not be null.");
+        this.queueClass = queueClass;
+    }
+    /**
+     * Holds value of property timeout.
+     */
+    private long timeout = 500;
+    /**
+     * Timeout for wait to ensure deadlock cannot occur on thread termination.
+     * Default is 500
+     * @return Value of property timeout.
+     */
+    public long getTimeout() {
+        return this.timeout;
+    }
+    /**
+     * Setter for property timeout.
+     * @param timeout New value of property timeout.
+     */
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+    /**
+     * Holds value of property faultTolerance.
+     */
+    private FaultTolerance faultTolerance = FaultTolerance.NONE;
+    /**
+     * Getter for property faultTolerance. See [EMAIL PROTECTED] 
FaultTolerance} for valid values
+     * and enumation meanings.
+     * @return Value of property faultTolerance.
+     */
+    public FaultTolerance getFaultTolerance() {
+        return this.faultTolerance;
+    }
+    /**
+     * Setter for property faultTolerance.
+     *
+     * @param faultTolerance New value of property faultTolerance.
+     */
+    public void setFaultTolerance(FaultTolerance faultTolerance) {
+        this.faultTolerance = faultTolerance;
+    }
+    /**
+     * Convenience setter for property faultTolerance for use by Digester.
+     *
+     * @param level New value of property level ("ALL","CHECKED", or "NONE").
+     */
+    public void setFaultToleranceLevel(String level) {
+        this.faultTolerance = FaultTolerance.valueOf(level);
+    }
+    /**
+     * Returns the number of threads that will be allocated to the thread
+     * pool of a driver created by this factory.
+     */
+    public int getNumThreads() {
+        return numThreads;
+    }
+    /**
+     * Sets the number of threads that will be allocated to the thread
+     * pool of a driver created by this factory.
+     */
+    public void setNumThreads(int numThreads) {
+        this.numThreads = numThreads;
+    }

    svn:eol-style = native

 Tue Oct 10 14:21:02 2006
@@ -0,0 +1,98 @@
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.pipeline.driver;
+import junit.framework.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Feeder;
+import org.apache.commons.pipeline.StageDriver.State;
+import static org.apache.commons.pipeline.StageDriver.State.*;
+ *
+ *
+ */
+public class ThreadPoolStageDriverTest extends AbstractStageDriverTest {
+    private Log log;
+    public ThreadPoolStageDriverTest(String testName) {
+        super(testName);
+        this.log = LogFactory.getLog(ThreadPoolStageDriverTest.class);
+    }
+    public static Test suite() {
+        TestSuite suite = new TestSuite(ThreadPoolStageDriverTest.class);
+        return suite;
+    }
+        /**
+     * Test of getFeeder method, of class 
+     */
+    public void testGetFeeder() {
+        log.debug("testGetFeeder 
+        ThreadPoolStageDriver instance = new ThreadPoolStageDriver(stage, 
context, new LinkedBlockingQueue(), 500, FaultTolerance.NONE, 5);
+        Feeder feeder = instance.getFeeder();
+        assertNotNull(feeder);        
+    }
+    /**
+     * Due to the design of the ThreadPoolStageDriver, it is meaningless
+     * to independently test the start or finish methods; however, testing 
+     * both together is meaningful. This test also provides verification of
+     * proper behavior of the getState() method.
+     */
+    public void testStartFinish() throws Exception {
+        log.debug("testStartFinish 
+        ThreadPoolStageDriver instance = new ThreadPoolStageDriver(stage, 
context, new LinkedBlockingQueue(), 500, FaultTolerance.NONE, 5);
+        assertEquals(State.STOPPED, instance.getState());
+        instance.start();
+        assertTrue(instance.getState() == State.STARTED || instance.getState() 
== State.RUNNING);
+        instance.finish();
+        assertEquals(State.STOPPED, instance.getState());
+    }
+    /*********************
+     *********************/
+    public void testSingleStage() throws Exception {        
+        log.debug("testSingleStage 
+        StageDriverTestUtils.testSingleStage(this, new 
+    }
+    public void testMultiStage() throws Exception {        
+        log.debug("testMultiStage 
+        StageDriverTestUtils.testMultiStage(this, new 
+    }
+    public void testMultiFaultingStage() throws Exception {       
+        log.debug("testMultiFaultingStage 
+        ThreadPoolStageDriverFactory factory = new 
+        factory.setFaultTolerance(FaultTolerance.CHECKED);
+        StageDriverTestUtils.testMultiFaultingStage(this, factory);
+    }

    svn:eol-style = native

To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to