This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch gg-19225 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit e545d547d5ce883766ee2c50f499e208fa3193c5 Author: Semyon Boikov <[email protected]> AuthorDate: Tue Dec 25 11:10:17 2018 +0300 IGNITE-1741 Fixed hanging CacheAffinityCallSelfTest.testAffinityCallNoServerNode - Fixes #5729. Signed-off-by: Alexey Goncharuk <[email protected]> (cherry picked from commit 175c1d815d848918eab79960910a8a3002143aa0) --- .../managers/discovery/GridDiscoveryManager.java | 6 +- .../processors/affinity/GridAffinityProcessor.java | 195 ++++++++++++--------- .../internal/processors/task/GridTaskWorker.java | 14 +- .../cache/CacheAffinityCallSelfTest.java | 70 +++++++- 4 files changed, 193 insertions(+), 92 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 42dabdd..552fc2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -1939,7 +1939,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param topVer Topology version. * @return Collection of cache nodes. */ - public Collection<ClusterNode> cacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { + public List<ClusterNode> cacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { return resolveDiscoCache(CU.cacheId(cacheName), topVer).cacheNodes(cacheName); } @@ -2483,7 +2483,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param cacheName Cache name. * @param node Node to add */ - private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) { + private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode node) { List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName)); if (cacheNodes == null) { @@ -2492,7 +2492,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { cacheMap.put(CU.cacheId(cacheName), cacheNodes); } - cacheNodes.add(rich); + cacheNodes.add(node); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index 61886b6..67b511c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -30,21 +30,24 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.binary.BinaryObject; -import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -53,6 +56,8 @@ import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridTuple3; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; @@ -79,12 +84,6 @@ public class GridAffinityProcessor extends GridProcessorAdapter { /** Affinity map cleanup delay (ms). */ private static final long AFFINITY_MAP_CLEAN_UP_DELAY = 3000; - /** Retries to get affinity in case of error. */ - private static final int ERROR_RETRIES = 3; - - /** Time to wait between errors (in milliseconds). */ - private static final long ERROR_WAIT = 500; - /** Log. */ private final IgniteLogger log; @@ -390,10 +389,19 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * @return Affinity cache. * @throws IgniteCheckedException In case of error. */ - @SuppressWarnings("ErrorNotRethrown") @Nullable private AffinityInfo affinityCache(final String cacheName, AffinityTopologyVersion topVer) throws IgniteCheckedException { + return affinityCacheFuture(cacheName, topVer).get(); + } + /** + * @param cacheName Cache name. + * @param topVer Topology version. + * @return Affinity cache. + * @throws IgniteCheckedException In case of error. + */ + public IgniteInternalFuture<AffinityInfo> affinityCacheFuture(final String cacheName, AffinityTopologyVersion topVer) + throws IgniteCheckedException { assert cacheName != null; AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer); @@ -401,7 +409,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { IgniteInternalFuture<AffinityInfo> fut = affMap.get(key); if (fut != null) - return fut.get(); + return fut; GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); @@ -416,7 +424,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { cctx.gate().enter(); } catch (IllegalStateException ignored) { - return null; + return new GridFinishedFuture<>((AffinityInfo)null); } try { @@ -428,99 +436,116 @@ public class GridAffinityProcessor extends GridProcessorAdapter { cctx.cacheObjectContext() ); - IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(info)); + GridFinishedFuture<AffinityInfo> fut0 = new GridFinishedFuture<>(info); + + IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0); if (old != null) - info = old.get(); + return old; - return info; + return fut0; } finally { cctx.gate().leave(); } } - Collection<ClusterNode> cacheNodes = ctx.discovery().cacheNodes(cacheName, topVer); + List<ClusterNode> cacheNodes = ctx.discovery().cacheNodes(cacheName, topVer); - if (F.isEmpty(cacheNodes)) - return null; + DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheName); - GridFutureAdapter<AffinityInfo> fut0 = new GridFutureAdapter<>(); + if (desc == null || F.isEmpty(cacheNodes)) { + if (ctx.clientDisconnected()) + return new GridFinishedFuture<>(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), + "Failed to get affinity mapping, client disconnected.")); - IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0); + return new GridFinishedFuture<>((AffinityInfo)null); + } - if (old != null) - return old.get(); + if (desc.cacheConfiguration().getCacheMode() == LOCAL) + return new GridFinishedFuture<>(new IgniteCheckedException("Failed to map keys for LOCAL cache: " + cacheName)); - int max = ERROR_RETRIES; - int cnt = 0; + AffinityFuture fut0 = new AffinityFuture(cacheName, topVer, cacheNodes); - Iterator<ClusterNode> it = cacheNodes.iterator(); + IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0); - // We are here because affinity has not been fetched yet, or cache mode is LOCAL. - while (true) { - cnt++; + if (old != null) + return old; - if (!it.hasNext()) - it = cacheNodes.iterator(); + fut0.getAffinityFromNextNode(); - // Double check since we deal with dynamic view. - if (!it.hasNext()) - // Exception will be caught in this method. - throw new IgniteCheckedException("No cache nodes in topology for cache name: " + cacheName); + return fut0; + } - ClusterNode n = it.next(); + /** + * + */ + private class AffinityFuture extends GridFutureAdapter<AffinityInfo> { + /** */ + private final String cacheName; - CacheMode mode = ctx.cache().cacheMode(cacheName); + /** */ + private final AffinityTopologyVersion topVer; - if (mode == null) { - if (ctx.clientDisconnected()) - throw new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), - "Failed to get affinity mapping, client disconnected."); + /** */ + private final List<ClusterNode> cacheNodes; - throw new IgniteCheckedException("No cache nodes in topology for cache name: " + cacheName); - } + /** */ + private int nodeIdx; - // Map all keys to a single node, if the cache mode is LOCAL. - if (mode == LOCAL) { - fut0.onDone(new IgniteCheckedException("Failed to map keys for LOCAL cache.")); + /** + * @param cacheName Cache name. + * @param topVer Topology version. + * @param cacheNodes Cache nodes. + */ + AffinityFuture(String cacheName, AffinityTopologyVersion topVer, List<ClusterNode> cacheNodes) { + this.cacheName = cacheName; + this.topVer = topVer; + this.cacheNodes = cacheNodes; + } - // Will throw exception. - fut0.get(); - } + /** + * + */ + void getAffinityFromNextNode() { + while (nodeIdx < cacheNodes.size()) { + final ClusterNode node = cacheNodes.get(nodeIdx); - try { - // Resolve cache context for remote node. - // Set affinity function before counting down on latch. - fut0.onDone(affinityInfoFromNode(cacheName, topVer, n)); + nodeIdx++; - break; - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to get affinity from node (will retry) [cache=" + cacheName + - ", node=" + U.toShortString(n) + ", msg=" + e.getMessage() + ']'); + if (!ctx.discovery().alive(node.id())) + continue; - if (cnt < max) { - U.sleep(ERROR_WAIT); + affinityInfoFromNode(cacheName, topVer, node).listen(new CI1<IgniteInternalFuture<AffinityInfo>>() { + @Override public void apply(IgniteInternalFuture<AffinityInfo> fut) { + try { + onDone(fut.get()); + } + catch (IgniteCheckedException e) { + if (e instanceof ClusterTopologyCheckedException || X.hasCause(e, ClusterTopologyException.class)) { + if (log.isDebugEnabled()) + log.debug("Failed to get affinity from node, node failed [cache=" + cacheName + + ", node=" + node.id() + ", msg=" + e.getMessage() + ']'); - continue; - } + getAffinityFromNextNode(); + + return; + } - affMap.remove(key, fut0); + if (log.isDebugEnabled()) + log.debug("Failed to get affinity from node [cache=" + cacheName + + ", node=" + node.id() + ", msg=" + e.getMessage() + ']'); - fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + n, e)); + onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + node.id(), e)); + } + } + }); - break; + return; } - catch (RuntimeException | Error e) { - fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + n, e)); - break; - } + onDone(new ClusterGroupEmptyCheckedException("Failed to get cache affinity, all cache nodes failed: " + cacheName)); } - - return fut0.get(); } /** @@ -529,26 +554,30 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * @param cacheName Name of cache on which affinity is requested. * @param topVer Topology version. * @param n Node from which affinity is requested. - * @return Affinity cached function. - * @throws IgniteCheckedException If either local or remote node cannot get deployment for affinity objects. + * @return Affinity future. */ - private AffinityInfo affinityInfoFromNode(String cacheName, AffinityTopologyVersion topVer, ClusterNode n) - throws IgniteCheckedException { - GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = ctx.closure() - .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/, 0, false).get(); + private IgniteInternalFuture<AffinityInfo> affinityInfoFromNode(String cacheName, AffinityTopologyVersion topVer, ClusterNode n) { + IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>> fut = ctx.closure() + .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/, 0, false); + + return fut.chain(new CX1<IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>>, AffinityInfo>() { + @Override public AffinityInfo applyx(IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>> fut) throws IgniteCheckedException { + GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = fut.get(); - AffinityFunction f = (AffinityFunction)unmarshall(ctx, n.id(), t.get1()); - AffinityKeyMapper m = (AffinityKeyMapper)unmarshall(ctx, n.id(), t.get2()); + AffinityFunction f = (AffinityFunction)unmarshall(ctx, n.id(), t.get1()); + AffinityKeyMapper m = (AffinityKeyMapper)unmarshall(ctx, n.id(), t.get2()); - assert m != null; + assert m != null; - // Bring to initial state. - f.reset(); - m.reset(); + // Bring to initial state. + f.reset(); + m.reset(); - CacheConfiguration ccfg = ctx.cache().cacheConfiguration(cacheName); + CacheConfiguration ccfg = ctx.cache().cacheConfiguration(cacheName); - return new AffinityInfo(f, m, t.get3(), ctx.cacheObjects().contextForCache(ccfg)); + return new AffinityInfo(f, m, t.get3(), ctx.cacheObjects().contextForCache(ccfg)); + } + }); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 02e8736..78d1554 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -928,9 +928,21 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { mapTopVer = ctx.cache().context().exchange().readyAffinityVersion(); affFut = ctx.cache().context().exchange().lastTopologyFuture(); + + if (affFut == null || affFut.isDone()) { + affFut = null; + + // Need asynchronosly fetch affinity if cache is not started on node . + if (affCacheName != null && ctx.cache().internalCache(affCacheName) == null) { + affFut = ctx.affinity().affinityCacheFuture(affCacheName, mapTopVer); + + if (affFut.isDone()) + affFut = null; + } + } } - if (affFut != null && !affFut.isDone()) { + if (affFut != null) { waitForAffTop = true; jobRes.resetResponse(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java index 2c5472e..3eb6974 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java @@ -17,13 +17,18 @@ package org.apache.ignite.internal.processors.cache; import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; + import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCompute; +import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -142,18 +147,23 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { */ @Test public void testAffinityCallNoServerNode() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1741"); - startGridsMultiThreaded(SRVS + 1); final Integer key = 1; - final Ignite client = grid(SRVS); + final IgniteEx client = grid(SRVS); assertTrue(client.configuration().isClientMode()); + assertNull(client.context().cache().cache(CACHE_NAME)); + + final int THREADS = 5; + + CyclicBarrier b = new CyclicBarrier(THREADS + 1); final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { + b.await(); + for (int i = 0; i < SRVS; ++i) stopGrid(i, false); @@ -162,8 +172,16 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { }); try { - while (!fut.isDone()) - client.compute().affinityCall(CACHE_NAME, key, new CheckCallable(key, null)); + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Void call() throws Exception { + b.await(); + + while (!fut.isDone()) + client.compute().affinityCall(CACHE_NAME, key, new CheckCallable(key, null)); + + return null; + } + }, THREADS, "test-thread"); } catch (ClusterTopologyException e) { log.info("Expected error: " + e); @@ -174,6 +192,48 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + @Test + public void testAffinityFailoverNoCacheOnClient() throws Exception { + startGridsMultiThreaded(SRVS + 1); + + final Integer key = 1; + + final IgniteEx client = grid(SRVS); + + assertTrue(client.configuration().isClientMode()); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < SRVS - 1; ++i) { + U.sleep(ThreadLocalRandom.current().nextLong(100) + 50); + + stopGrid(i, false); + } + + return null; + } + }); + + try { + final Affinity<Integer> aff = client.affinity(CACHE_NAME); + + assertNull(client.context().cache().cache(CACHE_NAME)); + + GridTestUtils.runMultiThreaded(new Runnable() { + @Override public void run() { + while (!fut.isDone()) + assertNotNull(aff.mapKeyToNode(key)); + } + }, 5, "test-thread"); + } + finally { + stopAllGrids(); + } + } + + /** * Test callable. */ public static class CheckCallable implements IgniteCallable<Object> {
