Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java?rev=744939&r1=744938&r2=744939&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java Mon Feb 16 15:32:50 2009 @@ -9,6 +9,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.dispatch.IDispatcher; +import org.apache.activemq.dispatch.IDispatcher.DispatchContext; +import org.apache.activemq.dispatch.IDispatcher.Dispatchable; +import org.apache.activemq.flow.Pipe.ReadReadyListener; +import org.apache.activemq.transport.DispatchableTransport; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; @@ -20,56 +25,97 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; public class PipeTransportFactory extends TransportFactory { - - private final HashMap<String,PipeTransportServer> servers = new HashMap<String, PipeTransportServer>(); + + private final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>(); static final AtomicInteger connectionCounter = new AtomicInteger(); - - private static class PipeTransport implements Transport, Runnable { + + private static class PipeTransport implements DispatchableTransport, Dispatchable, Runnable, ReadReadyListener<Object> { private final Pipe<Object> pipe; private TransportListener listener; private String remoteAddress; private AtomicBoolean stopping = new AtomicBoolean(); private Thread thread; + private DispatchContext readContext; + private String name; public PipeTransport(Pipe<Object> pipe) { this.pipe = pipe; } public void start() throws Exception { - thread = new Thread(this, getRemoteAddress()); - thread.start(); + if (readContext != null) { + pipe.setMode(Pipe.ASYNC); + readContext.requestDispatch(); + } else { + thread = new Thread(this, getRemoteAddress()); + thread.start(); + } } public void stop() throws Exception { - stopping.set(true); - thread.join(); + if (readContext != null) { + readContext.close(); + } else { + stopping.set(true); + thread.join(); + } + } + + public void setDispatcher(IDispatcher dispatcher) { + readContext = dispatcher.register(this, name); } - + + public void onReadReady(Pipe<Object> pipe) { + if (readContext != null) { + readContext.requestDispatch(); + } + } + + public void setName(String name) { + this.name = name; + } + public void oneway(Object command) throws IOException { + try { - while( !stopping.get() ) { - if( pipe.offer(command, 500, TimeUnit.MILLISECONDS) ) { - break; - } - } + pipe.write(command); } catch (InterruptedException e) { throw new InterruptedIOException(); } + /* + * try { while( !stopping.get() ) { if( pipe.offer(command, 500, + * TimeUnit.MILLISECONDS) ) { break; } } } catch + * (InterruptedException e) { throw new InterruptedIOException(); } + */ + } + + public boolean dispatch() { + while (true) { + + Object o = pipe.poll(); + if (o == null) { + pipe.setReadReadyListener(this); + return true; + } else { + listener.onCommand(o); + } + } } public void run() { + try { - while( !stopping.get() ) { + while (!stopping.get()) { Object value = pipe.poll(500, TimeUnit.MILLISECONDS); - if( value!=null ) { + if (value != null) { listener.onCommand(value); } } } catch (InterruptedException e) { } } - + public String getRemoteAddress() { return remoteAddress; } @@ -100,12 +146,11 @@ public void reconnect(URI uri) throws IOException { throw new UnsupportedOperationException(); } - + public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { throw new UnsupportedOperationException(); } - public Object request(Object command) throws IOException { throw new UnsupportedOperationException(); } @@ -120,10 +165,12 @@ public void setRemoteAddress(String remoteAddress) { this.remoteAddress = remoteAddress; + if (name == null) { + name = remoteAddress; + } } - } - + private class PipeTransportServer implements TransportServer { private URI connectURI; private TransportAcceptListener listener; @@ -165,9 +212,9 @@ public Transport connect() { int connectionId = connectionCounter.incrementAndGet(); - String remoteAddress = connectURI.toString()+"#"+connectionId; - assert listener!= null: "Server does not have an accept listener"; - Pipe<Object> pipe = new Pipe<Object>(10); + String remoteAddress = connectURI.toString() + "#" + connectionId; + assert listener != null : "Server does not have an accept listener"; + Pipe<Object> pipe = new Pipe<Object>(); PipeTransport rc = new PipeTransport(pipe); rc.setRemoteAddress(remoteAddress); PipeTransport serverSide = new PipeTransport(pipe.connect()); @@ -176,12 +223,12 @@ return rc; } } - + @Override public synchronized TransportServer doBind(URI uri) throws IOException { String node = uri.getHost(); - if( servers.containsKey(node) ) { - throw new IOException("Server allready bound: "+node); + if (servers.containsKey(node)) { + throw new IOException("Server allready bound: " + node); } PipeTransportServer server = new PipeTransportServer(); server.setConnectURI(uri); @@ -189,7 +236,7 @@ servers.put(node, server); return server; } - + private synchronized void unbind(PipeTransportServer server) { servers.remove(server.getName()); } @@ -197,9 +244,9 @@ @Override public synchronized Transport doCompositeConnect(URI location) throws Exception { String name = location.getHost(); - PipeTransportServer server = servers.get(name ); - if( server==null ) { - throw new IOException("Server is not bound: "+name); + PipeTransportServer server = servers.get(name); + if (server == null) { + throw new IOException("Server is not bound: " + name); } return server.connect(); }
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=744939&r1=744938&r2=744939&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java Mon Feb 16 15:32:50 2009 @@ -5,16 +5,26 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; import org.apache.activemq.flow.Commands.Destination; +import org.apache.activemq.flow.Commands.FlowControl; import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.wireformat.StatefulWireFormat; import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormatFactory; public class ProtoWireFormatFactory implements WireFormatFactory { - static public class TestWireFormat implements WireFormat { - + public class TestWireFormat implements StatefulWireFormat { + private ByteBuffer currentOut; + private byte outType; + + private ByteBuffer currentIn; + private byte inType; + public void marshal(Object value, DataOutput out) throws IOException { if( value.getClass() == Message.class ) { out.writeByte(0); @@ -25,6 +35,9 @@ } else if( value.getClass() == Destination.class ) { out.writeByte(2); ((Destination)value).writeFramed((OutputStream)out); + }else if( value.getClass() == FlowControl.class ) { + out.writeByte(3); + ((FlowControl)value).writeFramed((OutputStream)out); } else { throw new IOException("Unsupported type: "+value.getClass()); } @@ -43,11 +56,160 @@ Destination d = new Destination(); d.mergeFramed((InputStream)in); return d; + case 3: + FlowControl fc = new FlowControl(); + fc.mergeFramed((InputStream)in); + return fc; default: throw new IOException("Unknonw type byte: "); } } + public boolean marshal(Object value, ByteBuffer target) throws IOException + { + if(currentOut == null) + { + //Ensure room for type byte and length byte: + if(target.remaining() < 5) + { + return false; + } + + if( value.getClass() == Message.class ) { + + currentOut = ByteBuffer.wrap(((Message)value).getProto().toFramedByteArray()); + outType = 0; + } else if( value.getClass() == String.class ) { + outType = 1; + try { + currentOut = ByteBuffer.wrap(((String)value).getBytes("utf-8")); + } catch (UnsupportedEncodingException e) { + //Shouldn't happen. + throw IOExceptionSupport.create(e); + } + } else if( value.getClass() == Destination.class ) { + outType = 2; + currentOut = ByteBuffer.wrap(((Destination)value).toFramedByteArray()); + }else if( value.getClass() == FlowControl.class ) { + outType = 3; + currentOut = ByteBuffer.wrap(((FlowControl)value).toFramedByteArray()); + }else { + throw new IOException("Unsupported type: "+value.getClass()); + } + + //Write type: + target.put(outType); + //Write length: + target.putInt(currentOut.remaining()); + if(currentOut.remaining() > 1024*1024) + { + throw new IOException("Packet exceeded max memory size!"); + } + } + + //Avoid overflow: + if(currentOut.remaining() > target.remaining()) + { + int limit = currentOut.limit(); + currentOut.limit(currentOut.position() + target.remaining()); + target.put(currentOut); + currentOut.limit(limit); + } + else + { + target.put(currentOut); + } + + if(!currentOut.hasRemaining()) + { + currentOut = null; + return true; + } + return false; + } + + /** + * Unmarshals an object. When the object is read it is returned. + * @param source + * @return The object when unmarshalled, null otherwise + */ + public Object unMarshal(ByteBuffer source) throws IOException + { + if(currentIn == null) + { + if(source.remaining() < 5) + { + return null; + } + + inType = source.get(); + int length = source.getInt(); + if(length > 1024*1024) + { + throw new IOException("Packet exceeded max memory size!"); + } + currentIn = ByteBuffer.wrap(new byte[length]); + + } + + if(!source.hasRemaining()) + { + return null; + } + + if(source.remaining() > currentIn.remaining()) + { + int limit = source.limit(); + source.limit(source.position() + currentIn.remaining()); + currentIn.put(source); + source.limit(limit); + } + else + { + currentIn.put(source); + } + + //If we haven't finished the packet return to get more data: + if(currentIn.hasRemaining()) + { + return null; + } + + Object ret = null; + switch(inType) { + case 0: + Commands.Message m = new Commands.Message(); + try + { + m.mergeFramed(currentIn.array()); + } + catch(Exception e) + { + e.printStackTrace(); + } + ret = new Message(m); + break; + case 1: + ret = new String(currentIn.array(), "utf-8"); + break; + case 2: + Destination d = new Destination(); + d.mergeFramed(currentIn.array()); + ret = d; + break; + case 3: + FlowControl c = new FlowControl(); + c.mergeFramed(currentIn.array()); + ret = c; + break; + default: + throw new IOException("Unknown type byte: " + inType); + } + + currentIn = null; + return ret; + } + public int getVersion() { return 0; } Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=744939&r1=744938&r2=744939&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java Mon Feb 16 15:32:50 2009 @@ -7,36 +7,42 @@ import org.apache.activemq.dispatch.IDispatcher; import org.apache.activemq.flow.Commands.Destination; +import org.apache.activemq.flow.Commands.FlowControl; import org.apache.activemq.flow.ISinkController.FlowControllable; import org.apache.activemq.flow.MockBroker.DeliveryTarget; +import org.apache.activemq.queue.SingleFlowRelay; +import org.apache.activemq.transport.DispatchableTransport; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; public class RemoteConnection implements TransportListener, DeliveryTarget { - protected Transport transport; protected MockBroker broker; protected final Object inboundMutex = new Object(); - protected FlowController<Message> inboundController; + protected IFlowController<Message> inboundController; + + protected SingleFlowRelay<Message> outputQueue; + protected IFlowController<Message> outboundController; + protected ProtocolLimiter<Message> outboundLimiter; + protected Flow ouboundFlow; - protected final Object outboundMutex = new Object(); - protected IFlowSink<Message> outboundController; protected String name; private int priorityLevels; private final int outputWindowSize = 1000; - private final int outputResumeThreshold = 500; + private final int outputResumeThreshold = 900; private final int inputWindowSize = 1000; private final int inputResumeThreshold = 900; private IDispatcher dispatcher; - private ExecutorService writer; - private final AtomicBoolean stopping = new AtomicBoolean(); + protected Flow outputFlow; + protected boolean blockingTransport = false; + ExecutorService blockingWriter; public void setBroker(MockBroker broker) { this.broker = broker; @@ -53,51 +59,58 @@ public void stop() throws Exception { stopping.set(true); - writer.shutdown(); if (transport != null) { transport.stop(); } + if (blockingWriter != null) { + blockingWriter.shutdown(); + } } public void onCommand(Object command) { try { + // System.out.println("Got Command: " + command); // First command in should be the name of the connection - if( name==null ) { + if (name == null) { name = (String) command; initialize(); } else if (command.getClass() == Message.class) { Message msg = (Message) command; - // Use the flow controller to send the message on so that we do - // not overflow - // the broker. - while (!inboundController.offer(msg, null)) { - inboundController.waitForFlowUnblock(); - } + inboundController.add(msg, null); } else if (command.getClass() == Destination.class) { // This is a subscription request Destination destination = (Destination) command; + broker.subscribe(destination, this); + } else if (command.getClass() == FlowControl.class) { + // This is a subscription request + FlowControl fc = (FlowControl) command; + synchronized (outputQueue) { + outboundLimiter.onProtocolMessage(fc); + } + } else { + onException(new Exception("Unrecognized command: " + command)); } } catch (Exception e) { onException(e); } } - private void initialize() { + protected void initialize() { // Setup the input processing.. - SizeLimiter<Message> limiter = new SizeLimiter<Message>(inputWindowSize, inputResumeThreshold); - Flow flow = new Flow(name + "-inbound", false); + Flow flow = new Flow(name, false); + WindowLimiter<Message> limiter = new WindowLimiter<Message>(false, flow, inputWindowSize, inputResumeThreshold); + inboundController = new FlowController<Message>(new FlowControllable<Message>() { public void flowElemAccepted(ISourceController<Message> controller, Message elem) { - broker.router.route(controller, elem); - inboundController.elementDispatched(elem); + messageReceived(controller, elem); } @Override public String toString() { return name; } - + public IFlowSink<Message> getFlowSink() { return null; } @@ -107,16 +120,69 @@ } }, flow, limiter, inboundMutex); - // Setup output processing - writer = Executors.newSingleThreadExecutor(); - FlowControllable<Message> controllable = new FlowControllable<Message>(){ - public void flowElemAccepted(final ISourceController<Message> controller, final Message elem) { - writer.execute(new Runnable() { + ouboundFlow = new Flow(name, false); + outboundLimiter = new WindowLimiter<Message>(true, ouboundFlow, outputWindowSize, outputResumeThreshold); + outputQueue = new SingleFlowRelay<Message>(ouboundFlow, name + "-outbound", outboundLimiter); + outboundController = outputQueue.getFlowController(ouboundFlow); + + if (transport instanceof DispatchableTransport) { + outputQueue.setDrain(new IFlowDrain<Message>() { + + public void drain(Message message, ISourceController<Message> controller) { + write(message); + } + }); + + } else { + blockingTransport = true; + blockingWriter = Executors.newSingleThreadExecutor(); + outputQueue.setDrain(new IFlowDrain<Message>() { + public void drain(final Message message, ISourceController<Message> controller) { + write(message); + }; + }); + /* + * // Setup output processing final Executor writer = + * Executors.newSingleThreadExecutor(); FlowControllable<Message> + * controllable = new FlowControllable<Message>() { public void + * flowElemAccepted( final ISourceController<Message> controller, + * final Message elem) { writer.execute(new Runnable() { public void + * run() { if (!stopping.get()) { try { transport.oneway(elem); + * controller.elementDispatched(elem); } catch (IOException e) { + * onException(e); } } } }); } + * + * public IFlowSink<Message> getFlowSink() { return null; } + * + * public IFlowSource<Message> getFlowSource() { return null; } }; + * + * if (priorityLevels <= 1) { outboundController = new + * FlowController<Message>(controllable, flow, limiter, + * outboundMutex); } else { PrioritySizeLimiter<Message> pl = new + * PrioritySizeLimiter<Message>( outputWindowSize, + * outputResumeThreshold, priorityLevels); + * pl.setPriorityMapper(Message.PRIORITY_MAPPER); outboundController + * = new PriorityFlowController<Message>( controllable, flow, pl, + * outboundMutex); } + */ + } + // outputQueue.setDispatcher(dispatcher); + + } + + private final void write(final Object o) { + synchronized (outputQueue) { + if (!blockingTransport) { + try { + transport.oneway(o); + } catch (IOException e) { + onException(e); + } + } else { + blockingWriter.execute(new Runnable() { public void run() { if (!stopping.get()) { try { - transport.oneway(elem); - controller.elementDispatched(elem); + transport.oneway(o); } catch (IOException e) { onException(e); } @@ -124,33 +190,21 @@ } }); } - public IFlowSink<Message> getFlowSink() { - return null; - } - public IFlowSource<Message> getFlowSource() { - return null; - } - }; - - flow = new Flow(name + "-outbound", false); - if (priorityLevels <= 1) { - limiter = new SizeLimiter<Message>(outputWindowSize, outputResumeThreshold); - outboundController = new FlowController<Message>(controllable, flow, limiter, outboundMutex); - } else { - PrioritySizeLimiter<Message> pl = new PrioritySizeLimiter<Message>(outputWindowSize, outputResumeThreshold, priorityLevels); - pl.setPriorityMapper(Message.PRIORITY_MAPPER); - outboundController = new PriorityFlowController<Message>(controllable, flow, pl, outboundMutex); } + } + protected void messageReceived(ISourceController<Message> controller, Message elem) { + broker.router.route(controller, elem); + inboundController.elementDispatched(elem); } public void onException(IOException error) { - onException((Exception)error); + onException((Exception) error); } public void onException(Exception error) { if (!stopping.get() && !broker.isStopping()) { - System.out.println("RemoteConnection error: "+error); + System.out.println("RemoteConnection error: " + error); error.printStackTrace(); } } @@ -179,6 +233,13 @@ public void setDispatcher(IDispatcher dispatcher) { this.dispatcher = dispatcher; + if (transport instanceof DispatchableTransport) { + DispatchableTransport dt = ((DispatchableTransport) transport); + if (name != null) { + dt.setName(name); + } + dt.setDispatcher(getDispatcher()); + } } public MockBroker getBroker() { @@ -202,11 +263,66 @@ } public IFlowSink<Message> getSink() { - return outboundController; + return outputQueue; } public boolean match(Message message) { return true; } + private interface ProtocolLimiter<E> extends IFlowLimiter<E> { + public void onProtocolMessage(FlowControl m); + } + + private class WindowLimiter<E> extends SizeLimiter<E> implements ProtocolLimiter<E> { + final Flow flow; + final boolean clientMode; + private int available; + + public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) { + super(capacity, resumeThreshold); + this.clientMode = clientMode; + this.flow = flow; + } + + public void reserve(E elem) { + super.reserve(elem); + if (!clientMode) { + // System.out.println(RemoteConnection.this.name + " Reserved " + // + this); + } + } + + public void releaseReserved(E elem) { + super.reserve(elem); + if (!clientMode) { + // System.out.println(RemoteConnection.this.name + + // " Released Reserved " + this); + } + } + + protected void remove(int size) { + super.remove(size); + if (!clientMode) { + available += size; + if (available >= capacity - resumeThreshold) { + FlowControl fc = new FlowControl(); + fc.setCredit(available); + write(fc); + // System.out.println(RemoteConnection.this.name + + // " Send Release " + available + this); + available = 0; + } + } + } + + public void onProtocolMessage(FlowControl m) { + remove(m.getCredit()); + } + + public int getElementSize(Message m) { + return m.getFlowLimiterSize(); + } + } + } Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java?rev=744939&r1=744938&r2=744939&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java Mon Feb 16 15:32:50 2009 @@ -2,23 +2,20 @@ import java.io.IOException; import java.net.URI; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.flow.Commands.Destination; import org.apache.activemq.metric.MetricAggregator; import org.apache.activemq.metric.MetricCounter; +import org.apache.activemq.transport.DispatchableTransport; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; -import org.apache.activemq.transport.TransportListener; -public class RemoteConsumer implements TransportListener { +public class RemoteConsumer extends RemoteConnection{ - private final AtomicBoolean stopping = new AtomicBoolean(); private final MetricCounter consumerRate = new MetricCounter(); - private Transport transport; - private MockBroker broker; - private String name; private MetricAggregator totalConsumerRate; private long thinkTime; private Destination destination; @@ -30,6 +27,12 @@ URI uri = broker.getConnectURI(); transport = TransportFactory.compositeConnect(uri); + if(transport instanceof DispatchableTransport) + { + DispatchableTransport dt = ((DispatchableTransport)transport); + dt.setName(name); + dt.setDispatcher(getDispatcher()); + } transport.setTransportListener(this); transport.start(); @@ -37,58 +40,31 @@ transport.oneway(name); // Sending the destination acts as the subscribe. transport.oneway(destination); + super.initialize(); } - public void stop() throws Exception { - stopping.set(true); - if( transport!=null ) { - transport.stop(); - transport=null; - } - } - - public void onCommand(Object command) { - if( command.getClass() == Message.class ) { - - if (thinkTime > 0) { - try { - Thread.sleep(thinkTime); - } catch (InterruptedException e) { + protected void messageReceived(final ISourceController<Message> controller, final Message elem) { + if (thinkTime > 0) { + getDispatcher().schedule(new Runnable(){ + + public void run() { + consumerRate.increment(); + controller.elementDispatched(elem); } - } - consumerRate.increment(); + + }, thinkTime, TimeUnit.MILLISECONDS); - } else { - System.out.println("Unhandled command: "+command); - } - } - - public void onException(IOException error) { - if( !stopping.get() ) { - System.out.println("RemoteConsumer error: "+error); - error.printStackTrace(); } - } - - public void transportInterupted() { - } - public void transportResumed() { + else + { + consumerRate.increment(); + controller.elementDispatched(elem); + } } public void setName(String name) { this.name = name; } - public void setBroker(MockBroker broker) { - this.broker = broker; - } - - public MockBroker getBroker() { - return broker; - } - - public String getName() { - return name; - } public MetricAggregator getTotalConsumerRate() { return totalConsumerRate; Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=744939&r1=744938&r2=744939&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java Mon Feb 16 15:32:50 2009 @@ -5,22 +5,21 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.dispatch.IDispatcher.DispatchContext; +import org.apache.activemq.dispatch.IDispatcher.Dispatchable; import org.apache.activemq.flow.Commands.Destination; +import org.apache.activemq.flow.ISinkController.FlowUnblockListener; import org.apache.activemq.metric.MetricAggregator; import org.apache.activemq.metric.MetricCounter; +import org.apache.activemq.transport.DispatchableTransport; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportListener; -public class RemoteProducer implements TransportListener, Runnable { +public class RemoteProducer extends RemoteConnection implements Dispatchable, FlowUnblockListener<Message>{ - private final AtomicBoolean stopping = new AtomicBoolean(); private final MetricCounter rate = new MetricCounter(); - private Transport transport; - private MockBroker broker; - private String name; - private Thread thread; private AtomicLong messageIdGenerator; private int priority; private int priorityMod; @@ -29,6 +28,8 @@ private Destination destination; private String property; private MetricAggregator totalProducerRate; + Message next; + private DispatchContext dispatchContext; public void start() throws Exception { rate.name("Producer " + name + " Rate"); @@ -37,68 +38,67 @@ URI uri = broker.getConnectURI(); transport = TransportFactory.compositeConnect(uri); transport.setTransportListener(this); + if(transport instanceof DispatchableTransport) + { + DispatchableTransport dt = ((DispatchableTransport)transport); + dt.setName(name); + dt.setDispatcher(getDispatcher()); + } + super.setTransport(transport); + + super.initialize(); transport.start(); - // Let the remote side know our name. transport.oneway(name); - - thread = new Thread(this, name); - thread.start(); + dispatchContext = getDispatcher().register(this, name + "-producer"); + dispatchContext.requestDispatch(); } - public void stop() throws Exception { - stopping.set(true); - if( transport!=null ) { - transport.stop(); - } - thread.join(); - transport=null; - } - - public void run() { - try { - while( !stopping.get() ) { - - int priority = this.priority; - if (priorityMod > 0) { - priority = counter % priorityMod == 0 ? 0 : priority; - } - - Message next = new Message(messageIdGenerator.getAndIncrement(), producerId, name + ++counter, null, destination, priority); - if (property != null) { - next.setProperty(property); - } - - transport.oneway(next); - rate.increment(); - } - } catch (IOException e) { - onException(e); - } - } - - public void onCommand(Object command) { - System.out.println("Unhandled command: "+command); - } - - public void onException(IOException error) { - if( !stopping.get() ) { - System.out.println("RemoteProducer error: "+error); - error.printStackTrace(); - } - } - - public void transportInterupted() { - } - public void transportResumed() { - } - - public void setName(String name) { + public void stop() throws Exception + { + dispatchContext.close(); + super.stop(); + } + + public void onFlowUnblocked(ISinkController<Message> controller) { + dispatchContext.requestDispatch(); + } + + public boolean dispatch() { + while(true) + { + + if(next == null) + { + int priority = this.priority; + if (priorityMod > 0) { + priority = counter % priorityMod == 0 ? 0 : priority; + } + + next = new Message(messageIdGenerator.getAndIncrement(), producerId, name + ++counter, null, destination, priority); + if (property != null) { + next.setProperty(property); + } + } + + //If flow controlled stop until flow control is lifted. + if(outboundController.isSinkBlocked()) + { + if(outboundController.addUnblockListener(this)) + { + return true; + } + } + + getSink().add(next, null); + rate.increment(); + next = null; + } + } + + public void setName(String name) { this.name = name; } - public void setBroker(MockBroker broker) { - this.broker = broker; - } public AtomicLong getMessageIdGenerator() { return messageIdGenerator; @@ -148,14 +148,6 @@ this.property = property; } - public MockBroker getBroker() { - return broker; - } - - public String getName() { - return name; - } - public MetricAggregator getTotalProducerRate() { return totalProducerRate; } @@ -166,4 +158,6 @@ public MetricCounter getRate() { return rate; - }} + } +} +
