Author: davsclaus
Date: Sat Jan 8 09:51:59 2011
New Revision: 1056678
URL: http://svn.apache.org/viewvc?rev=1056678&view=rev
Log:
CAMEL-3497: Reduced memory consumption for splitter.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1056678&r1=1056677&r2=1056678&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Sat Jan 8 09:51:59 2011
@@ -285,6 +285,8 @@ public class MulticastProcessor extends
total.incrementAndGet();
}
+ // TODO: in streaming mode we need to aggregate on-the-fly
+
// its to hard to do parallel async routing so we let the caller
thread be synchronously
// and have it pickup the replies and do the aggregation
boolean timedOut = false;
@@ -664,7 +666,11 @@ public class MulticastProcessor extends
int index = 0;
for (Processor processor : processors) {
- result.add(createProcessorExchangePair(index++, processor,
exchange));
+ // copy exchange, and do not share the unit of work
+ Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange,
false);
+ // and add the pair
+ RouteContext routeContext = exchange.getUnitOfWork() != null ?
exchange.getUnitOfWork().getRouteContext() : null;
+ result.add(createProcessorExchangePair(index++, processor, copy,
routeContext));
}
return result;
@@ -676,34 +682,33 @@ public class MulticastProcessor extends
* You <b>must</b> use this method to create the instances of {...@link
ProcessorExchangePair} as they
* need to be specially prepared before use.
*
- * @param processor the processor
- * @param exchange the exchange
+ * @param index the index
+ * @param processor the processor
+ * @param exchange the exchange
+ * @param routeContext the route context
* @return prepared for use
*/
- protected ProcessorExchangePair createProcessorExchangePair(int index,
Processor processor, Exchange exchange) {
+ protected ProcessorExchangePair createProcessorExchangePair(int index,
Processor processor,
+ Exchange
exchange, RouteContext routeContext) {
Processor prepared = processor;
- // copy exchange, and do not share the unit of work
- Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
-
// set property which endpoint we send to
- setToEndpoint(copy, prepared);
+ setToEndpoint(exchange, prepared);
// rework error handling to support fine grained error handling
- prepared = createErrorHandler(exchange, prepared);
+ prepared = createErrorHandler(routeContext, prepared);
- return new DefaultProcessorExchangePair(index, processor, prepared,
copy);
+ return new DefaultProcessorExchangePair(index, processor, prepared,
exchange);
}
- protected Processor createErrorHandler(Exchange exchange, Processor
processor) {
- Processor answer = processor;
+ protected Processor createErrorHandler(RouteContext routeContext,
Processor processor) {
+ Processor answer;
- if (exchange.getUnitOfWork() != null &&
exchange.getUnitOfWork().getRouteContext() != null) {
+ if (routeContext != null) {
// wrap the producer in error handler so we have fine grained
error handling on
// the output side instead of the input side
// this is needed to support redelivery on that output alone and
not doing redelivery
// for the entire multicast block again which will start from
scratch again
- RouteContext routeContext =
exchange.getUnitOfWork().getRouteContext();
// create key for cache
final PreparedErrorHandler key = new
PreparedErrorHandler(routeContext, processor);
@@ -725,14 +730,18 @@ public class MulticastProcessor extends
// instead of using ProcessorDefinition.wrapInErrorHandler)
try {
processor = builder.createErrorHandler(routeContext,
processor);
- // and wrap in unit of work processor so the copy exchange
also can run under UoW
- answer = new UnitOfWorkProcessor(processor);
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
+ // and wrap in unit of work processor so the copy exchange also
can run under UoW
+ answer = new UnitOfWorkProcessor(processor);
+
// add to cache
errorHandlers.putIfAbsent(key, answer);
+ } else {
+ // and wrap in unit of work processor so the copy exchange also
can run under UoW
+ answer = new UnitOfWorkProcessor(processor);
}
return answer;
@@ -781,8 +790,8 @@ public class MulticastProcessor extends
/**
* Sets the given {...@link
org.apache.camel.processor.aggregate.AggregationStrategy} on the {...@link
Exchange}.
*
- * @param exchange the exchange
- * @param aggregationStrategy the strategy
+ * @param exchange the exchange
+ * @param aggregationStrategy the strategy
*/
protected void setAggregationStrategyOnExchange(Exchange exchange,
AggregationStrategy aggregationStrategy) {
Map property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY,
Map.class);
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java?rev=1056678&r1=1056677&r2=1056678&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
Sat Jan 8 09:51:59 2011
@@ -28,6 +28,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.ProducerCache;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
@@ -189,8 +190,10 @@ public class RecipientListProcessor exte
setToEndpoint(copy, prepared);
// rework error handling to support fine grained error handling
- prepared = createErrorHandler(exchange, prepared);
+ RouteContext routeContext = exchange.getUnitOfWork() != null ?
exchange.getUnitOfWork().getRouteContext() : null;
+ prepared = createErrorHandler(routeContext, prepared);
+ // and create the pair
return new RecipientProcessorExchangePair(index, producerCache,
endpoint, producer, prepared, copy);
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=1056678&r1=1056677&r2=1056678&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
Sat Jan 8 09:51:59 2011
@@ -34,7 +34,7 @@ import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
-import org.apache.camel.util.CollectionHelper;
+import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
@@ -112,6 +112,7 @@ public class Splitter extends MulticastP
// create a copy which we use as master to copy during splitting
// this avoids any side effect reflected upon the incoming exchange
private final Exchange copy = ExchangeHelper.createCopy(exchange,
true);
+ private final RouteContext routeContext = exchange.getUnitOfWork()
!= null ? exchange.getUnitOfWork().getRouteContext() : null;
public Iterator iterator() {
return new Iterator() {
@@ -140,15 +141,16 @@ public class Splitter extends MulticastP
public Object next() {
Object part = iterator.next();
- // create a copy as the new exchange to be routed in
the splitter from the copy
- Exchange newExchange = ExchangeHelper.createCopy(copy,
true);
+ // create a correlated copy as the new exchange to be
routed in the splitter from the copy
+ // and do not share the unit of work
+ Exchange newExchange =
ExchangeHelper.createCorrelatedCopy(copy, false);
if (part instanceof Message) {
- newExchange.setIn((Message)part);
+ newExchange.setIn((Message) part);
} else {
Message in = newExchange.getIn();
in.setBody(part);
}
- return createProcessorExchangePair(index++,
getProcessors().iterator().next(), newExchange);
+ return createProcessorExchangePair(index++,
getProcessors().iterator().next(), newExchange, routeContext);
}
public void remove() {
@@ -180,7 +182,7 @@ public class Splitter extends MulticastP
exchange.setProperty(Exchange.SPLIT_INDEX, index);
if (allPairs instanceof Collection) {
- exchange.setProperty(Exchange.SPLIT_SIZE,
((Collection<?>)allPairs).size());
+ exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>)
allPairs).size());
}
if (it.hasNext()) {
exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE);
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java?rev=1056678&r1=1056677&r2=1056678&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java
Sat Jan 8 09:51:59 2011
@@ -294,25 +294,27 @@ public class DefaultTraceFormatter imple
String to = "";
String route = "";
if (showNode || showRouteId) {
- TracedRouteNodes traced =
exchange.getUnitOfWork().getTracedRouteNodes();
+ if (exchange.getUnitOfWork() != null) {
+ TracedRouteNodes traced =
exchange.getUnitOfWork().getTracedRouteNodes();
- RouteNode traceFrom = traced.getSecondLastNode();
- if (traceFrom != null) {
- from = getNodeMessage(traceFrom, exchange);
- } else if (exchange.getFromEndpoint() != null) {
- from = "from(" + exchange.getFromEndpoint().getEndpointUri() +
")";
- }
-
- RouteNode traceTo = traced.getLastNode();
- if (traceTo != null) {
- to = getNodeMessage(traceTo, exchange);
- // if its an abstract dummy holder then we have to get the 2nd
last so we can get the real node that has
- // information which route it belongs to
- if (traceTo.isAbstract() && traceTo.getProcessorDefinition()
== null) {
- traceTo = traced.getSecondLastNode();
+ RouteNode traceFrom = traced.getSecondLastNode();
+ if (traceFrom != null) {
+ from = getNodeMessage(traceFrom, exchange);
+ } else if (exchange.getFromEndpoint() != null) {
+ from = "from(" +
exchange.getFromEndpoint().getEndpointUri() + ")";
}
+
+ RouteNode traceTo = traced.getLastNode();
if (traceTo != null) {
- route = extractRoute(traceTo.getProcessorDefinition());
+ to = getNodeMessage(traceTo, exchange);
+ // if its an abstract dummy holder then we have to get the
2nd last so we can get the real node that has
+ // information which route it belongs to
+ if (traceTo.isAbstract() &&
traceTo.getProcessorDefinition() == null) {
+ traceTo = traced.getSecondLastNode();
+ }
+ if (traceTo != null) {
+ route = extractRoute(traceTo.getProcessorDefinition());
+ }
}
}
}