Author: chirino
Date: Thu Feb 12 00:13:28 2009
New Revision: 743578
URL: http://svn.apache.org/viewvc?rev=743578&view=rev
Log:
Brekaing out the nested classes of the MockBrokerTest in preperation of hooking
in a version of the test the uses some real io.
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockConsumerConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockProducerConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockTransportConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
activemq/sandbox/activemq-flow/src/test/resources/
activemq/sandbox/activemq-flow/src/test/resources/META-INF/
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test
activemq/sandbox/activemq-flow/src/test/resources/log4j.properties (with
props)
Modified:
activemq/sandbox/activemq-flow/ (props changed)
activemq/sandbox/activemq-flow/pom.xml
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
Propchange: activemq/sandbox/activemq-flow/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Feb 12 00:13:28 2009
@@ -0,0 +1,8 @@
+.project
+.classpath
+.settings
+.wtpmodules
+*.iml
+junit*.properties
+eclipse-classes
+target
Modified: activemq/sandbox/activemq-flow/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/pom.xml?rev=743578&r1=743577&r2=743578&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/pom.xml (original)
+++ activemq/sandbox/activemq-flow/pom.xml Thu Feb 12 00:13:28 2009
@@ -1,21 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version
+ 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ applicable law or agreed to in writing, software distributed under
+ the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the
+ License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -24,29 +23,38 @@
<artifactId>activemq-parent</artifactId>
<version>5.3-SNAPSHOT</version>
</parent>
+
+ <groupId>org.apache.activemq.flow</groupId>
+ <artifactId>activemq-flow</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-SNAPSHOT</version>
+
+ <name>ActiveMQ :: Flow</name>
+
+ <dependencies>
- <groupId>org.apache.activemq.flow</groupId>
- <artifactId>activemq-flow</artifactId>
- <packaging>jar</packaging>
- <version>1.0-SNAPSHOT</version>
-
- <name>ActiveMQ :: Flow</name>
-
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- <dependency>
- <groupId>colt</groupId>
- <artifactId>colt</artifactId>
- <version>1.2.0</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-core</artifactId>
- </dependency>
- </dependencies>
-
+ <dependency>
+ <groupId>colt</groupId>
+ <artifactId>colt</artifactId>
+ <version>1.2.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
</project>
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=743578&r1=743577&r2=743578&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
Thu Feb 12 00:13:28 2009
@@ -31,7 +31,6 @@
import org.apache.activemq.flow.IFlowSource;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.SizeLimiter;
-import org.apache.activemq.flow.MockBrokerTest.MockBroker;
import org.apache.activemq.flow.IFlowResource.FlowLifeCycleListener;
import org.apache.activemq.flow.ISinkController.FlowControllable;
import org.apache.activemq.queue.ExclusivePriorityQueue;
@@ -42,6 +41,7 @@
import org.apache.activemq.queue.SingleFlowPriorityQueue;
public abstract class AbstractTestConnection implements Service {
+
protected final IFlowQueue<Message> output;
protected final NetworkSource input;
protected final MockBroker broker;
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=743578&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
Thu Feb 12 00:13:28 2009
@@ -0,0 +1,160 @@
+/**
+ *
+ */
+package org.apache.activemq.flow;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.MockBrokerTest.BrokerConnection;
+import org.apache.activemq.flow.MockBrokerTest.Router;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportServer;
+
+class MockBroker implements TransportAcceptListener {
+
+ private final MockBrokerTest mockBrokerTest;
+ private final TestFlowManager flowMgr;
+
+ final ArrayList<MockTransportConnection> connections = new
ArrayList<MockTransportConnection>();
+ final ArrayList<MockProducerConnection> producers = new
ArrayList<MockProducerConnection>();
+ final ArrayList<MockConsumerConnection> consumers = new
ArrayList<MockConsumerConnection>();
+ private final ArrayList<BrokerConnection> brokerConns = new
ArrayList<BrokerConnection>();
+
+ private final HashMap<Destination, MockQueue> queues = new
HashMap<Destination, MockQueue>();
+ final Router router;
+ private int pCount;
+ private int cCount;
+ private final String name;
+ public final int dispatchMode;
+
+ public final IDispatcher dispatcher;
+ public final int priorityLevels = MockBrokerTest.PRIORITY_LEVELS;
+ public final int ioWorkAmount = MockBrokerTest.IO_WORK_AMOUNT;
+ public final boolean useInputQueues = MockBrokerTest.USE_INPUT_QUEUES;
+ private TransportServer transportServer;
+
+ MockBroker(MockBrokerTest mockBrokerTest, String name) throws IOException,
URISyntaxException {
+ this.mockBrokerTest = mockBrokerTest;
+ this.flowMgr = new TestFlowManager();
+ this.router = this.mockBrokerTest.new Router();
+ this.name = name;
+ this.dispatchMode = this.mockBrokerTest.dispatchMode;
+ this.dispatcher = this.mockBrokerTest.dispatcher;
+ }
+
+ TestFlowManager getFlowManager() {
+ return flowMgr;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void createProducerConnection(Destination destination) {
+ MockProducerConnection c = new
MockProducerConnection(this.mockBrokerTest, "producer" + ++pCount, this,
destination);
+ producers.add(c);
+ }
+
+ public void createConsumerConnection(Destination destination, boolean ptp)
{
+ MockConsumerConnection c = new
MockConsumerConnection(this.mockBrokerTest, "consumer" + ++cCount, this,
destination);
+ consumers.add(c);
+ if (ptp) {
+ queues.get(destination).addConsumer(c);
+ } else {
+ router.bind(c, destination);
+ }
+
+ }
+
+ public void createClusterConnection(Destination destination) {
+ MockConsumerConnection c = new
MockConsumerConnection(this.mockBrokerTest, "consumer" + ++cCount, this,
destination);
+ consumers.add(c);
+ router.bind(c, destination);
+ }
+
+ public void createQueue(Destination destination) {
+ MockQueue queue = new MockQueue(this.mockBrokerTest, this,
destination);
+ queues.put(destination, queue);
+ }
+
+ public void createBrokerConnection(MockBroker target, Pipe<Message> pipe) {
+ BrokerConnection bc = this.mockBrokerTest.new BrokerConnection(this,
target, pipe);
+ // Set up the pipe for polled access
+ if (dispatchMode != AbstractTestConnection.BLOCKING) {
+ pipe.setMode(Pipe.POLLING);
+ }
+ // Add subscriptions for the target's destinations:
+ for (Destination d : target.router.lookupTable.keySet()) {
+ router.bind(bc, d);
+ }
+ brokerConns.add(bc);
+ }
+
+ final void stopServices() throws Exception {
+ transportServer.stop();
+
+ for (MockTransportConnection connection : connections) {
+ connection.stop();
+ }
+ for (MockProducerConnection connection : producers) {
+ connection.stop();
+ }
+ for (MockConsumerConnection connection : consumers) {
+ connection.stop();
+ }
+ for (BrokerConnection connection : brokerConns) {
+ connection.stop();
+ }
+ for (MockQueue queue : queues.values()) {
+ queue.stop();
+ }
+ dispatcher.shutdown();
+
+ }
+
+ final void startServices() throws Exception {
+
+ transportServer = TransportFactory.bind(new
URI("tcp://localhost:61616?wireFormat=test"));
+ transportServer.setAcceptListener(this);
+ transportServer.start();
+
+ dispatcher.start();
+
+ for (MockConsumerConnection connection : consumers) {
+ connection.start();
+ }
+
+ for (MockQueue queue : queues.values()) {
+ queue.start();
+ }
+
+ for (MockProducerConnection connection : producers) {
+ connection.start();
+ }
+
+ for (BrokerConnection connection : brokerConns) {
+ connection.start();
+ }
+ }
+
+ public void onAccept(final Transport transport) {
+ MockTransportConnection connection = new MockTransportConnection();
+ connection.setTransport(transport);
+ try {
+ connection.start();
+ } catch (Exception e1) {
+ onAcceptError(e1);
+ }
+ }
+
+ public void onAcceptError(Exception error) {
+ error.printStackTrace();
+ }
+}
\ No newline at end of file
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=743578&r1=743577&r2=743578&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
Thu Feb 12 00:13:28 2009
@@ -16,10 +16,11 @@
*/
package org.apache.activemq.flow;
+import java.io.IOException;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -29,30 +30,21 @@
import org.apache.activemq.dispatch.PriorityPooledDispatcher;
import org.apache.activemq.flow.FlowController;
import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.ISinkController;
import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.flow.PrioritySizeLimiter;
-import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.metric.MetricAggregator;
-import org.apache.activemq.metric.MetricCounter;
import org.apache.activemq.metric.Period;
-import org.apache.activemq.queue.IQueue;
import org.apache.activemq.queue.Mapper;
-import org.apache.activemq.queue.PartitionedQueue;
-import org.apache.activemq.queue.SharedPriorityQueue;
-import org.apache.activemq.queue.SharedQueue;
-import org.apache.activemq.queue.Subscription;
public class MockBrokerTest extends TestCase {
private static final int PERFORMANCE_SAMPLES = 3;
- private static final int IO_WORK_AMOUNT = 0;
+ static final int IO_WORK_AMOUNT = 0;
private static final int FANIN_COUNT = 10;
private static final int FANOUT_COUNT = 10;
- private static final int PRIORITY_LEVELS = 10;
- private static final boolean USE_INPUT_QUEUES = true;
+ static final int PRIORITY_LEVELS = 10;
+ static final boolean USE_INPUT_QUEUES = true;
// Set to put senders and consumers on separate brokers.
private boolean multibroker = false;
@@ -61,10 +53,10 @@
private boolean ptp = true;
// Can be set to BLOCKING, POLLING or ASYNC
- private final int dispatchMode = AbstractTestConnection.ASYNC;
+ final int dispatchMode = AbstractTestConnection.ASYNC;
// Set's the number of threads to use:
private final int asyncThreadPoolSize =
Runtime.getRuntime().availableProcessors();
- private boolean usePartitionedQueue = false;
+ boolean usePartitionedQueue = false;
private int producerCount;
private int consumerCount;
@@ -76,7 +68,7 @@
MockBroker sendBroker;
MockBroker rcvBroker;
private ArrayList<MockBroker> brokers = new ArrayList<MockBroker>();
- private IDispatcher dispatcher;
+ IDispatcher dispatcher;
public interface DeliveryTarget {
public IFlowSink<Message> getSink();
@@ -84,312 +76,8 @@
public boolean match(Message message);
}
- class MockBroker {
- private final TestFlowManager flowMgr;
- private final ArrayList<MockProducerConnection> producers = new
ArrayList<MockProducerConnection>();
- private final ArrayList<MockConsumerConnection> consumers = new
ArrayList<MockConsumerConnection>();
- private final ArrayList<BrokerConnection> brokerConns = new
ArrayList<BrokerConnection>();
-
- private final HashMap<Destination, MockQueue> queues = new
HashMap<Destination, MockQueue>();
- private final Router router;
- private int pCount;
- private int cCount;
- private final String name;
- public final int dispatchMode;
-
- public final IDispatcher dispatcher;
- public final int priorityLevels = PRIORITY_LEVELS;
- public final int ioWorkAmount = IO_WORK_AMOUNT;
- public final boolean useInputQueues = USE_INPUT_QUEUES;
-
- MockBroker(String name) {
- this.flowMgr = new TestFlowManager();
- this.router = new Router();
- this.name = name;
- this.dispatchMode = MockBrokerTest.this.dispatchMode;
- this.dispatcher = MockBrokerTest.this.dispatcher;
- }
-
- TestFlowManager getFlowManager() {
- return flowMgr;
- }
-
- public String getName() {
- return name;
- }
-
- public void createProducerConnection(Destination destination) {
- MockProducerConnection c = new MockProducerConnection("producer" +
++pCount, this, destination);
- producers.add(c);
- }
-
- public void createConsumerConnection(Destination destination, boolean
ptp) {
- MockConsumerConnection c = new MockConsumerConnection("consumer" +
++cCount, this, destination);
- consumers.add(c);
- if (ptp) {
- queues.get(destination).addConsumer(c);
- } else {
- router.bind(c, destination);
- }
-
- }
-
- public void createClusterConnection(Destination destination) {
- MockConsumerConnection c = new MockConsumerConnection("consumer" +
++cCount, this, destination);
- consumers.add(c);
- router.bind(c, destination);
- }
-
- public void createQueue(Destination destination) {
- MockQueue queue = new MockQueue(this, destination);
- queues.put(destination, queue);
- }
-
- public void createBrokerConnection(MockBroker target, Pipe<Message>
pipe) {
- BrokerConnection bc = new BrokerConnection(this, target, pipe);
- // Set up the pipe for polled access
- if (dispatchMode != AbstractTestConnection.BLOCKING) {
- pipe.setMode(Pipe.POLLING);
- }
- // Add subscriptions for the target's destinations:
- for (Destination d : target.router.lookupTable.keySet()) {
- router.bind(bc, d);
- }
- brokerConns.add(bc);
- }
-
- final void stopServices() throws Exception {
- for (MockProducerConnection connection : producers) {
- connection.stop();
- }
- for (MockConsumerConnection connection : consumers) {
- connection.stop();
- }
- for (BrokerConnection connection : brokerConns) {
- connection.stop();
- }
- for (MockQueue queue : queues.values()) {
- queue.stop();
- }
- dispatcher.shutdown();
-
- }
-
- final void startServices() throws Exception {
- dispatcher.start();
- for (MockConsumerConnection connection : consumers) {
- connection.start();
- }
-
- for (MockQueue queue : queues.values()) {
- queue.start();
- }
-
- for (MockProducerConnection connection : producers) {
- connection.start();
- }
-
- for (BrokerConnection connection : brokerConns) {
- connection.start();
- }
- }
- }
-
- private final AtomicLong msgIdGenerator = new AtomicLong();
- private final AtomicInteger prodcuerIdGenerator = new AtomicInteger();
-
- class MockProducerConnection extends AbstractTestConnection {
-
- MetricCounter producerRate = new MetricCounter();
-
- private final Destination destination;
- private int msgCounter;
- private String name;
- private String property;
- private Message next;
- private int msgPriority = 0;
- private int priorityMod = 0;
- int producerId = prodcuerIdGenerator.getAndIncrement();
-
- public MockProducerConnection(String name, MockBroker broker,
Destination destination) {
-
- super(broker, name, broker.getFlowManager().createFlow(name),
null);
- this.destination = destination;
-
- producerRate.name("Producer " + name + " Rate");
- totalProducerRate.add(producerRate);
-
- }
-
- /*
- * Gets the next message blocking until space is available for it.
- * (non-Javadoc)
- *
- * @see com.progress.flow.AbstractTestConnection#getNextMessage()
- */
- public Message getNextMessage() throws InterruptedException {
-
- Message m = new Message(msgIdGenerator.getAndIncrement(),
producerId, name + ++msgCounter, flow, destination, msgPriority);
- if (property != null) {
- m.setProperty(property);
- }
- simulateEncodingWork();
- input.getFlowController(m.getFlow()).waitForFlowUnblock();
- return m;
- }
-
- @Override
- protected void addReadReadyListener(final ReadReadyListener listener) {
- if (next == null) {
- next = new Message(msgIdGenerator.getAndIncrement(),
producerId, name + ++msgCounter, flow, destination, msgPriority);
- if (property != null) {
- next.setProperty(property);
- }
- simulateEncodingWork();
- }
-
- if
(!input.getFlowController(next.getFlow()).addUnblockListener(new
ISinkController.FlowUnblockListener<Message>() {
- public void onFlowUnblocked(ISinkController<Message>
controller) {
- listener.onReadReady();
- }
- })) {
- // Return value of false means that the controller didn't
- // register the listener because it was not blocked.
- listener.onReadReady();
- }
- }
-
- @Override
- public final Message pollNextMessage() {
- if (next == null) {
- int priority = msgPriority;
- if (priorityMod > 0) {
- priority = msgCounter % priorityMod == 0 ? 0 : msgPriority;
- }
-
- next = new Message(msgIdGenerator.getAndIncrement(),
producerId, name + ++msgCounter, flow, destination, priority);
- if (property != null) {
- next.setProperty(property);
- }
- simulateEncodingWork();
- }
-
- if (input.getFlowController(next.getFlow()).isSinkBlocked()) {
- return null;
- }
-
- Message m = next;
- next = null;
- return m;
- }
-
- @Override
- public void messageReceived(Message m, ISourceController<Message>
controller) {
-
- broker.router.route(controller, m);
- producerRate.increment();
- }
-
- @Override
- public void write(Message m, ISourceController<Message> controller) {
- // Noop
- }
-
- }
-
- class MockConsumerConnection extends AbstractTestConnection implements
DeliveryTarget {
-
- MetricCounter consumerRate = new MetricCounter();
- private final Destination destination;
- private String selector;
- private boolean autoRelease = false;
-
- private long thinkTime = 0;
-
- public MockConsumerConnection(String name, MockBroker broker,
Destination destination) {
- super(broker, name,
broker.getFlowManager().createFlow(destination.getName()), null);
- this.destination = destination;
- output.setAutoRelease(autoRelease);
- consumerRate.name("Consumer " + name + " Rate");
- totalConsumerRate.add(consumerRate);
-
- }
-
- public Destination getDestination() {
- return destination;
- }
-
- public String getSelector() {
- return selector;
- }
-
- public void setThinkTime(long time) {
- thinkTime = time;
- }
-
- @Override
- protected synchronized Message getNextMessage() throws
InterruptedException {
- wait();
- return null;
- }
-
- @Override
- protected void addReadReadyListener(final ReadReadyListener listener) {
- return;
- }
-
- public Message pollNextMessage() {
- return null;
- }
-
- @Override
- protected void messageReceived(Message m, ISourceController<Message>
controller) {
- }
-
- @Override
- protected void write(final Message m, final ISourceController<Message>
controller) throws InterruptedException {
- if (!m.isSystem()) {
- // /IF we are async don't tie up the calling thread
- // schedule dispatch complete for later.
- if (dispatchMode == ASYNC && thinkTime > 0) {
- Runnable acker = new Runnable() {
- public void run() {
- if (thinkTime > 0) {
- try {
- Thread.sleep(thinkTime);
- } catch (InterruptedException e) {
- }
- }
- simulateEncodingWork();
- if (!autoRelease) {
- controller.elementDispatched(m);
- }
- consumerRate.increment();
- }
- };
-
- broker.dispatcher.schedule(acker, thinkTime,
TimeUnit.MILLISECONDS);
- } else {
- simulateEncodingWork();
- if (!autoRelease) {
- controller.elementDispatched(m);
- }
- consumerRate.increment();
- }
- }
- }
-
- public IFlowSink<Message> getSink() {
- return output;
- }
-
- public boolean match(Message message) {
- if (selector == null)
- return true;
- return message.match(selector);
- }
-
- }
+ final AtomicLong msgIdGenerator = new AtomicLong();
+ final AtomicInteger prodcuerIdGenerator = new AtomicInteger();
class BrokerConnection extends AbstractTestConnection implements
DeliveryTarget {
private final Pipe<Message> pipe;
@@ -447,12 +135,12 @@
}
}
- final private Mapper<Long, Message> keyExtractor = new Mapper<Long,
Message>() {
+ final Mapper<Long, Message> keyExtractor = new Mapper<Long, Message>() {
public Long map(Message element) {
return element.getMsgId();
}
};
- final private Mapper<Integer, Message> partitionMapper = new
Mapper<Integer, Message>() {
+ final Mapper<Integer, Message> partitionMapper = new Mapper<Integer,
Message>() {
public Integer map(Message element) {
// we modulo 10 to have at most 10 partitions which the producers
// gets split across.
@@ -460,121 +148,8 @@
}
};
- private class MockQueue implements DeliveryTarget {
- HashMap<MockConsumerConnection, Subscription<Message>> subs = new
HashMap<MockConsumerConnection, Subscription<Message>>();
- private final Destination destination;
- private final IQueue<Long, Message> queue;
- private final MockBroker broker;
-
- MockQueue(MockBroker broker, Destination destination) {
- this.broker = broker;
- this.destination = destination;
- this.queue = createQueue();
- broker.router.bind(this, destination);
- }
-
- private IQueue<Long, Message> createQueue() {
-
- if (usePartitionedQueue) {
- PartitionedQueue<Integer, Long, Message> queue = new
PartitionedQueue<Integer, Long, Message>() {
- @Override
- protected IQueue<Long, Message> cratePartition(Integer
partitionKey) {
- return createSharedFlowQueue();
- }
- };
- queue.setPartitionMapper(partitionMapper);
- queue.setResourceName(destination.getName());
- return queue;
- } else {
- return createSharedFlowQueue();
- }
- }
-
- private IQueue<Long, Message> createSharedFlowQueue() {
- if (broker.priorityLevels > 1) {
- PrioritySizeLimiter<Message> limiter = new
PrioritySizeLimiter<Message>(100, 1, broker.priorityLevels);
- limiter.setPriorityMapper(Message.PRIORITY_MAPPER);
- SharedPriorityQueue<Long, Message> queue = new
SharedPriorityQueue<Long, Message>(destination.getName(), limiter);
- queue.setKeyMapper(keyExtractor);
- queue.setAutoRelease(true);
- queue.setDispatcher(broker.dispatcher);
- return queue;
- } else {
- SizeLimiter<Message> limiter = new SizeLimiter<Message>(100,
1);
- SharedQueue<Long, Message> queue = new SharedQueue<Long,
Message>(destination.getName(), limiter);
- queue.setKeyMapper(keyExtractor);
- queue.setAutoRelease(true);
- queue.setDispatcher(broker.dispatcher);
- return queue;
- }
- }
-
- public final void deliver(ISourceController<Message> source, Message
msg) {
-
- queue.add(msg, source);
- }
-
- public String getSelector() {
- return null;
- }
-
- public final Destination getDestination() {
- return destination;
- }
-
- public final void addConsumer(final MockConsumerConnection dt) {
- Subscription<Message> sub = new Subscription<Message>() {
- public boolean isPreAcquired() {
- return true;
- }
-
- public boolean matches(Message message) {
- return dt.match(message);
- }
-
- public boolean isRemoveOnDispatch() {
- return true;
- }
-
- public IFlowSink<Message> getSink() {
- return dt.getSink();
- }
-
- @Override
- public String toString() {
- return getSink().toString();
- }
- };
- subs.put(dt, sub);
- queue.addSubscription(sub);
- }
-
- public boolean removeSubscirption(final DeliveryTarget dt) {
- Subscription<Message> sub = subs.remove(dt);
- if (sub != null) {
- return queue.removeSubscription(sub);
- }
- return false;
- }
-
- public void start() throws Exception {
- }
-
- public void stop() throws Exception {
- }
-
- public IFlowSink<Message> getSink() {
- return queue;
- }
-
- public boolean match(Message message) {
- return true;
- }
-
- }
-
- private class Router {
- private final HashMap<Destination, Collection<DeliveryTarget>>
lookupTable = new HashMap<Destination, Collection<DeliveryTarget>>();
+ class Router {
+ final HashMap<Destination, Collection<DeliveryTarget>> lookupTable =
new HashMap<Destination, Collection<DeliveryTarget>>();
final synchronized void bind(DeliveryTarget dt, Destination
destination) {
Collection<DeliveryTarget> targets = lookupTable.get(destination);
@@ -826,7 +401,7 @@
}
}
- private void createConnections() {
+ private void createConnections() throws IOException, URISyntaxException {
if (dispatchMode == AbstractTestConnection.ASYNC || dispatchMode ==
AbstractTestConnection.POLLING) {
dispatcher = new PriorityPooledDispatcher("BrokerDispatcher",
asyncThreadPoolSize, Message.MAX_PRIORITY);
@@ -834,12 +409,12 @@
}
if (multibroker) {
- sendBroker = new MockBroker("SendBroker");
- rcvBroker = new MockBroker("RcvBroker");
+ sendBroker = new MockBroker(this, "SendBroker");
+ rcvBroker = new MockBroker(this, "RcvBroker");
brokers.add(sendBroker);
brokers.add(rcvBroker);
} else {
- sendBroker = rcvBroker = new MockBroker("Broker");
+ sendBroker = rcvBroker = new MockBroker(this, "Broker");
brokers.add(sendBroker);
}
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockConsumerConnection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockConsumerConnection.java?rev=743578&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockConsumerConnection.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockConsumerConnection.java
Thu Feb 12 00:13:28 2009
@@ -0,0 +1,109 @@
+/**
+ *
+ */
+package org.apache.activemq.flow;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.flow.AbstractTestConnection.ReadReadyListener;
+import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
+import org.apache.activemq.metric.MetricCounter;
+
+class MockConsumerConnection extends AbstractTestConnection implements
MockBrokerTest.DeliveryTarget {
+
+ /**
+ *
+ */
+ private final MockBrokerTest mockBrokerTest;
+ MetricCounter consumerRate = new MetricCounter();
+ private final Destination destination;
+ String selector;
+ private boolean autoRelease = false;
+
+ private long thinkTime = 0;
+
+ public MockConsumerConnection(MockBrokerTest mockBrokerTest, String name,
MockBroker broker, Destination destination) {
+ super(broker, name,
broker.getFlowManager().createFlow(destination.getName()), null);
+ this.mockBrokerTest = mockBrokerTest;
+ this.destination = destination;
+ output.setAutoRelease(autoRelease);
+ consumerRate.name("Consumer " + name + " Rate");
+ this.mockBrokerTest.totalConsumerRate.add(consumerRate);
+
+ }
+
+ public Destination getDestination() {
+ return destination;
+ }
+
+ public String getSelector() {
+ return selector;
+ }
+
+ public void setThinkTime(long time) {
+ thinkTime = time;
+ }
+
+ @Override
+ protected synchronized Message getNextMessage() throws
InterruptedException {
+ wait();
+ return null;
+ }
+
+ @Override
+ protected void addReadReadyListener(final ReadReadyListener listener) {
+ return;
+ }
+
+ public Message pollNextMessage() {
+ return null;
+ }
+
+ @Override
+ protected void messageReceived(Message m, ISourceController<Message>
controller) {
+ }
+
+ @Override
+ protected void write(final Message m, final ISourceController<Message>
controller) throws InterruptedException {
+ if (!m.isSystem()) {
+ // /IF we are async don't tie up the calling thread
+ // schedule dispatch complete for later.
+ if (this.mockBrokerTest.dispatchMode == ASYNC && thinkTime > 0) {
+ Runnable acker = new Runnable() {
+ public void run() {
+ if (thinkTime > 0) {
+ try {
+ Thread.sleep(thinkTime);
+ } catch (InterruptedException e) {
+ }
+ }
+ simulateEncodingWork();
+ if (!autoRelease) {
+ controller.elementDispatched(m);
+ }
+ consumerRate.increment();
+ }
+ };
+
+ broker.dispatcher.schedule(acker, thinkTime,
TimeUnit.MILLISECONDS);
+ } else {
+ simulateEncodingWork();
+ if (!autoRelease) {
+ controller.elementDispatched(m);
+ }
+ consumerRate.increment();
+ }
+ }
+ }
+
+ public IFlowSink<Message> getSink() {
+ return output;
+ }
+
+ public boolean match(Message message) {
+ if (selector == null)
+ return true;
+ return message.match(selector);
+ }
+
+}
\ No newline at end of file
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockProducerConnection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockProducerConnection.java?rev=743578&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockProducerConnection.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockProducerConnection.java
Thu Feb 12 00:13:28 2009
@@ -0,0 +1,113 @@
+/**
+ *
+ */
+package org.apache.activemq.flow;
+
+import org.apache.activemq.flow.AbstractTestConnection.ReadReadyListener;
+import org.apache.activemq.metric.MetricCounter;
+
+class MockProducerConnection extends AbstractTestConnection {
+
+ /**
+ *
+ */
+ private final MockBrokerTest mockBrokerTest;
+
+ MetricCounter producerRate = new MetricCounter();
+
+ private final Destination destination;
+ private int msgCounter;
+ private String name;
+ String property;
+ private Message next;
+ int msgPriority = 0;
+ int priorityMod = 0;
+ int producerId;
+
+ public MockProducerConnection(MockBrokerTest mockBrokerTest, String name,
MockBroker broker, Destination destination) {
+
+ super(broker, name, broker.getFlowManager().createFlow(name), null);
+ this.mockBrokerTest = mockBrokerTest;
+ this.destination = destination;
+ this.producerId =
this.mockBrokerTest.prodcuerIdGenerator.getAndIncrement();
+
+ producerRate.name("Producer " + name + " Rate");
+ this.mockBrokerTest.totalProducerRate.add(producerRate);
+
+ }
+
+ /*
+ * Gets the next message blocking until space is available for it.
+ * (non-Javadoc)
+ *
+ * @see com.progress.flow.AbstractTestConnection#getNextMessage()
+ */
+ public Message getNextMessage() throws InterruptedException {
+
+ Message m = new
Message(this.mockBrokerTest.msgIdGenerator.getAndIncrement(), producerId, name
+ ++msgCounter, flow, destination, msgPriority);
+ if (property != null) {
+ m.setProperty(property);
+ }
+ simulateEncodingWork();
+ input.getFlowController(m.getFlow()).waitForFlowUnblock();
+ return m;
+ }
+
+ @Override
+ protected void addReadReadyListener(final ReadReadyListener listener) {
+ if (next == null) {
+ next = new
Message(this.mockBrokerTest.msgIdGenerator.getAndIncrement(), producerId, name
+ ++msgCounter, flow, destination, msgPriority);
+ if (property != null) {
+ next.setProperty(property);
+ }
+ simulateEncodingWork();
+ }
+
+ if (!input.getFlowController(next.getFlow()).addUnblockListener(new
ISinkController.FlowUnblockListener<Message>() {
+ public void onFlowUnblocked(ISinkController<Message> controller) {
+ listener.onReadReady();
+ }
+ })) {
+ // Return value of false means that the controller didn't
+ // register the listener because it was not blocked.
+ listener.onReadReady();
+ }
+ }
+
+ @Override
+ public final Message pollNextMessage() {
+ if (next == null) {
+ int priority = msgPriority;
+ if (priorityMod > 0) {
+ priority = msgCounter % priorityMod == 0 ? 0 : msgPriority;
+ }
+
+ next = new
Message(this.mockBrokerTest.msgIdGenerator.getAndIncrement(), producerId, name
+ ++msgCounter, flow, destination, priority);
+ if (property != null) {
+ next.setProperty(property);
+ }
+ simulateEncodingWork();
+ }
+
+ if (input.getFlowController(next.getFlow()).isSinkBlocked()) {
+ return null;
+ }
+
+ Message m = next;
+ next = null;
+ return m;
+ }
+
+ @Override
+ public void messageReceived(Message m, ISourceController<Message>
controller) {
+
+ broker.router.route(controller, m);
+ producerRate.increment();
+ }
+
+ @Override
+ public void write(Message m, ISourceController<Message> controller) {
+ // Noop
+ }
+
+}
\ No newline at end of file
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=743578&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
Thu Feb 12 00:13:28 2009
@@ -0,0 +1,129 @@
+/**
+ *
+ */
+package org.apache.activemq.flow;
+
+import java.util.HashMap;
+
+import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
+import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.PartitionedQueue;
+import org.apache.activemq.queue.SharedPriorityQueue;
+import org.apache.activemq.queue.SharedQueue;
+import org.apache.activemq.queue.Subscription;
+
+class MockQueue implements MockBrokerTest.DeliveryTarget {
+
+ private final MockBrokerTest mockBrokerTest;
+ HashMap<MockConsumerConnection, Subscription<Message>> subs = new
HashMap<MockConsumerConnection, Subscription<Message>>();
+ private final Destination destination;
+ private final IQueue<Long, Message> queue;
+ private final MockBroker broker;
+
+ MockQueue(MockBrokerTest mockBrokerTest, MockBroker broker, Destination
destination) {
+ this.mockBrokerTest = mockBrokerTest;
+ this.broker = broker;
+ this.destination = destination;
+ this.queue = createQueue();
+ broker.router.bind(this, destination);
+ }
+
+ private IQueue<Long, Message> createQueue() {
+
+ if (this.mockBrokerTest.usePartitionedQueue) {
+ PartitionedQueue<Integer, Long, Message> queue = new
PartitionedQueue<Integer, Long, Message>() {
+ @Override
+ protected IQueue<Long, Message> cratePartition(Integer
partitionKey) {
+ return createSharedFlowQueue();
+ }
+ };
+ queue.setPartitionMapper(this.mockBrokerTest.partitionMapper);
+ queue.setResourceName(destination.getName());
+ return queue;
+ } else {
+ return createSharedFlowQueue();
+ }
+ }
+
+ private IQueue<Long, Message> createSharedFlowQueue() {
+ if (broker.priorityLevels > 1) {
+ PrioritySizeLimiter<Message> limiter = new
PrioritySizeLimiter<Message>(100, 1, broker.priorityLevels);
+ limiter.setPriorityMapper(Message.PRIORITY_MAPPER);
+ SharedPriorityQueue<Long, Message> queue = new
SharedPriorityQueue<Long, Message>(destination.getName(), limiter);
+ queue.setKeyMapper(this.mockBrokerTest.keyExtractor);
+ queue.setAutoRelease(true);
+ queue.setDispatcher(broker.dispatcher);
+ return queue;
+ } else {
+ SizeLimiter<Message> limiter = new SizeLimiter<Message>(100, 1);
+ SharedQueue<Long, Message> queue = new SharedQueue<Long,
Message>(destination.getName(), limiter);
+ queue.setKeyMapper(this.mockBrokerTest.keyExtractor);
+ queue.setAutoRelease(true);
+ queue.setDispatcher(broker.dispatcher);
+ return queue;
+ }
+ }
+
+ public final void deliver(ISourceController<Message> source, Message msg) {
+
+ queue.add(msg, source);
+ }
+
+ public String getSelector() {
+ return null;
+ }
+
+ public final Destination getDestination() {
+ return destination;
+ }
+
+ public final void addConsumer(final MockConsumerConnection dt) {
+ Subscription<Message> sub = new Subscription<Message>() {
+ public boolean isPreAcquired() {
+ return true;
+ }
+
+ public boolean matches(Message message) {
+ return dt.match(message);
+ }
+
+ public boolean isRemoveOnDispatch() {
+ return true;
+ }
+
+ public IFlowSink<Message> getSink() {
+ return dt.getSink();
+ }
+
+ @Override
+ public String toString() {
+ return getSink().toString();
+ }
+ };
+ subs.put(dt, sub);
+ queue.addSubscription(sub);
+ }
+
+ public boolean removeSubscirption(final DeliveryTarget dt) {
+ Subscription<Message> sub = subs.remove(dt);
+ if (sub != null) {
+ return queue.removeSubscription(sub);
+ }
+ return false;
+ }
+
+ public void start() throws Exception {
+ }
+
+ public void stop() throws Exception {
+ }
+
+ public IFlowSink<Message> getSink() {
+ return queue;
+ }
+
+ public boolean match(Message message) {
+ return true;
+ }
+
+}
\ No newline at end of file
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockTransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockTransportConnection.java?rev=743578&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockTransportConnection.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockTransportConnection.java
Thu Feb 12 00:13:28 2009
@@ -0,0 +1,37 @@
+package org.apache.activemq.flow;
+
+import java.io.IOException;
+
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+
+public class MockTransportConnection implements TransportListener {
+
+ private Transport transport;
+
+ public void setTransport(Transport transport) {
+ this.transport = transport;
+ }
+
+ public void start() throws Exception {
+ transport.setTransportListener(this);
+ transport.start();
+ }
+
+ public void stop() throws Exception {
+ transport.stop();
+
+ }
+
+ public void onCommand(Object command) {
+ }
+
+ public void onException(IOException error) {
+ }
+
+ public void transportInterupted() {
+ }
+ public void transportResumed() {
+ }
+
+}
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java?rev=743578&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
Thu Feb 12 00:13:28 2009
@@ -0,0 +1,58 @@
+package org.apache.activemq.flow;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+public class TestWireFormatFactory implements WireFormatFactory {
+
+ public class TestWireFormat implements WireFormat {
+
+ public void marshal(Object value, DataOutput out) throws IOException {
+ ObjectOutputStream oos = new ObjectOutputStream((OutputStream)
out);
+ oos.writeObject(value);
+ oos.reset();
+ oos.flush();
+ }
+
+ public Object unmarshal(DataInput in) throws IOException {
+ ObjectInputStream ois = new ObjectInputStream((InputStream) in);
+ try {
+ return ois.readObject();
+ } catch (ClassNotFoundException e) {
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+ public int getVersion() {
+ return 0;
+ }
+ public void setVersion(int version) {
+ }
+
+ public boolean inReceive() {
+ return false;
+ }
+
+ public ByteSequence marshal(Object value) throws IOException {
+ return null;
+ }
+ public Object unmarshal(ByteSequence data) throws IOException {
+ return null;
+ }
+ }
+
+ public WireFormat createWireFormat() {
+ return new TestWireFormat();
+ }
+
+}
Added:
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test?rev=743578&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test
(added)
+++
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test
Thu Feb 12 00:13:28 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.flow.TestWireFormatFactory
Added: activemq/sandbox/activemq-flow/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/resources/log4j.properties?rev=743578&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/resources/log4j.properties (added)
+++ activemq/sandbox/activemq-flow/src/test/resources/log4j.properties Thu Feb
12 00:13:28 2009
@@ -0,0 +1,35 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=INFO, out, stdout
+
+log4j.logger.org.apache.activemq.spring=WARN
+
+# CONSOLE appender not used by default
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1}
- %m%n
+
+# File appender
+log4j.appender.out=org.apache.log4j.FileAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} -
%m%n
+log4j.appender.out.file=target/activemq-test.log
+log4j.appender.out.append=true
Propchange: activemq/sandbox/activemq-flow/src/test/resources/log4j.properties
------------------------------------------------------------------------------
svn:executable = *