Author: ningjiang
Date: Fri Jun 27 05:48:40 2008
New Revision: 672260
URL: http://svn.apache.org/viewvc?rev=672260&view=rev
Log:
Added a test case to show the error of loan broker queue version
Modified:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
Modified:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java?rev=672260&r1=672259&r2=672260&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
(original)
+++
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
Fri Jun 27 05:48:40 2008
@@ -23,6 +23,7 @@
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.BatchProcessor;
@@ -34,7 +35,8 @@
public class AggregratedJmsRouteTest extends ContextTestSupport {
private static final transient Log LOG =
LogFactory.getLog(AggregratedJmsRouteTest.class);
- private String startEndpointUri = "jms:queue:test.a";
+ private String timeOutEndpointUri = "jms:queue:test.a";
+ private String multicastEndpointUri = "jms:queue:mutilcast";
/*
* negative recieve wait timeout for jms is blocking so timeout during
processing does not hang
@@ -45,14 +47,29 @@
resultEndpoint.expectedMessageCount(1);
for (int i = 1; i <= 2; i++) {
String body = "message:" + i;
- sendExchange(body);
+ sendExchange(timeOutEndpointUri, body);
}
resultEndpoint.assertIsSatisfied();
}
- protected void sendExchange(final Object expectedBody) {
- template.sendBodyAndHeader(startEndpointUri, expectedBody, "cheese",
123);
+
+ public void xtestJmsMulticastAndAggregration() throws Exception {
+ MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:reply",
MockEndpoint.class);
+
+ resultEndpoint.expectedMessageCount(2);
+ for (int i = 1; i <= 6; i++) {
+ String body = "message:" + i;
+ sendExchange("jms:queue:point1", body);
+ }
+
+ resultEndpoint.assertIsSatisfied();
+ }
+
+
+
+ protected void sendExchange(String uri, final Object expectedBody) {
+ template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
}
@@ -68,7 +85,7 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- from(startEndpointUri).to("jms:queue:test.b");
+ from(timeOutEndpointUri).to("jms:queue:test.b");
from("jms:queue:test.b").aggregator(header("cheese"), new
AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange, Exchange
newExchange) {
try {
@@ -80,7 +97,32 @@
return newExchange;
}
}).to("mock:result");
+
+ from(multicastEndpointUri).to("jms:queue:point1",
"jms:queue:point2", "jms:queue:point3");
+ from("jms:queue:point1").process(new
MyProcessor()).to("jms:queue:reply");
+ from("jms:queue:point2").process(new
MyProcessor()).to("jms:queue:reply");
+ from("jms:queue:point3").process(new
MyProcessor()).to("jms:queue:reply");
+ from("jms:queue:reply").aggregator(header("cheese"), new
AggregationStrategy() {
+ public Exchange aggregate(Exchange oldExchange, Exchange
newExchange) {
+ LOG.info("try to aggregating the message ");
+ Integer old = (Integer)
oldExchange.getProperty("aggregated");
+ if (old == null) {
+ old = 1;
+ }
+ Exchange result = newExchange;
+ result.setProperty("aggregated", old + 1);
+ return result;
+ }
+ }).completedPredicate(header("aggregated").isEqualTo(3))
+ .to("mock:reply");
}
};
}
+ private class MyProcessor implements Processor {
+
+ public void process(Exchange exchange) throws Exception {
+ LOG.info("get the exchange here " + exchange);
+ }
+
+ }
}