[ 
https://issues.apache.org/jira/browse/CAMEL-12041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16288989#comment-16288989
 ] 

ASF GitHub Bot commented on CAMEL-12041:
----------------------------------------

davsclaus closed pull request #2127: CAMEL-12041 Introduce 
ConcurrentMapIdempotentRepository and MapIdempotentRepository
URL: https://github.com/apache/camel/pull/2127
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/idempotent/ConcurrentMapIdempotentRepository.java
 
b/camel-core/src/main/java/org/apache/camel/processor/idempotent/ConcurrentMapIdempotentRepository.java
new file mode 100644
index 00000000000..07257accd10
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/idempotent/ConcurrentMapIdempotentRepository.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.support.ServiceSupport;
+
+/**
+ * A {@link ConcurrentMap}-based implementation of {@link 
IdempotentRepository}.
+ * <p/>
+ * Use this class to interface onto your favourite data-fabric.
+ */
+@ManagedResource(description = "ConcurrentMap based idempotent repository")
+public class ConcurrentMapIdempotentRepository extends ServiceSupport 
implements IdempotentRepository<String> {
+    private final ConcurrentMap<String, Object> cache;
+
+    public ConcurrentMapIdempotentRepository(ConcurrentMap<String, Object> 
cache) {
+        this.cache = cache;
+    }
+
+    @ManagedOperation(description = "Adds the key to the store")
+    @Override
+    public boolean add(String key) {
+        return cache.putIfAbsent(key, key) == null;
+    }
+
+    @ManagedOperation(description = "Does the store contain the given key")
+    @Override
+    public boolean contains(String key) {
+        return cache.containsKey(key);
+    }
+
+    @ManagedOperation(description = "Remove the key from the store")
+    @Override
+    public boolean remove(String key) {
+        return cache.remove(key) != null;
+    }
+
+    @Override
+    public boolean confirm(String key) {
+        // noop
+        return true;
+    }
+
+    @ManagedOperation(description = "Clear the store")
+    @Override
+    public void clear() {
+        cache.clear();
+    }
+
+    public ConcurrentMap<String, Object> getCache() {
+        return cache;
+    }
+
+    @ManagedAttribute(description = "The current cache size")
+    public int getCacheSize() {
+        return cache.size();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+    }
+}
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/idempotent/MapIdempotentRepository.java
 
b/camel-core/src/main/java/org/apache/camel/processor/idempotent/MapIdempotentRepository.java
new file mode 100644
index 00000000000..f642c78c7bd
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/idempotent/MapIdempotentRepository.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import java.util.Map;
+
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.support.ServiceSupport;
+
+/**
+ * A {@link Map}-based implementation of {@link IdempotentRepository}.
+ * <p/>
+ * Care should be taken to use a suitable underlying {@link Map} to avoid this 
class being a
+ * memory leak.
+ */
+@ManagedResource(description = "Map based idempotent repository")
+public class MapIdempotentRepository extends ServiceSupport implements 
IdempotentRepository<String>  {
+
+    protected Map<String, Object> cache; // :( should be final
+
+    public MapIdempotentRepository(Map<String, Object> cache) {
+        this.cache = cache;
+    }
+
+    /**
+     * Creates a new {@link Map}-based repository using the given {@link Map} 
to
+     * use to store the processed message ids.
+     * <p/>
+     * Care should be taken to use a suitable underlying {@link Map} to avoid 
this class being a
+     * memory leak.
+     *
+     * @param cache  the cache
+     */
+    public static IdempotentRepository<String> 
mapIdempotentRepository(Map<String, Object> cache) {
+        return new MapIdempotentRepository(cache);
+    }
+
+    @ManagedOperation(description = "Adds the key to the store")
+    public boolean add(String key) {
+        synchronized (cache) {
+            if (cache.containsKey(key)) {
+                return false;
+            } else {
+                cache.put(key, key);
+                return true;
+            }
+        }
+    }
+
+    @ManagedOperation(description = "Does the store contain the given key")
+    public boolean contains(String key) {
+        synchronized (cache) {
+            return cache.containsKey(key);
+        }
+    }
+
+    @ManagedOperation(description = "Remove the key from the store")
+    public boolean remove(String key) {
+        synchronized (cache) {
+            return cache.remove(key) != null;
+        }
+    }
+
+    public boolean confirm(String key) {
+        // noop
+        return true;
+    }
+    
+    @ManagedOperation(description = "Clear the store")
+    public void clear() {
+        synchronized (cache) {
+            cache.clear();
+        }
+    }
+
+    public Map<String, Object> getCache() {
+        return cache;
+    }
+
+    @ManagedAttribute(description = "The current cache size")
+    public int getCacheSize() {
+        return cache.size();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+    }
+}
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
 
