Author: davsclaus Date: Fri Apr 2 16:28:21 2010 New Revision: 930316 URL: http://svn.apache.org/viewvc?rev=930316&view=rev Log: CAMEL-2568: Introducing RecoverableAggregationRepository to support redelivery of failed aggregated exchanges to support not loosing messages out of the box in Camel. Work in progress.
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java (with props) camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java - copied, changed from r930237, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java - copied, changed from r930237, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=930316&r1=930315&r2=930316&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Apr 2 16:28:21 2010 @@ -18,10 +18,13 @@ package org.apache.camel.processor.aggre import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.CamelContext; @@ -36,6 +39,7 @@ import org.apache.camel.impl.ServiceSupp import org.apache.camel.processor.Traceable; import org.apache.camel.spi.AggregationRepository; import org.apache.camel.spi.ExceptionHandler; +import org.apache.camel.spi.RecoverableAggregationRepository; import org.apache.camel.util.DefaultTimeoutMap; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.LRUCache; @@ -71,10 +75,12 @@ public class AggregateProcessor extends private final AggregationStrategy aggregationStrategy; private final Expression correlationExpression; private final ExecutorService executorService; + private ScheduledExecutorService recoverService; private TimeoutMap<Object, Exchange> timeoutMap; private ExceptionHandler exceptionHandler; private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository(); private Map<Object, Object> closedCorrelationKeys; + private final Set<String> inProgressCompleteExchanges = new HashSet<String>(); // options private boolean ignoreBadCorrelationKeys; @@ -157,7 +163,7 @@ public class AggregateProcessor extends * This method <b>must</b> be run synchronized as we cannot aggregate the same correlation key * in parallel. * - * @param key the correlation key + * @param key the correlation key * @param exchange the exchange * @return the aggregated exchange */ @@ -312,6 +318,9 @@ public class AggregateProcessor extends LOG.debug("Aggregation complete for correlation key " + key + " sending aggregated exchange: " + exchange); } + // add this as in progress + inProgressCompleteExchanges.add(exchange.getExchangeId()); + // send this exchange executorService.submit(new Runnable() { public void run() { @@ -322,13 +331,20 @@ public class AggregateProcessor extends } catch (Throwable t) { // must catch throwable so we will handle all exceptions as the executor service will by default ignore them exchange.setException(new CamelExchangeException("Error processing aggregated exchange", exchange, t)); - } finally { - aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId()); } - // if there was an exception then let the exception handler handle it - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException()); + try { + // was it good or bad? + if (exchange.getException() == null) { + // only confirm if we processed without a problem + aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId()); + } else { + // if there was an exception then let the exception handler handle it + getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException()); + } + } finally { + // must remember to remove when we are done + inProgressCompleteExchanges.remove(exchange.getExchangeId()); } } }); @@ -434,7 +450,7 @@ public class AggregateProcessor extends } /** - * Background tasks that looks for aggregated exchanges which is triggered by completion timeouts. + * Background task that looks for aggregated exchanges which is triggered by completion timeouts. */ private final class AggregationTimeoutMap extends DefaultTimeoutMap<Object, Exchange> { @@ -453,6 +469,54 @@ public class AggregateProcessor extends } } + /** + * Background task that looks for aggregated exchanges to recover. + */ + private final class RecoverTask implements Runnable { + private final RecoverableAggregationRepository<Object> recoverable; + + private RecoverTask(RecoverableAggregationRepository<Object> recoverable) { + this.recoverable = recoverable; + } + + public void run() { + AggregateProcessor.this.doRecover(recoverable); + } + + } + + private void doRecover(RecoverableAggregationRepository<Object> recoverable) { + LOG.trace("Starting recover check"); + + Set<String> exchangeIds = recoverable.scan(camelContext); + for (String exchangeId : exchangeIds) { + + // we may shutdown while doing recovery + if (!isRunAllowed()) { + LOG.info("We are shutting down so stop recovering"); + return; + } + + boolean inProgress = inProgressCompleteExchanges.contains(exchangeId); + if (inProgress) { + LOG.debug("Aggregated exchange with id " + exchangeId + " is already in progress"); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering aggregated exchange with id " + exchangeId); + } + Exchange exchange = recoverable.recover(camelContext, exchangeId); + if (exchange != null) { + // get the correlation key + String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class); + // resubmit the recovered exchange + onSubmitCompletion(key, exchange); + } + } + } + + LOG.trace("Recover check complete"); + } + @Override protected void doStart() throws Exception { if (getCompletionTimeout() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null @@ -475,6 +539,25 @@ public class AggregateProcessor extends ServiceHelper.startService(aggregationRepository); + // should we use recover checker + if (aggregationRepository instanceof RecoverableAggregationRepository) { + RecoverableAggregationRepository<Object> recoverable = (RecoverableAggregationRepository<Object>) aggregationRepository; + if (recoverable.isUseRecovery()) { + long interval = recoverable.getCheckIntervalInMillis(); + if (interval > 0) { + // create a background recover thread to check once ev + recoverService = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateRecoverChecker", 1); + Runnable recoverTask = new RecoverTask(recoverable); + LOG.info("Scheduling recover checker to run every " + interval + " millis."); + recoverService.scheduleAtFixedRate(recoverTask, 1000L, interval, TimeUnit.MILLISECONDS); + } else { + // its a one shot recover during startup + LOG.info("Running recover checker once at startup to recover existing aggregated exchanges"); + doRecover(recoverable); + } + } + } + // start timeout service if its in use if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) { ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1); @@ -487,6 +570,7 @@ public class AggregateProcessor extends @Override protected void doStop() throws Exception { ServiceHelper.stopService(timeoutMap); + ServiceHelper.stopService(recoverService); ServiceHelper.stopService(aggregationRepository); Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java?rev=930316&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java Fri Apr 2 16:28:21 2010 @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; + +/** + * A specialized {...@link org.apache.camel.spi.AggregationRepository} which also supports + * recovery. This usually requires a repository which is persisted. + * + * @version $Revision$ + */ +public interface RecoverableAggregationRepository<K> extends AggregationRepository<K> { + + /** + * Scans the repository for exchanges to be recovered + * + * @param camelContext the current CamelContext + * @return the exchange ids for to be recovered + */ + Set<String> scan(CamelContext camelContext); + + /** + * Recovers the exchange with the given exchange id + * + * @param camelContext the current CamelContext + * @param exchangeId exchange id + * @return the recovered exchange or <tt>null</tt> if not found + */ + Exchange recover(CamelContext camelContext, String exchangeId); + + /** + * Sets the interval between scans + * + * @param interval the interval + * @param timeUnit the time unit + */ + void setCheckInterval(long interval, TimeUnit timeUnit); + + /** + * Gets the interval between scans in millis. + * + * @return the interval in millis + */ + long getCheckIntervalInMillis(); + + /** + * Whether or not recovery is enabled or not + * + * @return <tt>true</tt> to use recovery, <tt>false</tt> otherwise. + */ + boolean isUseRecovery(); +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=930316&r1=930315&r2=930316&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java (original) +++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java Fri Apr 2 16:28:21 2010 @@ -18,11 +18,17 @@ package org.apache.camel.component.hawtd import java.io.File; import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.impl.ServiceSupport; import org.apache.camel.spi.AggregationRepository; +import org.apache.camel.spi.RecoverableAggregationRepository; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; import org.apache.commons.logging.Log; @@ -34,7 +40,7 @@ import org.fusesource.hawtdb.util.buffer /** * An instance of AggregationRepository which is backed by a HawtDB. */ -public class HawtDBAggregationRepository<K> extends ServiceSupport implements AggregationRepository<K> { +public class HawtDBAggregationRepository<K> extends ServiceSupport implements AggregationRepository<K>, RecoverableAggregationRepository<K> { private static final transient Log LOG = LogFactory.getLog(HawtDBAggregationRepository.class); private HawtDBFile hawtDBFile; @@ -44,6 +50,8 @@ public class HawtDBAggregationRepository private boolean sync; private boolean returnOldExchange; private HawtDBCamelMarshaller<K> marshaller = new HawtDBCamelMarshaller<K>(); + private long interval = 5000; + private boolean useRecovery = false; /** * Creates an aggregation repository @@ -208,6 +216,78 @@ public class HawtDBAggregationRepository } } + public Set<String> scan(CamelContext camelContext) { + final Set<String> answer = new LinkedHashSet<String>(); + hawtDBFile.execute(new Work<Buffer>() { + public Buffer execute(Transaction tx) { + // scan could potentially be running while we are shutting down so check for that + if (!isRunAllowed()) { + return null; + } + + Index<Buffer, Buffer> indexCompleted = hawtDBFile.getRepositoryIndex(tx, getRepositoryNameCompleted()); + + Iterator<Map.Entry<Buffer, Buffer>> it = indexCompleted.iterator(); + // scan could potentially be running while we are shutting down so check for that + while (it.hasNext() && isRunAllowed()) { + Map.Entry<Buffer, Buffer> entry = it.next(); + Buffer keyBuffer = entry.getKey(); + + String exchangeId; + try { + exchangeId = marshaller.unmarshallConfirmKey(keyBuffer); + } catch (IOException e) { + throw new RuntimeException("Error unmarshalling confirm key: " + keyBuffer, e); + } + if (exchangeId != null) { + answer.add(exchangeId); + } + } + return null; + } + + @Override + public String toString() { + return "Scan"; + } + }); + + if (LOG.isDebugEnabled()) { + LOG.debug("Scanned and found " + answer.size() + " exchanges to recover."); + } + return answer; + + } + + public Exchange recover(CamelContext camelContext, final String exchangeId) { + Exchange answer = null; + try { + final Buffer confirmKeyBuffer = marshaller.marshallConfirmKey(exchangeId); + Buffer rc = hawtDBFile.execute(new Work<Buffer>() { + public Buffer execute(Transaction tx) { + Index<Buffer, Buffer> indexCompleted = hawtDBFile.getRepositoryIndex(tx, getRepositoryNameCompleted()); + return indexCompleted.get(confirmKeyBuffer); + } + + @Override + public String toString() { + return "Recovering exchangeId [" + exchangeId + "]"; + } + }); + if (rc != null) { + answer = marshaller.unmarshallExchange(camelContext, rc); + } + } catch (IOException e) { + throw new RuntimeException("Error recovering exchangeId " + exchangeId + " from repository " + repositoryName, e); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering exchangeId [" + exchangeId + "] -> " + answer); + } + return answer; + } + + public HawtDBFile getHawtDBFile() { return hawtDBFile; } @@ -260,6 +340,22 @@ public class HawtDBAggregationRepository this.returnOldExchange = returnOldExchange; } + public void setCheckInterval(long interval, TimeUnit timeUnit) { + this.interval = timeUnit.toMillis(interval); + } + + public long getCheckIntervalInMillis() { + return interval; + } + + public boolean isUseRecovery() { + return useRecovery; + } + + public void setUseRecovery(boolean useRecovery) { + this.useRecovery = useRecovery; + } + @Override protected void doStart() throws Exception { // either we have a HawtDB configured or we use a provided fileName Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java?rev=930316&r1=930315&r2=930316&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java (original) +++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java Fri Apr 2 16:28:21 2010 @@ -52,6 +52,12 @@ public final class HawtDBCamelMarshaller return baos.toBuffer(); } + public String unmarshallConfirmKey(Buffer buffer) throws IOException { + DataByteArrayInputStream bais = new DataByteArrayInputStream(buffer); + String key = confirmKeyMarshaller.readPayload(bais); + return key; + } + public Buffer marshallExchange(CamelContext camelContext, Exchange exchange) throws IOException { DataByteArrayOutputStream baos = new DataByteArrayOutputStream(); // use DefaultExchangeHolder to marshal to a serialized object Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java?rev=930316&r1=930315&r2=930316&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java (original) +++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java Fri Apr 2 16:28:21 2010 @@ -25,7 +25,7 @@ import org.fusesource.hawtdb.api.Transac interface Work<T> { /** - * Executs the work within the bounds of the given transaction + * Executes the work within the bounds of the given transaction * * @param transaction the transaction * @return result of the work, can be <tt>null</tt> if no result to return. Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java?rev=930316&r1=930315&r2=930316&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java Fri Apr 2 16:28:21 2010 @@ -70,6 +70,7 @@ public class HawtDBAggregateConcurrentDi private void doSendMessages(int files, int poolSize) throws Exception { MockEndpoint mock = getMockEndpoint("mock:aggregated"); mock.expectedMessageCount(2); + mock.setResultWaitTime(20 * 1000L); ExecutorService executor = Executors.newFixedThreadPool(poolSize); for (int i = 0; i < files; i++) { Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java?rev=930316&r1=930315&r2=930316&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java Fri Apr 2 16:28:21 2010 @@ -69,6 +69,7 @@ public class HawtDBAggregateConcurrentSa private void doSendMessages(int files, int poolSize) throws Exception { MockEndpoint mock = getMockEndpoint("mock:aggregated"); + mock.setResultWaitTime(20 * 1000L); mock.expectedMessageCount(1); // match number of expected numbers mock.message(0).body(String.class).regex("[0-9]{" + files + "}"); Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java?rev=930316&r1=930315&r2=930316&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java Fri Apr 2 16:28:21 2010 @@ -44,7 +44,7 @@ public class HawtDBAggregateLoadConcurre public void testLoadTestHawtDBAggregate() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMinimumMessageCount(10); - mock.setResultWaitTime(20 * 1000); + mock.setResultWaitTime(30 * 1000); ExecutorService executor = Executors.newFixedThreadPool(10); Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java?rev=930316&r1=930315&r2=930316&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java Fri Apr 2 16:28:21 2010 @@ -39,7 +39,7 @@ public class HawtDBAggregateLoadTest ext public void testLoadTestHawtDBAggregate() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMinimumMessageCount(1); - mock.setResultWaitTime(20 * 1000); + mock.setResultWaitTime(30 * 1000); System.out.println("Staring to send " + SIZE + " messages."); Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java (from r930237, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java&r1=930237&r2=930316&rev=930316&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java Fri Apr 2 16:28:21 2010 @@ -25,7 +25,7 @@ import org.fusesource.hawtdb.api.Transac import org.fusesource.hawtdb.util.buffer.Buffer; import org.junit.Test; -public class HawtDBAggregateNotLostTest extends CamelTestSupport { +public class HawtDBAggregateNotLostRemovedWhenConfirmedTest extends CamelTestSupport { private HawtDBAggregationRepository<String> repo; @@ -37,9 +37,8 @@ public class HawtDBAggregateNotLostTest } @Test - public void testHawtDBAggregateNotLost() throws Exception { - getMockEndpoint("mock:aggregated").expectedBodiesReceived("ABCDE"); - getMockEndpoint("mock:result").expectedMessageCount(0); + public void testHawtDBAggregateNotLostRemovedWhenConfirmed() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE"); template.sendBodyAndHeader("direct:start", "A", "id", 123); template.sendBodyAndHeader("direct:start", "B", "id", 123); @@ -49,9 +48,9 @@ public class HawtDBAggregateNotLostTest assertMockEndpointsSatisfied(); - String exchangeId = getMockEndpoint("mock:aggregated").getReceivedExchanges().get(0).getExchangeId(); + String exchangeId = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getExchangeId(); - // the exchange should be in the completed repo where we should be able to find it + // the exchange should NOT be in the completed repo as it was confirmed final HawtDBFile hawtDBFile = repo.getHawtDBFile(); final HawtDBCamelMarshaller<Object> marshaller = new HawtDBCamelMarshaller<Object>(); final Buffer confirmKeyBuffer = marshaller.marshallConfirmKey(exchangeId); @@ -62,17 +61,8 @@ public class HawtDBAggregateNotLostTest } }); - // assert the exchange was not lost and we got all the information still - assertNotNull(bf); - Exchange completed = marshaller.unmarshallExchange(context, bf); - assertNotNull(completed); - // should retain the exchange id - assertEquals(exchangeId, completed.getExchangeId()); - assertEquals("ABCDE", completed.getIn().getBody()); - assertEquals(123, completed.getIn().getHeader("id")); - assertEquals("size", completed.getProperty(Exchange.AGGREGATED_COMPLETED_BY)); - assertEquals(5, completed.getProperty(Exchange.AGGREGATED_SIZE)); - assertEquals(123, completed.getProperty(Exchange.AGGREGATED_CORRELATION_KEY)); + // assert the exchange was deleted + assertNull(bf); } @Override @@ -84,9 +74,6 @@ public class HawtDBAggregateNotLostTest .aggregate(header("id"), new MyAggregationStrategy()) .completionSize(5).aggregationRepository(repo) .log("aggregated exchange id ${exchangeId} with ${body}") - .to("mock:aggregated") - // throw an exception to fail, which we then will loose this message - .throwException(new IllegalArgumentException("Damn")) .to("mock:result") .end(); } Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java (from r930237, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java&r1=930237&r2=930316&rev=930316&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java Fri Apr 2 16:28:21 2010 @@ -16,30 +16,37 @@ */ package org.apache.camel.component.hawtdb; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.camel.Exchange; +import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.test.junit4.CamelTestSupport; -import org.fusesource.hawtdb.api.Index; -import org.fusesource.hawtdb.api.Transaction; -import org.fusesource.hawtdb.util.buffer.Buffer; import org.junit.Test; -public class HawtDBAggregateNotLostTest extends CamelTestSupport { +public class HawtDBAggregateRecoverTest extends CamelTestSupport { private HawtDBAggregationRepository<String> repo; + private static AtomicInteger counter = new AtomicInteger(0); @Override public void setUp() throws Exception { deleteDirectory("target/data"); repo = new HawtDBAggregationRepository<String>("repo1", "target/data/hawtdb.dat"); + // enable recovery + repo.setUseRecovery(true); + // check faster + repo.setCheckInterval(1, TimeUnit.SECONDS); super.setUp(); } @Test - public void testHawtDBAggregateNotLost() throws Exception { - getMockEndpoint("mock:aggregated").expectedBodiesReceived("ABCDE"); - getMockEndpoint("mock:result").expectedMessageCount(0); + public void testHawtDBAggregateRecover() throws Exception { + // should fail the first 2 times and then recover + getMockEndpoint("mock:aggregated").expectedMessageCount(3); + getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE"); template.sendBodyAndHeader("direct:start", "A", "id", 123); template.sendBodyAndHeader("direct:start", "B", "id", 123); @@ -48,31 +55,6 @@ public class HawtDBAggregateNotLostTest template.sendBodyAndHeader("direct:start", "E", "id", 123); assertMockEndpointsSatisfied(); - - String exchangeId = getMockEndpoint("mock:aggregated").getReceivedExchanges().get(0).getExchangeId(); - - // the exchange should be in the completed repo where we should be able to find it - final HawtDBFile hawtDBFile = repo.getHawtDBFile(); - final HawtDBCamelMarshaller<Object> marshaller = new HawtDBCamelMarshaller<Object>(); - final Buffer confirmKeyBuffer = marshaller.marshallConfirmKey(exchangeId); - Buffer bf = hawtDBFile.execute(new Work<Buffer>() { - public Buffer execute(Transaction tx) { - Index<Buffer, Buffer> index = hawtDBFile.getRepositoryIndex(tx, "repo1-completed"); - return index.get(confirmKeyBuffer); - } - }); - - // assert the exchange was not lost and we got all the information still - assertNotNull(bf); - Exchange completed = marshaller.unmarshallExchange(context, bf); - assertNotNull(completed); - // should retain the exchange id - assertEquals(exchangeId, completed.getExchangeId()); - assertEquals("ABCDE", completed.getIn().getBody()); - assertEquals(123, completed.getIn().getHeader("id")); - assertEquals("size", completed.getProperty(Exchange.AGGREGATED_COMPLETED_BY)); - assertEquals(5, completed.getProperty(Exchange.AGGREGATED_SIZE)); - assertEquals(123, completed.getProperty(Exchange.AGGREGATED_CORRELATION_KEY)); } @Override @@ -85,8 +67,16 @@ public class HawtDBAggregateNotLostTest .completionSize(5).aggregationRepository(repo) .log("aggregated exchange id ${exchangeId} with ${body}") .to("mock:aggregated") - // throw an exception to fail, which we then will loose this message - .throwException(new IllegalArgumentException("Damn")) + .delay(2000) + // simulate errors the first two times + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + int count = counter.incrementAndGet(); + if (count <= 2) { + throw new IllegalArgumentException("Damn"); + } + } + }) .to("mock:result") .end(); } Modified: camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties?rev=930316&r1=930315&r2=930316&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties (original) +++ camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties Fri Apr 2 16:28:21 2010 @@ -23,6 +23,9 @@ log4j.rootLogger=INFO, file # uncomment the following to enable camel debugging #log4j.logger.org.apache.camel=DEBUG #log4j.logger.org.apache.camel.component.hawtdb=DEBUG +log4j.logger.org.apache.camel.impl.converter=WARN +log4j.logger.org.apache.camel.management=WARN +log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN # CONSOLE appender not used by default log4j.appender.out=org.apache.log4j.ConsoleAppender