Author: davsclaus
Date: Tue Oct 7 11:14:19 2008
New Revision: 702571
URL: http://svn.apache.org/viewvc?rev=702571&view=rev
Log:
CAMEL-951: Added unit test for custom aggregation collection. Fixed problem
with logging the collection as it could cause a side effect of calling the
.iterate() method on aggregation collection twice.
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java
(contents, props changed)
- copied, changed from r702557,
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
(contents, props changed)
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
(contents, props changed)
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
(contents, props changed)
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
(contents, props changed)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=702571&r1=702570&r2=702571&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
Tue Oct 7 11:14:19 2008
@@ -156,10 +156,9 @@
collection.add(exchange);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Finished batch size: " + batchSize + " timeout: " +
batchTimeout + " so sending set: "
- + collection);
- }
+ // we should NOT log the collection directly as it will invoke a
toString() on collection
+ // and it will call collection.iterator() where end-users might do
stuff that would break
+ // calling the iterator a 2nd time as below
// lets send the batch
Iterator<Exchange> iter = collection.iterator();
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java?rev=702571&r1=702570&r2=702571&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
Tue Oct 7 11:14:19 2008
@@ -1,154 +1,154 @@
-package org.apache.camel.processor.aggregator;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-
-/**
- * Unit test for the batch size options on aggregator.
- */
-public class AggregatorBatchOptionsTest extends ContextTestSupport {
-
- public boolean isUseRouteBuilder() {
- return false;
- }
-
- public void testAggregateOutBatchSize() throws Exception {
- context.addRoutes(new RouteBuilder() {
- public void configure() throws Exception {
- // START SNIPPET: e1
- // our route is aggregating from the direct queue and sending
the response to the mock
- from("direct:start")
- // aggregated by header id
- // as we have not configured more on the aggregator it
will default to aggregate the
- // latest exchange only
- .aggregator().header("id")
- // wait for 0.5 seconds to aggregate
- .batchTimeout(500L)
- // batch size in is the limit of number of exchanges
recieved, so when we have received 100
- // exchanges then whatever we have in the collection will
be sent
- .batchSize(100)
- // limit the out batch size to 3 so when we have
aggregated 3 exchanges
- // and we reach this limit then the exchanges is send
- .outBatchSize(3)
- .to("mock:result");
- // END SNIPPET: e1
- }
- });
- startCamelContext();
-
- // START SNIPPET: e2
- MockEndpoint result = getMockEndpoint("mock:result");
-
- // we expect 3 messages grouped by the latest message only
- result.expectedMinimumMessageCount(3);
- result.expectedBodiesReceived("Message 1c", "Message 2b", "Message
3a");
-
- // then we sent all the message at once
- template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
- template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
- template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
- // when we send message 4 then we will reach the collection batch size
limit and the
- // exchanges above is the ones we have aggregated in the first batch
- template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
- template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
- template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
- template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
-
- assertMockEndpointsSatisfied();
- // END SNIPPET: e2
- }
-
- public void testAggregateBatchSize() throws Exception {
- context.addRoutes(new RouteBuilder() {
- public void configure() throws Exception {
- // START SNIPPET: e3
- // our route is aggregating from the direct queue and sending
the response to the mock
- from("direct:start")
- // aggregated by header id
- // as we have not configured more on the aggregator it
will default to aggregate the
- // latest exchange only
- .aggregator().header("id")
- // wait for 0.5 seconds to aggregate
- .batchTimeout(500L)
- // batch size in is the limit of number of exchanges
recieved, so when we have received 100
- // exchanges then whatever we have in the collection will
be sent
- .batchSize(5)
- .to("mock:result");
- // END SNIPPET: e3
- }
- });
- startCamelContext();
-
- // START SNIPPET: e4
- MockEndpoint result = getMockEndpoint("mock:result");
-
- // we expect 3 messages grouped by the latest message only
- result.expectedMinimumMessageCount(2);
- result.expectedBodiesReceived("Message 1c", "Message 2b");
-
- // then we sent all the message at once
- template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
- template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
- template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
- // when we sent the next message we have reached the in batch size
limit and the current
- // aggregated exchanges will be sent
- template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
- template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
- template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
- template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
- template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
-
- assertMockEndpointsSatisfied();
- // END SNIPPET: e4
- }
-
- public void testAggregateBatchTimeout() throws Exception {
- context.addRoutes(new RouteBuilder() {
- public void configure() throws Exception {
- // START SNIPPET: e5
- // our route is aggregating from the direct queue and sending
the response to the mock
- from("direct:start")
- // aggregated by header id
- // as we have not configured more on the aggregator it
will default to aggregate the
- // latest exchange only
- .aggregator().header("id")
- // wait for 0.5 seconds to aggregate
- .batchTimeout(500L)
- .to("mock:result");
- // END SNIPPET: e5
- }
- });
- startCamelContext();
-
- // START SNIPPET: e6
- MockEndpoint result = getMockEndpoint("mock:result");
-
- // we expect 3 messages grouped by the latest message only
- result.expectedMinimumMessageCount(3);
- result.expectedBodiesReceived("Message 1c", "Message 2b", "Message
3a");
-
- // then we sent all the message at once
- template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
- template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
- template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
- Thread.sleep(600L);
- // these messages are not aggregated as the timeout should have
accoured
- template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
- template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
- template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
- template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
-
- assertMockEndpointsSatisfied();
- // END SNIPPET: e6
- }
-
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for the batch size options on aggregator.
+ */
+public class AggregatorBatchOptionsTest extends ContextTestSupport {
+
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ public void testAggregateOutBatchSize() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e1
+ // our route is aggregating from the direct queue and sending
the response to the mock
+ from("direct:start")
+ // aggregated by header id
+ // as we have not configured more on the aggregator it
will default to aggregate the
+ // latest exchange only
+ .aggregator().header("id")
+ // wait for 0.5 seconds to aggregate
+ .batchTimeout(500L)
+ // batch size in is the limit of number of exchanges
recieved, so when we have received 100
+ // exchanges then whatever we have in the collection will
be sent
+ .batchSize(100)
+ // limit the out batch size to 3 so when we have
aggregated 3 exchanges
+ // and we reach this limit then the exchanges is send
+ .outBatchSize(3)
+ .to("mock:result");
+ // END SNIPPET: e1
+ }
+ });
+ startCamelContext();
+
+ // START SNIPPET: e2
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ // we expect 3 messages grouped by the latest message only
+ result.expectedMinimumMessageCount(3);
+ result.expectedBodiesReceived("Message 1c", "Message 2b", "Message
3a");
+
+ // then we sent all the message at once
+ template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+ // when we send message 4 then we will reach the collection batch size
limit and the
+ // exchanges above is the ones we have aggregated in the first batch
+ template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+ template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+
+ assertMockEndpointsSatisfied();
+ // END SNIPPET: e2
+ }
+
+ public void testAggregateBatchSize() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e3
+ // our route is aggregating from the direct queue and sending
the response to the mock
+ from("direct:start")
+ // aggregated by header id
+ // as we have not configured more on the aggregator it
will default to aggregate the
+ // latest exchange only
+ .aggregator().header("id")
+ // wait for 0.5 seconds to aggregate
+ .batchTimeout(500L)
+ // batch size in is the limit of number of exchanges
recieved, so when we have received 100
+ // exchanges then whatever we have in the collection will
be sent
+ .batchSize(5)
+ .to("mock:result");
+ // END SNIPPET: e3
+ }
+ });
+ startCamelContext();
+
+ // START SNIPPET: e4
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ // we expect 3 messages grouped by the latest message only
+ result.expectedMinimumMessageCount(2);
+ result.expectedBodiesReceived("Message 1c", "Message 2b");
+
+ // then we sent all the message at once
+ template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+ // when we sent the next message we have reached the in batch size
limit and the current
+ // aggregated exchanges will be sent
+ template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+ template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+
+ assertMockEndpointsSatisfied();
+ // END SNIPPET: e4
+ }
+
+ public void testAggregateBatchTimeout() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e5
+ // our route is aggregating from the direct queue and sending
the response to the mock
+ from("direct:start")
+ // aggregated by header id
+ // as we have not configured more on the aggregator it
will default to aggregate the
+ // latest exchange only
+ .aggregator().header("id")
+ // wait for 0.5 seconds to aggregate
+ .batchTimeout(500L)
+ .to("mock:result");
+ // END SNIPPET: e5
+ }
+ });
+ startCamelContext();
+
+ // START SNIPPET: e6
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ // we expect 3 messages grouped by the latest message only
+ result.expectedMinimumMessageCount(3);
+ result.expectedBodiesReceived("Message 1c", "Message 2b", "Message
3a");
+
+ // then we sent all the message at once
+ template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+ Thread.sleep(600L);
+ // these messages are not aggregated as the timeout should have
accoured
+ template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+ template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+
+ assertMockEndpointsSatisfied();
+ // END SNIPPET: e6
+ }
+
}
\ No newline at end of file
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java?rev=702571&r1=702570&r2=702571&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java
Tue Oct 7 11:14:19 2008
@@ -187,7 +187,7 @@
from("direct:joinBrothers").aggregator(header(TYPE_HEADER),
brothersAggregator);
- agg.setBatchTimeout(5000L);
+ agg.setBatchTimeout(1000L);
agg.removeHeader(SURNAME_HEADER)
.removeHeader(TYPE_HEADER)
.to("mock:result");
Copied:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java
(from r702557,
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java&r1=702557&r2=702571&rev=702571&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java
Tue Oct 7 11:14:19 2008
@@ -1,61 +1,106 @@
-package org.apache.camel.processor.aggregator;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.aggregate.AggregationStrategy;
-
-/**
- * Unit test for using our own aggregation strategy.
- */
-public class CustomAggregationStrategyTest extends ContextTestSupport {
-
- public void testCustomAggregationStrategy() throws Exception {
- // START SNIPPET: e2
- MockEndpoint result = getMockEndpoint("mock:result");
-
- // we expect 4 messages as they have different header id
- result.expectedMessageCount(2);
- result.expectedBodiesReceived("200", "150");
-
- // then we sent all the message at once
- template.sendBodyAndHeader("direct:start", "100", "id", "1");
- template.sendBodyAndHeader("direct:start", "150", "id", "2");
- template.sendBodyAndHeader("direct:start", "130", "id", "2");
- template.sendBodyAndHeader("direct:start", "200", "id", "1");
- template.sendBodyAndHeader("direct:start", "190", "id", "1");
-
- assertMockEndpointsSatisfied();
- // END SNIPPET: e2
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- public void configure() throws Exception {
- // START SNIPPET: e1
- // our route is aggregating from the direct queue and sending
the response to the mock
- from("direct:start")
- // aggregated by header id and use our own strategy how to
aggregate
- .aggregator(new MyAggregationStrategy()).header("id")
- // wait for 2 seconds to aggregate
- .batchTimeout(2000L)
- .to("mock:result");
- // END SNIPPET: e1
- }
- };
- }
-
- // START SNIPPET: e3
- private static class MyAggregationStrategy implements AggregationStrategy {
-
- public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
- int oldPrice = oldExchange.getIn().getBody(Integer.class);
- int newPrice = newExchange.getIn().getBody(Integer.class);
- // return the "winner" that has the highest price
- return newPrice > oldPrice ? newExchange : oldExchange;
- }
- }
- // END SNIPPET: e3
+package org.apache.camel.processor.aggregator;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.AbstractCollection;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.AggregationCollection;
+
+/**
+ * Unit test for using our own aggregation collection.
+ */
+public class CustomAggregationCollectionTest extends ContextTestSupport {
+
+ public void testCustomAggregationCollection() throws Exception {
+ // START SNIPPET: e2
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ // we expect 5 messages since our custom aggregation collection just
gets it all
+ // but returns them in reverse order
+ result.expectedMessageCount(5);
+ result.expectedBodiesReceived("190", "200", "130", "150", "100");
+
+ // then we sent all the message at once
+ template.sendBodyAndHeader("direct:start", "100", "id", "1");
+ template.sendBodyAndHeader("direct:start", "150", "id", "2");
+ template.sendBodyAndHeader("direct:start", "130", "id", "2");
+ template.sendBodyAndHeader("direct:start", "200", "id", "1");
+ template.sendBodyAndHeader("direct:start", "190", "id", "1");
+
+ assertMockEndpointsSatisfied();
+ // END SNIPPET: e2
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e1
+ // our route is aggregating from the direct queue and sending
the response to the mock
+ from("direct:start")
+ // use our own collection for aggregation
+ .aggregator(new MyReverseAggregationCollection())
+ // wait for 0.5 seconds to aggregate
+ .batchTimeout(500L)
+ .to("mock:result");
+ // END SNIPPET: e1
+ }
+ };
+ }
+
+ // START SNIPPET: e3
+ private static class MyReverseAggregationCollection extends
AbstractCollection<Exchange> implements AggregationCollection {
+
+ private List<Exchange> collection = new ArrayList<Exchange>();
+ private Expression<Exchange> correlation;
+ private AggregationStrategy strategy;
+
+ public Expression<Exchange> getCorrelationExpression() {
+ return correlation;
+ }
+
+ public void setCorrelationExpression(Expression<Exchange>
correlationExpression) {
+ this.correlation = correlationExpression;
+ }
+
+ public AggregationStrategy getAggregationStrategy() {
+ return strategy;
+ }
+
+ public void setAggregationStrategy(AggregationStrategy
aggregationStrategy) {
+ this.strategy = aggregationStrategy;
+ }
+
+ public boolean add(Exchange exchange) {
+ return collection.add(exchange);
+ }
+
+ public Iterator<Exchange> iterator() {
+ // demonstrate the we can do something with this collection, so we
reverse it
+ Collections.reverse(collection);
+
+ return collection.iterator();
+ }
+
+ public int size() {
+ return collection.size();
+ }
+
+ public void clear() {
+ collection.clear();
+ }
+
+ public void onAggregation(Object correlationKey, Exchange newExchange)
{
+ add(newExchange);
+ }
+ }
+ // END SNIPPET: e3
}
\ No newline at end of file
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java
------------------------------------------------------------------------------
svn:mergeinfo =
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java?rev=702571&r1=702570&r2=702571&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
Tue Oct 7 11:14:19 2008
@@ -1,61 +1,61 @@
-package org.apache.camel.processor.aggregator;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.aggregate.AggregationStrategy;
-
-/**
- * Unit test for using our own aggregation strategy.
- */
-public class CustomAggregationStrategyTest extends ContextTestSupport {
-
- public void testCustomAggregationStrategy() throws Exception {
- // START SNIPPET: e2
- MockEndpoint result = getMockEndpoint("mock:result");
-
- // we expect 4 messages as they have different header id
- result.expectedMessageCount(2);
- result.expectedBodiesReceived("200", "150");
-
- // then we sent all the message at once
- template.sendBodyAndHeader("direct:start", "100", "id", "1");
- template.sendBodyAndHeader("direct:start", "150", "id", "2");
- template.sendBodyAndHeader("direct:start", "130", "id", "2");
- template.sendBodyAndHeader("direct:start", "200", "id", "1");
- template.sendBodyAndHeader("direct:start", "190", "id", "1");
-
- assertMockEndpointsSatisfied();
- // END SNIPPET: e2
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- public void configure() throws Exception {
- // START SNIPPET: e1
- // our route is aggregating from the direct queue and sending
the response to the mock
- from("direct:start")
- // aggregated by header id and use our own strategy how to
aggregate
- .aggregator(new MyAggregationStrategy()).header("id")
- // wait for 2 seconds to aggregate
- .batchTimeout(2000L)
- .to("mock:result");
- // END SNIPPET: e1
- }
- };
- }
-
- // START SNIPPET: e3
- private static class MyAggregationStrategy implements AggregationStrategy {
-
- public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
- int oldPrice = oldExchange.getIn().getBody(Integer.class);
- int newPrice = newExchange.getIn().getBody(Integer.class);
- // return the "winner" that has the highest price
- return newPrice > oldPrice ? newExchange : oldExchange;
- }
- }
- // END SNIPPET: e3
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Unit test for using our own aggregation strategy.
+ */
+public class CustomAggregationStrategyTest extends ContextTestSupport {
+
+ public void testCustomAggregationStrategy() throws Exception {
+ // START SNIPPET: e2
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ // we expect to find the two winners with the highest bid
+ result.expectedMessageCount(2);
+ result.expectedBodiesReceived("200", "150");
+
+ // then we sent all the message at once
+ template.sendBodyAndHeader("direct:start", "100", "id", "1");
+ template.sendBodyAndHeader("direct:start", "150", "id", "2");
+ template.sendBodyAndHeader("direct:start", "130", "id", "2");
+ template.sendBodyAndHeader("direct:start", "200", "id", "1");
+ template.sendBodyAndHeader("direct:start", "190", "id", "1");
+
+ assertMockEndpointsSatisfied();
+ // END SNIPPET: e2
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e1
+ // our route is aggregating from the direct queue and sending
the response to the mock
+ from("direct:start")
+ // aggregated by header id and use our own strategy how to
aggregate
+ .aggregator(new MyAggregationStrategy()).header("id")
+ // wait for 0.5 seconds to aggregate
+ .batchTimeout(500L)
+ .to("mock:result");
+ // END SNIPPET: e1
+ }
+ };
+ }
+
+ // START SNIPPET: e3
+ private static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ int oldPrice = oldExchange.getIn().getBody(Integer.class);
+ int newPrice = newExchange.getIn().getBody(Integer.class);
+ // return the "winner" that has the highest price
+ return newPrice > oldPrice ? newExchange : oldExchange;
+ }
+ }
+ // END SNIPPET: e3
}
\ No newline at end of file
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java?rev=702571&r1=702570&r2=702571&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
Tue Oct 7 11:14:19 2008
@@ -1,54 +1,54 @@
-package org.apache.camel.processor.aggregator;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-
-/**
- * Unit test for DefaultAggregatorCollection.
- */
-public class DefaultAggregatorCollectionTest extends ContextTestSupport {
-
- public void testDefaultAggregateCollection() throws Exception {
- // START SNIPPET: e2
- MockEndpoint result = getMockEndpoint("mock:result");
-
- // we expect 4 messages grouped by the latest message only
- result.expectedMessageCount(4);
- result.expectedBodiesReceived("Message 1d", "Message 2b", "Message
3c", "Message 4");
-
- // then we sent all the message at once
- template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
- template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
- template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
- template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
- template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
- template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
-
- assertMockEndpointsSatisfied();
- // END SNIPPET: e2
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- public void configure() throws Exception {
- // START SNIPPET: e1
- // our route is aggregating from the direct queue and sending
the response to the mock
- from("direct:start")
- // aggregated by header id
- // as we have not configured more on the aggregator it
will default to aggregate the
- // latest exchange only
- .aggregator().header("id")
- // wait for 2 seconds to aggregate
- .batchTimeout(2000L)
- .to("mock:result");
- // END SNIPPET: e1
- }
- };
- }
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for DefaultAggregatorCollection.
+ */
+public class DefaultAggregatorCollectionTest extends ContextTestSupport {
+
+ public void testDefaultAggregateCollection() throws Exception {
+ // START SNIPPET: e2
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ // we expect 4 messages grouped by the latest message only
+ result.expectedMessageCount(4);
+ result.expectedBodiesReceived("Message 1d", "Message 2b", "Message
3c", "Message 4");
+
+ // then we sent all the message at once
+ template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+
+ assertMockEndpointsSatisfied();
+ // END SNIPPET: e2
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e1
+ // our route is aggregating from the direct queue and sending
the response to the mock
+ from("direct:start")
+ // aggregated by header id
+ // as we have not configured more on the aggregator it
will default to aggregate the
+ // latest exchange only
+ .aggregator().header("id")
+ // wait for 0.5 seconds to aggregate
+ .batchTimeout(500L)
+ .to("mock:result");
+ // END SNIPPET: e1
+ }
+ };
+ }
}
\ No newline at end of file
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java?rev=702571&r1=702570&r2=702571&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
Tue Oct 7 11:14:19 2008
@@ -1,65 +1,65 @@
-package org.apache.camel.processor.aggregator;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.aggregate.PredicateAggregationCollection;
-import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
-import org.apache.camel.processor.aggregate.AggregationCollection;
-
-/**
- * Unit test for PredicateAggregatorCollection.
- */
-public class PredicateAggregatorCollectionTest extends ContextTestSupport {
-
- public void testPredicateAggregateCollection() throws Exception {
- // START SNIPPET: e2
- MockEndpoint result = getMockEndpoint("mock:result");
-
- // we only expect two messages as they have reached the completed
predicate
- // that we want 3 messages that has the same header id
- result.expectedMessageCount(2);
- result.expectedBodiesReceived("Message 1c", "Message 3c");
-
- // then we sent all the message at once
- template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
- template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
- template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
- template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
- template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
- template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
- template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
-
- assertMockEndpointsSatisfied();
- // END SNIPPET: e2
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- public void configure() throws Exception {
- // START SNIPPET: e1
- // create the aggregation collection we will use.
- // - we will correlate the recieved message based on the id
header
- // - as we will just keep the latest message we use the latest
strategy
- // - and finally we stop aggregate if we recieve 2 or more
messages
- AggregationCollection ag = new
PredicateAggregationCollection(header("id"),
- new UseLatestAggregationStrategy(),
- header(Exchange.AGGREGATED_COUNT).isEqualTo(3));
-
- // our route is aggregating from the direct queue and sending
the response to the mock
- from("direct:start")
- // we use the collection based aggregator we already have
configued
- .aggregator(ag)
- // wait for 2 seconds to aggregate
- .batchTimeout(2000L)
- .to("mock:result");
- // END SNIPPET: e1
- }
- };
- }
-}
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.PredicateAggregationCollection;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.apache.camel.processor.aggregate.AggregationCollection;
+
+/**
+ * Unit test for PredicateAggregatorCollection.
+ */
+public class PredicateAggregatorCollectionTest extends ContextTestSupport {
+
+ public void testPredicateAggregateCollection() throws Exception {
+ // START SNIPPET: e2
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ // we only expect two messages as they have reached the completed
predicate
+ // that we want 3 messages that has the same header id
+ result.expectedMessageCount(2);
+ result.expectedBodiesReceived("Message 1c", "Message 3c");
+
+ // then we sent all the message at once
+ template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+
+ assertMockEndpointsSatisfied();
+ // END SNIPPET: e2
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e1
+ // create the aggregation collection we will use.
+ // - we will correlate the recieved message based on the id
header
+ // - as we will just keep the latest message we use the latest
strategy
+ // - and finally we stop aggregate if we recieve 2 or more
messages
+ AggregationCollection ag = new
PredicateAggregationCollection(header("id"),
+ new UseLatestAggregationStrategy(),
+ header(Exchange.AGGREGATED_COUNT).isEqualTo(3));
+
+ // our route is aggregating from the direct queue and sending
the response to the mock
+ from("direct:start")
+ // we use the collection based aggregator we already have
configued
+ .aggregator(ag)
+ // wait for 0.5 seconds to aggregate
+ .batchTimeout(500L)
+ .to("mock:result");
+ // END SNIPPET: e1
+ }
+ };
+ }
+}
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date