b/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
index ffeade52e9c..7703b54a6e1 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
@@ -35,17 +35,17 @@
  * @version 
  */
 @ManagedResource(description = "Memory based idempotent repository")
-public class MemoryIdempotentRepository extends ServiceSupport implements 
IdempotentRepository<String> {
-    private Map<String, Object> cache;
+public class MemoryIdempotentRepository extends MapIdempotentRepository {
+
     private int cacheSize;
 
     @SuppressWarnings("unchecked")
     public MemoryIdempotentRepository() {
-        this.cache = LRUCacheFactory.newLRUCache(1000);
+        super(LRUCacheFactory.newLRUCache(1000));
     }
 
-    public MemoryIdempotentRepository(Map<String, Object> set) {
-        this.cache = set;
+    public MemoryIdempotentRepository(Map<String, Object> cache) {
+        super(cache);
     }
 
     /**
@@ -63,7 +63,7 @@ public MemoryIdempotentRepository(Map<String, Object> set) {
      */
     @SuppressWarnings("unchecked")
     public static IdempotentRepository<String> memoryIdempotentRepository(int 
cacheSize) {
-        return 
memoryIdempotentRepository(LRUCacheFactory.newLRUCache(cacheSize));
+        return new 
MemoryIdempotentRepository(LRUCacheFactory.newLRUCache(cacheSize));
     }
 
     /**
@@ -74,58 +74,13 @@ public MemoryIdempotentRepository(Map<String, Object> set) {
      * memory leak.
      *
      * @param cache  the cache
+     * @deprecated Prefer {@link 
MapIdempotentRepository#mapIdempotentRepository(Map)}
      */
+    @Deprecated
     public static IdempotentRepository<String> 
memoryIdempotentRepository(Map<String, Object> cache) {
         return new MemoryIdempotentRepository(cache);
     }
 
-    @ManagedOperation(description = "Adds the key to the store")
-    public boolean add(String key) {
-        synchronized (cache) {
-            if (cache.containsKey(key)) {
-                return false;
-            } else {
-                cache.put(key, key);
-                return true;
-            }
-        }
-    }
-
-    @ManagedOperation(description = "Does the store contain the given key")
-    public boolean contains(String key) {
-        synchronized (cache) {
-            return cache.containsKey(key);
-        }
-    }
-
-    @ManagedOperation(description = "Remove the key from the store")
-    public boolean remove(String key) {
-        synchronized (cache) {
-            return cache.remove(key) != null;
-        }
-    }
-
-    public boolean confirm(String key) {
-        // noop
-        return true;
-    }
-    
-    @ManagedOperation(description = "Clear the store")
-    public void clear() {
-        synchronized (cache) {
-            cache.clear();
-        }
-    }
-
-    public Map<String, Object> getCache() {
-        return cache;
-    }
-
-    @ManagedAttribute(description = "The current cache size")
-    public int getCacheSize() {
-        return cache.size();
-    }
-
     public void setCacheSize(int cacheSize) {
         this.cacheSize = cacheSize;
     }
@@ -142,4 +97,5 @@ protected void doStart() throws Exception {
     protected void doStop() throws Exception {
         cache.clear();
     }
+
 }
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/idempotent/AbstractIdempotentRepositoryTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/idempotent/AbstractIdempotentRepositoryTest.java
new file mode 100644
index 00000000000..90303076ef1
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/idempotent/AbstractIdempotentRepositoryTest.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import java.util.Map;
+import java.util.function.Function;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.IdempotentRepository;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public abstract class AbstractIdempotentRepositoryTest<I extends 
IdempotentRepository<String>> extends ContextTestSupport {
+
+    protected final I repo;
+
+    protected AbstractIdempotentRepositoryTest(I repo) {
+        this.repo = repo;
+    }
+
+    // Disable auto-starting test routebuilder so we can a route per test
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    /**
+     * Sends messages to the given endpoint for each of the specified bodies
+     *
+     * @param endpointUri the endpoint URI to send to
+     * @param headers     A function to generate additional headers for a 
given body
+     * @param bodies      the bodies to send, one per message
+     */
+    @SafeVarargs
+    protected final <T> void sendBodies(String endpointUri, Function<T, 
Map<String, Object>> headers, T... bodies) {
+        for (T body : bodies) {
+            sendBody(endpointUri, body, headers.apply(body));
+        }
+    }
+
+    @Test
+    public void testDuplicateMessagesAreFiltered() throws Exception {
+        // Given:
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").idempotentConsumer(body(), 
repo).to("mock:result");
+            }
+        });
+        startCamelContext();
+        getMockEndpoint("mock:result").expectedBodiesReceived("a", "b", "c");
+        // When:
+        sendBodies("direct:start", "a", "b", "a", "b", "c");
+        // Then:
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testUnidentifiedMessagesAreFailed() throws Exception {
+        // Given:
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dlq"));
+                from("direct:start").idempotentConsumer(body(), 
repo).to("mock:result");
+            }
+        });
+        startCamelContext();
+        getMockEndpoint("mock:result").expectedBodiesReceived("a", "b", "c");
+        getMockEndpoint("mock:dlq").expectedBodiesReceived((String) null); // 
null idempotent key must fail
+        // When:
+        sendBodies("direct:start", "a", null, "b", "a", "b", "c");
+        // Then:
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testFailedMessagesAreNotRemoved() throws Exception {
+        // Given:
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dlq"));
+                from("direct:start").idempotentConsumer(body(), repo)
+                        .removeOnFailure(false)
+                        
.process().body(AbstractIdempotentRepositoryTest::hateMondays)
+                        .to("mock:result");
+            }
+        });
+        startCamelContext();
+        getMockEndpoint("mock:result").expectedBodiesReceived("tue", "wed");
+        getMockEndpoint("mock:dlq").expectedBodiesReceived("mon"); // "mon" is 
not redelivered so cannot fail twice
+        // When:
+        sendBodies("direct:start", "mon", "tue", "tue", "wed", "mon");
+        // Then:
+        assertMockEndpointsSatisfied();
+        assertThat(repo.contains("mon"), is(true));
+    }
+
+    @Test
+    public void testFailedMessagesAreRemoved() throws Exception {
+        // Given:
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dlq"));
+                from("direct:start").idempotentConsumer(body(), repo)
+                        .removeOnFailure(true) // Default
+                        
.process().body(AbstractIdempotentRepositoryTest::hateMondays)
+                        .to("mock:result");
+            }
+        });
+        startCamelContext();
+        getMockEndpoint("mock:result").expectedBodiesReceived("tue", "wed");
+        getMockEndpoint("mock:dlq").expectedBodiesReceived("mon", "mon"); // 
"mon" is redelivered so fails twice
+        // When:
+        sendBodies("direct:start", "mon", "tue", "tue", "wed", "mon");
+        // Then:
+        assertMockEndpointsSatisfied();
+        assertThat(repo.contains("mon"), is(false));
+    }
+
+    @Test
+    public void testNonEagerCompletionIncludesSubsequentRouting() throws 
Exception {
+        // Given:
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dlq"));
+                from("direct:start").idempotentConsumer(body(), repo)
+                        .completionEager(false) // Default
+                        .to("mock:idem")
+                        .end() // idempotent block ends here
+                        
.process().body(AbstractIdempotentRepositoryTest::hateMondays)
+                        .to("mock:result");
+            }
+        });
+        startCamelContext();
+        getMockEndpoint("mock:result").expectedBodiesReceived("tue", "tue", 
"wed");
+        getMockEndpoint("mock:dlq").expectedBodiesReceived("mon", "mon");
+        // non-eager: "mon" fails AFTER the idem block, is removed, and 
therefore WILL be delivered again
+        getMockEndpoint("mock:idem").expectedBodiesReceived("mon", "tue", 
"wed", "mon");
+        // When:
+        sendBodies("direct:start", "mon", "tue", "tue", "wed", "mon");
+        // Then:
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testEagerCompletionIgnoresSubsequentRouting() throws Exception 
{
+        // Given:
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dlq"));
+                from("direct:start").idempotentConsumer(body(), repo)
+                        .completionEager(true)
+                        .to("mock:idem")
+                        .end() // idempotent block ends here
+                        
.process().body(AbstractIdempotentRepositoryTest::hateMondays)
+                        .to("mock:result");
+            }
+        });
+        startCamelContext();
+        getMockEndpoint("mock:result").expectedBodiesReceived("tue", "tue", 
"wed");
+        getMockEndpoint("mock:dlq").expectedBodiesReceived("mon", "mon");
+        // eager: "mon" fails AFTER it was confirmed by the idem block and 
therefore WON'T be delivered again
+        getMockEndpoint("mock:idem").expectedBodiesReceived("mon", "tue", 
"wed");
+
+        // When:
+        sendBodies("direct:start", "mon", "tue", "tue", "wed", "mon");
+        // Then:
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testRepoCanBeCleared() throws Exception {
+        // Given:
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").idempotentConsumer(body(), 
repo).to("mock:result");
+            }
+        });
+        startCamelContext();
+        getMockEndpoint("mock:result").expectedBodiesReceived("a", "b", "c");
+        sendBodies("direct:start", "a", "b", "a", "b", "c");
+        assertMockEndpointsSatisfied();
+        resetMocks();
+        // When:
+        repo.clear();
+        sendBodies("direct:start", "a", "b", "a", "b", "c");
+        // Then:
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testRepoCanRemoveAnEntry() throws Exception {
+        // Given:
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").idempotentConsumer(body(), 
repo).to("mock:result");
+            }
+        });
+        startCamelContext();
+        getMockEndpoint("mock:result").expectedBodiesReceived("a", "b", "c");
+        sendBodies("direct:start", "a", "b", "a", "b", "c");
+        assertMockEndpointsSatisfied();
+        resetMocks();
+        // When:
+        repo.remove("b");
+        getMockEndpoint("mock:result").expectedBodiesReceived("b");
+        sendBodies("direct:start", "a", "b", "a", "b", "c");
+        // Then:
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testRepoExceptionFailsTheExchange() throws Exception {
+        // Given:
+        IdempotentRepository<String> fragileRepo = spy(repo);
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dlq"));
+                from("direct:start").idempotentConsumer(body(), 
fragileRepo).to("mock:result");
+            }
+        });
+        startCamelContext();
+        when(fragileRepo.add("mon")).thenThrow(new RuntimeException());
+        fragileRepo.remove("mon"); // Dubious but effective use of Mockito spy
+        getMockEndpoint("mock:dlq").expectedBodiesReceived("mon");
+        getMockEndpoint("mock:result").expectedBodiesReceived("thu", "wed", 
"tue");
+        // When:
+        sendBodies("direct:start", "thu", "wed", "tue", "mon", "thu");
+        // Then:
+        assertMockEndpointsSatisfied();
+        assertThat(repo.contains("mon"), is(false));
+    }
+
+    @Test
+    public void testNotSkippingDuplicates() throws Exception {
+        // Given:
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").idempotentConsumer(body(), repo)
+                        .skipDuplicate(false)
+                        .filter(exchangeProperty(Exchange.DUPLICATE_MESSAGE))
+                        .to("mock:dups")
+                        .stop().end()
+                        .to("mock:result");
+            }
+        });
+        startCamelContext();
+        getMockEndpoint("mock:result").expectedBodiesReceived("a", "b", "c");
+        getMockEndpoint("mock:dups").expectedBodiesReceived("a", "b");
+        // When:
+        sendBodies("direct:start", "a", "b", "a", "b", "c");
+        // Then:
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test // TODO This isn't a very good test of the _impact_ of eagerness
+    public void testEagerlyAddedToRepo() throws Exception {
+        // Given:
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").idempotentConsumer(body(), repo)
+                        .eager(true) // default
+                        .process().body(b -> assertThat(repo.contains((String) 
b), is(true)))
+                        .to("mock:result");
+            }
+        });
+        startCamelContext();
+        getMockEndpoint("mock:result").expectedBodiesReceived("a", "b", "c");
+        // When:
+        sendBodies("direct:start", "a", "b", "a", "b", "c");
+        // Then:
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test // TODO This isn't a very good test of the _impact_ of eagerness
+    public void testNotEagerlyAddedAddedToRepo() throws Exception {
+        // Given:
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").idempotentConsumer(body(), repo)
+                        .eager(false)
+                        .process().body(b -> assertThat(repo.contains((String) 
b), is(false)))
+                        .to("mock:result");
+            }
+        });
+        startCamelContext();
+        getMockEndpoint("mock:result").expectedBodiesReceived("a", "b", "c");
+        // When:
+        sendBodies("direct:start", "a", "b", "a", "b", "c");
+        // Then:
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testFailedMessagesAreRemovedNonEager() throws Exception {
+        // Given:
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dlq"));
+                from("direct:start").idempotentConsumer(body(), repo)
+                        .removeOnFailure(true) // Default
+                        .eager(false)
+                        
.process().body(AbstractIdempotentRepositoryTest::hateMondays)
+                        .to("mock:result");
+            }
+        });
+        startCamelContext();
+        getMockEndpoint("mock:result").expectedBodiesReceived("tue", "wed");
+        getMockEndpoint("mock:dlq").expectedBodiesReceived("mon", "mon"); // 
"mon" is redelivered so fails twice
+        // When:
+        sendBodies("direct:start", "mon", "tue", "tue", "wed", "mon");
+        // Then:
+        assertMockEndpointsSatisfied();
+        assertThat(repo.contains("mon"), is(false));
+    }
+
+    @Test
+    public void testThreads() throws Exception {
+        // Given:
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").idempotentConsumer(body(), repo)
+                        .threads()
+                        .to("mock:result");
+            }
+        });
+        startCamelContext();
+        getMockEndpoint("mock:result").expectedBodiesReceived("a", "b", "c");
+        // When:
+        sendBodies("direct:start", "a", "b", "a", "b", "c");
+        // Then:
+        assertMockEndpointsSatisfied();
+    }
+
+    private static Object hateMondays(Object body) {
+        if ("mon".equals(body.toString())) {
+            throw new RuntimeException();
+        }
+        return body;
+    }
+
+}
\ No newline at end of file
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/idempotent/ConcurrentMapIdempotentRepositoryTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/idempotent/ConcurrentMapIdempotentRepositoryTest.java
new file mode 100644
index 00000000000..4a71538be50
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/idempotent/ConcurrentMapIdempotentRepositoryTest.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ConcurrentMapIdempotentRepositoryTest extends 
AbstractIdempotentRepositoryTest<ConcurrentMapIdempotentRepository> {
+
+    public ConcurrentMapIdempotentRepositoryTest() {
+        super(createRepo());
+    }
+
+    private static ConcurrentMapIdempotentRepository createRepo() {
+        return new ConcurrentMapIdempotentRepository(new 
ConcurrentHashMap<>());
+    }
+
+}
\ No newline at end of file
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/idempotent/FileIdempotentRepositoryTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/idempotent/FileIdempotentRepositoryTest.java
new file mode 100644
index 00000000000..1696fc3b048
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/idempotent/FileIdempotentRepositoryTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+import static 
org.apache.camel.processor.idempotent.FileIdempotentRepository.fileIdempotentRepository;
+
+public class FileIdempotentRepositoryTest extends 
AbstractIdempotentRepositoryTest<FileIdempotentRepository> {
+
+    public FileIdempotentRepositoryTest() throws IOException {
+        super(createRepo());
+    }
+
+    private static FileIdempotentRepository createRepo() throws IOException {
+        Path file = Files.createTempFile(Paths.get("target"), 
FileIdempotentRepositoryTest.class.getSimpleName(), "dat");
+        return (FileIdempotentRepository) 
fileIdempotentRepository(file.toFile());
+    }
+
+    @Test
+    public void testLoadsPrevEntries() throws Exception {
+        // Given:
+        Files.write(repo.getFileStore().toPath(), "a\nb\n".getBytes()); // 
Should use DSL, not knowledge of the file format
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").idempotentConsumer(body(), 
repo).to("mock:result");
+            }
+        });
+        startCamelContext();
+        getMockEndpoint("mock:result").expectedBodiesReceived("c");
+        // When:
+        sendBodies("direct:start", "a", "b", "a", "b", "c");
+        // Then:
+        assertMockEndpointsSatisfied();
+    }
+
+}
\ No newline at end of file
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/idempotent/MapIdempotentRepositoryTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/idempotent/MapIdempotentRepositoryTest.java
new file mode 100644
index 00000000000..cf4f859821d
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/idempotent/MapIdempotentRepositoryTest.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import java.util.HashMap;
+
+public class MapIdempotentRepositoryTest extends 
AbstractIdempotentRepositoryTest<MapIdempotentRepository> {
+
+    public MapIdempotentRepositoryTest() {
+        super(createRepo());
+    }
+
+    private static MapIdempotentRepository createRepo() {
+        return new MapIdempotentRepository(new HashMap<>());
+    }
+
+}
\ No newline at end of file
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepositoryTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepositoryTest.java
new file mode 100644
index 00000000000..08288435195
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepositoryTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import java.util.List;
+
+import static java.util.stream.Collectors.toList;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.IdempotentRepository;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static 
org.apache.camel.processor.idempotent.MemoryIdempotentRepository.memoryIdempotentRepository;
+
+public class MemoryIdempotentRepositoryTest extends 
AbstractIdempotentRepositoryTest<IdempotentRepository<String>> {
+
+    public MemoryIdempotentRepositoryTest() {
+        super(createRepo());
+    }
+
+    private static IdempotentRepository<String> createRepo() {
+        return memoryIdempotentRepository(20);
+    }
+
+    @Test
+    @Ignore("lru test seems non-deterministic")
+    public void usesLRUSemantics() throws Exception {
+        // Given:
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .idempotentConsumer(body(), repo)
+                        .to("mock:result");
+            }
+        });
+        startCamelContext();
+        
getMockEndpoint("mock:result").expectedBodiesReceived(allOf("abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz"));
+        // When:
+        
allOf("abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz").forEach(c -> 
sendBodies("direct:start", c));
+        // Then:
+        assertMockEndpointsSatisfied();
+    }
+
+    private static List<Character> allOf(String letters) {
+        return letters.chars().mapToObj(i -> (char)i).collect(toList());
+    }
+}
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Introduce ConcurrentMapIdempotentRepository and MapIdempotentRepository
> -----------------------------------------------------------------------
>
>                 Key: CAMEL-12041
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12041
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-core
>    Affects Versions: 2.20.1
>            Reporter: Marc Carter
>            Priority: Trivial
>
> In order to more generically plug in third-party data fabrics, Camel should 
> directly support the ConcurrentMap interface.
> 1 Add an explicit ConcurrentMapIdempotentRepository (which does away with 
> synchronized blocks around the backing map) 
> 2 Factor our MapIdempotentRepository from MemoryIdempotentRepository (being 
> based on a Map _interface_ doesn't make it "memory" - only the default 
> LRUCache impls are definitively "memory")
> 3 Make some sense of the tests around this EIP



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to