Author: chirino
Date: Fri Feb 13 12:19:15 2009
New Revision: 744094

URL: http://svn.apache.org/viewvc?rev=744094&view=rev
Log:
- the ISinkController looks and smells like a IFlowSink.. so lets let it be one.
- Can replace using the exclusive queue for connection outbound messages with 
just a FlowContoller


Modified:
    
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
    
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
    
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
    
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
    
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
    
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java

Modified: 
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
 (original)
+++ 
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
 Fri Feb 13 12:19:15 2009
@@ -438,4 +438,34 @@
     public IFlowSink<E> getFlowSink() {
         return controllable.getFlowSink();
     }
+
+    public long getResourceId() {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            return flowSink.getResourceId();
+        }
+        return 0;
+    }
+
+    public String getResourceName() {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            return flowSink.getResourceName();
+        }
+        return null;
+    }
+
+    public void addFlowLifeCycleListener(FlowLifeCycleListener listener) {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            flowSink.addFlowLifeCycleListener(listener);
+        }
+    }
+    
+    public void removeFlowLifeCycleListener(FlowLifeCycleListener listener) {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            flowSink.removeFlowLifeCycleListener(listener);
+        }
+    }
 }

Modified: 
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
 (original)
+++ 
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
 Fri Feb 13 12:19:15 2009
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.flow;
 
