Repository: camel Updated Branches: refs/heads/camel-2.19.x e8f0bc8db -> ba4689296
CAMEL-11388: camel-infinispan - InfinispanRoutePolicy issue with locking from remote server Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ba468929 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ba468929 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ba468929 Branch: refs/heads/camel-2.19.x Commit: ba468929624a93ae707ecbb943d41db84ab99712 Parents: e8f0bc8 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Thu Jun 22 18:59:00 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Thu Jun 22 18:59:00 2017 +0200 ---------------------------------------------------------------------- .../component/infinispan/InfinispanManager.java | 18 ++++++++++++++++++ .../infinispan/policy/InfinispanRoutePolicy.java | 18 ++++++++++-------- .../src/test/resources/log4j2.properties | 3 +++ 3 files changed, 31 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ba468929/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java index 24ad853..028a2fe 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java @@ -164,6 +164,24 @@ public class InfinispanManager implements Service { return cache; } + public <K, V> BasicCache<K, V> getCache(String cacheName, boolean forceReturnValue) { + if (isCacheContainerRemote()) { + BasicCache<K, V> cache; + if (ObjectHelper.isEmpty(cacheName)) { + cache = InfinispanUtil.asRemote(cacheContainer).getCache(forceReturnValue); + cacheName = cache.getName(); + } else { + cache = InfinispanUtil.asRemote(cacheContainer).getCache(cacheName, forceReturnValue); + } + + LOGGER.trace("Cache[{}]", cacheName); + + return cache; + } else { + return getCache(cacheName); + } + } + public <K, V> BasicCache<K, V> getCache(Exchange exchange, String defaultCache) { return getCache(exchange.getIn().getHeader(InfinispanConstants.CACHE_NAME, defaultCache, String.class)); } http://git-wip-us.apache.org/repos/asf/camel/blob/ba468929/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java index 7afc26a..bc9a977 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java @@ -152,10 +152,6 @@ public class InfinispanRoutePolicy extends RoutePolicySupport implements CamelCo StringHelper.notEmpty(lockValue, "lockValue", this); ObjectHelper.notNull(camelContext, "camelContext", this); - if (this.lockValue == null) { - this.lockValue = camelContext.getUuidGenerator().generateUuid(); - } - this.manager.start(); this.executorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "InfinispanRoutePolicy"); @@ -163,10 +159,14 @@ public class InfinispanRoutePolicy extends RoutePolicySupport implements CamelCo throw new IllegalArgumentException("Lock lifespan can not be less that 2 seconds"); } - BasicCache<String, String> cache = manager.getCache(lockMapName); + if (manager.isCacheContainerEmbedded()) { + BasicCache<String, String> cache = manager.getCache(lockMapName); this.service = new EmbeddedCacheService(InfinispanUtil.asEmbedded(cache)); } else { + // By default, previously existing values for java.util.Map operations + // are not returned for remote caches but policy needs it so force it. + BasicCache<String, String> cache = manager.getCache(lockMapName, true); this.service = new RemoteCacheService(InfinispanUtil.asRemote(cache)); } @@ -464,8 +464,10 @@ public class InfinispanRoutePolicy extends RoutePolicySupport implements CamelCo // I'm still the leader, so refresh the key so it does not expire. if (!cache.replaceWithVersion(lockKey, lockValue, version, (int)lifespanTimeUnit.toSeconds(lifespan))) { - // Looks like I've lost the leadership. setLeader(false); + } else { + version = cache.getWithMetadata(lockKey).getVersion(); + LOGGER.debug("Lock refreshed key={} with new version={}", lockKey, version); } } @@ -495,14 +497,14 @@ public class InfinispanRoutePolicy extends RoutePolicySupport implements CamelCo } @ClientCacheEntryRemoved - public void onCacheEntryRemoved(ClientCacheEntryRemovedEvent<Object> event) { + public void onCacheEntryRemoved(ClientCacheEntryRemovedEvent<String> event) { if (ObjectHelper.equal(lockKey, event.getKey())) { run(); } } @ClientCacheEntryExpired - public void onCacheEntryExpired(ClientCacheEntryExpiredEvent<Object> event) { + public void onCacheEntryExpired(ClientCacheEntryExpiredEvent<String> event) { if (ObjectHelper.equal(lockKey, event.getKey())) { run(); } http://git-wip-us.apache.org/repos/asf/camel/blob/ba468929/components/camel-infinispan/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/resources/log4j2.properties b/components/camel-infinispan/src/test/resources/log4j2.properties index 7c25ddd..4b09a1d 100644 --- a/components/camel-infinispan/src/test/resources/log4j2.properties +++ b/components/camel-infinispan/src/test/resources/log4j2.properties @@ -26,3 +26,6 @@ appender.out.layout.type = PatternLayout appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n rootLogger.level = INFO rootLogger.appenderRef.file.ref = file + +logger.infinispan.name = org.apache.camel.component.infinispan +logger.infinispan.level = DEBUG