Repository: ignite Updated Branches: refs/heads/ignite-5272 faa4d37bf -> f765fd82a
ignite-5272 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f765fd82 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f765fd82 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f765fd82 Branch: refs/heads/ignite-5272 Commit: f765fd82ae77dbcf6341c38b9291e0d0d28f5834 Parents: faa4d37 Author: sboikov <[email protected]> Authored: Wed Jun 14 15:32:41 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Jun 14 15:32:41 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 25 +++++++++++--------- .../ClientCacheChangeDiscoveryMessage.java | 24 +++++++++++++++++++ 2 files changed, 38 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f765fd82/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index bf583f9..16850e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -342,7 +342,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap List<DynamicCacheDescriptor> startDescs = new ArrayList<>(startReqs.size()); for (DynamicCacheChangeRequest startReq : startReqs.values()) { - DynamicCacheDescriptor desc = caches.cache(startReq.cacheName()); + DynamicCacheDescriptor desc = caches.cache(CU.cacheId(startReq.cacheName())); if (desc == null) { CacheException err = new CacheException("Failed to start client cache " + @@ -581,14 +581,17 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (msg != null && msg.updateTimeoutObject() == timeoutObj) { assert !msg.empty() : msg; + clientCacheChanges.remove(); + + msg.checkCachesExist(caches.registeredCaches.keySet()); + try { - cctx.discovery().sendCustomEvent(msg); + if (!msg.empty()) + cctx.discovery().sendCustomEvent(msg); } catch (IgniteCheckedException e) { U.error(log, "Failed to send discovery event: " + e, e); } - - clientCacheChanges.remove(); } } @@ -2142,7 +2145,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private final ConcurrentHashMap<Integer, CacheGroupDescriptor> registeredGrps = new ConcurrentHashMap<>(); /** Registered caches (updated from exchange thread). */ - private final ConcurrentHashMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Integer, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>(); /** * @param grps Registered groups. @@ -2153,7 +2156,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap registeredGrps.put(grpDesc.groupId(), grpDesc); for (DynamicCacheDescriptor cacheDesc : caches.values()) - registeredCaches.put(cacheDesc.cacheName(), cacheDesc); + registeredCaches.put(cacheDesc.cacheId(), cacheDesc); } /** @@ -2186,7 +2189,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap registeredGrps.put(grpDesc.groupId(), grpDesc); if (!registeredCaches.containsKey(desc.cacheName())) - registeredCaches.put(desc.cacheName(), desc); + registeredCaches.put(desc.cacheId(), desc); } } @@ -2210,15 +2213,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap registeredCaches.remove(req.descriptor().cacheName()); for (ExchangeActions.ActionData req : exchActions.cacheStartRequests()) - registeredCaches.put(req.descriptor().cacheName(), req.descriptor()); + registeredCaches.put(req.descriptor().cacheId(), req.descriptor()); } /** - * @param cacheName Cache name. + * @param cacheId Cache ID. * @return Cache descriptor if cache found. */ - @Nullable DynamicCacheDescriptor cache(String cacheName) { - return registeredCaches.get(cacheName); + @Nullable DynamicCacheDescriptor cache(Integer cacheId) { + return registeredCaches.get(cacheId); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f765fd82/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java index 37eb927..b4a1696 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Set; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; @@ -103,6 +104,29 @@ public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage } /** + * @param caches Started caches' IDs. + */ + void checkCachesExist(Set<Integer> caches) { + if (closedCaches != null) { + for (Iterator<Integer> it = closedCaches.iterator(); it.hasNext();) { + Integer cacheId = it.next(); + + if (!caches.contains(cacheId)) + it.remove(); + } + } + + if (startedCaches != null) { + for (Iterator<Integer> it = startedCaches.keySet().iterator(); it.hasNext();) { + Integer cacheId = it.next(); + + if (!caches.contains(cacheId)) + it.remove(); + } + } + } + + /** * @return Update timeout object. */ public ClientCacheUpdateTimeout updateTimeoutObject() {
