Author: chirino
Date: Thu Feb 12 16:48:44 2009
New Revision: 743802
URL: http://svn.apache.org/viewvc?rev=743802&view=rev
Log:
Getting closer to being able to test real IO
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalProducer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
Removed:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockConsumerConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockProducerConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockTransportConnection.java
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java?rev=743802&r1=743801&r2=743802&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java
Thu Feb 12 16:48:44 2009
@@ -16,12 +16,18 @@
*/
package org.apache.activemq.flow;
-public class Destination {
+import java.io.Serializable;
+public class Destination implements Serializable{
+
+ private static final long serialVersionUID = 4020372477810069873L;
+
String name;
+ boolean ptp;
- Destination(String name) {
+ Destination(String name, boolean ptp) {
this.name = name;
+ this.ptp=ptp;
}
public final String getName() {
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalConsumer.java?rev=743802&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalConsumer.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalConsumer.java
Thu Feb 12 16:48:44 2009
@@ -0,0 +1,109 @@
+/**
+ *
+ */
+package org.apache.activemq.flow;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.flow.AbstractTestConnection.ReadReadyListener;
+import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
+import org.apache.activemq.metric.MetricCounter;
+
+class LocalConsumer extends AbstractTestConnection implements
MockBrokerTest.DeliveryTarget {
+
+ /**
+ *
+ */
+ private final MockBrokerTest mockBrokerTest;
+ MetricCounter consumerRate = new MetricCounter();
+ private final Destination destination;
+ String selector;
+ private boolean autoRelease = false;
+
+ private long thinkTime = 0;
+
+ public LocalConsumer(MockBrokerTest mockBrokerTest, String name,
MockBroker broker, Destination destination) {
+ super(broker, name,
broker.getFlowManager().createFlow(destination.getName()), null);
+ this.mockBrokerTest = mockBrokerTest;
+ this.destination = destination;
+ output.setAutoRelease(autoRelease);
+ consumerRate.name("Consumer " + name + " Rate");
+ this.mockBrokerTest.totalConsumerRate.add(consumerRate);
+
+ }
+
+ public Destination getDestination() {
+ return destination;
+ }
+
+ public String getSelector() {
+ return selector;
+ }
+
+ public void setThinkTime(long time) {
+ thinkTime = time;
+ }
+
+ @Override
+ protected synchronized Message getNextMessage() throws
InterruptedException {
+ wait();
+ return null;
+ }
+
+ @Override
+ protected void addReadReadyListener(final ReadReadyListener listener) {
+ return;
+ }
+
+ public Message pollNextMessage() {
+ return null;
+ }
+
+ @Override
+ protected void messageReceived(Message m, ISourceController<Message>
controller) {
+ }
+
+ @Override
+ protected void write(final Message m, final ISourceController<Message>
controller) throws InterruptedException {
+ if (!m.isSystem()) {
+ // /IF we are async don't tie up the calling thread
+ // schedule dispatch complete for later.
+ if (this.mockBrokerTest.dispatchMode == ASYNC && thinkTime > 0) {
+ Runnable acker = new Runnable() {
+ public void run() {
+ if (thinkTime > 0) {
+ try {
+ Thread.sleep(thinkTime);
+ } catch (InterruptedException e) {
+ }
+ }
+ simulateEncodingWork();
+ if (!autoRelease) {
+ controller.elementDispatched(m);
+ }
+ consumerRate.increment();
+ }
+ };
+
+ broker.dispatcher.schedule(acker, thinkTime,
TimeUnit.MILLISECONDS);
+ } else {
+ simulateEncodingWork();
+ if (!autoRelease) {
+ controller.elementDispatched(m);
+ }
+ consumerRate.increment();
+ }
+ }
+ }
+
+ public IFlowSink<Message> getSink() {
+ return output;
+ }
+
+ public boolean match(Message message) {
+ if (selector == null)
+ return true;
+ return message.match(selector);
+ }
+
+}
\ No newline at end of file
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalProducer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalProducer.java?rev=743802&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalProducer.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalProducer.java
Thu Feb 12 16:48:44 2009
@@ -0,0 +1,113 @@
+/**
+ *
+ */
+package org.apache.activemq.flow;
+
+import org.apache.activemq.flow.AbstractTestConnection.ReadReadyListener;
+import org.apache.activemq.metric.MetricCounter;
+
+class LocalProducer extends AbstractTestConnection {
+
+ /**
+ *
+ */
+ private final MockBrokerTest mockBrokerTest;
+
+ MetricCounter producerRate = new MetricCounter();
+
+ private final Destination destination;
+ private int msgCounter;
+ private String name;
+ String property;
+ private Message next;
+ int msgPriority = 0;
+ int priorityMod = 0;
+ int producerId;
+
+ public LocalProducer(MockBrokerTest mockBrokerTest, String name,
MockBroker broker, Destination destination) {
+
+ super(broker, name, broker.getFlowManager().createFlow(name), null);
+ this.mockBrokerTest = mockBrokerTest;
+ this.destination = destination;
+ this.producerId =
this.mockBrokerTest.prodcuerIdGenerator.getAndIncrement();
+
+ producerRate.name("Producer " + name + " Rate");
+ this.mockBrokerTest.totalProducerRate.add(producerRate);
+
+ }
+
+ /*
+ * Gets the next message blocking until space is available for it.
+ * (non-Javadoc)
+ *
+ * @see com.progress.flow.AbstractTestConnection#getNextMessage()
+ */
+ public Message getNextMessage() throws InterruptedException {
+
+ Message m = new
Message(this.mockBrokerTest.msgIdGenerator.getAndIncrement(), producerId, name
+ ++msgCounter, flow, destination, msgPriority);
+ if (property != null) {
+ m.setProperty(property);
+ }
+ simulateEncodingWork();
+ input.getFlowController(m.getFlow()).waitForFlowUnblock();
+ return m;
+ }
+
+ @Override
+ protected void addReadReadyListener(final ReadReadyListener listener) {
+ if (next == null) {
+ next = new
Message(this.mockBrokerTest.msgIdGenerator.getAndIncrement(), producerId, name
+ ++msgCounter, flow, destination, msgPriority);
+ if (property != null) {
+ next.setProperty(property);
+ }
+ simulateEncodingWork();
+ }
+
+ if (!input.getFlowController(next.getFlow()).addUnblockListener(new
ISinkController.FlowUnblockListener<Message>() {
+ public void onFlowUnblocked(ISinkController<Message> controller) {
+ listener.onReadReady();
+ }
+ })) {
+ // Return value of false means that the controller didn't
+ // register the listener because it was not blocked.
+ listener.onReadReady();
+ }
+ }
+
+ @Override
+ public final Message pollNextMessage() {
+ if (next == null) {
+ int priority = msgPriority;
+ if (priorityMod > 0) {
+ priority = msgCounter % priorityMod == 0 ? 0 : msgPriority;
+ }
+
+ next = new
Message(this.mockBrokerTest.msgIdGenerator.getAndIncrement(), producerId, name
+ ++msgCounter, flow, destination, priority);
+ if (property != null) {
+ next.setProperty(property);
+ }
+ simulateEncodingWork();
+ }
+
+ if (input.getFlowController(next.getFlow()).isSinkBlocked()) {
+ return null;
+ }
+
+ Message m = next;
+ next = null;
+ return m;
+ }
+
+ @Override
+ public void messageReceived(Message m, ISourceController<Message>
controller) {
+
+ broker.router.route(controller, m);
+ producerRate.increment();
+ }
+
+ @Override
+ public void write(Message m, ISourceController<Message> controller) {
+ // Noop
+ }
+
+}
\ No newline at end of file
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java?rev=743802&r1=743801&r2=743802&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
Thu Feb 12 16:48:44 2009
@@ -16,12 +16,13 @@
*/
package org.apache.activemq.flow;
+import java.io.Serializable;
import java.util.HashSet;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.queue.Mapper;
-public class Message {
+public class Message implements Serializable {
public static final Mapper<Integer, Message> PRIORITY_MAPPER = new
Mapper<Integer, Message>() {
public Integer map(Message element) {
@@ -39,7 +40,7 @@
public static final short TYPE_FLOW_CLOSE = 3;
final String msg;
- final Flow flow;
+ transient final Flow flow;
final Destination dest;
int hopCount;
HashSet<String> matchProps;
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=743802&r1=743801&r2=743802&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
Thu Feb 12 16:48:44 2009
@@ -11,6 +11,7 @@
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.flow.MockBrokerTest.BrokerConnection;
+import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
import org.apache.activemq.flow.MockBrokerTest.Router;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
@@ -22,9 +23,9 @@
private final MockBrokerTest mockBrokerTest;
private final TestFlowManager flowMgr;
- final ArrayList<MockTransportConnection> connections = new
ArrayList<MockTransportConnection>();
- final ArrayList<MockProducerConnection> producers = new
ArrayList<MockProducerConnection>();
- final ArrayList<MockConsumerConnection> consumers = new
ArrayList<MockConsumerConnection>();
+ final ArrayList<RemoteConnection> connections = new
ArrayList<RemoteConnection>();
+ final ArrayList<LocalProducer> producers = new ArrayList<LocalProducer>();
+ final ArrayList<LocalConsumer> consumers = new ArrayList<LocalConsumer>();
private final ArrayList<BrokerConnection> brokerConns = new
ArrayList<BrokerConnection>();
private final HashMap<Destination, MockQueue> queues = new
HashMap<Destination, MockQueue>();
@@ -38,7 +39,7 @@
public final int priorityLevels = MockBrokerTest.PRIORITY_LEVELS;
public final int ioWorkAmount = MockBrokerTest.IO_WORK_AMOUNT;
public final boolean useInputQueues = MockBrokerTest.USE_INPUT_QUEUES;
- private TransportServer transportServer;
+ public TransportServer transportServer;
MockBroker(MockBrokerTest mockBrokerTest, String name) throws IOException,
URISyntaxException {
this.mockBrokerTest = mockBrokerTest;
@@ -58,23 +59,26 @@
}
public void createProducerConnection(Destination destination) {
- MockProducerConnection c = new
MockProducerConnection(this.mockBrokerTest, "producer" + ++pCount, this,
destination);
+ LocalProducer c = new LocalProducer(this.mockBrokerTest, "producer" +
++pCount, this, destination);
producers.add(c);
}
- public void createConsumerConnection(Destination destination, boolean ptp)
{
- MockConsumerConnection c = new
MockConsumerConnection(this.mockBrokerTest, "consumer" + ++cCount, this,
destination);
+ public void createConsumerConnection(Destination destination) {
+ LocalConsumer c = new LocalConsumer(this.mockBrokerTest, "consumer" +
++cCount, this, destination);
consumers.add(c);
- if (ptp) {
- queues.get(destination).addConsumer(c);
+ subscribe(destination, c);
+ }
+
+ public void subscribe(Destination destination, DeliveryTarget
deliveryTarget) {
+ if (destination.ptp) {
+ queues.get(destination).addConsumer(deliveryTarget);
} else {
- router.bind(c, destination);
+ router.bind(deliveryTarget, destination);
}
-
}
public void createClusterConnection(Destination destination) {
- MockConsumerConnection c = new
MockConsumerConnection(this.mockBrokerTest, "consumer" + ++cCount, this,
destination);
+ LocalConsumer c = new LocalConsumer(this.mockBrokerTest, "consumer" +
++cCount, this, destination);
consumers.add(c);
router.bind(c, destination);
}
@@ -100,13 +104,13 @@
final void stopServices() throws Exception {
transportServer.stop();
- for (MockTransportConnection connection : connections) {
+ for (RemoteConnection connection : connections) {
connection.stop();
}
- for (MockProducerConnection connection : producers) {
+ for (LocalProducer connection : producers) {
connection.stop();
}
- for (MockConsumerConnection connection : consumers) {
+ for (LocalConsumer connection : consumers) {
connection.stop();
}
for (BrokerConnection connection : brokerConns) {
@@ -127,7 +131,7 @@
dispatcher.start();
- for (MockConsumerConnection connection : consumers) {
+ for (LocalConsumer connection : consumers) {
connection.start();
}
@@ -135,7 +139,7 @@
queue.start();
}
- for (MockProducerConnection connection : producers) {
+ for (LocalProducer connection : producers) {
connection.start();
}
@@ -145,7 +149,8 @@
}
public void onAccept(final Transport transport) {
- MockTransportConnection connection = new MockTransportConnection();
+ RemoteConnection connection = new RemoteConnection();
+ connection.setBroker(this);
connection.setTransport(transport);
try {
connection.start();
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=743802&r1=743801&r2=743802&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
Thu Feb 12 16:48:44 2009
@@ -50,7 +50,7 @@
private boolean multibroker = false;
// Set to mockup up ptp:
- private boolean ptp = true;
+ boolean ptp = true;
// Can be set to BLOCKING, POLLING or ASYNC
final int dispatchMode = AbstractTestConnection.ASYNC;
@@ -195,7 +195,7 @@
consumerCount = 1;
createConnections();
- MockProducerConnection producer = sendBroker.producers.get(0);
+ LocalProducer producer = sendBroker.producers.get(0);
producer.msgPriority = 1;
producer.producerRate.setName("High Priority Producer Rate");
@@ -233,7 +233,7 @@
consumerCount = 1;
createConnections();
- MockProducerConnection producer = sendBroker.producers.get(0);
+ LocalProducer producer = sendBroker.producers.get(0);
producer.msgPriority = 1;
producer.priorityMod = 3;
producer.producerRate.setName("High Priority Producer Rate");
@@ -421,7 +421,7 @@
Destination[] dests = new Destination[destCount];
for (int i = 0; i < destCount; i++) {
- dests[i] = new Destination("dest" + (i + 1));
+ dests[i] = new Destination("dest" + (i + 1), ptp);
if (ptp) {
sendBroker.createQueue(dests[i]);
if (multibroker) {
@@ -434,7 +434,7 @@
sendBroker.createProducerConnection(dests[i % destCount]);
}
for (int i = 0; i < consumerCount; i++) {
- rcvBroker.createConsumerConnection(dests[i % destCount], ptp);
+ rcvBroker.createConsumerConnection(dests[i % destCount]);
}
// Create MultiBroker connections:
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=743802&r1=743801&r2=743802&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
Thu Feb 12 16:48:44 2009
@@ -15,7 +15,7 @@
class MockQueue implements MockBrokerTest.DeliveryTarget {
private final MockBrokerTest mockBrokerTest;
- HashMap<MockConsumerConnection, Subscription<Message>> subs = new
HashMap<MockConsumerConnection, Subscription<Message>>();
+ HashMap<DeliveryTarget, Subscription<Message>> subs = new
HashMap<DeliveryTarget, Subscription<Message>>();
private final Destination destination;
private final IQueue<Long, Message> queue;
private final MockBroker broker;
@@ -77,7 +77,7 @@
return destination;
}
- public final void addConsumer(final MockConsumerConnection dt) {
+ public final void addConsumer(final DeliveryTarget dt) {
Subscription<Message> sub = new Subscription<Message>() {
public boolean isPreAcquired() {
return true;
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=743802&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
Thu Feb 12 16:48:44 2009
@@ -0,0 +1,204 @@
+package org.apache.activemq.flow;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
+import org.apache.activemq.queue.ExclusivePriorityQueue;
+import org.apache.activemq.queue.ExclusiveQueue;
+import org.apache.activemq.queue.IFlowQueue;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+
+public class RemoteConnection implements TransportListener, DeliveryTarget {
+
+ protected final Object mutex = new Object();
+
+ protected Transport transport;
+ protected MockBroker broker;
+ protected IFlowQueue<Message> output;
+
+ protected FlowController<Message> inboundController;
+ protected String name;
+
+ private int priorityLevels;
+
+ private final int outputWindowSize = 1000;
+ private final int outputResumeThreshold = 500;
+
+ private final int inputWindowSize = 1000;
+ private final int inputResumeThreshold = 900;
+
+ private IDispatcher dispatcher;
+ private ExecutorService writer;
+
+ private final AtomicBoolean stopping = new AtomicBoolean();
+
+ public void setBroker(MockBroker broker) {
+ this.broker = broker;
+
+ }
+
+ public void setTransport(Transport transport) {
+ this.transport = transport;
+ }
+
+ public void start() throws Exception {
+
+ // Setup the input processing..
+ SizeLimiter<Message> limiter = new
SizeLimiter<Message>(inputWindowSize, inputResumeThreshold);
+ Flow flow = new Flow(name + "-inbound", false);
+ inboundController = new FlowController<Message>(new
FlowControllable<Message>() {
+ public void flowElemAccepted(ISourceController<Message>
controller, Message elem) {
+ broker.router.route(controller, elem);
+ }
+
+ public IFlowSink<Message> getFlowSink() {
+ return null;
+ }
+
+ public IFlowSource<Message> getFlowSource() {
+ return null;
+ }
+ }, flow, limiter, mutex);
+
+ // Setup output processing
+ if (priorityLevels <= 1) {
+ limiter = new SizeLimiter<Message>(outputWindowSize,
outputResumeThreshold);
+ flow = new Flow(name + "-outbound", false);
+ ExclusiveQueue<Message> queue = new ExclusiveQueue<Message>(flow,
flow.getFlowName(), limiter);
+ this.output = queue;
+ } else {
+ ExclusivePriorityQueue<Message> t = new
ExclusivePriorityQueue<Message>(broker.priorityLevels, flow, name +
"-outbound", outputWindowSize, outputResumeThreshold);
+ t.setPriorityMapper(Message.PRIORITY_MAPPER);
+ this.output = t;
+ }
+
+ // Use an async thread to drain the output queue.
+ // Personally I think it would be better if we polled messages out of
the output queue.
+ writer = Executors.newSingleThreadExecutor();
+ output.setDispatcher(dispatcher);
+ output.setDrain(new IFlowDrain<Message>() {
+ public void drain(final Message elem, final
ISourceController<Message> controller) {
+ writer.execute(new Runnable() {
+ public void run() {
+ if (!stopping.get()) {
+ try {
+ transport.oneway(elem);
+ controller.elementDispatched(elem);
+ } catch (IOException e) {
+ onException(e);
+ }
+ }
+ }
+ });
+ }
+ });
+
+ transport.setTransportListener(this);
+ transport.start();
+ }
+
+ public void stop() throws Exception {
+ stopping.set(true);
+ writer.shutdown();
+ if (transport != null) {
+ transport.stop();
+ }
+ }
+
+ public void onCommand(Object command) {
+ try {
+ if (command.getClass() == Message.class) {
+ Message msg = (Message) command;
+ // Use the flow controller to send the message on so that we do
+ // not overflow
+ // the broker.
+ while (!inboundController.offer(msg, null)) {
+ inboundController.waitForFlowUnblock();
+ }
+ } else if (command.getClass() == Destination.class) {
+ // This is a subscription request
+ Destination destination = (Destination) command;
+ broker.subscribe(destination, this);
+ }
+ } catch (Exception e) {
+ onException(e);
+ }
+ }
+
+ public void onException(IOException error) {
+ if (!stopping.get()) {
+ error.printStackTrace();
+ }
+ }
+
+ public void onException(Exception error) {
+ if (!stopping.get()) {
+ error.printStackTrace();
+ }
+ }
+
+ public void transportInterupted() {
+ }
+
+ public void transportResumed() {
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getPriorityLevels() {
+ return priorityLevels;
+ }
+
+ public void setPriorityLevels(int priorityLevels) {
+ this.priorityLevels = priorityLevels;
+ }
+
+ public IDispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setDispatcher(IDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public MockBroker getBroker() {
+ return broker;
+ }
+
+ public int getOutputWindowSize() {
+ return outputWindowSize;
+ }
+
+ public int getOutputResumeThreshold() {
+ return outputResumeThreshold;
+ }
+
+ public int getInputWindowSize() {
+ return inputWindowSize;
+ }
+
+ public int getInputResumeThreshold() {
+ return inputResumeThreshold;
+ }
+
+ public IFlowSink<Message> getSink() {
+ return output;
+ }
+
+ public boolean match(Message message) {
+ return true;
+ }
+
+}
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java?rev=743802&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
Thu Feb 12 16:48:44 2009
@@ -0,0 +1,111 @@
+package org.apache.activemq.flow;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+
+public class RemoteConsumer implements TransportListener {
+
+ private final AtomicBoolean stopping = new AtomicBoolean();
+ private final MetricCounter consumerRate = new MetricCounter();
+
+ private Transport transport;
+ private MockBroker broker;
+ private String name;
+ private MetricAggregator totalConsumerRate;
+ private long thinkTime;
+ private Destination destination;
+
+ public void start() throws Exception {
+ consumerRate.name("Consumer " + name + " Rate");
+ totalConsumerRate.add(consumerRate);
+
+ URI uri = broker.transportServer.getConnectURI();
+ transport = TransportFactory.connect(uri);
+ transport.setTransportListener(this);
+ transport.start();
+
+ // Sending the destination acts as the subscribe.
+ transport.oneway(destination);
+ }
+
+ public void stop() throws Exception {
+ stopping.set(true);
+ if( transport!=null ) {
+ transport.stop();
+ transport=null;
+ }
+ }
+
+ public void onCommand(Object command) {
+ if( command.getClass() == Message.class ) {
+
+ if (thinkTime > 0) {
+ try {
+ Thread.sleep(thinkTime);
+ } catch (InterruptedException e) {
+ }
+ }
+ consumerRate.increment();
+
+ } else {
+ System.out.println("Unhandled command: "+command);
+ }
+ }
+
+ public void onException(IOException error) {
+ if( !stopping.get() ) {
+ error.printStackTrace();
+ }
+ }
+
+ public void transportInterupted() {
+ }
+ public void transportResumed() {
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+ public void setBroker(MockBroker broker) {
+ this.broker = broker;
+ }
+
+ public Transport getTransport() {
+ return transport;
+ }
+
+ public void setTransport(Transport transport) {
+ this.transport = transport;
+ }
+
+
+ public MockBroker getBroker() {
+ return broker;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public MetricAggregator getTotalConsumerRate() {
+ return totalConsumerRate;
+ }
+
+ public void setTotalConsumerRate(MetricAggregator totalProducerRate) {
+ this.totalConsumerRate = totalProducerRate;
+ }
+
+ public Destination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(Destination destination) {
+ this.destination = destination;
+ }}
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=743802&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
Thu Feb 12 16:48:44 2009
@@ -0,0 +1,175 @@
+package org.apache.activemq.flow;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+
+public class RemoteProducer implements TransportListener, Runnable {
+
+ private final AtomicBoolean stopping = new AtomicBoolean();
+ private final MetricCounter producerRate = new MetricCounter();
+
+ private Transport transport;
+ private MockBroker broker;
+ private String name;
+ private Thread thread;
+ private AtomicLong messageIdGenerator;
+ private int priority;
+ private int priorityMod;
+ private int counter;
+ private int producerId;
+ private Destination destination;
+ private String property;
+ private MetricAggregator totalProducerRate;
+
+ public void start() throws Exception {
+ producerRate.name("Producer " + name + " Rate");
+ totalProducerRate.add(producerRate);
+
+ URI uri = broker.transportServer.getConnectURI();
+ transport = TransportFactory.connect(uri);
+ transport.setTransportListener(this);
+ transport.start();
+
+ thread = new Thread(this, name);
+ thread.start();
+ }
+
+ public void stop() throws Exception {
+ stopping.set(true);
+ if( transport!=null ) {
+ transport.stop();
+ transport=null;
+ }
+ thread.join();
+ }
+
+ public void run() {
+ try {
+ while( !stopping.get() ) {
+
+ int priority = this.priority;
+ if (priorityMod > 0) {
+ priority = counter % priorityMod == 0 ? 0 : priority;
+ }
+
+ Message next = new
Message(messageIdGenerator.getAndIncrement(), producerId, name + ++counter,
null, destination, priority);
+ if (property != null) {
+ next.setProperty(property);
+ }
+
+ transport.oneway(next);
+ }
+ } catch (IOException e) {
+ onException(e);
+ }
+ }
+
+ public void onCommand(Object command) {
+ System.out.println("Unhandled command: "+command);
+ }
+
+ public void onException(IOException error) {
+ if( !stopping.get() ) {
+ error.printStackTrace();
+ }
+ }
+
+ public void transportInterupted() {
+ }
+ public void transportResumed() {
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+ public void setBroker(MockBroker broker) {
+ this.broker = broker;
+ }
+
+ public Transport getTransport() {
+ return transport;
+ }
+
+ public void setTransport(Transport transport) {
+ this.transport = transport;
+ }
+
+ public AtomicLong getMessageIdGenerator() {
+ return messageIdGenerator;
+ }
+
+ public void setMessageIdGenerator(AtomicLong msgIdGenerator) {
+ this.messageIdGenerator = msgIdGenerator;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int msgPriority) {
+ this.priority = msgPriority;
+ }
+
+ public int getPriorityMod() {
+ return priorityMod;
+ }
+
+ public void setPriorityMod(int priorityMod) {
+ this.priorityMod = priorityMod;
+ }
+
+ public int getCounter() {
+ return counter;
+ }
+
+ public void setCounter(int msgCounter) {
+ this.counter = msgCounter;
+ }
+
+ public int getProducerId() {
+ return producerId;
+ }
+
+ public void setProducerId(int producerId) {
+ this.producerId = producerId;
+ }
+
+ public Destination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(Destination destination) {
+ this.destination = destination;
+ }
+
+ public String getProperty() {
+ return property;
+ }
+
+ public void setProperty(String property) {
+ this.property = property;
+ }
+
+ public MockBroker getBroker() {
+ return broker;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public MetricAggregator getTotalProducerRate() {
+ return totalProducerRate;
+ }
+
+ public void setTotalProducerRate(MetricAggregator totalProducerRate) {
+ this.totalProducerRate = totalProducerRate;
+ }}