Repository: camel
Updated Branches:
  refs/heads/camel-2.19.x e8f0bc8db -> ba4689296


CAMEL-11388: camel-infinispan - InfinispanRoutePolicy issue with locking from 
remote server


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ba468929
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ba468929
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ba468929

Branch: refs/heads/camel-2.19.x
Commit: ba468929624a93ae707ecbb943d41db84ab99712
Parents: e8f0bc8
Author: lburgazzoli <lburgazz...@gmail.com>
Authored: Thu Jun 22 18:59:00 2017 +0200
Committer: lburgazzoli <lburgazz...@gmail.com>
Committed: Thu Jun 22 18:59:00 2017 +0200

----------------------------------------------------------------------
 .../component/infinispan/InfinispanManager.java   | 18 ++++++++++++++++++
 .../infinispan/policy/InfinispanRoutePolicy.java  | 18 ++++++++++--------
 .../src/test/resources/log4j2.properties          |  3 +++
 3 files changed, 31 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ba468929/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
index 24ad853..028a2fe 100644
--- 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
@@ -164,6 +164,24 @@ public class InfinispanManager implements Service {
         return cache;
     }
 
+    public <K, V> BasicCache<K, V> getCache(String cacheName, boolean 
forceReturnValue) {
+        if (isCacheContainerRemote()) {
+            BasicCache<K, V> cache;
+            if (ObjectHelper.isEmpty(cacheName)) {
+                cache = 
InfinispanUtil.asRemote(cacheContainer).getCache(forceReturnValue);
+                cacheName = cache.getName();
+            } else {
+                cache = 
InfinispanUtil.asRemote(cacheContainer).getCache(cacheName, forceReturnValue);
+            }
+
+            LOGGER.trace("Cache[{}]", cacheName);
+
+            return cache;
+        } else {
+            return getCache(cacheName);
+        }
+    }
+
     public <K, V> BasicCache<K, V> getCache(Exchange exchange, String 
defaultCache) {
         return 
getCache(exchange.getIn().getHeader(InfinispanConstants.CACHE_NAME, 
defaultCache, String.class));
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/ba468929/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java
index 7afc26a..bc9a977 100644
--- 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java
@@ -152,10 +152,6 @@ public class InfinispanRoutePolicy extends 
RoutePolicySupport implements CamelCo
         StringHelper.notEmpty(lockValue, "lockValue", this);
         ObjectHelper.notNull(camelContext, "camelContext", this);
 
-        if (this.lockValue == null) {
-            this.lockValue = camelContext.getUuidGenerator().generateUuid();
-        }
-
         this.manager.start();
         this.executorService = 
getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
 "InfinispanRoutePolicy");
 
@@ -163,10 +159,14 @@ public class InfinispanRoutePolicy extends 
RoutePolicySupport implements CamelCo
             throw new IllegalArgumentException("Lock lifespan can not be less 
that 2 seconds");
         }
 
-        BasicCache<String, String> cache = manager.getCache(lockMapName);
+
         if (manager.isCacheContainerEmbedded()) {
+            BasicCache<String, String> cache = manager.getCache(lockMapName);
             this.service = new 
EmbeddedCacheService(InfinispanUtil.asEmbedded(cache));
         } else {
+            // By default, previously existing values for java.util.Map 
operations
+            // are not returned for remote caches but policy needs it so force 
it.
+            BasicCache<String, String> cache = manager.getCache(lockMapName, 
true);
             this.service = new 
RemoteCacheService(InfinispanUtil.asRemote(cache));
         }
 
@@ -464,8 +464,10 @@ public class InfinispanRoutePolicy extends 
RoutePolicySupport implements CamelCo
 
                 // I'm still the leader, so refresh the key so it does not 
expire.
                 if (!cache.replaceWithVersion(lockKey, lockValue, version, 
(int)lifespanTimeUnit.toSeconds(lifespan))) {
-                    // Looks like I've lost the leadership.
                     setLeader(false);
+                } else {
+                    version = cache.getWithMetadata(lockKey).getVersion();
+                    LOGGER.debug("Lock refreshed key={} with new version={}", 
lockKey, version);
                 }
             }
 
@@ -495,14 +497,14 @@ public class InfinispanRoutePolicy extends 
RoutePolicySupport implements CamelCo
         }
 
         @ClientCacheEntryRemoved
-        public void onCacheEntryRemoved(ClientCacheEntryRemovedEvent<Object> 
event) {
+        public void onCacheEntryRemoved(ClientCacheEntryRemovedEvent<String> 
event) {
             if (ObjectHelper.equal(lockKey, event.getKey())) {
                 run();
             }
         }
 
         @ClientCacheEntryExpired
-        public void onCacheEntryExpired(ClientCacheEntryExpiredEvent<Object> 
event) {
+        public void onCacheEntryExpired(ClientCacheEntryExpiredEvent<String> 
event) {
             if (ObjectHelper.equal(lockKey, event.getKey())) {
                 run();
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/ba468929/components/camel-infinispan/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/resources/log4j2.properties 
b/components/camel-infinispan/src/test/resources/log4j2.properties
index 7c25ddd..4b09a1d 100644
--- a/components/camel-infinispan/src/test/resources/log4j2.properties
+++ b/components/camel-infinispan/src/test/resources/log4j2.properties
@@ -26,3 +26,6 @@ appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
 rootLogger.level = INFO
 rootLogger.appenderRef.file.ref = file
+
+logger.infinispan.name = org.apache.camel.component.infinispan
+logger.infinispan.level = DEBUG

Reply via email to