Author: davsclaus
Date: Fri Apr  2 16:28:21 2010
New Revision: 930316

URL: http://svn.apache.org/viewvc?rev=930316&view=rev
Log:
CAMEL-2568: Introducing RecoverableAggregationRepository to support redelivery 
of failed aggregated exchanges to support not loosing messages out of the box 
in Camel. Work in progress.

Added:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
   (with props)
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java
      - copied, changed from r930237, 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java
      - copied, changed from r930237, 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
    
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java
    
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java
    camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=930316&r1=930315&r2=930316&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 Fri Apr  2 16:28:21 2010
@@ -18,10 +18,13 @@ package org.apache.camel.processor.aggre
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.CamelContext;
@@ -36,6 +39,7 @@ import org.apache.camel.impl.ServiceSupp
 import org.apache.camel.processor.Traceable;
 import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.RecoverableAggregationRepository;
 import org.apache.camel.util.DefaultTimeoutMap;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.LRUCache;
@@ -71,10 +75,12 @@ public class AggregateProcessor extends 
     private final AggregationStrategy aggregationStrategy;
     private final Expression correlationExpression;
     private final ExecutorService executorService;
+    private ScheduledExecutorService recoverService;
     private TimeoutMap<Object, Exchange> timeoutMap;
     private ExceptionHandler exceptionHandler;
     private AggregationRepository<Object> aggregationRepository = new 
MemoryAggregationRepository();
     private Map<Object, Object> closedCorrelationKeys;
+    private final Set<String> inProgressCompleteExchanges = new 
HashSet<String>();
 
     // options
     private boolean ignoreBadCorrelationKeys;
@@ -157,7 +163,7 @@ public class AggregateProcessor extends 
      * This method <b>must</b> be run synchronized as we cannot aggregate the 
same correlation key
      * in parallel.
      *
-     * @param key the correlation key
+     * @param key      the correlation key
      * @param exchange the exchange
      * @return the aggregated exchange
      */
@@ -312,6 +318,9 @@ public class AggregateProcessor extends 
             LOG.debug("Aggregation complete for correlation key " + key + " 
sending aggregated exchange: " + exchange);
         }
 
