Author: nuttycom
Date: Thu Sep  7 14:46:02 2006
New Revision: 441243

URL: http://svn.apache.org/viewvc?view=rev&rev=441243
Log:
Fixed problem where events were not properly propagated to sibling branches

Added:
    
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java
   (with props)
Modified:
    
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java

Modified: 
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java
URL: 
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java?view=diff&rev=441243&r1=441242&r2=441243
==============================================================================
--- 
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java
 (original)
+++ 
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java
 Thu Sep  7 14:46:02 2006
@@ -19,10 +19,10 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EventObject;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -61,6 +61,9 @@
     // Map of pipeline branches where the keys are branch names.
     private final Map<String,Pipeline> branches;
     
+    // Used to store a reference to the parent pipeline, if this is a branch
+    private Pipeline parent;
+    
     // The list of listeners registered with the pipeline.
     private final List<StageEventListener> listeners;
     
@@ -100,25 +103,35 @@
     }
     
     /**
-     * Notifies each registered listener of an event and propagates
-     * the event to any attached branches
+     * Asynchronously notifies each registered listener of an event and 
propagates
+     * the event to any attached branches and the parent pipeline.
+     *
      * @param ev The event to be sent to registered listeners
      */
-    public void raise(final java.util.EventObject ev) {
+    public void raise(final EventObject ev) {
         new Thread() {
             public void run() {
-                for (StageEventListener listener : listeners) {
-                    listener.notify(ev);
-                }
+                //first, recursively find the root pipeline
+                Pipeline root = Pipeline.this;
+                while (root.parent != null) root = root.parent;
                 
-                for (Pipeline branch : branches.values()) {
-                    if (branch != Pipeline.this) branch.raise(ev);
-                }
-        }
+                //notify the listeners from the root pipeline
+                root.notifyListeners(ev);
+            }
         }.start();
     }
     
     /**
+     * Notify all listeners and recursively notify child branches of the
+     * specified event. This method does not propagate events to the
+     * parent pipeline.
+     */
+    private void notifyListeners(EventObject ev) {
+        for (StageEventListener listener : listeners) listener.notify(ev);
+        for (Pipeline branch : branches.values()) branch.notifyListeners(ev);
+    }
+    
+    /**
      * This method is used by a stage driver to pass data from one stage to 
the next.
      * @return the feeder for the downstream stage, or null if no downstream
      * stage exists.
@@ -135,7 +148,7 @@
             for (int i = drivers.size() - 2; i >= 0; i--) {
                 if (stage == drivers.get(i).getStage()) return 
drivers.get(i+1).getFeeder();
             }
-        
+            
             throw new IllegalStateException("Unable to find stage " + stage + 
" in pipeline.");
         }
     }
@@ -155,6 +168,7 @@
      * be used to validate that the appended Stage can consume the output of 
the
      * previous stage of the pipeline. It does NOT validate the ability or 
availability
      * of branches to consume data produced by the appended stage.
+     *
      * @param stage the stage to be added to the pipeline
      * @param driverFactory the factory that will be used to create a [EMAIL 
PROTECTED] StageDriver} that will run the stage
      * @throws ValidationException if there is a non-null validator set for 
this pipeline and an error is
@@ -189,6 +203,7 @@
     
     /**
      * Return the StageDriver for the specified Stage.
+     *
      * @return the StageDriver for the specified Stage.
      */
     public final StageDriver getStageDriver(Stage stage) {
@@ -211,24 +226,25 @@
      * @throws org.apache.commons.pipeline.validation.ValidationException if 
the pipeline has a non-null [EMAIL PROTECTED] PipelineValidator} and the branch
      * cannot consume the data produced for the branch by stages in the 
pipeline.
      */
