This is an automated email from the ASF dual-hosted git repository. agoncharuk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new a69e610 IGNITE-11214 Use discovery topology version when fetching affinity - Fixes #6033. a69e610 is described below commit a69e6103789c38caf5328af9fde64ac2549735b1 Author: Alexey Goncharuk <alexey.goncha...@gmail.com> AuthorDate: Tue Feb 19 15:59:13 2019 +0300 IGNITE-11214 Use discovery topology version when fetching affinity - Fixes #6033. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> --- .../processors/affinity/GridAffinityProcessor.java | 70 +++++++++++++++++++--- 1 file changed, 62 insertions(+), 8 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index c0b810f..81f6093 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -191,7 +191,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { assert cacheName != null; if (aff == null) { - aff = affinityCache(cacheName, ctx.cache().context().exchange().readyAffinityVersion()); + aff = affinityCache(cacheName); if (aff == null) throw new IgniteCheckedException("Failed to get cache affinity (cache was not started " + @@ -337,7 +337,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { if (key == null) return null; - AffinityInfo affInfo = affinityCache(cacheName, ctx.cache().context().exchange().readyAffinityVersion()); + AffinityInfo affInfo = affinityCache(cacheName); if (affInfo == null) return null; @@ -363,7 +363,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { */ private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final String cacheName, Collection<? extends K> keys) throws IgniteCheckedException { - return keysToNodes(cacheName, keys, ctx.cache().context().exchange().readyAffinityVersion()); + return keysToNodes(cacheName, keys, null); } /** @@ -380,7 +380,17 @@ public class GridAffinityProcessor extends GridProcessorAdapter { AffinityInfo affInfo = affinityCache(cacheName, topVer); - return affInfo != null ? affinityMap(affInfo, keys) : Collections.<ClusterNode, Collection<K>>emptyMap(); + return affInfo != null ? affinityMap(affInfo, keys) : Collections.emptyMap(); + } + + /** + * @param cacheName Cache name. + * @return Affinity cache. + * @throws IgniteCheckedException In case of error. + */ + @Nullable private AffinityInfo affinityCache(final String cacheName) + throws IgniteCheckedException { + return affinityCache(cacheName, null); } /** @@ -389,7 +399,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * @return Affinity cache. * @throws IgniteCheckedException In case of error. */ - @Nullable private AffinityInfo affinityCache(final String cacheName, AffinityTopologyVersion topVer) + @Nullable private AffinityInfo affinityCache(final String cacheName, @Nullable AffinityTopologyVersion topVer) throws IgniteCheckedException { return affinityCacheFuture(cacheName, topVer).get(); } @@ -400,10 +410,34 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * @return Affinity cache. * @throws IgniteCheckedException In case of error. */ - public IgniteInternalFuture<AffinityInfo> affinityCacheFuture(final String cacheName, AffinityTopologyVersion topVer) + public IgniteInternalFuture<AffinityInfo> affinityCacheFuture(final String cacheName, @Nullable AffinityTopologyVersion topVer) throws IgniteCheckedException { assert cacheName != null; + IgniteInternalFuture<AffinityInfo> locFetchFut = localAffinityInfo(cacheName, topVer); + + if (locFetchFut != null) + return locFetchFut; + + return remoteAffinityInfo(cacheName, topVer); + } + + /** + * Tries to fetch affinity info based on local cache affinity info. If cache with the given name is not started + * locally, will return {@code null}. + * + * @param cacheName Cache name to fetch. + * @param topVer Topology version to use. + * @return Future with affinity info or {@code null} if cache is not started locally. + * @throws IgniteCheckedException If failed to start for local cache context initialization. + */ + private IgniteInternalFuture<AffinityInfo> localAffinityInfo( + String cacheName, + @Nullable AffinityTopologyVersion topVer + ) throws IgniteCheckedException { + if (topVer == null) + topVer = ctx.cache().context().exchange().readyAffinityVersion(); + AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer); IgniteInternalFuture<AffinityInfo> fut = affMap.get(key); @@ -450,6 +484,26 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } } + return null; + } + + /** + * Tries to fetch affinity from remote nodes. If there are no nodes with the cache with the given name started, + * the retured future will be completed with {@code null}. + * + * @param cacheName Cache name to fetch affinity. + * @param topVer Topology version to fetch affinity. + * @return Affinity assignment fetch future. + */ + private IgniteInternalFuture<AffinityInfo> remoteAffinityInfo( + String cacheName, + @Nullable AffinityTopologyVersion topVer + ) { + if (topVer == null) + topVer = ctx.discovery().topologyVersionEx(); + + AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer); + List<ClusterNode> cacheNodes = ctx.discovery().cacheNodes(cacheName, topVer); DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheName); @@ -457,7 +511,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { if (desc == null || F.isEmpty(cacheNodes)) { if (ctx.clientDisconnected()) return new GridFinishedFuture<>(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), - "Failed to get affinity mapping, client disconnected.")); + "Failed to get affinity mapping, client disconnected.")); return new GridFinishedFuture<>((AffinityInfo)null); } @@ -1073,7 +1127,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ private AffinityInfo cache() throws IgniteCheckedException { - AffinityInfo aff = affinityCache(cacheName, ctx.cache().context().exchange().readyAffinityVersion()); + AffinityInfo aff = affinityCache(cacheName); if (aff == null) throw new IgniteException("Failed to find cache (cache was not started " +