-public interface ISinkController<E> {
+public interface ISinkController<E> extends IFlowSink<E> {
     /**
      * Defines required attributes for an entity that can be flow controlled.
      * 

Modified: 
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
 (original)
+++ 
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
 Fri Feb 13 12:19:15 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.flow;
 
+
 public class NoOpFlowController<E> implements ISinkController<E> {
     private final IFlowSource<E> source;
     private final Flow flow;
@@ -89,4 +90,33 @@
         return null;
     }
 
+    public long getResourceId() {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            return flowSink.getResourceId();
+        }
+        return 0;
+    }
+
+    public String getResourceName() {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            return flowSink.getResourceName();
+        }
+        return null;
+    }
+
+    public void addFlowLifeCycleListener(FlowLifeCycleListener listener) {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            flowSink.addFlowLifeCycleListener(listener);
+        }
+    }
+    
+    public void removeFlowLifeCycleListener(FlowLifeCycleListener listener) {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            flowSink.removeFlowLifeCycleListener(listener);
+        }
+    }
 }

Modified: 
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
 (original)
+++ 
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
 Fri Feb 13 12:19:15 2009
@@ -16,48 +16,40 @@
  */
 package org.apache.activemq.flow;
 
-import org.apache.activemq.queue.Mapper;
+import java.util.ArrayList;
 
 public class PriorityFlowController<E> implements ISourceController<E>, 
ISinkController<E> {
 
     private final Object mutex;
-    private final FlowController<E> controllers[];
+    private final ArrayList<FlowController<E>> controllers;
     private final PrioritySizeLimiter<E> limiter;
 
-    private Mapper<Integer, E> priorityMapper;
-
     private final Flow flow;
     private final FlowControllable<E> controllable;
 
-    public PriorityFlowController(int priorities, FlowControllable<E> 
controllable, Flow flow, Object mutex, int capacity, int resume) {
+    public PriorityFlowController(FlowControllable<E> controllable, Flow flow, 
PrioritySizeLimiter<E> limiter, Object mutex) {
         this.controllable = controllable;
         this.flow = flow;
         this.mutex = mutex;
-        this.limiter = new PrioritySizeLimiter<E>(capacity, resume, 
priorities);
-        this.limiter.setPriorityMapper(priorityMapper);
-        this.controllers = createControlerArray(priorities);
-        for (int i = 0; i < priorities; i++) {
-            this.controllers[i] = new FlowController<E>(controllable, flow, 
limiter.getPriorityLimter(i), mutex);
+        this.limiter =  limiter;
+        this.controllers = new 
ArrayList<FlowController<E>>(limiter.getPriorities());
+        for (int i = 0; i < limiter.getPriorities(); i++) {
+            controllers.add(new FlowController<E>(controllable, flow, 
limiter.getPriorityLimter(i), mutex));
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private FlowController<E>[] createControlerArray(int priorities) {
-        return new FlowController[priorities];
-    }
-
     // /////////////////////////////////////////////////////////////////
     // ISinkController interface impl.
     // /////////////////////////////////////////////////////////////////
 
     public boolean offer(E elem, ISourceController<E> controller) {
-        int prio = priorityMapper.map(elem);
-        return controllers[prio].offer(elem, controller);
+        int prio = limiter.getPriorityMapper().map(elem);
+        return controllers.get(prio).offer(elem, controller);
     }
 
     public void add(E elem, ISourceController<E> controller) {
-        int prio = priorityMapper.map(elem);
-        controllers[prio].add(elem, controller);
+        int prio = limiter.getPriorityMapper().map(elem);
+        controllers.get(prio).add(elem, controller);
     }
 
     public boolean isSinkBlocked() {
@@ -68,8 +60,8 @@
 
     public boolean 
addUnblockListener(org.apache.activemq.flow.ISinkController.FlowUnblockListener<E>
 listener) {
         boolean rc = false;
-        for (int i = 0; i < controllers.length; i++) {
-            rc |= this.controllers[i].addUnblockListener(listener);
+        for (int i = 0; i < controllers.size(); i++) {
+            rc |= this.controllers.get(i).addUnblockListener(listener);
         }
         return rc;
     }
@@ -77,13 +69,45 @@
     public void waitForFlowUnblock() throws InterruptedException {
         throw new UnsupportedOperationException();
     }
+    
+    public long getResourceId() {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            return flowSink.getResourceId();
+        }
+        return 0;
+    }
+
+    public String getResourceName() {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            return flowSink.getResourceName();
+        }
+        return null;
+    }
+
+    public void addFlowLifeCycleListener(FlowLifeCycleListener listener) {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            flowSink.addFlowLifeCycleListener(listener);
+        }
+    }
+    
+    public void removeFlowLifeCycleListener(FlowLifeCycleListener listener) {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            flowSink.removeFlowLifeCycleListener(listener);
+        }
+    }
+
 
     // /////////////////////////////////////////////////////////////////
     // ISourceController interface impl.
     // /////////////////////////////////////////////////////////////////
 
     public void elementDispatched(E elem) {
-        FlowController<E> controler = controllers[priorityMapper.map(elem)];
+        Integer prio = limiter.getPriorityMapper().map(elem);
+        FlowController<E> controler = controllers.get(prio);
         controler.elementDispatched(elem);
     }
 
@@ -96,14 +120,14 @@
     }
 
     public void onFlowBlock(ISinkController<E> sink) {
-        for (int i = 0; i < controllers.length; i++) {
-            controllers[i].onFlowBlock(sink);
+        for (int i = 0; i < controllers.size(); i++) {
+            controllers.get(i).onFlowBlock(sink);
         }
     }
 
     public void onFlowResume(ISinkController<E> sink) {
-        for (int i = 0; i < controllers.length; i++) {
-            controllers[i].onFlowBlock(sink);
+        for (int i = 0; i < controllers.size(); i++) {
+            controllers.get(i).onFlowBlock(sink);
         }
     }
 
@@ -115,15 +139,6 @@
     // Getters and Setters
     // /////////////////////////////////////////////////////////////////
 
-    public Mapper<Integer, E> getPriorityMapper() {
-        return priorityMapper;
-    }
-
-    public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
-        this.priorityMapper = priorityMapper;
-        limiter.setPriorityMapper(priorityMapper);
-    }
-
     public IFlowSink<E> getFlowSink() {
         return controllable.getFlowSink();
     }

Modified: 
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
 (original)
+++ 
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
 Fri Feb 13 12:19:15 2009
@@ -21,6 +21,7 @@
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.PriorityFlowController;
+import org.apache.activemq.flow.PrioritySizeLimiter;
 import org.apache.kahadb.util.LinkedNode;
 
 /**
@@ -28,7 +29,6 @@
 public class ExclusivePriorityQueue<E> extends AbstractFlowQueue<E> implements 
IFlowQueue<E> {
 
     private final PriorityLinkedList<PriorityNode> queue;
-    private Mapper<Integer, E> priorityMapper;
 
     private class PriorityNode extends LinkedNode<PriorityNode> {
         E elem;
@@ -36,6 +36,7 @@
     }
 
     private final PriorityFlowController<E> controller;
+    private final PrioritySizeLimiter<E> limiter;
 
     /**
      * Creates a flow queue that can handle multiple flows.
@@ -48,10 +49,11 @@
      * @param controller
      *            The FlowController if this queue is flow controlled:
      */
-    public ExclusivePriorityQueue(int priority, Flow flow, String name, int 
capacity, int resume) {
+    public ExclusivePriorityQueue(Flow flow, String name, 
PrioritySizeLimiter<E> limiter) {
         super(name);
+        this.limiter = limiter;
         this.queue = new PriorityLinkedList<PriorityNode>(10);
-        this.controller = new PriorityFlowController<E>(priority, 
getFlowControllableHook(), flow, this, capacity, resume);
+        this.controller = new 
PriorityFlowController<E>(getFlowControllableHook(), flow, limiter, this);
 
     }
 
@@ -72,7 +74,7 @@
     public synchronized void flowElemAccepted(ISourceController<E> controller, 
E elem) {
         PriorityNode node = new PriorityNode();
         node.elem = elem;
-        node.prio = priorityMapper.map(elem);
+        node.prio = limiter.getPriorityMapper().map(elem);
 
         queue.add(node, node.prio);
         notifyReady();
@@ -107,15 +109,6 @@
         }
     }
 
-    public Mapper<Integer, E> getPriorityMapper() {
-        return priorityMapper;
-    }
-
-    public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
-        this.priorityMapper = priorityMapper;
-        controller.setPriorityMapper(priorityMapper);
-    }
-
     @Override
     public String toString() {
         return getResourceName();

Modified: 
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
 (original)
+++ 
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
 Fri Feb 13 12:19:15 2009
@@ -65,8 +65,9 @@
         if (MockBrokerTest.PRIORITY_LEVELS <= 1) {
             this.output = TestFlowManager.createFlowQueue(flow, name + 
"-OUTPUT", outputQueueSize, resumeThreshold);
         } else {
-            ExclusivePriorityQueue<Message> t = new 
ExclusivePriorityQueue<Message>(MockBrokerTest.PRIORITY_LEVELS, flow, name + 
"-OUTPUT", outputQueueSize, resumeThreshold);
-            t.setPriorityMapper(Message.PRIORITY_MAPPER);
+            PrioritySizeLimiter<Message> pl = new 
PrioritySizeLimiter<Message>(outputQueueSize, resumeThreshold, 
MockBrokerTest.PRIORITY_LEVELS);
+            pl.setPriorityMapper(Message.PRIORITY_MAPPER);
+            ExclusivePriorityQueue<Message> t = new 
ExclusivePriorityQueue<Message>(flow, name + "-OUTPUT", pl);
             this.output = t;
         }
 

Modified: 
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
 (original)
+++ 
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
 Fri Feb 13 12:19:15 2009
@@ -9,21 +9,20 @@
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.flow.MockBroker.DeliveryTarget;
-import org.apache.activemq.queue.ExclusivePriorityQueue;
-import org.apache.activemq.queue.ExclusiveQueue;
-import org.apache.activemq.queue.IFlowQueue;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 
 public class RemoteConnection implements TransportListener, DeliveryTarget {
 
-    protected final Object mutex = new Object();
 
     protected Transport transport;
     protected MockBroker broker;
-    protected IFlowQueue<Message> output;
 
+    protected final Object inboundMutex = new Object();
     protected FlowController<Message> inboundController;
+
+    protected final Object outboundMutex = new Object();
+    protected IFlowSink<Message> outboundController;
     protected String name;
 
     private int priorityLevels;
@@ -106,26 +105,12 @@
             public IFlowSource<Message> getFlowSource() {
                 return null;
             }
-        }, flow, limiter, mutex);
+        }, flow, limiter, inboundMutex);
 
         // Setup output processing
-        if (priorityLevels <= 1) {
-            limiter = new SizeLimiter<Message>(outputWindowSize, 
outputResumeThreshold);
-            flow = new Flow(name + "-outbound", false);
-            ExclusiveQueue<Message> queue = new ExclusiveQueue<Message>(flow, 
flow.getFlowName(), limiter);
-            this.output = queue;
-        } else {
-            ExclusivePriorityQueue<Message> t = new 
ExclusivePriorityQueue<Message>(priorityLevels, flow, name + "-outbound", 
outputWindowSize, outputResumeThreshold);
-            t.setPriorityMapper(Message.PRIORITY_MAPPER);
-            this.output = t;
-        }
-
-        // Use an async thread to drain the output queue.
-        // Personally I think it would be better if we polled messages out of 
the output queue.
         writer = Executors.newSingleThreadExecutor();
-        output.setDispatcher(dispatcher);
-        output.setDrain(new IFlowDrain<Message>() {
-            public void drain(final Message elem, final 
ISourceController<Message> controller) {
+        FlowControllable<Message> controllable = new 
FlowControllable<Message>(){
+            public void flowElemAccepted(final ISourceController<Message> 
controller, final Message elem) {
                 writer.execute(new Runnable() {
                     public void run() {
                         if (!stopping.get()) {
@@ -139,7 +124,24 @@
                     }
                 });
             }
-        });
+            public IFlowSink<Message> getFlowSink() {
+                return null;
+            }
+            public IFlowSource<Message> getFlowSource() {
+                return null;
+            }
+        };
+
+        flow = new Flow(name + "-outbound", false);
+        if (priorityLevels <= 1) {
+            limiter = new SizeLimiter<Message>(outputWindowSize, 
outputResumeThreshold);
+            outboundController = new FlowController<Message>(controllable, 
flow, limiter,  outboundMutex);
+        } else {
+            PrioritySizeLimiter<Message> pl = new 
PrioritySizeLimiter<Message>(outputWindowSize, outputResumeThreshold, 
priorityLevels);
+            pl.setPriorityMapper(Message.PRIORITY_MAPPER);
+            outboundController = new 
PriorityFlowController<Message>(controllable, flow, pl,  outboundMutex);
+        }
+
     }
 
     public void onException(IOException error) {
@@ -200,7 +202,7 @@
     }
 
     public IFlowSink<Message> getSink() {
-        return output;
+        return outboundController;
     }
 
     public boolean match(Message message) {


Reply via email to