Author: davsclaus Date: Thu Jun 2 12:31:42 2011 New Revision: 1130521 URL: http://svn.apache.org/viewvc?rev=1130521&view=rev Log: CAMEL-4037: Aggregate EIP with only completion timeout condition will restore timeout map upon restart based on exchanges from aggregation repository.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java - copied, changed from r1130434, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTimeoutCompletionRestartTest.java - copied, changed from r1130434, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTimeoutCompletionRestartTest.java - copied, changed from r1130501, camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1130521&r1=1130520&r2=1130521&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Thu Jun 2 12:31:42 2011 @@ -74,6 +74,7 @@ public interface Exchange { String AUTHENTICATION_FAILURE_POLICY_ID = "CamelAuthenticationFailurePolicyId"; String ACCEPT_CONTENT_TYPE = "CamelAcceptContentType"; String AGGREGATED_SIZE = "CamelAggregatedSize"; + String AGGREGATED_TIMEOUT = "CamelAggregatedTimeout"; String AGGREGATED_COMPLETED_BY = "CamelAggregatedCompletedBy"; String AGGREGATED_CORRELATION_KEY = "CamelAggregatedCorrelationKey"; String AGGREGATION_STRATEGY = "CamelAggregationStrategy"; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1130521&r1=1130520&r2=1130521&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Thu Jun 2 12:31:42 2011 @@ -54,6 +54,8 @@ import org.apache.camel.util.ExchangeHel import org.apache.camel.util.LRUCache; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; +import org.apache.camel.util.StopWatch; +import org.apache.camel.util.TimeUtils; import org.apache.camel.util.TimeoutMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,8 +74,6 @@ import org.slf4j.LoggerFactory; * messages for the same stock are combined (or just the latest message is used * and older prices are discarded). Another idea is to combine line item messages * together into a single invoice message. - * - * @version */ public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable { @@ -198,7 +198,8 @@ public class AggregateProcessor extends * @param key the correlation key * @param exchange the exchange * @return the aggregated exchange - * @throws org.apache.camel.CamelExchangeException is thrown if error aggregating + * @throws org.apache.camel.CamelExchangeException + * is thrown if error aggregating */ private Exchange doAggregation(String key, Exchange exchange) throws CamelExchangeException { LOG.trace("onAggregation +++ start +++ with correlation key: {}", key); @@ -275,8 +276,8 @@ public class AggregateProcessor extends /** * Tests whether the given exchange is complete or not * - * @param key the correlation key - * @param exchange the incoming exchange + * @param key the correlation key + * @param exchange the incoming exchange * @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion */ protected String isCompleted(String key, Exchange exchange) { @@ -311,7 +312,7 @@ public class AggregateProcessor extends if (value != null && value > 0) { LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}", new Object[]{key, value, exchange}); - timeoutMap.put(key, exchange.getExchangeId(), value); + addExchangeToTimeoutMap(key, exchange, value); timeoutSet = true; } } @@ -319,10 +320,10 @@ public class AggregateProcessor extends // timeout is used so use the timeout map to keep an eye on this LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}", new Object[]{key, getCompletionTimeout(), exchange}); - timeoutMap.put(key, exchange.getExchangeId(), getCompletionTimeout()); + addExchangeToTimeoutMap(key, exchange, getCompletionTimeout()); } - if (isCompletionFromBatchConsumer()) { + if (isCompletionFromBatchConsumer()) { batchConsumerCorrelationKeys.add(key); batchConsumerCounter.incrementAndGet(); int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class); @@ -400,6 +401,50 @@ public class AggregateProcessor extends }); } + /** + * Restores the timeout map with timeout values from the aggregation repository. + * <p/> + * This is needed in case the aggregator has been stopped and started again (for example a server restart). + * Then the existing exchanges from the {@link AggregationRepository} must have its timeout conditions restored. + */ + protected void restoreTimeoutMapFromAggregationRepository() throws Exception { + StopWatch watch = new StopWatch(); + LOG.trace("Starting restoring CompletionTimeout for existing exchanges from the aggregation repository..."); + + // grab the timeout value for each partly aggregated exchange + Set<String> keys = aggregationRepository.getKeys(); + if (keys == null || keys.isEmpty()) { + return; + } + + for (String key : keys) { + Exchange exchange = aggregationRepository.get(camelContext, key); + // grab the timeout value + long timeout = exchange.hasProperties() ? exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, 0, long.class) : 0; + if (timeout > 0) { + LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), timeout); + addExchangeToTimeoutMap(key, exchange, timeout); + } + } + + // log duration of this task so end user can see how long it takes to pre-check this upon starting + LOG.info("Restored {} CompletionTimeout conditions in the AggregationTimeoutChecker in {}", + timeoutMap.size(), TimeUtils.printDuration(watch.stop())); + } + + /** + * Adds the given exchange to the timeout map, which is used by the timeout checker task to trigger timeouts. + * + * @param key the correlation key + * @param exchange the exchange + * @param timeout the timeout value in millis + */ + private void addExchangeToTimeoutMap(String key, Exchange exchange, long timeout) { + // store the timeout value on the exchange as well, in case we need it later + exchange.setProperty(Exchange.AGGREGATED_TIMEOUT, timeout); + timeoutMap.put(key, exchange.getExchangeId(), timeout); + } + public Predicate getCompletionPredicate() { return completionPredicate; } @@ -797,6 +842,9 @@ public class AggregateProcessor extends ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1); // check for timed out aggregated messages once every second timeoutMap = new AggregationTimeoutMap(scheduler, 1000L); + // fill in existing timeout values from the aggregation repository, for example if a restart occurred, then we + // need to re-establish the timeout map so timeout can trigger + restoreTimeoutMapFromAggregationRepository(); ServiceHelper.startService(timeoutMap); } } Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java (from r1130434, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java&r1=1130434&r2=1130521&rev=1130521&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java Thu Jun 2 12:31:42 2011 @@ -18,13 +18,10 @@ package org.apache.camel.processor.aggre import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.camel.CamelExchangeException; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.Expression; -import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.impl.DefaultExchange; @@ -32,12 +29,13 @@ import org.apache.camel.processor.BodyIn import org.apache.camel.processor.SendProcessor; import org.apache.camel.processor.aggregate.AggregateProcessor; import org.apache.camel.processor.aggregate.AggregationStrategy; -import org.apache.camel.spi.ExceptionHandler; /** + * To test CAMEL-4037 that a restart of aggregator can re-initialize the timeout map + * * @version */ -public class AggregateProcessorTest extends ContextTestSupport { +public class AggregateProcessorTimeoutCompletionRestartTest extends ContextTestSupport { private ExecutorService executorService; @@ -52,60 +50,18 @@ public class AggregateProcessorTest exte executorService = Executors.newSingleThreadExecutor(); } - public void testAggregateProcessorCompletionPredicate() throws Exception { + public void testAggregateProcessorTimeoutRestart() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("A+B+END"); - mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "predicate"); - - Processor done = new SendProcessor(context.getEndpoint("mock:result")); - Expression corr = header("id"); - AggregationStrategy as = new BodyInAggregatingStrategy(); - Predicate complete = body().contains("END"); - - AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService); - ap.setCompletionPredicate(complete); - ap.setEagerCheckCompletion(false); - ap.start(); - - Exchange e1 = new DefaultExchange(context); - e1.getIn().setBody("A"); - e1.getIn().setHeader("id", 123); - - Exchange e2 = new DefaultExchange(context); - e2.getIn().setBody("B"); - e2.getIn().setHeader("id", 123); - - Exchange e3 = new DefaultExchange(context); - e3.getIn().setBody("END"); - e3.getIn().setHeader("id", 123); - - Exchange e4 = new DefaultExchange(context); - e4.getIn().setBody("D"); - e4.getIn().setHeader("id", 123); - - ap.process(e1); - ap.process(e2); - ap.process(e3); - ap.process(e4); - - assertMockEndpointsSatisfied(); - - ap.stop(); - } - - public void testAggregateProcessorCompletionPredicateEager() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("A+B+END"); - mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "predicate"); + mock.expectedBodiesReceived("A+B"); + mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "timeout"); Processor done = new SendProcessor(context.getEndpoint("mock:result")); Expression corr = header("id"); AggregationStrategy as = new BodyInAggregatingStrategy(); - Predicate complete = body().isEqualTo("END"); AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService); - ap.setCompletionPredicate(complete); - ap.setEagerCheckCompletion(true); + // start with a high timeout so no completes before we stop + ap.setCompletionTimeout(2000); ap.start(); Exchange e1 = new DefaultExchange(context); @@ -116,83 +72,29 @@ public class AggregateProcessorTest exte e2.getIn().setBody("B"); e2.getIn().setHeader("id", 123); - Exchange e3 = new DefaultExchange(context); - e3.getIn().setBody("END"); - e3.getIn().setHeader("id", 123); - - Exchange e4 = new DefaultExchange(context); - e4.getIn().setBody("D"); - e4.getIn().setHeader("id", 123); - ap.process(e1); ap.process(e2); - ap.process(e3); - ap.process(e4); - - assertMockEndpointsSatisfied(); + // shutdown before the 2 sec timeout occurs + // however we use stop instead of shutdown as shutdown will clear the in memory aggregation repository, ap.stop(); - } - - public void testAggregateProcessorCompletionAggregatedSize() throws Exception { - doTestAggregateProcessorCompletionAggregatedSize(false); - } - - public void testAggregateProcessorCompletionAggregatedSizeEager() throws Exception { - doTestAggregateProcessorCompletionAggregatedSize(true); - } - - private void doTestAggregateProcessorCompletionAggregatedSize(boolean eager) throws Exception { - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("A+B+C"); - mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "size"); - Processor done = new SendProcessor(context.getEndpoint("mock:result")); - Expression corr = header("id"); - AggregationStrategy as = new BodyInAggregatingStrategy(); + // should be no completed + assertEquals(0, mock.getReceivedCounter()); - AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService); - ap.setCompletionSize(3); - ap.setEagerCheckCompletion(eager); + // start aggregator again ap.start(); - Exchange e1 = new DefaultExchange(context); - e1.getIn().setBody("A"); - e1.getIn().setHeader("id", 123); - - Exchange e2 = new DefaultExchange(context); - e2.getIn().setBody("B"); - e2.getIn().setHeader("id", 123); - - Exchange e3 = new DefaultExchange(context); - e3.getIn().setBody("C"); - e3.getIn().setHeader("id", 123); - - Exchange e4 = new DefaultExchange(context); - e4.getIn().setBody("D"); - e4.getIn().setHeader("id", 123); - - ap.process(e1); - ap.process(e2); - ap.process(e3); - ap.process(e4); - + // the aggregator should restore the timeout condition and trigger timeout assertMockEndpointsSatisfied(); + assertEquals(1, mock.getReceivedCounter()); - ap.stop(); - } - - public void testAggregateProcessorCompletionTimeout() throws Exception { - doTestAggregateProcessorCompletionTimeout(false); - } - - public void testAggregateProcessorCompletionTimeoutEager() throws Exception { - doTestAggregateProcessorCompletionTimeout(true); + ap.shutdown(); } - private void doTestAggregateProcessorCompletionTimeout(boolean eager) throws Exception { + public void testAggregateProcessorTimeoutExpressionRestart() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("A+B+C"); + mock.expectedBodiesReceived("A+B"); mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "timeout"); Processor done = new SendProcessor(context.getEndpoint("mock:result")); @@ -200,363 +102,93 @@ public class AggregateProcessorTest exte AggregationStrategy as = new BodyInAggregatingStrategy(); AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService); - ap.setCompletionTimeout(3000); - ap.setEagerCheckCompletion(eager); - ap.start(); - - Exchange e1 = new DefaultExchange(context); - e1.getIn().setBody("A"); - e1.getIn().setHeader("id", 123); - - Exchange e2 = new DefaultExchange(context); - e2.getIn().setBody("B"); - e2.getIn().setHeader("id", 123); - - Exchange e3 = new DefaultExchange(context); - e3.getIn().setBody("C"); - e3.getIn().setHeader("id", 123); - - Exchange e4 = new DefaultExchange(context); - e4.getIn().setBody("D"); - e4.getIn().setHeader("id", 123); - - ap.process(e1); - - Thread.sleep(250); - ap.process(e2); - - Thread.sleep(500); - ap.process(e3); - - Thread.sleep(5000); - ap.process(e4); - - assertMockEndpointsSatisfied(); - - ap.stop(); - } - - public void testAggregateCompletionInterval() throws Exception { - // camel context must be started - context.start(); - - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("A+B+C", "D"); - mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "interval"); - - Processor done = new SendProcessor(context.getEndpoint("mock:result")); - Expression corr = header("id"); - AggregationStrategy as = new BodyInAggregatingStrategy(); - - AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService); - ap.setCompletionInterval(3000); + // start with a high timeout so no completes before we stop + ap.setCompletionTimeoutExpression(header("myTimeout")); ap.start(); Exchange e1 = new DefaultExchange(context); e1.getIn().setBody("A"); e1.getIn().setHeader("id", 123); + e1.getIn().setHeader("myTimeout", 2000); Exchange e2 = new DefaultExchange(context); e2.getIn().setBody("B"); e2.getIn().setHeader("id", 123); - - Exchange e3 = new DefaultExchange(context); - e3.getIn().setBody("C"); - e3.getIn().setHeader("id", 123); - - Exchange e4 = new DefaultExchange(context); - e4.getIn().setBody("D"); - e4.getIn().setHeader("id", 123); - - ap.process(e1); - ap.process(e2); - ap.process(e3); - - Thread.sleep(5000); - ap.process(e4); - - assertMockEndpointsSatisfied(); - - ap.stop(); - } - - public void testAggregateIgnoreInvalidCorrelationKey() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("A+C+END"); - - Processor done = new SendProcessor(context.getEndpoint("mock:result")); - Expression corr = header("id"); - AggregationStrategy as = new BodyInAggregatingStrategy(); - Predicate complete = body().contains("END"); - - AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService); - ap.setCompletionPredicate(complete); - ap.setIgnoreInvalidCorrelationKeys(true); - - ap.start(); - - Exchange e1 = new DefaultExchange(context); - e1.getIn().setBody("A"); - e1.getIn().setHeader("id", 123); - - Exchange e2 = new DefaultExchange(context); - e2.getIn().setBody("B"); - - Exchange e3 = new DefaultExchange(context); - e3.getIn().setBody("C"); - e3.getIn().setHeader("id", 123); - - Exchange e4 = new DefaultExchange(context); - e4.getIn().setBody("END"); - e4.getIn().setHeader("id", 123); + e2.getIn().setHeader("myTimeout", 2000); ap.process(e1); ap.process(e2); - ap.process(e3); - ap.process(e4); - - assertMockEndpointsSatisfied(); - - ap.stop(); - } - - public void testAggregateBadCorrelationKey() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("A+C+END"); - - Processor done = new SendProcessor(context.getEndpoint("mock:result")); - Expression corr = header("id"); - AggregationStrategy as = new BodyInAggregatingStrategy(); - Predicate complete = body().contains("END"); - - AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService); - ap.setCompletionPredicate(complete); - - ap.start(); - - Exchange e1 = new DefaultExchange(context); - e1.getIn().setBody("A"); - e1.getIn().setHeader("id", 123); - - Exchange e2 = new DefaultExchange(context); - e2.getIn().setBody("B"); - - Exchange e3 = new DefaultExchange(context); - e3.getIn().setBody("C"); - e3.getIn().setHeader("id", 123); - - - Exchange e4 = new DefaultExchange(context); - e4.getIn().setBody("END"); - e4.getIn().setHeader("id", 123); - - ap.process(e1); - - try { - ap.process(e2); - fail("Should have thrown an exception"); - } catch (CamelExchangeException e) { - assertEquals("Invalid correlation key. Exchange[Message: B]", e.getMessage()); - } - - ap.process(e3); - ap.process(e4); - - assertMockEndpointsSatisfied(); + // shutdown before the 2 sec timeout occurs + // however we use stop instead of shutdown as shutdown will clear the in memory aggregation repository, ap.stop(); - } - public void testAggregateCloseCorrelationKeyOnCompletion() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("A+B+END"); - - Processor done = new SendProcessor(context.getEndpoint("mock:result")); - Expression corr = header("id"); - AggregationStrategy as = new BodyInAggregatingStrategy(); - Predicate complete = body().contains("END"); - - AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService); - ap.setCompletionPredicate(complete); - ap.setCloseCorrelationKeyOnCompletion(1000); + // should be no completed + assertEquals(0, mock.getReceivedCounter()); + // start aggregator again ap.start(); - Exchange e1 = new DefaultExchange(context); - e1.getIn().setBody("A"); - e1.getIn().setHeader("id", 123); - - Exchange e2 = new DefaultExchange(context); - e2.getIn().setBody("B"); - e2.getIn().setHeader("id", 123); - - Exchange e3 = new DefaultExchange(context); - e3.getIn().setBody("END"); - e3.getIn().setHeader("id", 123); - - Exchange e4 = new DefaultExchange(context); - e4.getIn().setBody("C"); - e4.getIn().setHeader("id", 123); - - ap.process(e1); - ap.process(e2); - ap.process(e3); - - try { - ap.process(e4); - fail("Should have thrown an exception"); - } catch (CamelExchangeException e) { - assertEquals("The correlation key [123] has been closed. Exchange[Message: C]", e.getMessage()); - } - + // the aggregator should restore the timeout condition and trigger timeout assertMockEndpointsSatisfied(); + assertEquals(1, mock.getReceivedCounter()); - ap.stop(); + ap.shutdown(); } - public void testAggregateUseBatchSizeFromConsumer() throws Exception { + public void testAggregateProcessorTwoTimeoutExpressionRestart() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("A+B", "C+D+E"); - mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "consumer"); + mock.expectedBodiesReceived("C+D", "A+B"); + mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "timeout"); Processor done = new SendProcessor(context.getEndpoint("mock:result")); Expression corr = header("id"); AggregationStrategy as = new BodyInAggregatingStrategy(); AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService); - ap.setCompletionSize(100); - ap.setCompletionFromBatchConsumer(true); - + // start with a high timeout so no completes before we stop + ap.setCompletionTimeoutExpression(header("myTimeout")); ap.start(); Exchange e1 = new DefaultExchange(context); e1.getIn().setBody("A"); e1.getIn().setHeader("id", 123); - e1.setProperty(Exchange.BATCH_INDEX, 0); - e1.setProperty(Exchange.BATCH_SIZE, 2); - e1.setProperty(Exchange.BATCH_COMPLETE, false); + e1.getIn().setHeader("myTimeout", 3000); Exchange e2 = new DefaultExchange(context); e2.getIn().setBody("B"); e2.getIn().setHeader("id", 123); - e2.setProperty(Exchange.BATCH_INDEX, 1); - e2.setProperty(Exchange.BATCH_SIZE, 2); - e2.setProperty(Exchange.BATCH_COMPLETE, true); + e2.getIn().setHeader("myTimeout", 3000); Exchange e3 = new DefaultExchange(context); e3.getIn().setBody("C"); - e3.getIn().setHeader("id", 123); - e3.setProperty(Exchange.BATCH_INDEX, 0); - e3.setProperty(Exchange.BATCH_SIZE, 3); - e3.setProperty(Exchange.BATCH_COMPLETE, false); + e3.getIn().setHeader("id", 456); + e3.getIn().setHeader("myTimeout", 2000); Exchange e4 = new DefaultExchange(context); e4.getIn().setBody("D"); - e4.getIn().setHeader("id", 123); - e4.setProperty(Exchange.BATCH_INDEX, 1); - e4.setProperty(Exchange.BATCH_SIZE, 3); - e4.setProperty(Exchange.BATCH_COMPLETE, false); - - Exchange e5 = new DefaultExchange(context); - e5.getIn().setBody("E"); - e5.getIn().setHeader("id", 123); - e5.setProperty(Exchange.BATCH_INDEX, 2); - e5.setProperty(Exchange.BATCH_SIZE, 3); - e5.setProperty(Exchange.BATCH_COMPLETE, true); + e4.getIn().setHeader("id", 456); + e4.getIn().setHeader("myTimeout", 2000); ap.process(e1); ap.process(e2); ap.process(e3); ap.process(e4); - ap.process(e5); - - assertMockEndpointsSatisfied(); + // shutdown before the 2 sec timeout occurs + // however we use stop instead of shutdown as shutdown will clear the in memory aggregation repository, ap.stop(); - } - public void testAggregateLogFailedExchange() throws Exception { - doTestAggregateLogFailedExchange(null); - } - - public void testAggregateHandleFailedExchange() throws Exception { - final AtomicBoolean tested = new AtomicBoolean(); - - ExceptionHandler myHandler = new ExceptionHandler() { - public void handleException(Throwable exception) { - } - - public void handleException(String message, Throwable exception) { - } - - public void handleException(String message, Exchange exchange, Throwable exception) { - assertEquals("Error processing aggregated exchange", message); - assertEquals("B+Kaboom+END", exchange.getIn().getBody()); - assertEquals("Damn", exception.getMessage()); - tested.set(true); - } - }; - - doTestAggregateLogFailedExchange(myHandler); - assertEquals(true, tested.get()); - } - - private void doTestAggregateLogFailedExchange(ExceptionHandler handler) throws Exception { - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("A+END"); - - Processor done = new Processor() { - public void process(Exchange exchange) throws Exception { - if (exchange.getIn().getBody(String.class).contains("Kaboom")) { - throw new IllegalArgumentException("Damn"); - } - // else send it further along - SendProcessor send = new SendProcessor(context.getEndpoint("mock:result")); - send.start(); - send.process(exchange); - } - }; - - Expression corr = header("id"); - AggregationStrategy as = new BodyInAggregatingStrategy(); + // should be no completed + assertEquals(0, mock.getReceivedCounter()); - AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService); - ap.setEagerCheckCompletion(true); - ap.setCompletionPredicate(body().isEqualTo("END")); - if (handler != null) { - ap.setExceptionHandler(handler); - } + // start aggregator again ap.start(); - Exchange e1 = new DefaultExchange(context); - e1.getIn().setBody("A"); - e1.getIn().setHeader("id", 123); - - Exchange e2 = new DefaultExchange(context); - e2.getIn().setBody("B"); - e2.getIn().setHeader("id", 456); - - Exchange e3 = new DefaultExchange(context); - e3.getIn().setBody("Kaboom"); - e3.getIn().setHeader("id", 456); - - Exchange e4 = new DefaultExchange(context); - e4.getIn().setBody("END"); - e4.getIn().setHeader("id", 456); - - Exchange e5 = new DefaultExchange(context); - e5.getIn().setBody("END"); - e5.getIn().setHeader("id", 123); - - ap.process(e1); - ap.process(e2); - ap.process(e3); - ap.process(e4); - ap.process(e5); - + // the aggregator should restore the timeout condition and trigger timeout assertMockEndpointsSatisfied(); + assertEquals(2, mock.getReceivedCounter()); - ap.stop(); + ap.shutdown(); } - } Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java?rev=1130521&r1=1130520&r2=1130521&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java (original) +++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java Thu Jun 2 12:31:42 2011 @@ -17,7 +17,6 @@ package org.apache.camel.component.hawtdb; import java.io.IOException; -import java.io.Serializable; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; @@ -55,8 +54,9 @@ public final class HawtDBCamelCodec { DataByteArrayOutputStream baos = new DataByteArrayOutputStream(); // use DefaultExchangeHolder to marshal to a serialized object DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false); - // add the aggregated size property as the only property we want to retain + // add the aggregated size and timeout property as the only properties we want to retain DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_SIZE, exchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class)); + DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_TIMEOUT, exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, Long.class)); // add the aggregated completed by property to retain DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COMPLETED_BY, exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class)); // add the aggregated correlation key property to retain Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTimeoutCompletionRestartTest.java (from r1130434, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTimeoutCompletionRestartTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTimeoutCompletionRestartTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java&r1=1130434&r2=1130521&rev=1130521&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTimeoutCompletionRestartTest.java Thu Jun 2 12:31:42 2011 @@ -16,8 +16,6 @@ */ package org.apache.camel.component.hawtdb; -import java.util.concurrent.TimeUnit; - import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -25,7 +23,7 @@ import org.apache.camel.processor.aggreg import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; -public class HawtDBAggregateTest extends CamelTestSupport { +public class HawtDBAggregateTimeoutCompletionRestartTest extends CamelTestSupport { @Override public void setUp() throws Exception { @@ -34,27 +32,32 @@ public class HawtDBAggregateTest extends } @Test - public void testHawtDBAggregate() throws Exception { + public void testHawtDBAggregateTimeoutCompletionRestart() throws Exception { MockEndpoint mock = getMockEndpoint("mock:aggregated"); - mock.expectedBodiesReceived("ABCDE"); + mock.expectedMessageCount(0); template.sendBodyAndHeader("direct:start", "A", "id", 123); template.sendBodyAndHeader("direct:start", "B", "id", 123); template.sendBodyAndHeader("direct:start", "C", "id", 123); - template.sendBodyAndHeader("direct:start", "D", "id", 123); - template.sendBodyAndHeader("direct:start", "E", "id", 123); - assertMockEndpointsSatisfied(30, TimeUnit.SECONDS); + // stop Camel + context.stop(); + assertEquals(0, mock.getReceivedCounter()); + + // start Camel again, and the timeout should trigger a completion + context.start(); + + mock = getMockEndpoint("mock:aggregated"); + mock.expectedBodiesReceived("ABC"); - // from endpoint should be preserved - assertEquals("direct://start", mock.getReceivedExchanges().get(0).getFromEndpoint().getEndpointUri()); + assertMockEndpointsSatisfied(); + assertEquals(1, mock.getReceivedCounter()); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override - // START SNIPPET: e1 public void configure() throws Exception { // create the hawtdb repo HawtDBAggregationRepository repo = new HawtDBAggregationRepository("repo1", "target/data/hawtdb.dat"); @@ -63,10 +66,9 @@ public class HawtDBAggregateTest extends from("direct:start") .aggregate(header("id"), new MyAggregationStrategy()) // use our created hawtdb repo as aggregation repository - .completionSize(5).aggregationRepository(repo) + .completionTimeout(3000).aggregationRepository(repo) .to("mock:aggregated"); } - // END SNIPPET: e1 }; } Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java?rev=1130521&r1=1130520&r2=1130521&view=diff ============================================================================== --- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java (original) +++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java Thu Jun 2 12:31:42 2011 @@ -36,8 +36,9 @@ public final class JdbcCamelCodec { public byte[] marshallExchange(CamelContext camelContext, Exchange exchange) throws IOException { // use DefaultExchangeHolder to marshal to a serialized object DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false); - // add the aggregated size property as the only property we want to retain + // add the aggregated size and timeout property as the only properties we want to retain DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_SIZE, exchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class)); + DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_TIMEOUT, exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, Long.class)); // add the aggregated completed by property to retain DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COMPLETED_BY, exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class)); // add the aggregated correlation key property to retain Copied: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTimeoutCompletionRestartTest.java (from r1130501, camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTimeoutCompletionRestartTest.java?p2=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTimeoutCompletionRestartTest.java&p1=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTest.java&r1=1130501&r2=1130521&rev=1130521&view=diff ============================================================================== --- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTest.java (original) +++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateTimeoutCompletionRestartTest.java Thu Jun 2 12:31:42 2011 @@ -16,29 +16,33 @@ */ package org.apache.camel.processor.aggregate.jdbc; -import java.util.concurrent.TimeUnit; - import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.junit.Test; -public class JdbcAggregateTest extends AbstractJdbcAggregationTestSupport { +public class JdbcAggregateTimeoutCompletionRestartTest extends AbstractJdbcAggregationTestSupport { @Test - public void testJdbcAggregate() throws Exception { + public void testJdbcAggregateTimeoutCompletionRestart() throws Exception { MockEndpoint mock = getMockEndpoint("mock:aggregated"); - mock.expectedBodiesReceived("ABCDE"); + mock.expectedMessageCount(0); template.sendBodyAndHeader("direct:start", "A", "id", 123); template.sendBodyAndHeader("direct:start", "B", "id", 123); template.sendBodyAndHeader("direct:start", "C", "id", 123); - template.sendBodyAndHeader("direct:start", "D", "id", 123); - template.sendBodyAndHeader("direct:start", "E", "id", 123); - assertMockEndpointsSatisfied(30, TimeUnit.SECONDS); + // stop Camel + context.stop(); + assertEquals(0, mock.getReceivedCounter()); + + // start Camel again, and the timeout should trigger a completion + context.start(); + + mock = getMockEndpoint("mock:aggregated"); + mock.expectedBodiesReceived("ABC"); - // from endpoint should be preserved - assertEquals("direct://start", mock.getReceivedExchanges().get(0).getFromEndpoint().getEndpointUri()); + assertMockEndpointsSatisfied(); + assertEquals(1, mock.getReceivedCounter()); } @Override @@ -50,7 +54,7 @@ public class JdbcAggregateTest extends A from("direct:start") .aggregate(header("id"), new MyAggregationStrategy()) // use our created jdbc repo as aggregation repository - .completionSize(5).aggregationRepository(repo) + .completionTimeout(3000).aggregationRepository(repo) .to("mock:aggregated"); } };