+        // add this as in progress
+        inProgressCompleteExchanges.add(exchange.getExchangeId());
+
         // send this exchange
         executorService.submit(new Runnable() {
             public void run() {
@@ -322,13 +331,20 @@ public class AggregateProcessor extends 
                 } catch (Throwable t) {
                     // must catch throwable so we will handle all exceptions 
as the executor service will by default ignore them
                     exchange.setException(new CamelExchangeException("Error 
processing aggregated exchange", exchange, t));
-                } finally {
-                    aggregationRepository.confirm(exchange.getContext(), 
exchange.getExchangeId());
                 }
 
-                // if there was an exception then let the exception handler 
handle it
-                if (exchange.getException() != null) {
-                    getExceptionHandler().handleException("Error processing 
aggregated exchange", exchange, exchange.getException());
+                try {
+                    // was it good or bad?
+                    if (exchange.getException() == null) {
+                        // only confirm if we processed without a problem
+                        aggregationRepository.confirm(exchange.getContext(), 
exchange.getExchangeId());
+                    } else {
+                        // if there was an exception then let the exception 
handler handle it
+                        getExceptionHandler().handleException("Error 
processing aggregated exchange", exchange, exchange.getException());
+                    }
+                } finally {
+                    // must remember to remove when we are done
+                    
inProgressCompleteExchanges.remove(exchange.getExchangeId());
                 }
             }
         });
@@ -434,7 +450,7 @@ public class AggregateProcessor extends 
     }
 
     /**
-     * Background tasks that looks for aggregated exchanges which is triggered 
by completion timeouts.
+     * Background task that looks for aggregated exchanges which is triggered 
by completion timeouts.
      */
     private final class AggregationTimeoutMap extends 
DefaultTimeoutMap<Object, Exchange> {
 
@@ -453,6 +469,54 @@ public class AggregateProcessor extends 
         }
     }
 
+    /**
+     * Background task that looks for aggregated exchanges to recover.
+     */
+    private final class RecoverTask implements Runnable {
+        private final RecoverableAggregationRepository<Object> recoverable;
+
+        private RecoverTask(RecoverableAggregationRepository<Object> 
recoverable) {
+            this.recoverable = recoverable;
+        }
+
+        public void run() {
+            AggregateProcessor.this.doRecover(recoverable);
+        }
+
+    }
+
+    private void doRecover(RecoverableAggregationRepository<Object> 
recoverable) {
+        LOG.trace("Starting recover check");
+
+        Set<String> exchangeIds = recoverable.scan(camelContext);
+        for (String exchangeId : exchangeIds) {
+
+            // we may shutdown while doing recovery
+            if (!isRunAllowed()) {
+                LOG.info("We are shutting down so stop recovering");
+                return;
+            }
+
+            boolean inProgress = 
inProgressCompleteExchanges.contains(exchangeId);
+            if (inProgress) {
+                LOG.debug("Aggregated exchange with id " + exchangeId + " is 
already in progress");
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Recovering aggregated exchange with id " + 
exchangeId);
+                }
+                Exchange exchange = recoverable.recover(camelContext, 
exchangeId);
+                if (exchange != null) {
+                    // get the correlation key
+                    String key = 
exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class);
+                    // resubmit the recovered exchange
+                    onSubmitCompletion(key, exchange);
+                }
+            }
+        }
+
+        LOG.trace("Recover check complete");
+    }
+
     @Override
     protected void doStart() throws Exception {
         if (getCompletionTimeout() <= 0 && getCompletionSize() <= 0 && 
getCompletionPredicate() == null
@@ -475,6 +539,25 @@ public class AggregateProcessor extends 
 
         ServiceHelper.startService(aggregationRepository);
 
+        // should we use recover checker
+        if (aggregationRepository instanceof RecoverableAggregationRepository) 
{
+            RecoverableAggregationRepository<Object> recoverable = 
(RecoverableAggregationRepository<Object>) aggregationRepository;
+            if (recoverable.isUseRecovery()) {
+                long interval = recoverable.getCheckIntervalInMillis();
+                if (interval > 0) {
+                    // create a background recover thread to check once ev
+                    recoverService = 
camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, 
"AggregateRecoverChecker", 1);
+                    Runnable recoverTask = new RecoverTask(recoverable);
+                    LOG.info("Scheduling recover checker to run every " + 
interval + " millis.");
+                    recoverService.scheduleAtFixedRate(recoverTask, 1000L, 
interval, TimeUnit.MILLISECONDS);
+                } else {
+                    // its a one shot recover during startup
+                    LOG.info("Running recover checker once at startup to 
recover existing aggregated exchanges");
+                    doRecover(recoverable);
+                }
+            }
+        }
+
         // start timeout service if its in use
         if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != 
null) {
             ScheduledExecutorService scheduler = 
camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, 
"AggregateTimeoutChecker", 1);
@@ -487,6 +570,7 @@ public class AggregateProcessor extends 
     @Override
     protected void doStop() throws Exception {
         ServiceHelper.stopService(timeoutMap);
+        ServiceHelper.stopService(recoverService);
 
         ServiceHelper.stopService(aggregationRepository);
 

Added: 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java?rev=930316&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
 (added)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
 Fri Apr  2 16:28:21 2010
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+
+/**
+ * A specialized {...@link org.apache.camel.spi.AggregationRepository} which 
also supports
+ * recovery. This usually requires a repository which is persisted.
+ *
+ * @version $Revision$
+ */
+public interface RecoverableAggregationRepository<K> extends 
AggregationRepository<K> {
+
+    /**
+     * Scans the repository for exchanges to be recovered
+     * 
+     * @param camelContext   the current CamelContext
+     * @return the exchange ids for to be recovered
+     */
+    Set<String> scan(CamelContext camelContext);
+
+    /**
+     * Recovers the exchange with the given exchange id
+     *
+     * @param camelContext   the current CamelContext
+     * @param exchangeId     exchange id
+     * @return the recovered exchange or <tt>null</tt> if not found
+     */
+    Exchange recover(CamelContext camelContext, String exchangeId);
+
+    /**
+     * Sets the interval between scans
+     *
+     * @param interval  the interval
+     * @param timeUnit  the time unit
+     */
+    void setCheckInterval(long interval, TimeUnit timeUnit);
+
+    /**
+     * Gets the interval between scans in millis.
+     *
+     * @return the interval in millis
+     */
+    long getCheckIntervalInMillis();
+
+    /**
+     * Whether or not recovery is enabled or not
+     *
+     * @return <tt>true</tt> to use recovery, <tt>false</tt> otherwise.
+     */
+    boolean isUseRecovery();
+}

Propchange: 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=930316&r1=930315&r2=930316&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
 Fri Apr  2 16:28:21 2010
@@ -18,11 +18,17 @@ package org.apache.camel.component.hawtd
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.spi.AggregationRepository;
+import org.apache.camel.spi.RecoverableAggregationRepository;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
@@ -34,7 +40,7 @@ import org.fusesource.hawtdb.util.buffer
 /**
  * An instance of AggregationRepository which is backed by a HawtDB.
  */
-public class HawtDBAggregationRepository<K> extends ServiceSupport implements 
AggregationRepository<K> {
+public class HawtDBAggregationRepository<K> extends ServiceSupport implements 
AggregationRepository<K>, RecoverableAggregationRepository<K> {
 
     private static final transient Log LOG = 
LogFactory.getLog(HawtDBAggregationRepository.class);
     private HawtDBFile hawtDBFile;
@@ -44,6 +50,8 @@ public class HawtDBAggregationRepository
     private boolean sync;
     private boolean returnOldExchange;
     private HawtDBCamelMarshaller<K> marshaller = new 
HawtDBCamelMarshaller<K>();
+    private long interval = 5000;
+    private boolean useRecovery = false;
 
     /**
      * Creates an aggregation repository
@@ -208,6 +216,78 @@ public class HawtDBAggregationRepository
         }
     }
 
+    public Set<String> scan(CamelContext camelContext) {
+        final Set<String> answer = new LinkedHashSet<String>();
+        hawtDBFile.execute(new Work<Buffer>() {
+            public Buffer execute(Transaction tx) {
+                // scan could potentially be running while we are shutting 
down so check for that
+                if (!isRunAllowed()) {
+                    return null;
+                }
+
+                Index<Buffer, Buffer> indexCompleted = 
hawtDBFile.getRepositoryIndex(tx, getRepositoryNameCompleted());
+
+                Iterator<Map.Entry<Buffer, Buffer>> it = 
indexCompleted.iterator();
+                // scan could potentially be running while we are shutting 
down so check for that
+                while (it.hasNext() && isRunAllowed()) {
+                    Map.Entry<Buffer, Buffer> entry = it.next();
+                    Buffer keyBuffer = entry.getKey();
+
+                    String exchangeId;
+                    try {
+                        exchangeId = 
marshaller.unmarshallConfirmKey(keyBuffer);
+                    } catch (IOException e) {
+                        throw new RuntimeException("Error unmarshalling 
confirm key: " + keyBuffer, e);
+                    }
+                    if (exchangeId != null) {
+                        answer.add(exchangeId);
+                    }
+                }
+                return null;
+            }
+
+            @Override
+            public String toString() {
+                return "Scan";
+            }
+        });
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Scanned and found " + answer.size() + " exchanges to 
recover.");
+        }
+        return answer;
+
+    }
+
+    public Exchange recover(CamelContext camelContext, final String 
exchangeId) {
+        Exchange answer = null;
+        try {
+            final Buffer confirmKeyBuffer = 
marshaller.marshallConfirmKey(exchangeId);
+            Buffer rc = hawtDBFile.execute(new Work<Buffer>() {
+                public Buffer execute(Transaction tx) {
+                    Index<Buffer, Buffer> indexCompleted = 
hawtDBFile.getRepositoryIndex(tx, getRepositoryNameCompleted());
+                    return indexCompleted.get(confirmKeyBuffer);
+                }
+
+                @Override
+                public String toString() {
+                    return "Recovering exchangeId [" + exchangeId + "]";
+                }
+            });
+            if (rc != null) {
+                answer = marshaller.unmarshallExchange(camelContext, rc);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Error recovering exchangeId " + 
exchangeId + " from repository " + repositoryName, e);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Recovering exchangeId  [" + exchangeId + "] -> " + 
answer);
+        }
+        return answer;
+    }
+
+
     public HawtDBFile getHawtDBFile() {
         return hawtDBFile;
     }
@@ -260,6 +340,22 @@ public class HawtDBAggregationRepository
         this.returnOldExchange = returnOldExchange;
     }
 
+    public void setCheckInterval(long interval, TimeUnit timeUnit) {
+        this.interval = timeUnit.toMillis(interval);
+    }
+
+    public long getCheckIntervalInMillis() {
+        return interval;
+    }
+
+    public boolean isUseRecovery() {
+        return useRecovery;
+    }
+
+    public void setUseRecovery(boolean useRecovery) {
+        this.useRecovery = useRecovery;
+    }
+
     @Override
     protected void doStart() throws Exception {
         // either we have a HawtDB configured or we use a provided fileName

Modified: 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java?rev=930316&r1=930315&r2=930316&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java
 Fri Apr  2 16:28:21 2010
@@ -52,6 +52,12 @@ public final class HawtDBCamelMarshaller
         return baos.toBuffer();
     }
 
+    public String unmarshallConfirmKey(Buffer buffer) throws IOException {
+        DataByteArrayInputStream bais = new DataByteArrayInputStream(buffer);
+        String key = confirmKeyMarshaller.readPayload(bais);
+        return key;
+    }
+
     public Buffer marshallExchange(CamelContext camelContext, Exchange 
exchange) throws IOException {
         DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
         // use DefaultExchangeHolder to marshal to a serialized object

Modified: 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java?rev=930316&r1=930315&r2=930316&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java
 Fri Apr  2 16:28:21 2010
@@ -25,7 +25,7 @@ import org.fusesource.hawtdb.api.Transac
 interface Work<T> {
 
     /**
-     * Executs the work within the bounds of the given transaction
+     * Executes the work within the bounds of the given transaction
      *
      * @param transaction the transaction
      * @return result of the work, can be <tt>null</tt> if no result to return.

Modified: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java?rev=930316&r1=930315&r2=930316&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java
 Fri Apr  2 16:28:21 2010
@@ -70,6 +70,7 @@ public class HawtDBAggregateConcurrentDi
     private void doSendMessages(int files, int poolSize) throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:aggregated");
         mock.expectedMessageCount(2);
+        mock.setResultWaitTime(20 * 1000L);
 
         ExecutorService executor = Executors.newFixedThreadPool(poolSize);
         for (int i = 0; i < files; i++) {

Modified: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java?rev=930316&r1=930315&r2=930316&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java
 Fri Apr  2 16:28:21 2010
@@ -69,6 +69,7 @@ public class HawtDBAggregateConcurrentSa
 
     private void doSendMessages(int files, int poolSize) throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.setResultWaitTime(20 * 1000L);
         mock.expectedMessageCount(1);
         // match number of expected numbers
         mock.message(0).body(String.class).regex("[0-9]{" + files + "}");

Modified: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java?rev=930316&r1=930315&r2=930316&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java
 Fri Apr  2 16:28:21 2010
@@ -44,7 +44,7 @@ public class HawtDBAggregateLoadConcurre
     public void testLoadTestHawtDBAggregate() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMinimumMessageCount(10);
-        mock.setResultWaitTime(20 * 1000);
+        mock.setResultWaitTime(30 * 1000);
 
         ExecutorService executor = Executors.newFixedThreadPool(10);
 

Modified: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java?rev=930316&r1=930315&r2=930316&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java
 Fri Apr  2 16:28:21 2010
@@ -39,7 +39,7 @@ public class HawtDBAggregateLoadTest ext
     public void testLoadTestHawtDBAggregate() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMinimumMessageCount(1);
-        mock.setResultWaitTime(20 * 1000);
+        mock.setResultWaitTime(30 * 1000);
 
         System.out.println("Staring to send " + SIZE + " messages.");
 

Copied: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java
 (from r930237, 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java&r1=930237&r2=930316&rev=930316&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java
 Fri Apr  2 16:28:21 2010
@@ -25,7 +25,7 @@ import org.fusesource.hawtdb.api.Transac
 import org.fusesource.hawtdb.util.buffer.Buffer;
 import org.junit.Test;
 
-public class HawtDBAggregateNotLostTest extends CamelTestSupport {
+public class HawtDBAggregateNotLostRemovedWhenConfirmedTest extends 
CamelTestSupport {
 
     private HawtDBAggregationRepository<String> repo;
 
@@ -37,9 +37,8 @@ public class HawtDBAggregateNotLostTest 
     }
 
     @Test
-    public void testHawtDBAggregateNotLost() throws Exception {
-        getMockEndpoint("mock:aggregated").expectedBodiesReceived("ABCDE");
-        getMockEndpoint("mock:result").expectedMessageCount(0);
+    public void testHawtDBAggregateNotLostRemovedWhenConfirmed() throws 
Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
@@ -49,9 +48,9 @@ public class HawtDBAggregateNotLostTest 
 
         assertMockEndpointsSatisfied();
 
-        String exchangeId = 
getMockEndpoint("mock:aggregated").getReceivedExchanges().get(0).getExchangeId();
+        String exchangeId = 
getMockEndpoint("mock:result").getReceivedExchanges().get(0).getExchangeId();
 
-        // the exchange should be in the completed repo where we should be 
able to find it
+        // the exchange should NOT be in the completed repo as it was confirmed
         final HawtDBFile hawtDBFile = repo.getHawtDBFile();
         final HawtDBCamelMarshaller<Object> marshaller = new 
HawtDBCamelMarshaller<Object>();
         final Buffer confirmKeyBuffer = 
marshaller.marshallConfirmKey(exchangeId);
@@ -62,17 +61,8 @@ public class HawtDBAggregateNotLostTest 
             }
         });
 
-        // assert the exchange was not lost and we got all the information 
still
-        assertNotNull(bf);
-        Exchange completed = marshaller.unmarshallExchange(context, bf);
-        assertNotNull(completed);
-        // should retain the exchange id
-        assertEquals(exchangeId, completed.getExchangeId());
-        assertEquals("ABCDE", completed.getIn().getBody());
-        assertEquals(123, completed.getIn().getHeader("id"));
-        assertEquals("size", 
completed.getProperty(Exchange.AGGREGATED_COMPLETED_BY));
-        assertEquals(5, completed.getProperty(Exchange.AGGREGATED_SIZE));
-        assertEquals(123, 
completed.getProperty(Exchange.AGGREGATED_CORRELATION_KEY));
+        // assert the exchange was deleted
+        assertNull(bf);
     }
 
     @Override
@@ -84,9 +74,6 @@ public class HawtDBAggregateNotLostTest 
                     .aggregate(header("id"), new MyAggregationStrategy())
                         .completionSize(5).aggregationRepository(repo)
                         .log("aggregated exchange id ${exchangeId} with 
${body}")
-                        .to("mock:aggregated")
-                        // throw an exception to fail, which we then will 
loose this message
-                        .throwException(new IllegalArgumentException("Damn"))
                         .to("mock:result")
                     .end();
             }

Copied: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java
 (from r930237, 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java&r1=930237&r2=930316&rev=930316&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java
 Fri Apr  2 16:28:21 2010
@@ -16,30 +16,37 @@
  */
 package org.apache.camel.component.hawtdb;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.test.junit4.CamelTestSupport;
-import org.fusesource.hawtdb.api.Index;
-import org.fusesource.hawtdb.api.Transaction;
-import org.fusesource.hawtdb.util.buffer.Buffer;
 import org.junit.Test;
 
-public class HawtDBAggregateNotLostTest extends CamelTestSupport {
+public class HawtDBAggregateRecoverTest extends CamelTestSupport {
 
     private HawtDBAggregationRepository<String> repo;
+    private static AtomicInteger counter = new AtomicInteger(0);
 
     @Override
     public void setUp() throws Exception {
         deleteDirectory("target/data");
         repo = new HawtDBAggregationRepository<String>("repo1", 
"target/data/hawtdb.dat");
+        // enable recovery
+        repo.setUseRecovery(true);
+        // check faster
+        repo.setCheckInterval(1, TimeUnit.SECONDS);
         super.setUp();
     }
 
     @Test
-    public void testHawtDBAggregateNotLost() throws Exception {
-        getMockEndpoint("mock:aggregated").expectedBodiesReceived("ABCDE");
-        getMockEndpoint("mock:result").expectedMessageCount(0);
+    public void testHawtDBAggregateRecover() throws Exception {
+        // should fail the first 2 times and then recover
+        getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
@@ -48,31 +55,6 @@ public class HawtDBAggregateNotLostTest 
         template.sendBodyAndHeader("direct:start", "E", "id", 123);
 
         assertMockEndpointsSatisfied();
-
-        String exchangeId = 
getMockEndpoint("mock:aggregated").getReceivedExchanges().get(0).getExchangeId();
-
-        // the exchange should be in the completed repo where we should be 
able to find it
-        final HawtDBFile hawtDBFile = repo.getHawtDBFile();
-        final HawtDBCamelMarshaller<Object> marshaller = new 
HawtDBCamelMarshaller<Object>();
-        final Buffer confirmKeyBuffer = 
marshaller.marshallConfirmKey(exchangeId);
-        Buffer bf = hawtDBFile.execute(new Work<Buffer>() {
-            public Buffer execute(Transaction tx) {
-                Index<Buffer, Buffer> index = 
hawtDBFile.getRepositoryIndex(tx, "repo1-completed");
-                return index.get(confirmKeyBuffer);
-            }
-        });
-
-        // assert the exchange was not lost and we got all the information 
still
-        assertNotNull(bf);
-        Exchange completed = marshaller.unmarshallExchange(context, bf);
-        assertNotNull(completed);
-        // should retain the exchange id
-        assertEquals(exchangeId, completed.getExchangeId());
-        assertEquals("ABCDE", completed.getIn().getBody());
-        assertEquals(123, completed.getIn().getHeader("id"));
-        assertEquals("size", 
completed.getProperty(Exchange.AGGREGATED_COMPLETED_BY));
-        assertEquals(5, completed.getProperty(Exchange.AGGREGATED_SIZE));
-        assertEquals(123, 
completed.getProperty(Exchange.AGGREGATED_CORRELATION_KEY));
     }
 
     @Override
@@ -85,8 +67,16 @@ public class HawtDBAggregateNotLostTest 
                         .completionSize(5).aggregationRepository(repo)
                         .log("aggregated exchange id ${exchangeId} with 
${body}")
                         .to("mock:aggregated")
-                        // throw an exception to fail, which we then will 
loose this message
-                        .throwException(new IllegalArgumentException("Damn"))
+                        .delay(2000)
+                        // simulate errors the first two times
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws 
Exception {
+                                int count = counter.incrementAndGet();
+                                if (count <= 2) {
+                                    throw new IllegalArgumentException("Damn");
+                                }
+                            }
+                        })
                         .to("mock:result")
                     .end();
             }

Modified: 
camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties?rev=930316&r1=930315&r2=930316&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties 
(original)
+++ camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties Fri 
Apr  2 16:28:21 2010
@@ -23,6 +23,9 @@ log4j.rootLogger=INFO, file
 # uncomment the following to enable camel debugging
 #log4j.logger.org.apache.camel=DEBUG
 #log4j.logger.org.apache.camel.component.hawtdb=DEBUG
+log4j.logger.org.apache.camel.impl.converter=WARN
+log4j.logger.org.apache.camel.management=WARN
+log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
 
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender


Reply via email to