This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a69e610  IGNITE-11214 Use discovery topology version when fetching 
affinity - Fixes #6033.
a69e610 is described below

commit a69e6103789c38caf5328af9fde64ac2549735b1
Author: Alexey Goncharuk <alexey.goncha...@gmail.com>
AuthorDate: Tue Feb 19 15:59:13 2019 +0300

    IGNITE-11214 Use discovery topology version when fetching affinity - Fixes 
#6033.
    
    Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>
---
 .../processors/affinity/GridAffinityProcessor.java | 70 +++++++++++++++++++---
 1 file changed, 62 insertions(+), 8 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index c0b810f..81f6093 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -191,7 +191,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
         assert cacheName != null;
 
         if (aff == null) {
-            aff = affinityCache(cacheName, 
ctx.cache().context().exchange().readyAffinityVersion());
+            aff = affinityCache(cacheName);
 
             if (aff == null)
                 throw new IgniteCheckedException("Failed to get cache affinity 
(cache was not started " +
@@ -337,7 +337,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
         if (key == null)
             return null;
 
-        AffinityInfo affInfo = affinityCache(cacheName, 
ctx.cache().context().exchange().readyAffinityVersion());
+        AffinityInfo affInfo = affinityCache(cacheName);
 
         if (affInfo == null)
             return null;
@@ -363,7 +363,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
      */
     private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final 
String cacheName,
         Collection<? extends K> keys) throws IgniteCheckedException {
-        return keysToNodes(cacheName, keys, 
ctx.cache().context().exchange().readyAffinityVersion());
+        return keysToNodes(cacheName, keys, null);
     }
 
     /**
@@ -380,7 +380,17 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
 
         AffinityInfo affInfo = affinityCache(cacheName, topVer);
 
-        return affInfo != null ? affinityMap(affInfo, keys) : 
Collections.<ClusterNode, Collection<K>>emptyMap();
+        return affInfo != null ? affinityMap(affInfo, keys) : 
Collections.emptyMap();
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Affinity cache.
+     * @throws IgniteCheckedException In case of error.
+     */
+    @Nullable private AffinityInfo affinityCache(final String cacheName)
+        throws IgniteCheckedException {
+        return affinityCache(cacheName, null);
     }
 
     /**
@@ -389,7 +399,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
      * @return Affinity cache.
      * @throws IgniteCheckedException In case of error.
      */
-    @Nullable private AffinityInfo affinityCache(final String cacheName, 
AffinityTopologyVersion topVer)
+    @Nullable private AffinityInfo affinityCache(final String cacheName, 
@Nullable AffinityTopologyVersion topVer)
         throws IgniteCheckedException {
         return affinityCacheFuture(cacheName, topVer).get();
     }
@@ -400,10 +410,34 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
      * @return Affinity cache.
      * @throws IgniteCheckedException In case of error.
      */
-    public IgniteInternalFuture<AffinityInfo> affinityCacheFuture(final String 
cacheName, AffinityTopologyVersion topVer)
+    public IgniteInternalFuture<AffinityInfo> affinityCacheFuture(final String 
cacheName, @Nullable AffinityTopologyVersion topVer)
         throws IgniteCheckedException {
         assert cacheName != null;
 
+        IgniteInternalFuture<AffinityInfo> locFetchFut = 
localAffinityInfo(cacheName, topVer);
+
+        if (locFetchFut != null)
+            return locFetchFut;
+
+        return remoteAffinityInfo(cacheName, topVer);
+    }
+
+    /**
+     * Tries to fetch affinity info based on local cache affinity info. If 
cache with the given name is not started
+     * locally, will return {@code null}.
+     *
+     * @param cacheName Cache name to fetch.
+     * @param topVer Topology version to use.
+     * @return Future with affinity info or {@code null} if cache is not 
started locally.
+     * @throws IgniteCheckedException If failed to start for local cache 
context initialization.
+     */
+    private IgniteInternalFuture<AffinityInfo> localAffinityInfo(
+        String cacheName,
+        @Nullable AffinityTopologyVersion topVer
+    ) throws IgniteCheckedException {
+        if (topVer == null)
+            topVer = ctx.cache().context().exchange().readyAffinityVersion();
+
         AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, 
topVer);
 
         IgniteInternalFuture<AffinityInfo> fut = affMap.get(key);
@@ -450,6 +484,26 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             }
         }
 
+        return null;
+    }
+
+    /**
+     * Tries to fetch affinity from remote nodes. If there are no nodes with 
the cache with the given name started,
+     * the retured future will be completed with {@code null}.
+     *
+     * @param cacheName Cache name to fetch affinity.
+     * @param topVer Topology version to fetch affinity.
+     * @return Affinity assignment fetch future.
+     */
+    private IgniteInternalFuture<AffinityInfo> remoteAffinityInfo(
+        String cacheName,
+        @Nullable AffinityTopologyVersion topVer
+    ) {
+        if (topVer == null)
+            topVer = ctx.discovery().topologyVersionEx();
+
+        AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, 
topVer);
+
         List<ClusterNode> cacheNodes = ctx.discovery().cacheNodes(cacheName, 
topVer);
 
         DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheName);
@@ -457,7 +511,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
         if (desc == null || F.isEmpty(cacheNodes)) {
             if (ctx.clientDisconnected())
                 return new GridFinishedFuture<>(new 
IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
-                        "Failed to get affinity mapping, client 
disconnected."));
+                    "Failed to get affinity mapping, client disconnected."));
 
             return new GridFinishedFuture<>((AffinityInfo)null);
         }
@@ -1073,7 +1127,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
          * @throws IgniteCheckedException If failed.
          */
         private AffinityInfo cache() throws IgniteCheckedException {
-            AffinityInfo aff = affinityCache(cacheName, 
ctx.cache().context().exchange().readyAffinityVersion());
+            AffinityInfo aff = affinityCache(cacheName);
 
             if (aff == null)
                 throw new IgniteException("Failed to find cache (cache was not 
started " +

Reply via email to