Author: davsclaus Date: Thu Apr 8 09:20:19 2010 New Revision: 931845 URL: http://svn.apache.org/viewvc?rev=931845&view=rev Log: CAMEL-2568: Polished aggregator and fixed a timeout in some rare cases could send the last and not aggregated exchange.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java - copied, changed from r931784, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java (contents, props changed) - copied, changed from r931784, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorTest.java Removed: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=931845&r1=931844&r2=931845&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Thu Apr 8 09:20:19 2010 @@ -81,7 +81,8 @@ public class AggregateProcessor extends private final Expression correlationExpression; private final ExecutorService executorService; private ScheduledExecutorService recoverService; - private TimeoutMap<Object, Exchange> timeoutMap; + // store correlation key -> exchange id in timeout map + private TimeoutMap<Object, String> timeoutMap; private ExceptionHandler exceptionHandler = new LoggingExceptionHandler(getClass()); private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository(); private Map<Object, Object> closedCorrelationKeys; @@ -203,7 +204,7 @@ public class AggregateProcessor extends } // check if we are complete - boolean complete = false; + String complete = null; if (isEagerCheckCompletion()) { // put the current aggregated size on the exchange so its avail during completion check newExchange.setProperty(Exchange.AGGREGATED_SIZE, size); @@ -224,14 +225,13 @@ public class AggregateProcessor extends } // only need to update aggregation repository if we are not complete - if (!complete) { + if (complete == null) { if (LOG.isTraceEnabled()) { LOG.trace("In progress aggregated exchange: " + answer + " with correlation key:" + key); } aggregationRepository.add(exchange.getContext(), key, answer); - } - - if (complete) { + } else { + answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); onCompletion(key, answer, false); } @@ -242,12 +242,18 @@ public class AggregateProcessor extends return answer; } - protected boolean isCompleted(Object key, Exchange exchange) { + /** + * Tests whether the given exchange is complete or not + * + * @param key the correlation key + * @param exchange the incoming exchange + * @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion + */ + protected String isCompleted(Object key, Exchange exchange) { if (getCompletionPredicate() != null) { boolean answer = getCompletionPredicate().matches(exchange); if (answer) { - exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "predicate"); - return true; + return "predicate"; } } @@ -256,16 +262,14 @@ public class AggregateProcessor extends if (value != null && value > 0) { int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class); if (size >= value) { - exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "size"); - return true; + return "size"; } } } if (getCompletionSize() > 0) { int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class); if (size >= getCompletionSize()) { - exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "size"); - return true; + return "size"; } } @@ -279,7 +283,7 @@ public class AggregateProcessor extends LOG.trace("Updating correlation key " + key + " to timeout after " + value + " ms. as exchange received: " + exchange); } - timeoutMap.put(key, exchange, value); + timeoutMap.put(key, exchange.getExchangeId(), value); timeoutSet = true; } } @@ -289,7 +293,7 @@ public class AggregateProcessor extends LOG.trace("Updating correlation key " + key + " to timeout after " + getCompletionTimeout() + " ms. as exchange received: " + exchange); } - timeoutMap.put(key, exchange, getCompletionTimeout()); + timeoutMap.put(key, exchange.getExchangeId(), getCompletionTimeout()); } if (isCompletionFromBatchConsumer()) { @@ -298,12 +302,12 @@ public class AggregateProcessor extends if (size > 0 && batchConsumerCounter.intValue() >= size) { // batch consumer is complete then reset the counter batchConsumerCounter.set(0); - exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "consumer"); - return true; + return "consumer"; } } - return false; + // not complete + return null; } protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) { @@ -498,23 +502,27 @@ public class AggregateProcessor extends /** * Background task that looks for aggregated exchanges which is triggered by completion timeouts. */ - private final class AggregationTimeoutMap extends DefaultTimeoutMap<Object, Exchange> { + private final class AggregationTimeoutMap extends DefaultTimeoutMap<Object, String> { private AggregationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) { super(executor, requestMapPollTimeMillis); } @Override - public void onEviction(Object key, Exchange exchange) { + public void onEviction(Object key, String exchangeId) { if (log.isDebugEnabled()) { log.debug("Completion timeout triggered for correlation key: " + key); } - exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout"); + // get the aggregated exchange + Exchange answer = aggregationRepository.get(camelContext, key); + + // indicate it was completed by timeout + answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout"); try { lock.lock(); - onCompletion(key, exchange, true); + onCompletion(key, answer, true); } finally { lock.unlock(); } @@ -580,12 +588,7 @@ public class AggregateProcessor extends exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); // resubmit the recovered exchange - try { - lock.lock(); - onSubmitCompletion(key, exchange); - } finally { - lock.unlock(); - } + onSubmitCompletion(key, exchange); } } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java?rev=931845&r1=931844&r2=931845&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java Thu Apr 8 09:20:19 2010 @@ -23,12 +23,14 @@ import org.apache.camel.processor.aggreg public class BodyInAggregatingStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { - if (oldExchange != null) { - String oldBody = oldExchange.getIn().getBody(String.class); - String newBody = newExchange.getIn().getBody(String.class); - newExchange.getIn().setBody(oldBody + "+" + newBody); + if (oldExchange == null) { + return newExchange; } - return newExchange; + + String oldBody = oldExchange.getIn().getBody(String.class); + String newBody = newExchange.getIn().getBody(String.class); + oldExchange.getIn().setBody(oldBody + "+" + newBody); + return oldExchange; } /** Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java?rev=931845&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java Thu Apr 8 09:20:19 2010 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import java.util.List; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * Unit test for aggregate grouped exchanges completed by size + */ +public class AggregateGroupedExchangeSizePredicateTest extends ContextTestSupport { + + public void testGroupedSize() throws Exception { + MockEndpoint result = getMockEndpoint("mock:result"); + + // we expect 2 messages since we group by size (3 and 2) + result.expectedMessageCount(2); + + template.sendBodyAndHeader("direct:start", "100", "groupSize", 3); + template.sendBodyAndHeader("direct:start", "150", "groupSize", 3); + template.sendBodyAndHeader("direct:start", "130", "groupSize", 3); + template.sendBodyAndHeader("direct:start", "200", "groupSize", 2); + template.sendBodyAndHeader("direct:start", "190", "groupSize", 2); + + assertMockEndpointsSatisfied(); + + Exchange out = result.getExchanges().get(0); + List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + assertEquals(3, grouped.size()); + assertEquals("100", grouped.get(0).getIn().getBody(String.class)); + assertEquals("150", grouped.get(1).getIn().getBody(String.class)); + assertEquals("130", grouped.get(2).getIn().getBody(String.class)); + + out = result.getExchanges().get(1); + grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + assertEquals(2, grouped.size()); + + assertEquals("200", grouped.get(0).getIn().getBody(String.class)); + assertEquals("190", grouped.get(1).getIn().getBody(String.class)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("direct:start") + // must use eagerCheckCompletion so we can check the groupSize header on the incoming exchange + .aggregate().constant(true).groupExchanges().eagerCheckCompletion().completionSize(header("groupSize")) + .to("mock:result") + .end(); + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java (from r931784, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java&r1=931784&r2=931845&rev=931845&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java Thu Apr 8 09:20:19 2010 @@ -24,55 +24,50 @@ import org.apache.camel.builder.RouteBui import org.apache.camel.component.mock.MockEndpoint; /** - * Unit test for aggregate grouped exchanges. + * Unit test for aggregate grouped exchanges completed by size */ -public class AggregateGroupedExchangeTest extends ContextTestSupport { +public class AggregateGroupedExchangeSizeTest extends ContextTestSupport { - public void testGrouped() throws Exception { - // START SNIPPET: e2 + public void testGroupedSize() throws Exception { MockEndpoint result = getMockEndpoint("mock:result"); - // we expect 1 messages since we group all we get in using the same correlation key - result.expectedMessageCount(1); + // we expect 2 messages since we group by size (3 and 3) + result.expectedMessageCount(2); - // then we sent all the message at once template.sendBody("direct:start", "100"); template.sendBody("direct:start", "150"); template.sendBody("direct:start", "130"); template.sendBody("direct:start", "200"); template.sendBody("direct:start", "190"); + template.sendBody("direct:start", "120"); assertMockEndpointsSatisfied(); Exchange out = result.getExchanges().get(0); List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); - - assertEquals(5, grouped.size()); - + assertEquals(3, grouped.size()); assertEquals("100", grouped.get(0).getIn().getBody(String.class)); assertEquals("150", grouped.get(1).getIn().getBody(String.class)); assertEquals("130", grouped.get(2).getIn().getBody(String.class)); - assertEquals("200", grouped.get(3).getIn().getBody(String.class)); - assertEquals("190", grouped.get(4).getIn().getBody(String.class)); - // END SNIPPET: e2 + + out = result.getExchanges().get(1); + grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + assertEquals(3, grouped.size()); + + assertEquals("200", grouped.get(0).getIn().getBody(String.class)); + assertEquals("190", grouped.get(1).getIn().getBody(String.class)); + assertEquals("120", grouped.get(2).getIn().getBody(String.class)); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { - // START SNIPPET: e1 - // our route is aggregating from the direct queue and sending the response to the mock from("direct:start") - // aggregate all using same expression - .aggregate().constant(true) - // wait for 0.5 seconds to aggregate - .completionTimeout(500L) - // group the exchanges so we get one single exchange containing all the others - .groupExchanges() - .to("mock:result"); - // END SNIPPET: e1 + .aggregate().constant(true).groupExchanges().completionSize(3) + .to("mock:result") + .end(); } }; } -} +} \ No newline at end of file Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java (from r931784, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorTest.java&r1=931784&r2=931845&rev=931845&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorTest.java (original) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java Thu Apr 8 09:20:19 2010 @@ -14,10 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.spring.processor; +package org.apache.camel.spring.processor.aggregator; import org.apache.camel.CamelContext; -import org.apache.camel.processor.AggregatorTest; +import org.apache.camel.processor.aggregator.AggregatorTest; import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; /** Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date