-    public void addBranch(String key, Pipeline pipeline) throws 
ValidationException {
+    public void addBranch(String key, Pipeline branch) throws 
ValidationException {
         if (key == null)
             throw new IllegalArgumentException("Branch key may not be null.");
         if (MAIN_BRANCH.equalsIgnoreCase(key))
             throw new IllegalArgumentException("Branch key name \"" + 
MAIN_BRANCH + "\" is reserved.");
-        if (pipeline == null)
+        if (branch == null)
             throw new IllegalArgumentException("Illegal attempt to set 
reference to null branch.");
-        if (pipeline == this || pipeline.hasBranch(this))
+        if (branch == this || branch.hasBranch(this))
             throw new IllegalArgumentException("Illegal attempt to set 
reference to self as a branch (infinite recursion potential)");
         
         if (validator != null) {
-            List<ValidationFailure> errors = validator.validateAddBranch(this, 
key, pipeline);
+            List<ValidationFailure> errors = validator.validateAddBranch(this, 
key, branch);
             if (errors != null && !errors.isEmpty()) {
-                throw new ValidationException("An error occurred adding branch 
pipeline " + pipeline, errors);
+                throw new ValidationException("An error occurred adding branch 
pipeline " + branch, errors);
             }
         }
         
-        this.branches.put(key, pipeline);
+        branch.parent = this;
+        this.branches.put(key, branch);
     }
     
     /**
@@ -250,19 +266,19 @@
         if (branches.containsValue(pipeline)) return true;
         for (Pipeline branch : branches.values()) {
             if (branch.hasBranch(pipeline)) return true;
-            }
-            
-        return false;
         }
         
+        return false;
+    }
+    
     /**
      * Returns a feeder for the first stage if the pipeline is not empty
      * @return the feeder to feed the first stage of the pipeline
      */
     public Feeder getSourceFeeder() {
-        if (drivers.isEmpty()) return null;
+        if (drivers.isEmpty()) return this.terminalFeeder;
         return drivers.peek().getFeeder();
-        }
+    }
     
     /**
      * Gets the feeder that receives output from the final stage.
@@ -286,6 +302,7 @@
      * Startups may occur sequentially or in parallel, depending upon the 
stage driver
      * used.  If a the stage has not been configured with a [EMAIL PROTECTED] 
StageDriver},
      * we will use the default, non-threaded [EMAIL PROTECTED] 
SynchronousStageDriver}.
+     *
      * @throws org.apache.commons.pipeline.StageException Thrown if there is 
an error during pipeline startup
      */
     public void start() throws StageException {
@@ -300,6 +317,7 @@
      * method will block until the stage's queue is exhausted, so this method
      * may be used to safely finalize all stages without the risk of
      * losing data in the queues.
+     *
      * @throws org.apache.commons.pipeline.StageException Thrown if there is 
an unhandled error during stage shutdown
      */
     public void finish() throws StageException {

Added: 
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java
URL: 
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java?view=auto&rev=441243
==============================================================================
--- 
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java
 (added)
+++ 
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java
 Thu Sep  7 14:46:02 2006
@@ -0,0 +1,348 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline;
+
+import junit.framework.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EventObject;
+import org.apache.commons.pipeline.driver.SynchronousStageDriverFactory;
+import org.apache.commons.pipeline.event.ObjectProcessedEvent;
+import org.apache.commons.pipeline.listener.ObjectProcessedEventCounter;
+import org.apache.commons.pipeline.testFramework.TestStage;
+
+/**
+ * Test cases
+ */
+public class PipelineTest extends TestCase {
+    
+    public PipelineTest(String testName) {
+        super(testName);
+    }
+    
+    public static Test suite() {
+        TestSuite suite = new TestSuite(PipelineTest.class);
+        
+        return suite;
+    }
+    
+    /**
+     * Test of registerListener method, of class 
org.apache.commons.pipeline.Pipeline.
+     */
+    public void testRegisterListener() {
+        StageEventListener listener = new ObjectProcessedEventCounter();
+        Pipeline instance = new Pipeline();
+        
+        instance.registerListener(listener);
+        
+        assertEquals(1, instance.getRegisteredListeners().size());
+    }
+    
+    /**
+     * Test of getRegisteredListeners method, of class 
org.apache.commons.pipeline.Pipeline.
+     */
+    public void testGetRegisteredListeners() {
+        Pipeline instance = new Pipeline();
+        
+        Collection<StageEventListener> expResult = Collections.EMPTY_LIST;
+        Collection<StageEventListener> result = 
instance.getRegisteredListeners();
+        assertEquals(expResult, result);
+    }
+    
+    /**
+     * Test of raise method, of class org.apache.commons.pipeline.Pipeline.
+     */
+    public void testRaise() {
+        Stage testStage = new TestStage(0);
+        EventObject ev = new ObjectProcessedEvent(testStage, "Hello, World!");
+        Pipeline instance = new Pipeline();
+        ObjectProcessedEventCounter counter = new 
ObjectProcessedEventCounter();
+        instance.registerListener(counter);
+        
+        instance.raise(ev);
+        
+        Thread.currentThread().yield(); //give the notifier thread created by 
raise priority
+        
+        assertNotNull(counter.getCounts().get(testStage));
+        assertEquals(1, counter.getCounts().get(testStage).intValue());
+    }
+    
+    public void testRaiseOnBranch() throws Exception {
+        Pipeline root = new Pipeline();
+        
+        Pipeline branch1 = new Pipeline();
+        root.addBranch("b1", branch1);
+        
+        Pipeline branch2 = new Pipeline();
+        root.addBranch("b2", branch2);
+        
+        ObjectProcessedEventCounter counter = new 
ObjectProcessedEventCounter();
+        branch2.registerListener(counter);
+        
+        Stage testStage = new TestStage(0);
+        EventObject ev = new ObjectProcessedEvent(testStage, "Hello, World!");
+        branch1.raise(ev);
+        
+        Thread.currentThread().yield(); //give the notifier thread created by 
raise priority
+        
+        assertNotNull(counter.getCounts().get(testStage));
+        assertEquals(1, counter.getCounts().get(testStage).intValue());
+    }
+    
+    /**
+     * Test of getDownstreamFeeder method, of class 
org.apache.commons.pipeline.Pipeline.
+     */
+    public void testGetDownstreamFeeder() throws Exception {
+        Stage stage1 = new TestStage(0);
+        Stage stage2 = new TestStage(1);
+        StageDriverFactory sdf = new SynchronousStageDriverFactory();
+
+        Pipeline instance = new Pipeline();
+        instance.addStage(stage1, sdf);
+        instance.addStage(stage2, sdf);
+
+        Feeder expResult = instance.getStageDriver(stage2).getFeeder();
+        Feeder result = instance.getDownstreamFeeder(stage1);
+        assertSame(expResult, result);
+    }
+    
+    /**
+     * Test of getBranchFeeder method, of class 
org.apache.commons.pipeline.Pipeline.
+     */
+    public void testGetBranchFeeder() throws Exception {
+        String branchKey = "b1";
+        Pipeline root = new Pipeline();
+        Pipeline branch = new Pipeline();
+        root.addBranch(branchKey, branch);
+        
+        Feeder expResult = branch.getTerminalFeeder(); //no feeders registered
+        Feeder result = root.getBranchFeeder(branchKey);
+        assertSame(expResult, result);
+        
+        StageDriverFactory sdf = new SynchronousStageDriverFactory();
+        Stage testStage = new TestStage(0);
+        branch.addStage(testStage, sdf);
+        
+        expResult = branch.getStageDriver(testStage).getFeeder();
+        result = root.getBranchFeeder(branchKey);
+        assertSame(expResult, result);        
+    }
+    
+//    /**
+//     * Test of addStage method, of class 
org.apache.commons.pipeline.Pipeline.
+//     */
+//    public void testAddStage() throws Exception {
+//        System.out.println("addStage");
+//
+//        Stage stage = null;
+//        StageDriverFactory driverFactory = null;
+//        Pipeline instance = new Pipeline();
+//
+//        instance.addStage(stage, driverFactory);
+//
+//        fail("The test case is a prototype.");
+//    }
+//
+//    /**
+//     * Test of getStages method, of class 
org.apache.commons.pipeline.Pipeline.
+//     */
+//    public void testGetStages() {
+//        System.out.println("getStages");
+//
+//        Pipeline instance = new Pipeline();
+//
+//        List<Stage> expResult = null;
+//        List<Stage> result = instance.getStages();
+//        assertEquals(expResult, result);
+//
+//        fail("The test case is a prototype.");
+//    }
+//
+//    /**
+//     * Test of getStageDriver method, of class 
org.apache.commons.pipeline.Pipeline.
+//     */
+//    public void testGetStageDriver() {
+//        System.out.println("getStageDriver");
+//
+//        Stage stage = null;
+//        Pipeline instance = new Pipeline();
+//
+//        StageDriver expResult = null;
+//        StageDriver result = instance.getStageDriver(stage);
+//        assertEquals(expResult, result);
+//
+//        fail("The test case is a prototype.");
+//    }
+//
+//    /**
+//     * Test of getStageDrivers method, of class 
org.apache.commons.pipeline.Pipeline.
+//     */
+//    public void testGetStageDrivers() {
+//        System.out.println("getStageDrivers");
+//
+//        Pipeline instance = new Pipeline();
+//
+//        List<StageDriver> expResult = null;
+//        List<StageDriver> result = instance.getStageDrivers();
+//        assertEquals(expResult, result);
+//
+//        fail("The test case is a prototype.");
+//    }
+//
+//    /**
+//     * Test of addBranch method, of class 
org.apache.commons.pipeline.Pipeline.
+//     */
+//    public void testAddBranch() throws Exception {
+//        System.out.println("addBranch");
+//
+//        String key = "";
+//        Pipeline branch = null;
+//        Pipeline instance = new Pipeline();
+//
+//        instance.addBranch(key, branch);
+//
+//        fail("The test case is a prototype.");
+//    }
+//
+//    /**
+//     * Test of getBranches method, of class 
org.apache.commons.pipeline.Pipeline.
+//     */
+//    public void testGetBranches() {
+//        System.out.println("getBranches");
+//
+//        Pipeline instance = new Pipeline();
+//
+//        Map<String, Pipeline> expResult = null;
+//        Map<String, Pipeline> result = instance.getBranches();
+//        assertEquals(expResult, result);
+//
+//        fail("The test case is a prototype.");
+//    }
+//
+//    /**
+//     * Test of getSourceFeeder method, of class 
org.apache.commons.pipeline.Pipeline.
+//     */
+//    public void testGetSourceFeeder() {
+//        System.out.println("getSourceFeeder");
+//
+//        Pipeline instance = new Pipeline();
+//
+//        Feeder expResult = null;
+//        Feeder result = instance.getSourceFeeder();
+//        assertEquals(expResult, result);
+//
+//        fail("The test case is a prototype.");
+//    }
+//
+//    /**
+//     * Test of getTerminalFeeder method, of class 
org.apache.commons.pipeline.Pipeline.
+//     */
+//    public void testGetTerminalFeeder() {
+//        System.out.println("getTerminalFeeder");
+//
+//        Pipeline instance = new Pipeline();
+//
+//        Feeder expResult = null;
+//        Feeder result = instance.getTerminalFeeder();
+//        assertEquals(expResult, result);
+//
+//        fail("The test case is a prototype.");
+//    }
+//
+//    /**
+//     * Test of setTerminalFeeder method, of class 
org.apache.commons.pipeline.Pipeline.
+//     */
+//    public void testSetTerminalFeeder() {
+//        System.out.println("setTerminalFeeder");
+//
+//        Feeder terminalFeeder = null;
+//        Pipeline instance = new Pipeline();
+//
+//        instance.setTerminalFeeder(terminalFeeder);
+//
+//        fail("The test case is a prototype.");
+//    }
+//
+//    /**
+//     * Test of start method, of class org.apache.commons.pipeline.Pipeline.
+//     */
+//    public void testStart() throws Exception {
+//        System.out.println("start");
+//
+//        Pipeline instance = new Pipeline();
+//
+//        instance.start();
+//
+//        fail("The test case is a prototype.");
+//    }
+//
+//    /**
+//     * Test of finish method, of class org.apache.commons.pipeline.Pipeline.
+//     */
+//    public void testFinish() throws Exception {
+//        System.out.println("finish");
+//
+//        Pipeline instance = new Pipeline();
+//
+//        instance.finish();
+//
+//        fail("The test case is a prototype.");
+//    }
+//
+//    /**
+//     * Test of run method, of class org.apache.commons.pipeline.Pipeline.
+//     */
+//    public void testRun() {
+//        System.out.println("run");
+//
+//        Pipeline instance = new Pipeline();
+//
+//        instance.run();
+//
+//        fail("The test case is a prototype.");
+//    }
+//
+//    /**
+//     * Test of getValidator method, of class 
org.apache.commons.pipeline.Pipeline.
+//     */
+//    public void testGetValidator() {
+//        System.out.println("getValidator");
+//
+//        Pipeline instance = new Pipeline();
+//
+//        PipelineValidator expResult = null;
+//        PipelineValidator result = instance.getValidator();
+//        assertEquals(expResult, result);
+//
+//        fail("The test case is a prototype.");
+//    }
+//
+//    /**
+//     * Test of setValidator method, of class 
org.apache.commons.pipeline.Pipeline.
+//     */
+//    public void testSetValidator() {
+//        System.out.println("setValidator");
+//
+//        PipelineValidator validator = null;
+//        Pipeline instance = new Pipeline();
+//
+//        instance.setValidator(validator);
+//
+//        fail("The test case is a prototype.");
+//    }
+    
+}

Propchange: 
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to