Author: nuttycom Date: Tue Oct 10 14:21:02 2006 New Revision: 462575 URL: http://svn.apache.org/viewvc?view=rev&rev=462575 Log: Initial import. Stage implementations contributed by Steve Christensen.
Added: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java (with props) jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverFactory.java (with props) jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverTest.java (with props) Added: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java?view=auto&rev=462575 ============================================================================== --- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java (added) +++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java Tue Oct 10 14:21:02 2006 @@ -0,0 +1,372 @@ +/* + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.pipeline.driver; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.pipeline.Feeder; +import org.apache.commons.pipeline.StageDriver; +import org.apache.commons.pipeline.Stage; +import org.apache.commons.pipeline.StageContext; +import org.apache.commons.pipeline.StageException; +import org.apache.commons.pipeline.driver.AbstractStageDriver; +import org.apache.commons.pipeline.driver.FaultTolerance; + +import static org.apache.commons.pipeline.StageDriver.State.*; +import static org.apache.commons.pipeline.driver.FaultTolerance.*; + +/** + * This is a very simple implementation of a AbstractStageDriver which spawns + * a single thread to process a stage. + */ +public class ThreadPoolStageDriver extends AbstractStageDriver { + private final Log log = LogFactory.getLog(ThreadPoolStageDriver.class); + + //wait timeout to ensure deadlock cannot occur on thread termination + private long timeout; + + //flag describing whether or not the driver is fault tolerant + private FaultTolerance faultTolerance = FaultTolerance.NONE; + + //signal telling threads to start polling queue + final private CountDownLatch startSignal; + + //signal threads use to tell driver they have finished + final private CountDownLatch doneSignal; + + // number of threads polling queue + private int numThreads = 1; + + //queue to hold data to be processed + private BlockingQueue queue; + + //current state of thread processing + private volatile State currentState = State.STOPPED; + + //feeder used to feed data to this stage's queue + private final Feeder feeder = new Feeder() { + public void feed(Object obj) { + if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage + + " (" + ThreadPoolStageDriver.this.queue.remainingCapacity() + " available slots in queue)"); + try { + ThreadPoolStageDriver.this.queue.put(obj); + } catch (InterruptedException e) { + throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object " + + obj + " in queue for stage " + stage, e); + } + synchronized(ThreadPoolStageDriver.this) { + ThreadPoolStageDriver.this.notifyAll(); + } + } + }; + + /** + * Creates a new ThreadPoolStageDriver with the specified thread wait + * timeout and fault tolerance values. + * @param stage The stage that the driver will run + * @param context the context in which to run the stage + * @param queue The object queue to use for storing objects prior to processing. The + * default is [EMAIL PROTECTED] LinkedBlockingQueue} + * @param timeout The amount of time, in milliseconds, that the worker thread + * will wait before checking the processing state if no objects are available + * in the thread's queue. + * @param faultTolerance Flag determining the behavior of the driver when + * an error is encountered in execution of [EMAIL PROTECTED] Stage#process(Object)}. + * If this is set to false, any exception thrown during [EMAIL PROTECTED] Stage#process(Object)} + * will cause the worker thread to halt without executing [EMAIL PROTECTED] Stage#postprocess()} + * ([EMAIL PROTECTED] Stage#release()} will be called.) + * @param numThreads Number of threads that will be simultaneously reading from queue + */ + public ThreadPoolStageDriver(Stage stage, StageContext context, + BlockingQueue queue, + long timeout, + FaultTolerance faultTolerance, + int numThreads) { + super(stage, context); + this.numThreads = numThreads; + + this.startSignal = new CountDownLatch(1); + this.doneSignal = new CountDownLatch(this.numThreads); + + this.queue = queue; + this.timeout = timeout; + this.faultTolerance = faultTolerance; + } + + /** + * Return the Feeder used to feed data to the queue of objects to be processed. + * @return The feeder for objects processed by this driver's stage. + */ + public Feeder getFeeder() { + return this.feeder; + } + + /** + * Start the processing of the stage. Creates threads to poll items + * from queue. + * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state during startup + */ + public synchronized void start() throws StageException { + if (this.currentState == STOPPED) { + setState(STARTED); + + if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "..."); + stage.preprocess(); + if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete."); + + log.debug("Starting worker threads for stage " + stage + "."); + + for (int i=0;i<this.numThreads;i++) { + new LatchWorkerThread(i).start(); + } + + // let threads know they can start + testAndSetState(STARTED, RUNNING); + startSignal.countDown(); + + log.debug("Worker threads for stage " + stage + " started."); + + //wait to ensure that the stage starts up correctly + try { + while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait(); + } catch (InterruptedException e) { + throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for thread startup.", e); + } + } else { + throw new IllegalStateException("Attempt to start driver in state " + this.currentState); + } + } + + /** + * Causes processing to shut down gracefully. Waits until all worker threads + * have completed. + * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state for shutdown. + */ + public synchronized void finish() throws StageException { + if (currentState == STOPPED) { + throw new IllegalStateException("The driver is not currently running."); + } + + try { + while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait(); + + //ask the worker threads to shut down + testAndSetState(RUNNING, STOP_REQUESTED); + + if (log.isDebugEnabled()) log.debug("Waiting for worker threads to stop for stage " + stage + "."); + doneSignal.await(); + + if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "..."); + ThreadPoolStageDriver.this.stage.postprocess(); + if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete."); + + //do not transition into finished state if an error has occurred + testAndSetState(STOP_REQUESTED, FINISHED); + + while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait(); + + log.debug("Worker threads for stage " + stage + " halted"); + } catch (StageException e) { + log.error("An error occurred during postprocess for stage " + stage , e); + recordFatalError(e); + setState(ERROR); + } catch (InterruptedException e) { + throw new StageException(this.getStage(), "StageDriver unexpectedly interrupted while waiting for shutdown of worker threads.", e); + } finally { + if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "..."); + stage.release(); + if (log.isDebugEnabled()) log.debug("Stage " + stage + " released."); + } + + setState(STOPPED); + } + + /** + * Return the current state of stage processing. + * @return the current state of processing + */ + public StageDriver.State getState() { + return this.currentState; + } + + /** + * Atomically tests to determine whether or not the driver is in the one of + * the specified states. + */ + private synchronized boolean isInState(State... states) { + for (State state : states) if (state == currentState) return true; + return false; + } + + /** + * Set the current state of stage processing and notify any listeners + * that may be waiting on a state change. + */ + private synchronized void setState(State nextState) { + if (log.isDebugEnabled()) log.debug("State change for " + stage + ": " + this.currentState + " -> " + nextState); + this.currentState = nextState; + this.notifyAll(); + } + + /** + * This method performs an atomic conditional state transition change + * to the value specified by the nextState parameter if and only if the + * current state is equal to the test state. + */ + private synchronized boolean testAndSetState(State testState, State nextState) { + if (currentState == testState) { + setState(nextState); + return true; + } else { + return false; + } + } + + /** + * Sets the failure tolerance flag for the worker thread. If faultTolerance + * is set to CHECKED, [EMAIL PROTECTED] StageException StageException}s thrown by + * the [EMAIL PROTECTED] Stage#process(Object)} method will not interrupt queue + * processing, but will simply be logged with a severity of ERROR. + * If faultTolerance is set to ALL, runtime exceptions will also be + * logged and otherwise ignored. + * @param faultTolerance the flag value + */ + public final void setFaultTolerance(String faultTolerance) { + this.faultTolerance = FaultTolerance.valueOf(faultTolerance); + } + + /** + * Sets the failure tolerance flag for the worker thread. If faultTolerance + * is set to CHECKED, [EMAIL PROTECTED] StageException StageException}s thrown by + * the [EMAIL PROTECTED] Stage#process(Object)} method will not interrupt queue + * processing, but will simply be logged with a severity of ERROR. + * If faultTolerance is set to ALL, runtime exceptions will also be + * logged and otherwise ignored. + * @param faultTolerance the flag value + */ + public final void setFaultTolerance(FaultTolerance faultTolerance) { + this.faultTolerance = faultTolerance; + } + + /** + * Getter for property faultTolerant. + * @return Value of property faultTolerant. + */ + public FaultTolerance getFaultTolerance() { + return this.faultTolerance; + } + + /********************************* + * WORKER THREAD IMPLEMENTATIONS * + *********************************/ + private UncaughtExceptionHandler workerThreadExceptionHandler = new UncaughtExceptionHandler() { + public void uncaughtException(Thread t, Throwable e) { + setState(ERROR); + recordFatalError(e); + log.error("Uncaught exception in stage " + stage, e); + } + }; + + /** + * This worker thread removes and processes data objects from the incoming + * synchronize queue. It calls the Stage's process() method to process data + * from the queue. This loop runs until State has changed to + * STOP_REQUESTED. To break the loop the calling code must run the writer's + * finish() method to set the running property to false. + * + * @throws StageException if an error is encountered during data processing + * and faultTolerant is set to false. + */ + private class LatchWorkerThread extends Thread { + final int threadID; + + LatchWorkerThread(int threadID) { + this.setUncaughtExceptionHandler(workerThreadExceptionHandler); + this.threadID = threadID; + } + + public final void run() { + try { + ThreadPoolStageDriver.this.startSignal.await(); + //do not transition into running state if an error has occurred or a stop requested + running: while (currentState != ERROR) { + try { + Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS); + if (obj == null) { + if (currentState == STOP_REQUESTED) break running; + } else { + try { + stage.process(obj); + } catch (StageException e) { + recordProcessingException(obj, e); + if (faultTolerance == NONE) throw e; + } catch (RuntimeException e) { + recordProcessingException(obj, e); + if (faultTolerance == CHECKED || faultTolerance == NONE) throw e; + } + } + } catch (InterruptedException e) { + throw new RuntimeException("Worker thread " + this.threadID + " unexpectedly interrupted while waiting on data for stage " + stage, e); + } + } + if (log.isDebugEnabled()) log.debug("Stage " + stage + " (threadID: " + this.threadID + ") exited running state."); + + } catch (StageException e) { + log.error("An error occurred in the stage " + stage + " (threadID: " + this.threadID + ")", e); + recordFatalError(e); + setState(ERROR); + } catch (InterruptedException e) { + log.error("Stage " + stage + " (threadID: " + threadID + ") interrupted while waiting for barrier",e); + recordFatalError(e); + setState(ERROR); + } finally { + doneSignal.countDown(); + synchronized (ThreadPoolStageDriver.this) { + ThreadPoolStageDriver.this.notifyAll(); + } + } + } + } + + /** + * Get the size of the queue used by this StageDriver. + * @return the queue capacity + */ + public int getQueueSize() { + return this.queue.size() + this.queue.remainingCapacity(); + } + + /** + * Get the timeout value (in milliseconds) used by this StageDriver on + * thread termination. + * @return the timeout setting in milliseconds + */ + public long getTimeout() { + return this.timeout; + } + + /** + * Returns the number of threads allocated to the thread pool. + */ + public int getNumThreads() { + return numThreads; + } +} Propchange: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java ------------------------------------------------------------------------------ svn:eol-style = native Added: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverFactory.java URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverFactory.java?view=auto&rev=462575 ============================================================================== --- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverFactory.java (added) +++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverFactory.java Tue Oct 10 14:21:02 2006 @@ -0,0 +1,146 @@ +/* + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.pipeline.driver; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.pipeline.Stage; +import org.apache.commons.pipeline.StageContext; +import org.apache.commons.pipeline.StageDriver; +import org.apache.commons.pipeline.StageDriverFactory; + +/** + * This factory is used to create ThreadPoolStageDriver instances configured + * to run specific stages. + */ +public class ThreadPoolStageDriverFactory implements StageDriverFactory { + + private int numThreads = 1; + + /** Creates a new instance of ThreadPoolStageDriverFactory */ + public ThreadPoolStageDriverFactory() { + } + + /** + * Creates the new [EMAIL PROTECTED] ThreadPoolStageDriver} based upon the configuration + * of this factory instance + * @param stage The stage to be run by the newly created driver + * @param context The context in which the stage will be run + * @return the newly created driver + */ + public StageDriver createStageDriver(Stage stage, StageContext context) { + try { + return new ThreadPoolStageDriver(stage, context, queueClass.newInstance(), timeout, faultTolerance, numThreads); + } catch (Exception e) { + throw new IllegalStateException("Instantiation of driver failed due to illegal factory state.", e); + } + } + + /** + * Holds value of property queueClass. + */ + private Class<? extends BlockingQueue> queueClass = LinkedBlockingQueue.class; + + /** + * Getter for property queueClass. + * @return Value of property queueClass. + */ + public Class<? extends BlockingQueue> getQueueClass() { + return this.queueClass; + } + + /** + * Setter for property queueClass. + * @param queueClass New value of property queueClass. + */ + public void setQueueClass(Class<? extends BlockingQueue> queueClass) { + if (queueClass == null) throw new IllegalArgumentException("Queue class may not be null."); + this.queueClass = queueClass; + } + + /** + * Holds value of property timeout. + */ + private long timeout = 500; + + /** + * Timeout for wait to ensure deadlock cannot occur on thread termination. + * Default is 500 + * @return Value of property timeout. + */ + public long getTimeout() { + return this.timeout; + } + + /** + * Setter for property timeout. + * @param timeout New value of property timeout. + */ + public void setTimeout(long timeout) { + this.timeout = timeout; + } + + /** + * Holds value of property faultTolerance. + */ + private FaultTolerance faultTolerance = FaultTolerance.NONE; + + /** + * Getter for property faultTolerance. See [EMAIL PROTECTED] FaultTolerance} for valid values + * and enumation meanings. + * @return Value of property faultTolerance. + */ + public FaultTolerance getFaultTolerance() { + return this.faultTolerance; + } + + /** + * Setter for property faultTolerance. + * + * @param faultTolerance New value of property faultTolerance. + */ + public void setFaultTolerance(FaultTolerance faultTolerance) { + this.faultTolerance = faultTolerance; + } + + /** + * Convenience setter for property faultTolerance for use by Digester. + * + * @param level New value of property level ("ALL","CHECKED", or "NONE"). + */ + public void setFaultToleranceLevel(String level) { + this.faultTolerance = FaultTolerance.valueOf(level); + } + + /** + * Returns the number of threads that will be allocated to the thread + * pool of a driver created by this factory. + */ + public int getNumThreads() { + return numThreads; + } + + /** + * Sets the number of threads that will be allocated to the thread + * pool of a driver created by this factory. + */ + public void setNumThreads(int numThreads) { + this.numThreads = numThreads; + } + +} Propchange: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverTest.java URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverTest.java?view=auto&rev=462575 ============================================================================== --- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverTest.java (added) +++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverTest.java Tue Oct 10 14:21:02 2006 @@ -0,0 +1,98 @@ +/* + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.pipeline.driver; + +import junit.framework.*; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.pipeline.Feeder; +import org.apache.commons.pipeline.StageDriver.State; +import static org.apache.commons.pipeline.StageDriver.State.*; + +/** + * + * + */ +public class ThreadPoolStageDriverTest extends AbstractStageDriverTest { + private Log log; + + public ThreadPoolStageDriverTest(String testName) { + super(testName); + this.log = LogFactory.getLog(ThreadPoolStageDriverTest.class); + } + + public static Test suite() { + TestSuite suite = new TestSuite(ThreadPoolStageDriverTest.class); + + return suite; + } + /** + * Test of getFeeder method, of class org.apache.commons.pipeline.driver.SynchronousStageDriver. + */ + public void testGetFeeder() { + log.debug("testGetFeeder ---------------------------------------------"); + ThreadPoolStageDriver instance = new ThreadPoolStageDriver(stage, context, new LinkedBlockingQueue(), 500, FaultTolerance.NONE, 5); + + Feeder feeder = instance.getFeeder(); + assertNotNull(feeder); + } + + /** + * Due to the design of the ThreadPoolStageDriver, it is meaningless + * to independently test the start or finish methods; however, testing + * both together is meaningful. This test also provides verification of + * proper behavior of the getState() method. + */ + public void testStartFinish() throws Exception { + log.debug("testStartFinish -------------------------------------------"); + ThreadPoolStageDriver instance = new ThreadPoolStageDriver(stage, context, new LinkedBlockingQueue(), 500, FaultTolerance.NONE, 5); + + assertEquals(State.STOPPED, instance.getState()); + + instance.start(); + + assertTrue(instance.getState() == State.STARTED || instance.getState() == State.RUNNING); + + instance.finish(); + + assertEquals(State.STOPPED, instance.getState()); + } + + + /********************* + * INTEGRATION TESTS * + *********************/ + + public void testSingleStage() throws Exception { + log.debug("testSingleStage -------------------------------------------"); + StageDriverTestUtils.testSingleStage(this, new ThreadPoolStageDriverFactory()); + } + + public void testMultiStage() throws Exception { + log.debug("testMultiStage --------------------------------------------"); + StageDriverTestUtils.testMultiStage(this, new ThreadPoolStageDriverFactory()); + } + + public void testMultiFaultingStage() throws Exception { + log.debug("testMultiFaultingStage ------------------------------------"); + ThreadPoolStageDriverFactory factory = new ThreadPoolStageDriverFactory(); + factory.setFaultTolerance(FaultTolerance.CHECKED); + + StageDriverTestUtils.testMultiFaultingStage(this, factory); + } +} Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverTest.java ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]