Author: davsclaus
Date: Mon Dec 1 07:00:10 2008
New Revision: 722088
URL: http://svn.apache.org/viewvc?rev=722088&view=rev
Log:
CAMEL-1099: Introduced IdempotentRepository in the spi package.
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
(contents, props changed)
- copied, changed from r722000,
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
(contents, props changed)
- copied, changed from r722000,
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
Removed:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
activemq/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/component/test/TestEndpointTest.java
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/component/test/TestEndpointTest-context.xml
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java?rev=722088&r1=722087&r2=722088&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java
Mon Dec 1 07:00:10 2008
@@ -25,7 +25,7 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;
-import org.apache.camel.processor.idempotent.MessageIdRepository;
+import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.util.ObjectHelper;
/**
@@ -71,7 +71,7 @@
// lookup idempotent repository in registry if provided
String ref = getAndRemoveParameter(parameters,
"idempotentRepositoryRef", String.class);
if (ref != null) {
- MessageIdRepository repository = mandatoryLookup(ref,
MessageIdRepository.class);
+ IdempotentRepository repository = mandatoryLookup(ref,
IdempotentRepository.class);
result.setIdempotentRepository(repository);
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=722088&r1=722087&r2=722088&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
Mon Dec 1 07:00:10 2008
@@ -247,8 +247,7 @@
* @return true to include the file, false to skip it
*/
protected boolean validateFile(File file) {
- // NOTE: contains will add if we had a miss
- if (endpoint.isIdempotent() &&
endpoint.getIdempotentRepository().contains(file.getName())) {
+ if (endpoint.isIdempotent() &&
!endpoint.getIdempotentRepository().add(file.getName())) {
// skip as we have already processed it
return false;
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java?rev=722088&r1=722087&r2=722088&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
Mon Dec 1 07:00:10 2008
@@ -33,8 +33,8 @@
import org.apache.camel.Producer;
import org.apache.camel.impl.ScheduledPollEndpoint;
import org.apache.camel.language.simple.FileLanguage;
-import org.apache.camel.processor.idempotent.MemoryMessageIdRepository;
-import org.apache.camel.processor.idempotent.MessageIdRepository;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.util.FactoryFinder;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.UuidGenerator;
@@ -71,7 +71,7 @@
private Expression expression;
private String tempPrefix;
private boolean idempotent;
- private MessageIdRepository idempotentRepository;
+ private IdempotentRepository idempotentRepository;
private FileFilter filter;
private Comparator<File> fileSorter;
private Comparator<FileExchange> exchangeSorter;
@@ -103,7 +103,7 @@
// if idempotent and no repository set then create a default one
if (isIdempotent() && idempotentRepository == null) {
LOG.info("Using default memory based idempotent repository with
cache max size: " + DEFAULT_IDEMPOTENT_CACHE_SIZE);
- idempotentRepository =
MemoryMessageIdRepository.memoryMessageIdRepository(DEFAULT_IDEMPOTENT_CACHE_SIZE);
+ idempotentRepository =
MemoryIdempotentRepository.memoryIdempotentRepository(DEFAULT_IDEMPOTENT_CACHE_SIZE);
}
configureConsumer(result);
@@ -331,11 +331,11 @@
this.idempotent = idempotent;
}
- public MessageIdRepository getIdempotentRepository() {
+ public IdempotentRepository getIdempotentRepository() {
return idempotentRepository;
}
- public void setIdempotentRepository(MessageIdRepository
idempotentRepository) {
+ public void setIdempotentRepository(IdempotentRepository
idempotentRepository) {
this.idempotentRepository = idempotentRepository;
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerType.java?rev=722088&r1=722087&r2=722088&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerType.java
Mon Dec 1 07:00:10 2008
@@ -25,7 +25,7 @@
import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.processor.idempotent.IdempotentConsumer;
-import org.apache.camel.processor.idempotent.MessageIdRepository;
+import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.spi.RouteContext;
/**
@@ -39,14 +39,14 @@
@XmlAttribute
private String messageIdRepositoryRef;
@XmlTransient
- private MessageIdRepository messageIdRepository;
+ private IdempotentRepository idempotentRepository;
public IdempotentConsumerType() {
}
- public IdempotentConsumerType(Expression messageIdExpression,
MessageIdRepository messageIdRepository) {
+ public IdempotentConsumerType(Expression messageIdExpression,
IdempotentRepository idempotentRepository) {
super(messageIdExpression);
- this.messageIdRepository = messageIdRepository;
+ this.idempotentRepository = idempotentRepository;
}
@Override
@@ -67,32 +67,32 @@
this.messageIdRepositoryRef = messageIdRepositoryRef;
}
- public MessageIdRepository getMessageIdRepository() {
- return messageIdRepository;
+ public IdempotentRepository getMessageIdRepository() {
+ return idempotentRepository;
}
- public void setMessageIdRepository(MessageIdRepository
messageIdRepository) {
- this.messageIdRepository = messageIdRepository;
+ public void setMessageIdRepository(IdempotentRepository
idempotentRepository) {
+ this.idempotentRepository = idempotentRepository;
}
@Override
public Processor createProcessor(RouteContext routeContext) throws
Exception {
Processor childProcessor = routeContext.createProcessor(this);
- MessageIdRepository messageIdRepository =
resolveMessageIdRepository(routeContext);
- return new
IdempotentConsumer(getExpression().createExpression(routeContext),
messageIdRepository,
+ IdempotentRepository idempotentRepository =
resolveMessageIdRepository(routeContext);
+ return new
IdempotentConsumer(getExpression().createExpression(routeContext),
idempotentRepository,
childProcessor);
}
/**
- * Strategy method to resolve the [EMAIL PROTECTED]
org.apache.camel.processor.idempotent.MessageIdRepository} to use
+ * Strategy method to resolve the [EMAIL PROTECTED]
org.apache.camel.spi.IdempotentRepository} to use
*
* @param routeContext route context
* @return the repository
*/
- protected MessageIdRepository resolveMessageIdRepository(RouteContext
routeContext) {
- if (messageIdRepository == null) {
- messageIdRepository = routeContext.lookup(messageIdRepositoryRef,
MessageIdRepository.class);
+ protected IdempotentRepository resolveMessageIdRepository(RouteContext
routeContext) {
+ if (idempotentRepository == null) {
+ idempotentRepository = routeContext.lookup(messageIdRepositoryRef,
IdempotentRepository.class);
}
- return messageIdRepository;
+ return idempotentRepository;
}
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=722088&r1=722087&r2=722088&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
Mon Dec 1 07:00:10 2008
@@ -25,7 +25,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
-
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -54,9 +53,9 @@
import org.apache.camel.processor.Pipeline;
import org.apache.camel.processor.aggregate.AggregationCollection;
import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.processor.idempotent.MessageIdRepository;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.ErrorHandlerWrappingStrategy;
+import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.Policy;
import org.apache.camel.spi.RouteContext;
@@ -311,12 +310,12 @@
* to avoid duplicate messages
*
* @param messageIdExpression expression to test of duplicate messages
- * @param messageIdRepository the repository to use for duplicate chedck
+ * @param idempotentRepository the repository to use for duplicate chedck
* @return the builder
*/
public IdempotentConsumerType idempotentConsumer(Expression
messageIdExpression,
- MessageIdRepository messageIdRepository) {
- IdempotentConsumerType answer = new
IdempotentConsumerType(messageIdExpression, messageIdRepository);
+ IdempotentRepository idempotentRepository) {
+ IdempotentConsumerType answer = new
IdempotentConsumerType(messageIdExpression, idempotentRepository);
addOutput(answer);
return answer;
}
@@ -326,12 +325,12 @@
* Creates an [EMAIL PROTECTED]
org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
* to avoid duplicate messages
*
- * @param messageIdRepository the repository to use for duplicate chedck
+ * @param idempotentRepository the repository to use for duplicate chedck
* @return the builder used to create the expression
*/
- public ExpressionClause<IdempotentConsumerType>
idempotentConsumer(MessageIdRepository messageIdRepository) {
+ public ExpressionClause<IdempotentConsumerType>
idempotentConsumer(IdempotentRepository idempotentRepository) {
IdempotentConsumerType answer = new IdempotentConsumerType();
- answer.setMessageIdRepository(messageIdRepository);
+ answer.setMessageIdRepository(idempotentRepository);
addOutput(answer);
return ExpressionClause.createAndSetExpression(answer);
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=722088&r1=722087&r2=722088&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
Mon Dec 1 07:00:10 2008
@@ -20,6 +20,7 @@
import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.util.ExpressionHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
@@ -36,18 +37,18 @@
private static final transient Log LOG =
LogFactory.getLog(IdempotentConsumer.class);
private Expression messageIdExpression;
private Processor nextProcessor;
- private MessageIdRepository messageIdRepository;
+ private IdempotentRepository idempotentRepository;
public IdempotentConsumer(Expression messageIdExpression,
- MessageIdRepository messageIdRepository, Processor nextProcessor) {
+ IdempotentRepository idempotentRepository, Processor
nextProcessor) {
this.messageIdExpression = messageIdExpression;
- this.messageIdRepository = messageIdRepository;
+ this.idempotentRepository = idempotentRepository;
this.nextProcessor = nextProcessor;
}
@Override
public String toString() {
- return "IdempotentConsumer[expression=" + messageIdExpression + ",
repository=" + messageIdRepository
+ return "IdempotentConsumer[expression=" + messageIdExpression + ",
repository=" + idempotentRepository
+ ", processor=" + nextProcessor + "]";
}
@@ -56,7 +57,7 @@
if (messageId == null) {
throw new NoMessageIdException(exchange, messageIdExpression);
}
- if (!messageIdRepository.contains(messageId)) {
+ if (idempotentRepository.add(messageId)) {
nextProcessor.process(exchange);
} else {
onDuplicateMessage(exchange, messageId);
@@ -69,8 +70,8 @@
return messageIdExpression;
}
- public MessageIdRepository getMessageIdRepository() {
- return messageIdRepository;
+ public IdempotentRepository getIdempotentRepository() {
+ return idempotentRepository;
}
public Processor getNextProcessor() {
Copied:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
(from r722000,
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java?p2=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java&p1=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java&r1=722000&r2=722088&rev=722088&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
Mon Dec 1 07:00:10 2008
@@ -16,61 +16,70 @@
*/
package org.apache.camel.processor.idempotent;
-import java.util.HashMap;
import java.util.Map;
+import org.apache.camel.Service;
+import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.util.LRUCache;
/**
- * A memory based implementation of [EMAIL PROTECTED] MessageIdRepository}.
Care should be
- * taken to use a suitable underlying [EMAIL PROTECTED] Map} to avoid this
class being a
- * memory leak
- *
+ * A memory based implementation of [EMAIL PROTECTED]
org.apache.camel.spi.IdempotentRepository}.
+ * <p/>
+ * Care should be taken to use a suitable underlying [EMAIL PROTECTED] Map} to
avoid this class being a
+ * memory leak.
+ *
* @version $Revision$
*/
-public class MemoryMessageIdRepository implements MessageIdRepository {
- private final Map cache;
+public class MemoryIdempotentRepository implements
IdempotentRepository<String> {
- public MemoryMessageIdRepository(Map set) {
+ private final Map<String, Object> cache;
+
+ public MemoryIdempotentRepository(Map<String, Object> set) {
this.cache = set;
}
/**
- * Creates a new MemoryMessageIdRepository with a memory based repository.
- * <b>Warning</b> this method should only really be used for testing as it
- * will involve keeping all message IDs in RAM.
+ * Creates a new memory based repository using a [EMAIL PROTECTED]
LRUCache}
+ * with a default of 1000 entries in the cache.
*/
- public static MessageIdRepository memoryMessageIdRepository() {
- return memoryMessageIdRepository(new HashMap());
+ public static IdempotentRepository memoryIdempotentRepository() {
+ return memoryIdempotentRepository(1000);
}
/**
- * Creates a new MemoryMessageIdRepository with a memory based repository.
- * <b>Warning</b> this method should only really be used for testing as it
- * will involve keeping all message IDs in RAM.
+ * Creates a new memory based repository using a [EMAIL PROTECTED]
LRUCache}.
+ *
+ * @param cacheSize the cache size
*/
- public static MessageIdRepository memoryMessageIdRepository(int cacheSize)
{
- return memoryMessageIdRepository(new LRUCache(cacheSize));
+ public static IdempotentRepository memoryIdempotentRepository(int
cacheSize) {
+ return memoryIdempotentRepository(new LRUCache<String,
Object>(cacheSize));
}
/**
- * Creates a new MemoryMessageIdRepository using the given [EMAIL
PROTECTED] Map} to
- * use to store the processed Message ID objects. Warning be careful of the
- * implementation of Map you use as if you are not careful it could be a
- * memory leak.
+ * Creates a new memory based repository using the given [EMAIL PROTECTED]
Map} to
+ * use to store the processed message ids.
+ * <p/>
+ * Care should be taken to use a suitable underlying [EMAIL PROTECTED]
Map} to avoid this class being a
+ * memory leak.
*/
- public static MessageIdRepository memoryMessageIdRepository(Map cache) {
- return new MemoryMessageIdRepository(cache);
+ public static IdempotentRepository memoryIdempotentRepository(Map<String,
Object> cache) {
+ return new MemoryIdempotentRepository(cache);
}
- public boolean contains(String messageId) {
+ public boolean add(String messageId) {
synchronized (cache) {
if (cache.containsKey(messageId)) {
- return true;
+ return false;
} else {
cache.put(messageId, messageId);
- return false;
+ return true;
}
}
}
+
+ public boolean contains(String key) {
+ synchronized (cache) {
+ return cache.containsKey(key);
+ }
+ }
}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
(from r722000,
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java?p2=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java&p1=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java&r1=722000&r2=722088&rev=722088&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
Mon Dec 1 07:00:10 2008
@@ -14,22 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.processor.idempotent;
+package org.apache.camel.spi;
/**
* Access to a repository of Message IDs to implement the
* <a
href="http://activemq.apache.org/camel/idempotent-consumer.html">Idempotent
Consumer</a> pattern.
+ * <p/>
+ * The <tt>add</tt> and <tt>contains</tt> methods is operating according to
the [EMAIL PROTECTED] java.util.Set} contract.
*
* @version $Revision$
*/
-public interface MessageIdRepository {
+public interface IdempotentRepository<E> {
/**
- * Returns true if this messageId has been processed before
- * otherwise this messageId is added to the repository and false is
returned.
+ * Adds the key to the repository.
*
- * @param messageId the String ID of the message
- * @return true if the message has been processed succesfully before
otherwise false
+ * @param key the key of the message for duplicate test
+ * @return <tt>true</tt> if this repository did <b>not</b> already contain
the specified element
*/
- boolean contains(String messageId);
+ boolean add(E key);
+
+ /**
+ * Returns <tt>true</tt> if this repository contains the specified element.
+ *
+ * @param key the key of the message
+ * @return <tt>true</tt> if this repository contains the specified element
+ */
+ boolean contains(E key);
}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java?rev=722088&r1=722087&r2=722088&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
Mon Dec 1 07:00:10 2008
@@ -38,9 +38,7 @@
import org.apache.camel.processor.SendProcessor;
import org.apache.camel.processor.Splitter;
import org.apache.camel.processor.idempotent.IdempotentConsumer;
-import org.apache.camel.processor.idempotent.MemoryMessageIdRepository;
-
-import static
org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
/**
* @version $Revision$
@@ -412,7 +410,7 @@
// START SNIPPET: idempotent
RouteBuilder builder = new RouteBuilder() {
public void configure() {
- from("seda:a").idempotentConsumer(header("myMessageId"),
memoryMessageIdRepository(200))
+ from("seda:a").idempotentConsumer(header("myMessageId"),
MemoryIdempotentRepository.memoryIdempotentRepository(200))
.to("seda:b");
}
};
@@ -441,7 +439,7 @@
assertEquals("messageIdExpression", "header(myMessageId)",
idempotentConsumer
.getMessageIdExpression().toString());
- assertIsInstanceOf(MemoryMessageIdRepository.class,
idempotentConsumer.getMessageIdRepository());
+ assertIsInstanceOf(MemoryIdempotentRepository.class,
idempotentConsumer.getIdempotentRepository());
SendProcessor sendProcessor =
assertIsInstanceOf(SendProcessor.class,
unwrapErrorHandler(idempotentConsumer
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java?rev=722088&r1=722087&r2=722088&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
Mon Dec 1 07:00:10 2008
@@ -22,7 +22,7 @@
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.JndiRegistry;
-import org.apache.camel.processor.idempotent.MessageIdRepository;
+import org.apache.camel.spi.IdempotentRepository;
/**
* Unit test for the idempotentRepositoryRef option.
@@ -81,14 +81,18 @@
assertTrue("MyIdempotentRepository should have been invoked", invoked);
}
- public class MyIdempotentRepository implements MessageIdRepository {
+ public class MyIdempotentRepository implements
IdempotentRepository<String> {
- public boolean contains(String messageId) {
- // will return false 1st time, and true 2nd time
+ public boolean add(String messageId) {
+ // will return true 1st time, and false 2nd time
boolean result = invoked;
invoked = true;
assertEquals("report.txt", messageId);
- return result;
+ return !result;
+ }
+
+ public boolean contains(String key) {
+ return false;
}
}
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java?rev=722088&r1=722087&r2=722088&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
Mon Dec 1 07:00:10 2008
@@ -24,7 +24,7 @@
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import static
org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
/**
* @version $Revision$
@@ -69,7 +69,7 @@
return new RouteBuilder() {
public void configure() {
from("direct:start").idempotentConsumer(
- header("messageId"), memoryMessageIdRepository(200)
+ header("messageId"),
MemoryIdempotentRepository.memoryIdempotentRepository(200)
).to("mock:result");
}
};
Modified:
activemq/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java?rev=722088&r1=722087&r2=722088&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
(original)
+++
activemq/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
Mon Dec 1 07:00:10 2008
@@ -17,12 +17,10 @@
package org.apache.camel.processor.idempotent.jpa;
import java.util.List;
-
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
-import org.apache.camel.processor.idempotent.MessageIdRepository;
-
+import org.apache.camel.spi.IdempotentRepository;
import org.springframework.orm.jpa.JpaTemplate;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.transaction.TransactionDefinition;
@@ -33,7 +31,7 @@
/**
* @version $Revision$
*/
-public class JpaMessageIdRepository implements MessageIdRepository {
+public class JpaMessageIdRepository implements IdempotentRepository<String> {
protected static final String QUERY_STRING = "select x from " +
MessageProcessed.class.getName() + " x where x.processorName = ?1 and
x.messageId = ?2";
private JpaTemplate jpaTemplate;
private String processorName;
@@ -65,11 +63,10 @@
return transactionTemplate;
}
- public boolean contains(final String messageId) {
+ public boolean add(final String messageId) {
// Run this in single transaction.
Boolean rc = (Boolean)transactionTemplate.execute(new
TransactionCallback() {
public Object doInTransaction(TransactionStatus arg0) {
-
List list = jpaTemplate.find(QUERY_STRING, processorName,
messageId);
if (list.isEmpty()) {
MessageProcessed processed = new MessageProcessed();
@@ -77,6 +74,21 @@
processed.setMessageId(messageId);
jpaTemplate.persist(processed);
jpaTemplate.flush();
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+ }
+ });
+ return rc.booleanValue();
+ }
+
+ public boolean contains(final String messageId) {
+ // Run this in single transaction.
+ Boolean rc = (Boolean)transactionTemplate.execute(new
TransactionCallback() {
+ public Object doInTransaction(TransactionStatus arg0) {
+ List list = jpaTemplate.find(QUERY_STRING, processorName,
messageId);
+ if (list.isEmpty()) {
return Boolean.FALSE;
} else {
return Boolean.TRUE;
@@ -85,4 +97,5 @@
});
return rc.booleanValue();
}
+
}
Modified:
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/component/test/TestEndpointTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/component/test/TestEndpointTest.java?rev=722088&r1=722087&r2=722088&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/component/test/TestEndpointTest.java
(original)
+++
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/component/test/TestEndpointTest.java
Mon Dec 1 07:00:10 2008
@@ -39,7 +39,7 @@
@Autowired
protected CamelContext camelContext;
- @EndpointInject(uri =
"test:file://src/test/data?noop=true&consumer.recursive=true&consumer.delay=2000")
+ @EndpointInject(uri =
"test:file://src/test/data?noop=true&idempotent=true&consumer.recursive=true&consumer.delay=2000")
protected TestEndpoint endpoint;
public void testMocksAreValid() throws Exception {
Modified:
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/component/test/TestEndpointTest-context.xml
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/component/test/TestEndpointTest-context.xml?rev=722088&r1=722087&r2=722088&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/component/test/TestEndpointTest-context.xml
(original)
+++
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/component/test/TestEndpointTest-context.xml
Mon Dec 1 07:00:10 2008
@@ -25,8 +25,8 @@
<!-- START SNIPPET: example -->
<camelContext xmlns="http://activemq.apache.org/camel/schema/spring">
<route>
- <from
uri="file://src/test/data?noop=true&consumer.recursive=true&consumer.delay=2000"/>
- <to
uri="test:file://src/test/data?noop=true&consumer.recursive=true&consumer.delay=2000"/>
+ <from
uri="file://src/test/data?noop=true&idempotent=true&consumer.recursive=true&consumer.delay=2000"/>
+ <to
uri="test:file://src/test/data?noop=true&idempotent=true&consumer.recursive=true&consumer.delay=2000"/>
</route>
</camelContext>
<!-- END SNIPPET: example -->