Author: ningjiang
Date: Wed Jul 2 02:48:15 2008
New Revision: 673335
URL: http://svn.apache.org/viewvc?rev=673335&view=rev
Log:
CAMEL-642 patch applied with huge thanks to William
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
activemq/camel/trunk/components/camel-jdbc/pom.xml
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=673335&r1=673334&r2=673335&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
Wed Jul 2 02:48:15 2008
@@ -293,6 +293,9 @@
// Route Management Methods
// -----------------------------------------------------------------------
public List<Route> getRoutes() {
+ if (routes == null) {
+ routes = new ArrayList<Route>();
+ }
return routes;
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=673335&r1=673334&r2=673335&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
Wed Jul 2 02:48:15 2008
@@ -55,6 +55,7 @@
private CamelContext camelContext;
private List<InterceptStrategy> interceptStrategies = new
ArrayList<InterceptStrategy>();
private ErrorHandlerWrappingStrategy errorHandlerWrappingStrategy;
+ private boolean routeAdded;
public DefaultRouteContext(RouteType route, FromType from,
Collection<Route> routes) {
this.route = route;
@@ -194,4 +195,13 @@
errorHandlerWrappingStrategy = strategy;
}
+
+ public boolean isRouteAdded() {
+ return routeAdded;
+ }
+
+ public void setIsRouteAdded(boolean b) {
+ routeAdded = b;
+
+ }
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java?rev=673335&r1=673334&r2=673335&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
Wed Jul 2 02:48:15 2008
@@ -90,7 +90,11 @@
@Override
public void addRoutes(RouteContext routeContext, Collection<Route> routes)
throws Exception {
final Aggregator aggregator = createAggregator(routeContext);
-
+ doAddRoute(routeContext, routes, aggregator);
+ }
+
+ private void doAddRoute(RouteContext routeContext, Collection<Route>
routes, final Aggregator aggregator)
+ throws Exception {
Route route = new Route<Exchange>(aggregator.getEndpoint(),
aggregator) {
@Override
public String toString() {
@@ -100,10 +104,14 @@
routes.add(route);
}
-
+
@Override
public Processor createProcessor(RouteContext routeContext) throws
Exception {
- return createAggregator(routeContext);
+ final Aggregator aggregator = createAggregator(routeContext);
+
+ doAddRoute(routeContext, routeContext.getCamelContext().getRoutes(),
aggregator);
+ routeContext.setIsRouteAdded(true);
+ return aggregator;
}
protected Aggregator createAggregator(RouteContext routeContext) throws
Exception {
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=673335&r1=673334&r2=673335&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
Wed Jul 2 02:48:15 2008
@@ -95,7 +95,9 @@
public void addRoutes(RouteContext routeContext, Collection<Route> routes)
throws Exception {
Processor processor = makeProcessor(routeContext);
- routeContext.addEventDrivenProcessor(processor);
+ if (!routeContext.isRouteAdded()) {
+ routeContext.addEventDrivenProcessor(processor);
+ }
}
/**
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java?rev=673335&r1=673334&r2=673335&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
Wed Jul 2 02:48:15 2008
@@ -103,4 +103,20 @@
*/
void setErrorHandlerWrappingStrategy(ErrorHandlerWrappingStrategy
strategy);
+ /**
+ * If this flag is true, [EMAIL PROTECTED]
ProcessorType#addRoutes(RouteContext, java.util.Collection)
+ * will not add processor to addEventDrivenProcessor to the RouteContext
and it
+ * will prevent from adding an EventDrivenRoute.
+ *
+ */
+ void setIsRouteAdded(boolean value);
+
+ /**
+ * @see [EMAIL PROTECTED] #setIsRouteAdded(boolean)}
+ *
+ */
+ boolean isRouteAdded();
+
+
+
}
Modified: activemq/camel/trunk/components/camel-jdbc/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jdbc/pom.xml?rev=673335&r1=673334&r2=673335&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jdbc/pom.xml (original)
+++ activemq/camel/trunk/components/camel-jdbc/pom.xml Wed Jul 2 02:48:15 2008
@@ -6,9 +6,9 @@
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
-
+
http://www.apache.org/licenses/LICENSE-2.0
-
+
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -41,6 +41,7 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<type>test-jar</type>
+ <scope>test</scope>
</dependency>
<!-- test dependencies -->
<dependency>
Modified:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java?rev=673335&r1=673334&r2=673335&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
(original)
+++
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
Wed Jul 2 02:48:15 2008
@@ -36,7 +36,7 @@
private static final transient Log LOG =
LogFactory.getLog(AggregratedJmsRouteTest.class);
private String timeOutEndpointUri = "jms:queue:test.a";
- private String multicastEndpointUri = "jms:queue:mutilcast";
+ private String multicastEndpointUri = "jms:queue:multicast";
/*
* negative receive wait timeout for jms is blocking so timeout during
processing does not hang
@@ -54,7 +54,7 @@
}
- public void xtestJmsMulticastAndAggregration() throws Exception {
+ public void testJmsMulticastAndAggregration() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:reply",
MockEndpoint.class);
resultEndpoint.expectedMessageCount(2);
@@ -104,12 +104,13 @@
from("jms:queue:point3").process(new
MyProcessor()).to("jms:queue:reply");
from("jms:queue:reply").aggregator(header("cheese"), new
AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange, Exchange
newExchange) {
+ Exchange copy = newExchange.copy();
LOG.info("try to aggregating the message ");
Integer old = (Integer)
oldExchange.getProperty("aggregated");
if (old == null) {
old = 1;
}
- Exchange result = newExchange;
+ Exchange result = copy;
result.setProperty("aggregated", old + 1);
return result;
}