Repository: camel Updated Branches: refs/heads/master cd2c2829e -> f245ab072
CAMEL-9840 : InfinispanIdempotentRepository should force return values for RemoteCaches Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f245ab07 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f245ab07 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f245ab07 Branch: refs/heads/master Commit: f245ab072331af29f221b6b9206588d9a0d30823 Parents: cd2c282 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Fri Apr 8 16:16:59 2016 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Fri Apr 8 16:25:21 2016 +0200 ---------------------------------------------------------------------- components/camel-infinispan/pom.xml | 4 +- .../InfinispanIdempotentRepository.java | 18 +++++- .../InfinispanIdempotentRepositoryIT.java | 68 ++++++++++++++++++++ 3 files changed, 86 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f245ab07/components/camel-infinispan/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/pom.xml b/components/camel-infinispan/pom.xml index ab9be43..5512393 100644 --- a/components/camel-infinispan/pom.xml +++ b/components/camel-infinispan/pom.xml @@ -254,8 +254,10 @@ <command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=static_filter_factory:add(configuration=default)</command> <!-- Separate cache for @ClientListener(includeCurrentState=true) --> <command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=include_current_state:add(configuration=default)</command> - <!-- Separate cache for protobuf serialized objects. --> + <!-- Separate cache for misc tests. --> <command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=misc_cache:add(configuration=default)</command> + <!-- Separate cache for idempotent tests. --> + <command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=idempotent:add(configuration=default)</command> </commands> </executeCommands> </configuration> http://git-wip-us.apache.org/repos/asf/camel/blob/f245ab07/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java index b28a460..6f33693 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java @@ -19,8 +19,10 @@ package org.apache.camel.component.infinispan.processor.idempotent; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedOperation; import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.component.infinispan.InfinispanUtil; import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.support.ServiceSupport; +import org.infinispan.client.hotrod.RemoteCacheManager; import org.infinispan.commons.api.BasicCache; import org.infinispan.commons.api.BasicCacheContainer; import org.infinispan.manager.DefaultCacheManager; @@ -117,9 +119,19 @@ public class InfinispanIdempotentRepository extends ServiceSupport implements Id private BasicCache<Object, Boolean> getCache() { if (cache == null) { - cache = cacheName != null - ? cacheContainer.<Object, Boolean>getCache(cacheName) - : cacheContainer.<Object, Boolean>getCache(); + // By default, previously existing values for java.util.Map operations + // are not returned for remote caches but idempotent repository needs + // them so force it. + if (InfinispanUtil.isRemote(cacheContainer)) { + RemoteCacheManager manager = InfinispanUtil.asRemote(cacheContainer); + cache = cacheName != null + ? manager.getCache(cacheName, true) + : manager.getCache(true); + } else { + cache = cacheName != null + ? cacheContainer.getCache(cacheName) + : cacheContainer.getCache(); + } } return cache; http://git-wip-us.apache.org/repos/asf/camel/blob/f245ab07/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepositoryIT.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepositoryIT.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepositoryIT.java new file mode 100644 index 0000000..4306a65 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepositoryIT.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.processor.idempotent; + +import java.util.UUID; +import java.util.stream.IntStream; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.infinispan.client.hotrod.RemoteCacheManager; +import org.infinispan.client.hotrod.configuration.ConfigurationBuilder; +import org.junit.Test; + +public class InfinispanIdempotentRepositoryIT extends CamelTestSupport { + + @Test + public void producerQueryOperationWithoutQueryBuilder() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + final String messageId = UUID.randomUUID().toString(); + IntStream.range(0, 10).forEach( + i -> template().sendBodyAndHeader("direct:start", "message-" + i, "MessageID", messageId) + ); + + mock.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .idempotentConsumer( + header("MessageID"), + new InfinispanIdempotentRepository( + new RemoteCacheManager( + new ConfigurationBuilder() + .addServers("localhost") + .build(), + true + ), + "idempotent" + ) + ) + .skipDuplicate(true) + .to("log:org.apache.camel.component.infinispan.processor.idempotent?level=INFO&showAll=true&multiline=true") + .to("mock:result"); + } + }; + } +}