Author: jstrachan
Date: Wed Apr 18 10:45:05 2007
New Revision: 530102

URL: http://svn.apache.org/viewvc?view=rev&rev=530102
Log:
minor refactor to make the builder support ExpressionFactory; also added 
support for IdempotentConsumer along with a test case

Added:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java
   (with props)
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
   (with props)
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
   (with props)
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
   (with props)
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
   (with props)
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
   (with props)
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java
   (with props)
    
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
   (with props)
Modified:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RecipientListBuilder.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ValueBuilder.java
    
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java?view=diff&rev=530102&r1=530101&r2=530102
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
 Wed Apr 18 10:45:05 2007
@@ -16,12 +16,9 @@
  */
 package org.apache.camel.builder;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.CompositeProcessor;
@@ -29,6 +26,12 @@
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.processor.Pipeline;
 import org.apache.camel.processor.RecipientList;
+import org.apache.camel.processor.idempotent.IdempotentConsumer;
+import org.apache.camel.processor.idempotent.MessageIdRepository;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
 /**
  * @version $Revision$
@@ -55,7 +58,7 @@
      * Sends the exchange to the given endpoint URI
      */
     @Fluent
-    public ProcessorFactory<E> to(@FluentArg("uri") String uri) {
+    public ProcessorFactory<E> to(@FluentArg("uri")String uri) {
         return to(endpoint(uri));
     }
 
@@ -63,7 +66,7 @@
      * Sends the exchange to the given endpoint
      */
     @Fluent
-    public ProcessorFactory<E> to(@FluentArg("endpoint") Endpoint<E> endpoint) 
{
+    public ProcessorFactory<E> to(@FluentArg("endpoint")Endpoint<E> endpoint) {
         ToBuilder<E> answer = new ToBuilder<E>(this, endpoint);
         addProcessBuilder(answer);
         return answer;
@@ -74,8 +77,8 @@
      */
     @Fluent
     public ProcessorFactory<E> to(
-               @FluentArg(value="uri", attribute=false, element=true) 
-               String... uris) {
+            @FluentArg(value = "uri", attribute = false, element = true)
+            String... uris) {
         return to(endpoints(uris));
     }
 
@@ -84,8 +87,8 @@
      */
     @Fluent
     public ProcessorFactory<E> to(
-               @FluentArg(value="endpoint", attribute=false, element=true) 
-               Endpoint<E>... endpoints) {
+            @FluentArg(value = "endpoint", attribute = false, element = true)
+            Endpoint<E>... endpoints) {
         return to(endpoints(endpoints));
     }
 
@@ -93,7 +96,7 @@
      * Sends the exchange to a list of endpoint using the [EMAIL PROTECTED] 
MulticastProcessor} pattern
      */
     @Fluent
-    public ProcessorFactory<E> to(@FluentArg("endpoints") 
Collection<Endpoint<E>> endpoints) {
+    public ProcessorFactory<E> 
to(@FluentArg("endpoints")Collection<Endpoint<E>> endpoints) {
         return addProcessBuilder(new MulticastBuilder<E>(this, endpoints));
     }
 
@@ -102,7 +105,7 @@
      * and for request/response the output of one endpoint will be the input 
of the next endpoint
      */
     @Fluent
-    public ProcessorFactory<E> pipeline(@FluentArg("uris") String... uris) {
+    public ProcessorFactory<E> pipeline(@FluentArg("uris")String... uris) {
         return pipeline(endpoints(uris));
     }
 
@@ -111,7 +114,7 @@
      * and for request/response the output of one endpoint will be the input 
of the next endpoint
      */
     @Fluent
-    public ProcessorFactory<E> pipeline(@FluentArg("endpoints") Endpoint<E>... 
endpoints) {
+    public ProcessorFactory<E> pipeline(@FluentArg("endpoints")Endpoint<E>... 
endpoints) {
         return pipeline(endpoints(endpoints));
     }
 
@@ -120,15 +123,32 @@
      * and for request/response the output of one endpoint will be the input 
of the next endpoint
      */
     @Fluent
-    public ProcessorFactory<E> pipeline(@FluentArg("endpoints") 
Collection<Endpoint<E>> endpoints) {
+    public ProcessorFactory<E> 
pipeline(@FluentArg("endpoints")Collection<Endpoint<E>> endpoints) {
         return addProcessBuilder(new PipelineBuilder<E>(this, endpoints));
     }
 
     /**
+     * Creates an [EMAIL PROTECTED] IdempotentConsumer} to avoid duplicate 
messages
+     */
+    @Fluent
+    public IdempotentConsumerBuilder<E> idempotentConsumer(
+            @FluentArg("messageIdExpression")Expression<E> messageIdExpression,
+            @FluentArg("MessageIdRepository")MessageIdRepository 
messageIdRepository) {
+        return (IdempotentConsumerBuilder<E>) addProcessBuilder(new 
IdempotentConsumerBuilder<E>(this, messageIdExpression, messageIdRepository));
+    }
+
+    /**
+     * Creates an [EMAIL PROTECTED] IdempotentConsumer} to avoid duplicate 
messages
+     */
+    public IdempotentConsumerBuilder<E> 
idempotentConsumer(ExpressionFactory<E> messageIdExpressionFactory, 
MessageIdRepository messageIdRepository) {
+        return 
idempotentConsumer(messageIdExpressionFactory.createExpression(), 
messageIdRepository);
+    }
+
+    /**
      * Adds the custom processor to this destination
      */
     @Fluent
-    public ConstantProcessorBuilder<E> process(@FluentArg("ref") Processor<E> 
processor) {
+    public ConstantProcessorBuilder<E> process(@FluentArg("ref")Processor<E> 
processor) {
         ConstantProcessorBuilder<E> answer = new 
ConstantProcessorBuilder<E>(processor);
         addProcessBuilder(answer);
         return answer;
@@ -142,8 +162,8 @@
      */
     @Fluent
     public FilterBuilder<E> filter(
-               @FluentArg(value="predicate",element=true) 
-               Predicate<E> predicate) {
+            @FluentArg(value = "predicate", element = true)
+            Predicate<E> predicate) {
         FilterBuilder<E> answer = new FilterBuilder<E>(this, predicate);
         addProcessBuilder(answer);
         return answer;
@@ -154,7 +174,7 @@
      *
      * @return the builder for a choice expression
      */
-    @Fluent(nestedActions=true)
+    @Fluent(nestedActions = true)
     public ChoiceBuilder<E> choice() {
         ChoiceBuilder<E> answer = new ChoiceBuilder<E>(this);
         addProcessBuilder(answer);
@@ -168,8 +188,8 @@
      */
     @Fluent
     public RecipientListBuilder<E> recipientList(
-               @FluentArg(value="recipients",element=true) 
-               ValueBuilder<E> receipients) {
+            @FluentArg(value = "recipients", element = true)
+            ValueBuilder<E> receipients) {
         RecipientListBuilder<E> answer = new RecipientListBuilder<E>(this, 
receipients);
         addProcessBuilder(answer);
         return answer;
@@ -183,7 +203,7 @@
      * @return the builder
      */
     @Fluent
-    public SplitterBuilder<E> splitter(@FluentArg(value="recipients", 
element=true) ValueBuilder<E> receipients) {
+    public SplitterBuilder<E> splitter(@FluentArg(value = "recipients", 
element = true)ValueBuilder<E> receipients) {
         SplitterBuilder<E> answer = new SplitterBuilder<E>(this, receipients);
         addProcessBuilder(answer);
         return answer;
@@ -196,7 +216,7 @@
      * @return the current builder with the error handler configured
      */
     @Fluent
-    public FromBuilder<E> errorHandler(@FluentArg("handler") 
ErrorHandlerBuilder errorHandlerBuilder) {
+    public FromBuilder<E> 
errorHandler(@FluentArg("handler")ErrorHandlerBuilder errorHandlerBuilder) {
         setErrorHandlerBuilder(errorHandlerBuilder);
         return this;
     }
@@ -208,12 +228,12 @@
      * @return the current builder
      */
     @Fluent
-    public FromBuilder<E> inheritErrorHandler(@FluentArg("condition") boolean 
condition) {
+    public FromBuilder<E> inheritErrorHandler(@FluentArg("condition")boolean 
condition) {
         setInheritErrorHandler(condition);
         return this;
     }
-    
-    @Fluent(nestedActions=true)
+
+    @Fluent(nestedActions = true)
     public InterceptorBuilder<E> intercept() {
         InterceptorBuilder<E> answer = new InterceptorBuilder<E>(this);
         addProcessBuilder(answer);
@@ -221,14 +241,13 @@
     }
 
     @Fluent
-    public InterceptorBuilder<E> intercept(@FluentArg("interceptor") 
InterceptorProcessor<E> interceptor) {
+    public InterceptorBuilder<E> 
intercept(@FluentArg("interceptor")InterceptorProcessor<E> interceptor) {
         InterceptorBuilder<E> answer = new InterceptorBuilder<E>(this);
         answer.add(interceptor);
         addProcessBuilder(answer);
         return answer;
     }
 
-
     // Properties
     //-------------------------------------------------------------------------
     public RouteBuilder<E> getBuilder() {
@@ -274,11 +293,31 @@
      */
     protected Processor<E> makeProcessor(ProcessorFactory<E> processFactory) 
throws Exception {
         Processor<E> processor = processFactory.createProcessor();
+        processor = wrapProcessor(processor);
+        return wrapInErrorHandler(processor);
+    }
+
+    /**
+     * A strategy method to allow newly created processors to be wrapped in an 
error handler. This feature
+     * could be disabled for child builders such as [EMAIL PROTECTED] 
IdempotentConsumerBuilder} which will rely on the
+     * [EMAIL PROTECTED] FromBuilder} to perform the error handling to avoid 
doubly-wrapped processors with 2 nested error handlers
+     */
+    protected Processor<E> wrapInErrorHandler(Processor<E> processor) throws 
Exception {
         return getErrorHandlerBuilder().createErrorHandler(processor);
     }
 
+    /**
+     * A strategy method which allows derived classes to wrap the child 
processor in some kind of interceptor such as
+     * a filter for the [EMAIL PROTECTED] IdempotentConsumerBuilder}.
+     *
+     * @param processor the processor which can be wrapped
+     * @return the original processor or a new wrapped interceptor
+     */
+    protected Processor<E> wrapProcessor(Processor<E> processor) {
+        return processor;
+    }
+
     public List<Processor<E>> getProcessors() {
         return processors;
     }
-
 }

Added: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java?view=auto&rev=530102
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java
 (added)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java
 Wed Apr 18 10:45:05 2007
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Expression;
+import org.apache.camel.processor.idempotent.IdempotentConsumer;
+import org.apache.camel.processor.idempotent.MessageIdRepository;
+
+/**
+ * A builder of an [EMAIL PROTECTED] IdempotentConsumer}
+ *
+ * @version $Revision: 1.1 $
+ */
+public class IdempotentConsumerBuilder<E extends Exchange> extends 
FromBuilder<E> implements ProcessorFactory<E> {
+    private final Expression<E> messageIdExpression;
+    private final MessageIdRepository messageIdRegistry;
+
+    public IdempotentConsumerBuilder(FromBuilder<E> fromBuilder, Expression<E> 
messageIdExpression, MessageIdRepository messageIdRegistry) {
+        super(fromBuilder);
+        this.messageIdRegistry = messageIdRegistry;
+        this.messageIdExpression = messageIdExpression;
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public MessageIdRepository getMessageIdRegistry() {
+        return messageIdRegistry;
+    }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+    @Override
+    protected Processor<E> wrapInErrorHandler(Processor<E> processor) throws 
Exception {
+        // lets do no wrapping in error handlers as the parent FromBuilder 
will do that
+        return processor;
+    }
+
+    @Override
+    protected Processor<E> wrapProcessor(Processor<E> processor) {
+        return new IdempotentConsumer<E>(messageIdExpression, 
messageIdRegistry, processor);
+    }
+}

Propchange: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RecipientListBuilder.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RecipientListBuilder.java?view=diff&rev=530102&r1=530101&r2=530102
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RecipientListBuilder.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RecipientListBuilder.java
 Wed Apr 18 10:45:05 2007
@@ -28,15 +28,15 @@
  * @version $Revision$
  */
 public class RecipientListBuilder<E extends Exchange> extends 
BuilderSupport<E> implements ProcessorFactory<E> {
-    private final ValueBuilder<E> valueBuilder;
+    private final ExpressionFactory<E> expressionFactory;
 
-    public RecipientListBuilder(FromBuilder<E> parent, ValueBuilder<E> 
valueBuilder) {
+    public RecipientListBuilder(FromBuilder<E> parent, ExpressionFactory<E> 
expressionFactory) {
         super(parent);
-        this.valueBuilder = valueBuilder;
+        this.expressionFactory = expressionFactory;
     }
 
     public Processor<E> createProcessor() {
-        Expression<E> expression = valueBuilder.getExpression();
+        Expression<E> expression = expressionFactory.createExpression();
         return new RecipientList<E>(expression);
     }
 }

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java?view=diff&rev=530102&r1=530101&r2=530102
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
 Wed Apr 18 10:45:05 2007
@@ -29,17 +29,17 @@
  * @version $Revision$
  */
 public class SplitterBuilder<E extends Exchange> extends FromBuilder<E> {
-    private final ValueBuilder<E> valueBuilder;
+    private final ExpressionFactory<E> expressionFactory;
 
-    public SplitterBuilder(FromBuilder<E> parent, ValueBuilder<E> 
valueBuilder) {
+    public SplitterBuilder(FromBuilder<E> parent, ExpressionFactory<E> 
expressionFactory) {
         super(parent);
-        this.valueBuilder = valueBuilder;
+        this.expressionFactory = expressionFactory;
     }
 
     public Processor<E> createProcessor() throws Exception {
         // lets create a single processor for all child predicates
         Processor<E> destination = super.createProcessor();
-        Expression<E> expression = valueBuilder.getExpression();
+        Expression<E> expression = expressionFactory.createExpression();
         return new Splitter<E>(destination, expression);
     }
 }

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ValueBuilder.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ValueBuilder.java?view=diff&rev=530102&r1=530101&r2=530102
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ValueBuilder.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ValueBuilder.java
 Wed Apr 18 10:45:05 2007
@@ -25,7 +25,7 @@
  *
  * @version $Revision: $
  */
-public class ValueBuilder<E extends Exchange> {
+public class ValueBuilder<E extends Exchange> implements ExpressionFactory<E> {
     private Expression<E> expression;
 
     public ValueBuilder(Expression<E> expression) {
@@ -36,6 +36,11 @@
         return expression;
     }
 
+
+    public Expression<E> createExpression() {
+        return expression;
+    }
+
     @Fluent
     public Predicate<E> isNotEqualTo(@FluentArg("value") Object value) {
         Expression<E> right = ExpressionBuilder.constantExpression(value);
@@ -124,4 +129,5 @@
     public ValueBuilder<E> convertToString() {
         return convertTo(String.class);
     }
+
 }

Added: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?view=auto&rev=530102
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
 (added)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
 Wed Apr 18 10:45:05 2007
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Processor;
+import org.apache.camel.util.ExpressionHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An implementation of the
+ * <a 
href="http://activemq.apache.org/camel/idempotent-consumer.html";>Idempotent 
Consumer</a> pattern.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class IdempotentConsumer<E extends Exchange> implements Processor<E> {
+    private static final transient Log log = 
LogFactory.getLog(IdempotentConsumer.class);
+    private Expression<E> messageIdExpression;
+    private Processor<E> nextProcessor;
+    private MessageIdRepository messageIdRepository;
+
+    public IdempotentConsumer(Expression<E> messageIdExpression, 
MessageIdRepository messageIdRepository, Processor<E> nextProcessor) {
+        this.messageIdExpression = messageIdExpression;
+        this.messageIdRepository = messageIdRepository;
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public String toString() {
+        return "IdempotentConsumer[expression=" + messageIdExpression + ", 
repository=" + messageIdRepository + ", processor=" + nextProcessor + "]";
+    }
+
+    public void process(E exchange) {
+        String messageId = 
ExpressionHelper.evaluateAsString(messageIdExpression, exchange);
+        if (messageId == null) {
+            throw new NoMessageIdException(exchange, messageIdExpression);
+        }
+        if (!messageIdRepository.contains(messageId)) {
+            nextProcessor.process(exchange);
+        }
+        else {
+            onDuplicateMessage(exchange, messageId);
+        }
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public Expression<E> getMessageIdExpression() {
+        return messageIdExpression;
+    }
+
+    public MessageIdRepository getMessageIdRepository() {
+        return messageIdRepository;
+    }
+
+    public Processor<E> getNextProcessor() {
+        return nextProcessor;
+    }
+
+    /**
+     * A strategy method to allow derived classes to overload the behaviour of 
processing a duplicate message
+     *
+     * @param exchange the exchange
+     * @param messageId the message ID of this exchange
+     */
+    protected void onDuplicateMessage(E exchange, String messageId) {
+        if (log.isDebugEnabled()) {
+            log.debug("Ignoring duplicate message with id: " + messageId + " 
for exchange: " + exchange);
+        }
+    }
+}

Propchange: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java?view=auto&rev=530102
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
 (added)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
 Wed Apr 18 10:45:05 2007
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import java.util.Set;
+import java.util.HashSet;
+
+/**
+ * A simple memory implementation of [EMAIL PROTECTED] MessageIdRepository}; 
though warning this could use up lots of RAM!
+ *
+ * @version $Revision: 1.1 $
+ */
+public class MemoryMessageIdRepository implements MessageIdRepository {
+    private Set set;
+
+    /**
+     * Creates a new MemoryMessageIdRepository with a memory based respository
+     */
+    public static MessageIdRepository memoryMessageIdRepository() {
+        return memoryMessageIdRepository(new HashSet());
+    }
+
+    /**
+     * Creates a new MemoryMessageIdRepository using the given [EMAIL 
PROTECTED] Set} to use to store the
+     * processed Message ID objects
+     */
+    public static MessageIdRepository memoryMessageIdRepository(Set set) {
+        return new MemoryMessageIdRepository(set);
+    }
+
+    public MemoryMessageIdRepository(Set set) {
+        this.set = set;
+    }
+
+    public boolean contains(String messageId) {
+        synchronized (set) {
+            if (set.contains(messageId)) {
+                return true;
+            }
+            else {
+                set.add(messageId);
+                return false;
+            }
+        }
+    }
+}

Propchange: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java?view=auto&rev=530102
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
 (added)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
 Wed Apr 18 10:45:05 2007
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+/**
+ * Access to a repository of Message IDs to implement the
+ * <a 
href="http://activemq.apache.org/camel/idempotent-consumer.html";>Idempotent 
Consumer</a> pattern.
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface MessageIdRepository {
+
+    /**
+     * Returns true if this messageId has been processed before
+     * otherwise this messageId is added to the repository and false is 
returned.
+     *
+     * @param messageId the String ID of the message
+     * @return true if the message has been processed succesfully before 
otherwise false
+     */
+    boolean contains(String messageId);
+}

Propchange: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java?view=auto&rev=530102
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
 (added)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
 Wed Apr 18 10:45:05 2007
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import org.apache.camel.Expression;
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
+
+/**
+ * An exception thrown if no message ID could be found on a message which is 
to be used with the
+ * <a 
href="http://activemq.apache.org/camel/idempotent-consumer.html";>Idempotent 
Consumer</a> pattern.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class NoMessageIdException extends RuntimeCamelException {
+    private final Exchange exchange;
+    private final Expression expression;
+
+    public NoMessageIdException(Exchange exchange, Expression expression) {
+        super("No message ID could be found using expression: " + expression + 
" on message exchange: " + exchange);
+        this.exchange = exchange;
+        this.expression = expression;
+    }
+
+    /**
+     * The exchange which caused this failure
+     */
+    public Exchange getExchange() {
+        return exchange;
+    }
+
+    /**
+     * The expression which was used
+     */
+    public Expression getExpression() {
+        return expression;
+    }
+}

Propchange: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html?view=auto&rev=530102
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
 (added)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
 Wed Apr 18 10:45:05 2007
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+An implementation of the <a 
href="http://activemq.apache.org/camel/idempotent-consumer.html";>Idempotent 
Consumer</a> pattern.
+
+</body>
+</html>

Propchange: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java?view=auto&rev=530102
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java
 (added)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java
 Wed Apr 18 10:45:05 2007
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+
+/**
+ * A collection of helper methods for working with expressions.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class ExpressionHelper {
+
+    /**
+     * Evaluates the given expression on the exchange as a String value
+     *
+     * @param expression the expression to evaluate
+     * @param exchange the exchange to use to evaluate the expression
+     * @return the result of the evaluation as a string.
+     */
+    public static <E extends Exchange> String evaluateAsString(Expression<E> 
expression, E exchange) {
+        return evaluateAsType(expression, exchange, String.class);
+    }
+
+    /**
+     * Evaluates the given expression on the exchange, converting the result 
to the given type
+     *
+     * @param expression the expression to evaluate
+     * @param exchange the exchange to use to evaluate the expression
+     * @param resultType the type of the result that is required
+     * @return the result of the evaluation as the specified type.
+     */
+    public static <T, E extends Exchange> T evaluateAsType(Expression<E> 
expression, E exchange, Class<T> resultType) {
+        Object value = expression.evaluate(exchange);
+        return exchange.getContext().getTypeConverter().convertTo(resultType, 
value);
+    }
+}

Propchange: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java?view=auto&rev=530102
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
 (added)
+++ 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
 Wed Apr 18 10:45:05 2007
@@ -0,0 +1,24 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class IdempotentConsumerTest {
+}

Propchange: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java?view=diff&rev=530102&r1=530101&r2=530102
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
 Wed Apr 18 10:45:05 2007
@@ -16,9 +16,6 @@
  */
 package org.apache.camel.builder;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -32,28 +29,33 @@
 import org.apache.camel.processor.RecipientList;
 import org.apache.camel.processor.SendProcessor;
 import org.apache.camel.processor.Splitter;
+import org.apache.camel.processor.idempotent.IdempotentConsumer;
+import org.apache.camel.processor.idempotent.MemoryMessageIdRepository;
+import static 
org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository;
+
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * @version $Revision$
  */
 public class RouteBuilderTest extends TestSupport {
-       
-       protected Processor<Exchange> myProcessor = new MyProcessor();          
-       protected InterceptorProcessor<Exchange> interceptor1;
-       protected InterceptorProcessor<Exchange> interceptor2;
-    
-       protected RouteBuilder<Exchange> buildSimpleRoute() {
-               // START SNIPPET: e1
+    protected Processor<Exchange> myProcessor = new MyProcessor();
+    protected InterceptorProcessor<Exchange> interceptor1;
+    protected InterceptorProcessor<Exchange> interceptor2;
+
+    protected RouteBuilder<Exchange> buildSimpleRoute() {
+        // START SNIPPET: e1
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
                 from("queue:a").to("queue:b");
             }
         };
         // END SNIPPET: e1
-               return builder;
-       }
+        return builder;
+    }
 
-       public void testSimpleRoute() throws Exception {
+    public void testSimpleRoute() throws Exception {
         RouteBuilder<Exchange> builder = buildSimpleRoute();
 
         List<Route<Exchange>> routes = builder.getRouteList();
@@ -69,17 +71,17 @@
     }
 
     protected RouteBuilder<Exchange> buildSimpleRouteWithHeaderPredicate() {
-               // START SNIPPET: e2
+        // START SNIPPET: e2
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
                 
from("queue:a").filter(header("foo").isEqualTo("bar")).to("queue:b");
             }
         };
         // END SNIPPET: e2
-               return builder;
-       }
+        return builder;
+    }
 
-       public void testSimpleRouteWithHeaderPredicate() throws Exception {
+    public void testSimpleRouteWithHeaderPredicate() throws Exception {
         RouteBuilder<Exchange> builder = buildSimpleRouteWithHeaderPredicate();
 
         List<Route<Exchange>> routes = builder.getRouteList();
@@ -92,14 +94,13 @@
             Processor processor = getProcessorWithoutErrorHandler(route);
 
             FilterProcessor filterProcessor = 
assertIsInstanceOf(FilterProcessor.class, processor);
-    SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class, 
unwrapErrorHandler(filterProcessor.getProcessor()));
+            SendProcessor sendProcessor = 
assertIsInstanceOf(SendProcessor.class, 
unwrapErrorHandler(filterProcessor.getProcessor()));
             assertEquals("Endpoint URI", "queue:b", 
sendProcessor.getDestination().getEndpointUri());
         }
     }
 
-
     protected RouteBuilder<Exchange> buildSimpleRouteWithChoice() {
-               // START SNIPPET: e3
+        // START SNIPPET: e3
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
                 from("queue:a").choice()
@@ -109,8 +110,8 @@
             }
         };
         // END SNIPPET: e3
-               return builder;
-       }
+        return builder;
+    }
 
     public void testSimpleRouteWithChoice() throws Exception {
         RouteBuilder<Exchange> builder = buildSimpleRouteWithChoice();
@@ -139,7 +140,7 @@
     }
 
     protected RouteBuilder<Exchange> buildCustomProcessor() {
-               // START SNIPPET: e4
+        // START SNIPPET: e4
         myProcessor = new Processor<Exchange>() {
             public void process(Exchange exchange) {
                 System.out.println("Called with exchange: " + exchange);
@@ -152,10 +153,10 @@
             }
         };
         // END SNIPPET: e4
-               return builder;
-       }
+        return builder;
+    }
 
-       public void testCustomProcessor() throws Exception {
+    public void testCustomProcessor() throws Exception {
         RouteBuilder<Exchange> builder = buildCustomProcessor();
 
         List<Route<Exchange>> routes = builder.getRouteList();
@@ -170,19 +171,18 @@
         }
     }
 
-
-       protected RouteBuilder<Exchange> buildCustomProcessorWithFilter() {
-               // START SNIPPET: e5
+    protected RouteBuilder<Exchange> buildCustomProcessorWithFilter() {
+        // START SNIPPET: e5
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
                 
from("queue:a").filter(header("foo").isEqualTo("bar")).process(myProcessor);
             }
         };
         // END SNIPPET: e5
-               return builder;
-       }
+        return builder;
+    }
 
-       public void testCustomProcessorWithFilter() throws Exception {
+    public void testCustomProcessorWithFilter() throws Exception {
         RouteBuilder<Exchange> builder = buildCustomProcessorWithFilter();
 
         List<Route<Exchange>> routes = builder.getRouteList();
@@ -199,17 +199,16 @@
         }
     }
 
-
-       protected RouteBuilder<Exchange> buildWireTap() {
-               // START SNIPPET: e6
+    protected RouteBuilder<Exchange> buildWireTap() {
+        // START SNIPPET: e6
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
                 from("queue:a").to("queue:tap", "queue:b");
             }
         };
         // END SNIPPET: e6
-               return builder;
-       }
+        return builder;
+    }
 
     public void testWireTap() throws Exception {
         RouteBuilder<Exchange> builder = buildWireTap();
@@ -233,7 +232,7 @@
     }
 
     protected RouteBuilder<Exchange> buildRouteWithInterceptor() {
-               interceptor1 = new InterceptorProcessor<Exchange>() {
+        interceptor1 = new InterceptorProcessor<Exchange>() {
         };
 
         // START SNIPPET: e7        
@@ -242,18 +241,18 @@
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
                 from("queue:a")
-                    .intercept()
-                          .add(interceptor1)
-                          .add(interceptor2)
-                       .target().to("queue:d");
+                        .intercept()
+                        .add(interceptor1)
+                        .add(interceptor2)
+                        .target().to("queue:d");
             }
         };
         // END SNIPPET: e7
-               return builder;
-       }
+        return builder;
+    }
 
     public void testRouteWithInterceptor() throws Exception {
-       
+
         RouteBuilder<Exchange> builder = buildRouteWithInterceptor();
 
         List<Route<Exchange>> routes = builder.getRouteList();
@@ -274,8 +273,8 @@
         }
     }
 
-       public void testComplexExpressions() throws Exception {
-               // START SNIPPET: e7
+    public void testComplexExpressions() throws Exception {
+        // START SNIPPET: e7
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
                 
from("queue:a").filter(header("foo").isEqualTo(123)).to("queue:b");
@@ -341,6 +340,7 @@
             RecipientList<Exchange> p1 = 
assertIsInstanceOf(RecipientList.class, processor);
         }
     }
+
     protected RouteBuilder<Exchange> buildSplitter() {
         // START SNIPPET: splitter
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
@@ -366,6 +366,41 @@
             Processor processor = getProcessorWithoutErrorHandler(route);
 
             Splitter<Exchange> p1 = assertIsInstanceOf(Splitter.class, 
processor);
+        }
+    }
+
+    protected RouteBuilder<Exchange> buildIdempotentConsumer() {
+        // START SNIPPET: idempotent
+        RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
+            public void configure() {
+                from("queue:a").idempotentConsumer(header("myMessageId"), 
memoryMessageIdRepository()).to("queue:b");
+            }
+        };
+        // END SNIPPET: idempotent
+        return builder;
+    }
+
+    public void testIdempotentConsumer() throws Exception {
+
+        RouteBuilder<Exchange> builder = buildIdempotentConsumer();
+
+        List<Route<Exchange>> routes = builder.getRouteList();
+        System.out.println("Created routes: " + routes);
+
+        assertEquals("Number routes created", 1, routes.size());
+        for (Route<Exchange> route : routes) {
+            Endpoint<Exchange> key = route.getEndpoint();
+            assertEquals("From endpoint", "queue:a", key.getEndpointUri());
+            Processor processor = getProcessorWithoutErrorHandler(route);
+
+            IdempotentConsumer<Exchange> idempotentConsumer = 
assertIsInstanceOf(IdempotentConsumer.class, processor);
+
+            assertEquals("messageIdExpression", "header(myMessageId)", 
idempotentConsumer.getMessageIdExpression().toString());
+
+            assertIsInstanceOf(MemoryMessageIdRepository.class, 
idempotentConsumer.getMessageIdRepository());
+
+            SendProcessor sendProcessor = 
assertIsInstanceOf(SendProcessor.class, idempotentConsumer.getNextProcessor());
+            assertEquals("Endpoint URI", "queue:b", 
sendProcessor.getDestination().getEndpointUri());
         }
     }
 


Reply via email to