Author: nuttycom Date: Thu Sep 7 14:46:02 2006 New Revision: 441243 URL: http://svn.apache.org/viewvc?view=rev&rev=441243 Log: Fixed problem where events were not properly propagated to sibling branches
Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java (with props) Modified: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java Modified: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java?view=diff&rev=441243&r1=441242&r2=441243 ============================================================================== --- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java (original) +++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java Thu Sep 7 14:46:02 2006 @@ -19,10 +19,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.EventObject; import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.ListIterator; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -61,6 +61,9 @@ // Map of pipeline branches where the keys are branch names. private final Map<String,Pipeline> branches; + // Used to store a reference to the parent pipeline, if this is a branch + private Pipeline parent; + // The list of listeners registered with the pipeline. private final List<StageEventListener> listeners; @@ -100,25 +103,35 @@ } /** - * Notifies each registered listener of an event and propagates - * the event to any attached branches + * Asynchronously notifies each registered listener of an event and propagates + * the event to any attached branches and the parent pipeline. + * * @param ev The event to be sent to registered listeners */ - public void raise(final java.util.EventObject ev) { + public void raise(final EventObject ev) { new Thread() { public void run() { - for (StageEventListener listener : listeners) { - listener.notify(ev); - } + //first, recursively find the root pipeline + Pipeline root = Pipeline.this; + while (root.parent != null) root = root.parent; - for (Pipeline branch : branches.values()) { - if (branch != Pipeline.this) branch.raise(ev); - } - } + //notify the listeners from the root pipeline + root.notifyListeners(ev); + } }.start(); } /** + * Notify all listeners and recursively notify child branches of the + * specified event. This method does not propagate events to the + * parent pipeline. + */ + private void notifyListeners(EventObject ev) { + for (StageEventListener listener : listeners) listener.notify(ev); + for (Pipeline branch : branches.values()) branch.notifyListeners(ev); + } + + /** * This method is used by a stage driver to pass data from one stage to the next. * @return the feeder for the downstream stage, or null if no downstream * stage exists. @@ -135,7 +148,7 @@ for (int i = drivers.size() - 2; i >= 0; i--) { if (stage == drivers.get(i).getStage()) return drivers.get(i+1).getFeeder(); } - + throw new IllegalStateException("Unable to find stage " + stage + " in pipeline."); } } @@ -155,6 +168,7 @@ * be used to validate that the appended Stage can consume the output of the * previous stage of the pipeline. It does NOT validate the ability or availability * of branches to consume data produced by the appended stage. + * * @param stage the stage to be added to the pipeline * @param driverFactory the factory that will be used to create a [EMAIL PROTECTED] StageDriver} that will run the stage * @throws ValidationException if there is a non-null validator set for this pipeline and an error is @@ -189,6 +203,7 @@ /** * Return the StageDriver for the specified Stage. + * * @return the StageDriver for the specified Stage. */ public final StageDriver getStageDriver(Stage stage) { @@ -211,24 +226,25 @@ * @throws org.apache.commons.pipeline.validation.ValidationException if the pipeline has a non-null [EMAIL PROTECTED] PipelineValidator} and the branch * cannot consume the data produced for the branch by stages in the pipeline. */ - public void addBranch(String key, Pipeline pipeline) throws ValidationException { + public void addBranch(String key, Pipeline branch) throws ValidationException { if (key == null) throw new IllegalArgumentException("Branch key may not be null."); if (MAIN_BRANCH.equalsIgnoreCase(key)) throw new IllegalArgumentException("Branch key name \"" + MAIN_BRANCH + "\" is reserved."); - if (pipeline == null) + if (branch == null) throw new IllegalArgumentException("Illegal attempt to set reference to null branch."); - if (pipeline == this || pipeline.hasBranch(this)) + if (branch == this || branch.hasBranch(this)) throw new IllegalArgumentException("Illegal attempt to set reference to self as a branch (infinite recursion potential)"); if (validator != null) { - List<ValidationFailure> errors = validator.validateAddBranch(this, key, pipeline); + List<ValidationFailure> errors = validator.validateAddBranch(this, key, branch); if (errors != null && !errors.isEmpty()) { - throw new ValidationException("An error occurred adding branch pipeline " + pipeline, errors); + throw new ValidationException("An error occurred adding branch pipeline " + branch, errors); } } - this.branches.put(key, pipeline); + branch.parent = this; + this.branches.put(key, branch); } /** @@ -250,19 +266,19 @@ if (branches.containsValue(pipeline)) return true; for (Pipeline branch : branches.values()) { if (branch.hasBranch(pipeline)) return true; - } - - return false; } + return false; + } + /** * Returns a feeder for the first stage if the pipeline is not empty * @return the feeder to feed the first stage of the pipeline */ public Feeder getSourceFeeder() { - if (drivers.isEmpty()) return null; + if (drivers.isEmpty()) return this.terminalFeeder; return drivers.peek().getFeeder(); - } + } /** * Gets the feeder that receives output from the final stage. @@ -286,6 +302,7 @@ * Startups may occur sequentially or in parallel, depending upon the stage driver * used. If a the stage has not been configured with a [EMAIL PROTECTED] StageDriver}, * we will use the default, non-threaded [EMAIL PROTECTED] SynchronousStageDriver}. + * * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during pipeline startup */ public void start() throws StageException { @@ -300,6 +317,7 @@ * method will block until the stage's queue is exhausted, so this method * may be used to safely finalize all stages without the risk of * losing data in the queues. + * * @throws org.apache.commons.pipeline.StageException Thrown if there is an unhandled error during stage shutdown */ public void finish() throws StageException { Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java?view=auto&rev=441243 ============================================================================== --- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java (added) +++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java Thu Sep 7 14:46:02 2006 @@ -0,0 +1,348 @@ +/* + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.pipeline; + +import junit.framework.*; +import java.util.Collection; +import java.util.Collections; +import java.util.EventObject; +import org.apache.commons.pipeline.driver.SynchronousStageDriverFactory; +import org.apache.commons.pipeline.event.ObjectProcessedEvent; +import org.apache.commons.pipeline.listener.ObjectProcessedEventCounter; +import org.apache.commons.pipeline.testFramework.TestStage; + +/** + * Test cases + */ +public class PipelineTest extends TestCase { + + public PipelineTest(String testName) { + super(testName); + } + + public static Test suite() { + TestSuite suite = new TestSuite(PipelineTest.class); + + return suite; + } + + /** + * Test of registerListener method, of class org.apache.commons.pipeline.Pipeline. + */ + public void testRegisterListener() { + StageEventListener listener = new ObjectProcessedEventCounter(); + Pipeline instance = new Pipeline(); + + instance.registerListener(listener); + + assertEquals(1, instance.getRegisteredListeners().size()); + } + + /** + * Test of getRegisteredListeners method, of class org.apache.commons.pipeline.Pipeline. + */ + public void testGetRegisteredListeners() { + Pipeline instance = new Pipeline(); + + Collection<StageEventListener> expResult = Collections.EMPTY_LIST; + Collection<StageEventListener> result = instance.getRegisteredListeners(); + assertEquals(expResult, result); + } + + /** + * Test of raise method, of class org.apache.commons.pipeline.Pipeline. + */ + public void testRaise() { + Stage testStage = new TestStage(0); + EventObject ev = new ObjectProcessedEvent(testStage, "Hello, World!"); + Pipeline instance = new Pipeline(); + ObjectProcessedEventCounter counter = new ObjectProcessedEventCounter(); + instance.registerListener(counter); + + instance.raise(ev); + + Thread.currentThread().yield(); //give the notifier thread created by raise priority + + assertNotNull(counter.getCounts().get(testStage)); + assertEquals(1, counter.getCounts().get(testStage).intValue()); + } + + public void testRaiseOnBranch() throws Exception { + Pipeline root = new Pipeline(); + + Pipeline branch1 = new Pipeline(); + root.addBranch("b1", branch1); + + Pipeline branch2 = new Pipeline(); + root.addBranch("b2", branch2); + + ObjectProcessedEventCounter counter = new ObjectProcessedEventCounter(); + branch2.registerListener(counter); + + Stage testStage = new TestStage(0); + EventObject ev = new ObjectProcessedEvent(testStage, "Hello, World!"); + branch1.raise(ev); + + Thread.currentThread().yield(); //give the notifier thread created by raise priority + + assertNotNull(counter.getCounts().get(testStage)); + assertEquals(1, counter.getCounts().get(testStage).intValue()); + } + + /** + * Test of getDownstreamFeeder method, of class org.apache.commons.pipeline.Pipeline. + */ + public void testGetDownstreamFeeder() throws Exception { + Stage stage1 = new TestStage(0); + Stage stage2 = new TestStage(1); + StageDriverFactory sdf = new SynchronousStageDriverFactory(); + + Pipeline instance = new Pipeline(); + instance.addStage(stage1, sdf); + instance.addStage(stage2, sdf); + + Feeder expResult = instance.getStageDriver(stage2).getFeeder(); + Feeder result = instance.getDownstreamFeeder(stage1); + assertSame(expResult, result); + } + + /** + * Test of getBranchFeeder method, of class org.apache.commons.pipeline.Pipeline. + */ + public void testGetBranchFeeder() throws Exception { + String branchKey = "b1"; + Pipeline root = new Pipeline(); + Pipeline branch = new Pipeline(); + root.addBranch(branchKey, branch); + + Feeder expResult = branch.getTerminalFeeder(); //no feeders registered + Feeder result = root.getBranchFeeder(branchKey); + assertSame(expResult, result); + + StageDriverFactory sdf = new SynchronousStageDriverFactory(); + Stage testStage = new TestStage(0); + branch.addStage(testStage, sdf); + + expResult = branch.getStageDriver(testStage).getFeeder(); + result = root.getBranchFeeder(branchKey); + assertSame(expResult, result); + } + +// /** +// * Test of addStage method, of class org.apache.commons.pipeline.Pipeline. +// */ +// public void testAddStage() throws Exception { +// System.out.println("addStage"); +// +// Stage stage = null; +// StageDriverFactory driverFactory = null; +// Pipeline instance = new Pipeline(); +// +// instance.addStage(stage, driverFactory); +// +// fail("The test case is a prototype."); +// } +// +// /** +// * Test of getStages method, of class org.apache.commons.pipeline.Pipeline. +// */ +// public void testGetStages() { +// System.out.println("getStages"); +// +// Pipeline instance = new Pipeline(); +// +// List<Stage> expResult = null; +// List<Stage> result = instance.getStages(); +// assertEquals(expResult, result); +// +// fail("The test case is a prototype."); +// } +// +// /** +// * Test of getStageDriver method, of class org.apache.commons.pipeline.Pipeline. +// */ +// public void testGetStageDriver() { +// System.out.println("getStageDriver"); +// +// Stage stage = null; +// Pipeline instance = new Pipeline(); +// +// StageDriver expResult = null; +// StageDriver result = instance.getStageDriver(stage); +// assertEquals(expResult, result); +// +// fail("The test case is a prototype."); +// } +// +// /** +// * Test of getStageDrivers method, of class org.apache.commons.pipeline.Pipeline. +// */ +// public void testGetStageDrivers() { +// System.out.println("getStageDrivers"); +// +// Pipeline instance = new Pipeline(); +// +// List<StageDriver> expResult = null; +// List<StageDriver> result = instance.getStageDrivers(); +// assertEquals(expResult, result); +// +// fail("The test case is a prototype."); +// } +// +// /** +// * Test of addBranch method, of class org.apache.commons.pipeline.Pipeline. +// */ +// public void testAddBranch() throws Exception { +// System.out.println("addBranch"); +// +// String key = ""; +// Pipeline branch = null; +// Pipeline instance = new Pipeline(); +// +// instance.addBranch(key, branch); +// +// fail("The test case is a prototype."); +// } +// +// /** +// * Test of getBranches method, of class org.apache.commons.pipeline.Pipeline. +// */ +// public void testGetBranches() { +// System.out.println("getBranches"); +// +// Pipeline instance = new Pipeline(); +// +// Map<String, Pipeline> expResult = null; +// Map<String, Pipeline> result = instance.getBranches(); +// assertEquals(expResult, result); +// +// fail("The test case is a prototype."); +// } +// +// /** +// * Test of getSourceFeeder method, of class org.apache.commons.pipeline.Pipeline. +// */ +// public void testGetSourceFeeder() { +// System.out.println("getSourceFeeder"); +// +// Pipeline instance = new Pipeline(); +// +// Feeder expResult = null; +// Feeder result = instance.getSourceFeeder(); +// assertEquals(expResult, result); +// +// fail("The test case is a prototype."); +// } +// +// /** +// * Test of getTerminalFeeder method, of class org.apache.commons.pipeline.Pipeline. +// */ +// public void testGetTerminalFeeder() { +// System.out.println("getTerminalFeeder"); +// +// Pipeline instance = new Pipeline(); +// +// Feeder expResult = null; +// Feeder result = instance.getTerminalFeeder(); +// assertEquals(expResult, result); +// +// fail("The test case is a prototype."); +// } +// +// /** +// * Test of setTerminalFeeder method, of class org.apache.commons.pipeline.Pipeline. +// */ +// public void testSetTerminalFeeder() { +// System.out.println("setTerminalFeeder"); +// +// Feeder terminalFeeder = null; +// Pipeline instance = new Pipeline(); +// +// instance.setTerminalFeeder(terminalFeeder); +// +// fail("The test case is a prototype."); +// } +// +// /** +// * Test of start method, of class org.apache.commons.pipeline.Pipeline. +// */ +// public void testStart() throws Exception { +// System.out.println("start"); +// +// Pipeline instance = new Pipeline(); +// +// instance.start(); +// +// fail("The test case is a prototype."); +// } +// +// /** +// * Test of finish method, of class org.apache.commons.pipeline.Pipeline. +// */ +// public void testFinish() throws Exception { +// System.out.println("finish"); +// +// Pipeline instance = new Pipeline(); +// +// instance.finish(); +// +// fail("The test case is a prototype."); +// } +// +// /** +// * Test of run method, of class org.apache.commons.pipeline.Pipeline. +// */ +// public void testRun() { +// System.out.println("run"); +// +// Pipeline instance = new Pipeline(); +// +// instance.run(); +// +// fail("The test case is a prototype."); +// } +// +// /** +// * Test of getValidator method, of class org.apache.commons.pipeline.Pipeline. +// */ +// public void testGetValidator() { +// System.out.println("getValidator"); +// +// Pipeline instance = new Pipeline(); +// +// PipelineValidator expResult = null; +// PipelineValidator result = instance.getValidator(); +// assertEquals(expResult, result); +// +// fail("The test case is a prototype."); +// } +// +// /** +// * Test of setValidator method, of class org.apache.commons.pipeline.Pipeline. +// */ +// public void testSetValidator() { +// System.out.println("setValidator"); +// +// PipelineValidator validator = null; +// Pipeline instance = new Pipeline(); +// +// instance.setValidator(validator); +// +// fail("The test case is a prototype."); +// } + +} Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]