Author: jstrachan
Date: Wed Apr 18 10:45:05 2007
New Revision: 530102
URL: http://svn.apache.org/viewvc?view=rev&rev=530102
Log:
minor refactor to make the builder support ExpressionFactory; also added
support for IdempotentConsumer along with a test case
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java
(with props)
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
(with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RecipientListBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ValueBuilder.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java?view=diff&rev=530102&r1=530101&r2=530102
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
Wed Apr 18 10:45:05 2007
@@ -16,12 +16,9 @@
*/
package org.apache.camel.builder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.processor.CompositeProcessor;
@@ -29,6 +26,12 @@
import org.apache.camel.processor.MulticastProcessor;
import org.apache.camel.processor.Pipeline;
import org.apache.camel.processor.RecipientList;
+import org.apache.camel.processor.idempotent.IdempotentConsumer;
+import org.apache.camel.processor.idempotent.MessageIdRepository;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
/**
* @version $Revision$
@@ -55,7 +58,7 @@
* Sends the exchange to the given endpoint URI
*/
@Fluent
- public ProcessorFactory<E> to(@FluentArg("uri") String uri) {
+ public ProcessorFactory<E> to(@FluentArg("uri")String uri) {
return to(endpoint(uri));
}
@@ -63,7 +66,7 @@
* Sends the exchange to the given endpoint
*/
@Fluent
- public ProcessorFactory<E> to(@FluentArg("endpoint") Endpoint<E> endpoint)
{
+ public ProcessorFactory<E> to(@FluentArg("endpoint")Endpoint<E> endpoint) {
ToBuilder<E> answer = new ToBuilder<E>(this, endpoint);
addProcessBuilder(answer);
return answer;
@@ -74,8 +77,8 @@
*/
@Fluent
public ProcessorFactory<E> to(
- @FluentArg(value="uri", attribute=false, element=true)
- String... uris) {
+ @FluentArg(value = "uri", attribute = false, element = true)
+ String... uris) {
return to(endpoints(uris));
}
@@ -84,8 +87,8 @@
*/
@Fluent
public ProcessorFactory<E> to(
- @FluentArg(value="endpoint", attribute=false, element=true)
- Endpoint<E>... endpoints) {
+ @FluentArg(value = "endpoint", attribute = false, element = true)
+ Endpoint<E>... endpoints) {
return to(endpoints(endpoints));
}
@@ -93,7 +96,7 @@
* Sends the exchange to a list of endpoint using the [EMAIL PROTECTED]
MulticastProcessor} pattern
*/
@Fluent
- public ProcessorFactory<E> to(@FluentArg("endpoints")
Collection<Endpoint<E>> endpoints) {
+ public ProcessorFactory<E>
to(@FluentArg("endpoints")Collection<Endpoint<E>> endpoints) {
return addProcessBuilder(new MulticastBuilder<E>(this, endpoints));
}
@@ -102,7 +105,7 @@
* and for request/response the output of one endpoint will be the input
of the next endpoint
*/
@Fluent
- public ProcessorFactory<E> pipeline(@FluentArg("uris") String... uris) {
+ public ProcessorFactory<E> pipeline(@FluentArg("uris")String... uris) {
return pipeline(endpoints(uris));
}
@@ -111,7 +114,7 @@
* and for request/response the output of one endpoint will be the input
of the next endpoint
*/
@Fluent
- public ProcessorFactory<E> pipeline(@FluentArg("endpoints") Endpoint<E>...
endpoints) {
+ public ProcessorFactory<E> pipeline(@FluentArg("endpoints")Endpoint<E>...
endpoints) {
return pipeline(endpoints(endpoints));
}
@@ -120,15 +123,32 @@
* and for request/response the output of one endpoint will be the input
of the next endpoint
*/
@Fluent
- public ProcessorFactory<E> pipeline(@FluentArg("endpoints")
Collection<Endpoint<E>> endpoints) {
+ public ProcessorFactory<E>
pipeline(@FluentArg("endpoints")Collection<Endpoint<E>> endpoints) {
return addProcessBuilder(new PipelineBuilder<E>(this, endpoints));
}
/**
+ * Creates an [EMAIL PROTECTED] IdempotentConsumer} to avoid duplicate
messages
+ */
+ @Fluent
+ public IdempotentConsumerBuilder<E> idempotentConsumer(
+ @FluentArg("messageIdExpression")Expression<E> messageIdExpression,
+ @FluentArg("MessageIdRepository")MessageIdRepository
messageIdRepository) {
+ return (IdempotentConsumerBuilder<E>) addProcessBuilder(new
IdempotentConsumerBuilder<E>(this, messageIdExpression, messageIdRepository));
+ }
+
+ /**
+ * Creates an [EMAIL PROTECTED] IdempotentConsumer} to avoid duplicate
messages
+ */
+ public IdempotentConsumerBuilder<E>
idempotentConsumer(ExpressionFactory<E> messageIdExpressionFactory,
MessageIdRepository messageIdRepository) {
+ return
idempotentConsumer(messageIdExpressionFactory.createExpression(),
messageIdRepository);
+ }
+
+ /**
* Adds the custom processor to this destination
*/
@Fluent
- public ConstantProcessorBuilder<E> process(@FluentArg("ref") Processor<E>
processor) {
+ public ConstantProcessorBuilder<E> process(@FluentArg("ref")Processor<E>
processor) {
ConstantProcessorBuilder<E> answer = new
ConstantProcessorBuilder<E>(processor);
addProcessBuilder(answer);
return answer;
@@ -142,8 +162,8 @@
*/
@Fluent
public FilterBuilder<E> filter(
- @FluentArg(value="predicate",element=true)
- Predicate<E> predicate) {
+ @FluentArg(value = "predicate", element = true)
+ Predicate<E> predicate) {
FilterBuilder<E> answer = new FilterBuilder<E>(this, predicate);
addProcessBuilder(answer);
return answer;
@@ -154,7 +174,7 @@
*
* @return the builder for a choice expression
*/
- @Fluent(nestedActions=true)
+ @Fluent(nestedActions = true)
public ChoiceBuilder<E> choice() {
ChoiceBuilder<E> answer = new ChoiceBuilder<E>(this);
addProcessBuilder(answer);
@@ -168,8 +188,8 @@
*/
@Fluent
public RecipientListBuilder<E> recipientList(
- @FluentArg(value="recipients",element=true)
- ValueBuilder<E> receipients) {
+ @FluentArg(value = "recipients", element = true)
+ ValueBuilder<E> receipients) {
RecipientListBuilder<E> answer = new RecipientListBuilder<E>(this,
receipients);
addProcessBuilder(answer);
return answer;
@@ -183,7 +203,7 @@
* @return the builder
*/
@Fluent
- public SplitterBuilder<E> splitter(@FluentArg(value="recipients",
element=true) ValueBuilder<E> receipients) {
+ public SplitterBuilder<E> splitter(@FluentArg(value = "recipients",
element = true)ValueBuilder<E> receipients) {
SplitterBuilder<E> answer = new SplitterBuilder<E>(this, receipients);
addProcessBuilder(answer);
return answer;
@@ -196,7 +216,7 @@
* @return the current builder with the error handler configured
*/
@Fluent
- public FromBuilder<E> errorHandler(@FluentArg("handler")
ErrorHandlerBuilder errorHandlerBuilder) {
+ public FromBuilder<E>
errorHandler(@FluentArg("handler")ErrorHandlerBuilder errorHandlerBuilder) {
setErrorHandlerBuilder(errorHandlerBuilder);
return this;
}
@@ -208,12 +228,12 @@
* @return the current builder
*/
@Fluent
- public FromBuilder<E> inheritErrorHandler(@FluentArg("condition") boolean
condition) {
+ public FromBuilder<E> inheritErrorHandler(@FluentArg("condition")boolean
condition) {
setInheritErrorHandler(condition);
return this;
}
-
- @Fluent(nestedActions=true)
+
+ @Fluent(nestedActions = true)
public InterceptorBuilder<E> intercept() {
InterceptorBuilder<E> answer = new InterceptorBuilder<E>(this);
addProcessBuilder(answer);
@@ -221,14 +241,13 @@
}
@Fluent
- public InterceptorBuilder<E> intercept(@FluentArg("interceptor")
InterceptorProcessor<E> interceptor) {
+ public InterceptorBuilder<E>
intercept(@FluentArg("interceptor")InterceptorProcessor<E> interceptor) {
InterceptorBuilder<E> answer = new InterceptorBuilder<E>(this);
answer.add(interceptor);
addProcessBuilder(answer);
return answer;
}
-
// Properties
//-------------------------------------------------------------------------
public RouteBuilder<E> getBuilder() {
@@ -274,11 +293,31 @@
*/
protected Processor<E> makeProcessor(ProcessorFactory<E> processFactory)
throws Exception {
Processor<E> processor = processFactory.createProcessor();
+ processor = wrapProcessor(processor);
+ return wrapInErrorHandler(processor);
+ }
+
+ /**
+ * A strategy method to allow newly created processors to be wrapped in an
error handler. This feature
+ * could be disabled for child builders such as [EMAIL PROTECTED]
IdempotentConsumerBuilder} which will rely on the
+ * [EMAIL PROTECTED] FromBuilder} to perform the error handling to avoid
doubly-wrapped processors with 2 nested error handlers
+ */
+ protected Processor<E> wrapInErrorHandler(Processor<E> processor) throws
Exception {
return getErrorHandlerBuilder().createErrorHandler(processor);
}
+ /**
+ * A strategy method which allows derived classes to wrap the child
processor in some kind of interceptor such as
+ * a filter for the [EMAIL PROTECTED] IdempotentConsumerBuilder}.
+ *
+ * @param processor the processor which can be wrapped
+ * @return the original processor or a new wrapped interceptor
+ */
+ protected Processor<E> wrapProcessor(Processor<E> processor) {
+ return processor;
+ }
+
public List<Processor<E>> getProcessors() {
return processors;
}
-
}
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java?view=auto&rev=530102
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Expression;
+import org.apache.camel.processor.idempotent.IdempotentConsumer;
+import org.apache.camel.processor.idempotent.MessageIdRepository;
+
+/**
+ * A builder of an [EMAIL PROTECTED] IdempotentConsumer}
+ *
+ * @version $Revision: 1.1 $
+ */
+public class IdempotentConsumerBuilder<E extends Exchange> extends
FromBuilder<E> implements ProcessorFactory<E> {
+ private final Expression<E> messageIdExpression;
+ private final MessageIdRepository messageIdRegistry;
+
+ public IdempotentConsumerBuilder(FromBuilder<E> fromBuilder, Expression<E>
messageIdExpression, MessageIdRepository messageIdRegistry) {
+ super(fromBuilder);
+ this.messageIdRegistry = messageIdRegistry;
+ this.messageIdExpression = messageIdExpression;
+ }
+
+ // Properties
+ //-------------------------------------------------------------------------
+ public MessageIdRepository getMessageIdRegistry() {
+ return messageIdRegistry;
+ }
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+ @Override
+ protected Processor<E> wrapInErrorHandler(Processor<E> processor) throws
Exception {
+ // lets do no wrapping in error handlers as the parent FromBuilder
will do that
+ return processor;
+ }
+
+ @Override
+ protected Processor<E> wrapProcessor(Processor<E> processor) {
+ return new IdempotentConsumer<E>(messageIdExpression,
messageIdRegistry, processor);
+ }
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RecipientListBuilder.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RecipientListBuilder.java?view=diff&rev=530102&r1=530101&r2=530102
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RecipientListBuilder.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RecipientListBuilder.java
Wed Apr 18 10:45:05 2007
@@ -28,15 +28,15 @@
* @version $Revision$
*/
public class RecipientListBuilder<E extends Exchange> extends
BuilderSupport<E> implements ProcessorFactory<E> {
- private final ValueBuilder<E> valueBuilder;
+ private final ExpressionFactory<E> expressionFactory;
- public RecipientListBuilder(FromBuilder<E> parent, ValueBuilder<E>
valueBuilder) {
+ public RecipientListBuilder(FromBuilder<E> parent, ExpressionFactory<E>
expressionFactory) {
super(parent);
- this.valueBuilder = valueBuilder;
+ this.expressionFactory = expressionFactory;
}
public Processor<E> createProcessor() {
- Expression<E> expression = valueBuilder.getExpression();
+ Expression<E> expression = expressionFactory.createExpression();
return new RecipientList<E>(expression);
}
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java?view=diff&rev=530102&r1=530101&r2=530102
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
Wed Apr 18 10:45:05 2007
@@ -29,17 +29,17 @@
* @version $Revision$
*/
public class SplitterBuilder<E extends Exchange> extends FromBuilder<E> {
- private final ValueBuilder<E> valueBuilder;
+ private final ExpressionFactory<E> expressionFactory;
- public SplitterBuilder(FromBuilder<E> parent, ValueBuilder<E>
valueBuilder) {
+ public SplitterBuilder(FromBuilder<E> parent, ExpressionFactory<E>
expressionFactory) {
super(parent);
- this.valueBuilder = valueBuilder;
+ this.expressionFactory = expressionFactory;
}
public Processor<E> createProcessor() throws Exception {
// lets create a single processor for all child predicates
Processor<E> destination = super.createProcessor();
- Expression<E> expression = valueBuilder.getExpression();
+ Expression<E> expression = expressionFactory.createExpression();
return new Splitter<E>(destination, expression);
}
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ValueBuilder.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ValueBuilder.java?view=diff&rev=530102&r1=530101&r2=530102
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ValueBuilder.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ValueBuilder.java
Wed Apr 18 10:45:05 2007
@@ -25,7 +25,7 @@
*
* @version $Revision: $
*/
-public class ValueBuilder<E extends Exchange> {
+public class ValueBuilder<E extends Exchange> implements ExpressionFactory<E> {
private Expression<E> expression;
public ValueBuilder(Expression<E> expression) {
@@ -36,6 +36,11 @@
return expression;
}
+
+ public Expression<E> createExpression() {
+ return expression;
+ }
+
@Fluent
public Predicate<E> isNotEqualTo(@FluentArg("value") Object value) {
Expression<E> right = ExpressionBuilder.constantExpression(value);
@@ -124,4 +129,5 @@
public ValueBuilder<E> convertToString() {
return convertTo(String.class);
}
+
}
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?view=auto&rev=530102
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Processor;
+import org.apache.camel.util.ExpressionHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An implementation of the
+ * <a
href="http://activemq.apache.org/camel/idempotent-consumer.html">Idempotent
Consumer</a> pattern.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class IdempotentConsumer<E extends Exchange> implements Processor<E> {
+ private static final transient Log log =
LogFactory.getLog(IdempotentConsumer.class);
+ private Expression<E> messageIdExpression;
+ private Processor<E> nextProcessor;
+ private MessageIdRepository messageIdRepository;
+
+ public IdempotentConsumer(Expression<E> messageIdExpression,
MessageIdRepository messageIdRepository, Processor<E> nextProcessor) {
+ this.messageIdExpression = messageIdExpression;
+ this.messageIdRepository = messageIdRepository;
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public String toString() {
+ return "IdempotentConsumer[expression=" + messageIdExpression + ",
repository=" + messageIdRepository + ", processor=" + nextProcessor + "]";
+ }
+
+ public void process(E exchange) {
+ String messageId =
ExpressionHelper.evaluateAsString(messageIdExpression, exchange);
+ if (messageId == null) {
+ throw new NoMessageIdException(exchange, messageIdExpression);
+ }
+ if (!messageIdRepository.contains(messageId)) {
+ nextProcessor.process(exchange);
+ }
+ else {
+ onDuplicateMessage(exchange, messageId);
+ }
+ }
+
+ // Properties
+ //-------------------------------------------------------------------------
+ public Expression<E> getMessageIdExpression() {
+ return messageIdExpression;
+ }
+
+ public MessageIdRepository getMessageIdRepository() {
+ return messageIdRepository;
+ }
+
+ public Processor<E> getNextProcessor() {
+ return nextProcessor;
+ }
+
+ /**
+ * A strategy method to allow derived classes to overload the behaviour of
processing a duplicate message
+ *
+ * @param exchange the exchange
+ * @param messageId the message ID of this exchange
+ */
+ protected void onDuplicateMessage(E exchange, String messageId) {
+ if (log.isDebugEnabled()) {
+ log.debug("Ignoring duplicate message with id: " + messageId + "
for exchange: " + exchange);
+ }
+ }
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java?view=auto&rev=530102
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import java.util.Set;
+import java.util.HashSet;
+
+/**
+ * A simple memory implementation of [EMAIL PROTECTED] MessageIdRepository};
though warning this could use up lots of RAM!
+ *
+ * @version $Revision: 1.1 $
+ */
+public class MemoryMessageIdRepository implements MessageIdRepository {
+ private Set set;
+
+ /**
+ * Creates a new MemoryMessageIdRepository with a memory based respository
+ */
+ public static MessageIdRepository memoryMessageIdRepository() {
+ return memoryMessageIdRepository(new HashSet());
+ }
+
+ /**
+ * Creates a new MemoryMessageIdRepository using the given [EMAIL
PROTECTED] Set} to use to store the
+ * processed Message ID objects
+ */
+ public static MessageIdRepository memoryMessageIdRepository(Set set) {
+ return new MemoryMessageIdRepository(set);
+ }
+
+ public MemoryMessageIdRepository(Set set) {
+ this.set = set;
+ }
+
+ public boolean contains(String messageId) {
+ synchronized (set) {
+ if (set.contains(messageId)) {
+ return true;
+ }
+ else {
+ set.add(messageId);
+ return false;
+ }
+ }
+ }
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java?view=auto&rev=530102
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+/**
+ * Access to a repository of Message IDs to implement the
+ * <a
href="http://activemq.apache.org/camel/idempotent-consumer.html">Idempotent
Consumer</a> pattern.
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface MessageIdRepository {
+
+ /**
+ * Returns true if this messageId has been processed before
+ * otherwise this messageId is added to the repository and false is
returned.
+ *
+ * @param messageId the String ID of the message
+ * @return true if the message has been processed succesfully before
otherwise false
+ */
+ boolean contains(String messageId);
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java?view=auto&rev=530102
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import org.apache.camel.Expression;
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
+
+/**
+ * An exception thrown if no message ID could be found on a message which is
to be used with the
+ * <a
href="http://activemq.apache.org/camel/idempotent-consumer.html">Idempotent
Consumer</a> pattern.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class NoMessageIdException extends RuntimeCamelException {
+ private final Exchange exchange;
+ private final Expression expression;
+
+ public NoMessageIdException(Exchange exchange, Expression expression) {
+ super("No message ID could be found using expression: " + expression +
" on message exchange: " + exchange);
+ this.exchange = exchange;
+ this.expression = expression;
+ }
+
+ /**
+ * The exchange which caused this failure
+ */
+ public Exchange getExchange() {
+ return exchange;
+ }
+
+ /**
+ * The expression which was used
+ */
+ public Expression getExpression() {
+ return expression;
+ }
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html?view=auto&rev=530102
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,25 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+An implementation of the <a
href="http://activemq.apache.org/camel/idempotent-consumer.html">Idempotent
Consumer</a> pattern.
+
+</body>
+</html>
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java?view=auto&rev=530102
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+
+/**
+ * A collection of helper methods for working with expressions.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class ExpressionHelper {
+
+ /**
+ * Evaluates the given expression on the exchange as a String value
+ *
+ * @param expression the expression to evaluate
+ * @param exchange the exchange to use to evaluate the expression
+ * @return the result of the evaluation as a string.
+ */
+ public static <E extends Exchange> String evaluateAsString(Expression<E>
expression, E exchange) {
+ return evaluateAsType(expression, exchange, String.class);
+ }
+
+ /**
+ * Evaluates the given expression on the exchange, converting the result
to the given type
+ *
+ * @param expression the expression to evaluate
+ * @param exchange the exchange to use to evaluate the expression
+ * @param resultType the type of the result that is required
+ * @return the result of the evaluation as the specified type.
+ */
+ public static <T, E extends Exchange> T evaluateAsType(Expression<E>
expression, E exchange, Class<T> resultType) {
+ Object value = expression.evaluate(exchange);
+ return exchange.getContext().getTypeConverter().convertTo(resultType,
value);
+ }
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java?view=auto&rev=530102
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
(added)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,24 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class IdempotentConsumerTest {
+}
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java?view=diff&rev=530102&r1=530101&r2=530102
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
Wed Apr 18 10:45:05 2007
@@ -16,9 +16,6 @@
*/
package org.apache.camel.builder;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -32,28 +29,33 @@
import org.apache.camel.processor.RecipientList;
import org.apache.camel.processor.SendProcessor;
import org.apache.camel.processor.Splitter;
+import org.apache.camel.processor.idempotent.IdempotentConsumer;
+import org.apache.camel.processor.idempotent.MemoryMessageIdRepository;
+import static
org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository;
+
+import java.util.ArrayList;
+import java.util.List;
/**
* @version $Revision$
*/
public class RouteBuilderTest extends TestSupport {
-
- protected Processor<Exchange> myProcessor = new MyProcessor();
- protected InterceptorProcessor<Exchange> interceptor1;
- protected InterceptorProcessor<Exchange> interceptor2;
-
- protected RouteBuilder<Exchange> buildSimpleRoute() {
- // START SNIPPET: e1
+ protected Processor<Exchange> myProcessor = new MyProcessor();
+ protected InterceptorProcessor<Exchange> interceptor1;
+ protected InterceptorProcessor<Exchange> interceptor2;
+
+ protected RouteBuilder<Exchange> buildSimpleRoute() {
+ // START SNIPPET: e1
RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
public void configure() {
from("queue:a").to("queue:b");
}
};
// END SNIPPET: e1
- return builder;
- }
+ return builder;
+ }
- public void testSimpleRoute() throws Exception {
+ public void testSimpleRoute() throws Exception {
RouteBuilder<Exchange> builder = buildSimpleRoute();
List<Route<Exchange>> routes = builder.getRouteList();
@@ -69,17 +71,17 @@
}
protected RouteBuilder<Exchange> buildSimpleRouteWithHeaderPredicate() {
- // START SNIPPET: e2
+ // START SNIPPET: e2
RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
public void configure() {
from("queue:a").filter(header("foo").isEqualTo("bar")).to("queue:b");
}
};
// END SNIPPET: e2
- return builder;
- }
+ return builder;
+ }
- public void testSimpleRouteWithHeaderPredicate() throws Exception {
+ public void testSimpleRouteWithHeaderPredicate() throws Exception {
RouteBuilder<Exchange> builder = buildSimpleRouteWithHeaderPredicate();
List<Route<Exchange>> routes = builder.getRouteList();
@@ -92,14 +94,13 @@
Processor processor = getProcessorWithoutErrorHandler(route);
FilterProcessor filterProcessor =
assertIsInstanceOf(FilterProcessor.class, processor);
- SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class,
unwrapErrorHandler(filterProcessor.getProcessor()));
+ SendProcessor sendProcessor =
assertIsInstanceOf(SendProcessor.class,
unwrapErrorHandler(filterProcessor.getProcessor()));
assertEquals("Endpoint URI", "queue:b",
sendProcessor.getDestination().getEndpointUri());
}
}
-
protected RouteBuilder<Exchange> buildSimpleRouteWithChoice() {
- // START SNIPPET: e3
+ // START SNIPPET: e3
RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
public void configure() {
from("queue:a").choice()
@@ -109,8 +110,8 @@
}
};
// END SNIPPET: e3
- return builder;
- }
+ return builder;
+ }
public void testSimpleRouteWithChoice() throws Exception {
RouteBuilder<Exchange> builder = buildSimpleRouteWithChoice();
@@ -139,7 +140,7 @@
}
protected RouteBuilder<Exchange> buildCustomProcessor() {
- // START SNIPPET: e4
+ // START SNIPPET: e4
myProcessor = new Processor<Exchange>() {
public void process(Exchange exchange) {
System.out.println("Called with exchange: " + exchange);
@@ -152,10 +153,10 @@
}
};
// END SNIPPET: e4
- return builder;
- }
+ return builder;
+ }
- public void testCustomProcessor() throws Exception {
+ public void testCustomProcessor() throws Exception {
RouteBuilder<Exchange> builder = buildCustomProcessor();
List<Route<Exchange>> routes = builder.getRouteList();
@@ -170,19 +171,18 @@
}
}
-
- protected RouteBuilder<Exchange> buildCustomProcessorWithFilter() {
- // START SNIPPET: e5
+ protected RouteBuilder<Exchange> buildCustomProcessorWithFilter() {
+ // START SNIPPET: e5
RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
public void configure() {
from("queue:a").filter(header("foo").isEqualTo("bar")).process(myProcessor);
}
};
// END SNIPPET: e5
- return builder;
- }
+ return builder;
+ }
- public void testCustomProcessorWithFilter() throws Exception {
+ public void testCustomProcessorWithFilter() throws Exception {
RouteBuilder<Exchange> builder = buildCustomProcessorWithFilter();
List<Route<Exchange>> routes = builder.getRouteList();
@@ -199,17 +199,16 @@
}
}
-
- protected RouteBuilder<Exchange> buildWireTap() {
- // START SNIPPET: e6
+ protected RouteBuilder<Exchange> buildWireTap() {
+ // START SNIPPET: e6
RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
public void configure() {
from("queue:a").to("queue:tap", "queue:b");
}
};
// END SNIPPET: e6
- return builder;
- }
+ return builder;
+ }
public void testWireTap() throws Exception {
RouteBuilder<Exchange> builder = buildWireTap();
@@ -233,7 +232,7 @@
}
protected RouteBuilder<Exchange> buildRouteWithInterceptor() {
- interceptor1 = new InterceptorProcessor<Exchange>() {
+ interceptor1 = new InterceptorProcessor<Exchange>() {
};
// START SNIPPET: e7
@@ -242,18 +241,18 @@
RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
public void configure() {
from("queue:a")
- .intercept()
- .add(interceptor1)
- .add(interceptor2)
- .target().to("queue:d");
+ .intercept()
+ .add(interceptor1)
+ .add(interceptor2)
+ .target().to("queue:d");
}
};
// END SNIPPET: e7
- return builder;
- }
+ return builder;
+ }
public void testRouteWithInterceptor() throws Exception {
-
+
RouteBuilder<Exchange> builder = buildRouteWithInterceptor();
List<Route<Exchange>> routes = builder.getRouteList();
@@ -274,8 +273,8 @@
}
}
- public void testComplexExpressions() throws Exception {
- // START SNIPPET: e7
+ public void testComplexExpressions() throws Exception {
+ // START SNIPPET: e7
RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
public void configure() {
from("queue:a").filter(header("foo").isEqualTo(123)).to("queue:b");
@@ -341,6 +340,7 @@
RecipientList<Exchange> p1 =
assertIsInstanceOf(RecipientList.class, processor);
}
}
+
protected RouteBuilder<Exchange> buildSplitter() {
// START SNIPPET: splitter
RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
@@ -366,6 +366,41 @@
Processor processor = getProcessorWithoutErrorHandler(route);
Splitter<Exchange> p1 = assertIsInstanceOf(Splitter.class,
processor);
+ }
+ }
+
+ protected RouteBuilder<Exchange> buildIdempotentConsumer() {
+ // START SNIPPET: idempotent
+ RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
+ public void configure() {
+ from("queue:a").idempotentConsumer(header("myMessageId"),
memoryMessageIdRepository()).to("queue:b");
+ }
+ };
+ // END SNIPPET: idempotent
+ return builder;
+ }
+
+ public void testIdempotentConsumer() throws Exception {
+
+ RouteBuilder<Exchange> builder = buildIdempotentConsumer();
+
+ List<Route<Exchange>> routes = builder.getRouteList();
+ System.out.println("Created routes: " + routes);
+
+ assertEquals("Number routes created", 1, routes.size());
+ for (Route<Exchange> route : routes) {
+ Endpoint<Exchange> key = route.getEndpoint();
+ assertEquals("From endpoint", "queue:a", key.getEndpointUri());
+ Processor processor = getProcessorWithoutErrorHandler(route);
+
+ IdempotentConsumer<Exchange> idempotentConsumer =
assertIsInstanceOf(IdempotentConsumer.class, processor);
+
+ assertEquals("messageIdExpression", "header(myMessageId)",
idempotentConsumer.getMessageIdExpression().toString());
+
+ assertIsInstanceOf(MemoryMessageIdRepository.class,
idempotentConsumer.getMessageIdRepository());
+
+ SendProcessor sendProcessor =
assertIsInstanceOf(SendProcessor.class, idempotentConsumer.getNextProcessor());
+ assertEquals("Endpoint URI", "queue:b",
sendProcessor.getDestination().getEndpointUri());
}
}