Repository: ignite Updated Branches: refs/heads/master f6f731f57 -> 137dd06aa
IGNITE-7165 Re-balancing is cancelled if client node joins Signed-off-by: Anton Vinogradov <a...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/137dd06a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/137dd06a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/137dd06a Branch: refs/heads/master Commit: 137dd06aaee9cc84104e6b4bf48306b050341e3a Parents: f6f731f Author: Maxim Muzafarov <maxmu...@gmail.com> Authored: Wed Aug 1 18:39:54 2018 +0300 Committer: Anton Vinogradov <a...@apache.org> Committed: Wed Aug 1 18:39:54 2018 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 68 ++++++--- .../processors/cache/GridCachePreloader.java | 21 ++- .../cache/GridCachePreloaderAdapter.java | 6 + .../dht/preloader/GridDhtPartitionDemander.java | 55 ++++--- .../dht/preloader/GridDhtPartitionSupplier.java | 26 ++-- .../dht/preloader/GridDhtPreloader.java | 60 +++++++- .../preloader/GridDhtPreloaderAssignments.java | 6 +- .../ClusterBaselineNodesMetricsSelfTest.java | 1 - .../cache/CacheValidatorMetricsTest.java | 4 +- .../dht/GridCacheDhtPreloadSelfTest.java | 68 +-------- .../atomic/IgniteCacheAtomicProtocolTest.java | 3 - .../GridCacheRebalancingAsyncSelfTest.java | 7 +- .../GridCacheRebalancingCancelTest.java | 106 +++++++++++++ ...idCacheRebalancingPartitionCountersTest.java | 3 +- .../GridCacheRebalancingSyncSelfTest.java | 149 +++++++------------ ...lientAffinityAssignmentWithBaselineTest.java | 4 +- ...SlowHistoricalRebalanceSmallHistoryTest.java | 5 +- ...lFlushMultiNodeFailoverAbstractSelfTest.java | 2 +- .../GridMarshallerMappingConsistencyTest.java | 3 +- .../junits/common/GridCommonAbstractTest.java | 115 +++----------- .../testsuites/IgniteCacheTestSuite3.java | 2 + 21 files changed, 370 insertions(+), 344 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 05eeee3..824aa67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -66,7 +66,6 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; @@ -88,6 +87,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Ign import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -179,6 +179,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentSkipListMap<>(); + /** + * Latest started rebalance topology version but possibly not finished yet. Value {@code NONE} + * means that previous rebalance is undefined and the new one should be initiated. + * + * Should not be used to determine latest rebalanced topology. + */ + private volatile AffinityTopologyVersion rebTopVer = AffinityTopologyVersion.NONE; + /** */ private GridFutureAdapter<?> reconnectExchangeFut; @@ -827,6 +835,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * @return Latest rebalance topology version or {@code NONE} if there is no info. + */ + public AffinityTopologyVersion rebalanceTopologyVersion() { + return rebTopVer; + } + + /** * @return Last initialized topology future. */ public GridDhtPartitionsExchangeFuture lastTopologyFuture() { @@ -2558,6 +2573,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (grp.isLocal()) continue; + if (grp.preloader().rebalanceRequired(rebTopVer, exchFut)) + rebTopVer = AffinityTopologyVersion.NONE; + changed |= grp.topology().afterExchange(exchFut); } @@ -2565,7 +2583,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana refreshPartitions(); } - if (!cctx.kernalContext().clientNode()) { + // Schedule rebalance if force rebalance or force reassign occurs. + if (exchFut == null) + rebTopVer = AffinityTopologyVersion.NONE; + + if (!cctx.kernalContext().clientNode() && rebTopVer.equals(AffinityTopologyVersion.NONE)) { assignsMap = new HashMap<>(); IgniteCacheSnapshotManager snp = cctx.snapshot(); @@ -2582,6 +2604,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana assigns = grp.preloader().generateAssignments(exchId, exchFut); assignsMap.put(grp.groupId(), assigns); + + if (resVer == null) + resVer = grp.topology().readyTopologyVersion(); } } } @@ -2590,7 +2615,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana busy = false; } - if (assignsMap != null) { + if (assignsMap != null && rebTopVer.equals(AffinityTopologyVersion.NONE)) { int size = assignsMap.size(); NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>(); @@ -2628,11 +2653,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (assigns != null) assignsCancelled |= assigns.cancelled(); - // Cancels previous rebalance future (in case it's not done yet). - // Sends previous rebalance stopped event (if necessary). - // Creates new rebalance future. - // Sends current rebalance started event (if necessary). - // Finishes cache sync future (on empty assignments). Runnable cur = grp.preloader().addAssignments(assigns, forcePreload, cnt, @@ -2650,7 +2670,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (forcedRebFut != null) forcedRebFut.markInitialized(); - if (assignsCancelled) { // Pending exchange. + if (assignsCancelled || hasPendingExchange()) { U.log(log, "Skipping rebalancing (obsolete exchange ID) " + "[top=" + resVer + ", evt=" + exchId.discoveryEventName() + ", node=" + exchId.nodeId() + ']'); @@ -2658,25 +2678,31 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana else if (r != null) { Collections.reverse(rebList); - U.log(log, "Rebalancing scheduled [order=" + rebList + "]"); + U.log(log, "Rebalancing scheduled [order=" + rebList + + ", top=" + resVer + ", force=" + (exchFut == null) + + ", evt=" + exchId.discoveryEventName() + + ", node=" + exchId.nodeId() + ']'); - if (!hasPendingExchange()) { - U.log(log, "Rebalancing started " + - "[top=" + resVer + ", evt=" + exchId.discoveryEventName() + - ", node=" + exchId.nodeId() + ']'); + rebTopVer = resVer; - r.run(); // Starts rebalancing routine. - } - else - U.log(log, "Skipping rebalancing (obsolete exchange ID) " + - "[top=" + resVer + ", evt=" + exchId.discoveryEventName() + - ", node=" + exchId.nodeId() + ']'); + // Start rebalancing cache groups chain. Each group will be rebalanced + // sequentially one by one e.g.: + // ignite-sys-cache -> cacheGroupR1 -> cacheGroupP2 -> cacheGroupR3 + r.run(); } else U.log(log, "Skipping rebalancing (nothing scheduled) " + - "[top=" + resVer + ", evt=" + exchId.discoveryEventName() + + "[top=" + resVer + ", force=" + (exchFut == null) + + ", evt=" + exchId.discoveryEventName() + ", node=" + exchId.nodeId() + ']'); } + else + U.log(log, "Skipping rebalancing (no affinity changes) " + + "[top=" + resVer + + ", rebTopVer=" + rebTopVer + + ", evt=" + exchId.discoveryEventName() + + ", evtNode=" + exchId.nodeId() + + ", client=" + cctx.kernalContext().clientNode() + ']'); } catch (IgniteInterruptedCheckedException e) { throw e; http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 5fa7a82..d629e94 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; @@ -63,9 +64,16 @@ public interface GridCachePreloader { public void onInitialExchangeComplete(@Nullable Throwable err); /** + * @param rebTopVer Previous rebalance topology version or {@code NONE} if there is no info. + * @param exchFut Completed exchange future. + * @return {@code True} if rebalance should be started (previous will be interrupted). + */ + public boolean rebalanceRequired(AffinityTopologyVersion rebTopVer, GridDhtPartitionsExchangeFuture exchFut); + + /** * @param exchId Exchange ID. - * @param exchFut Exchange future. - * @return Assignments or {@code null} if detected that there are pending exchanges. + * @param exchFut Completed exchange future. Can be {@code null} if forced or reassigned generation occurs. + * @return Partition assignments which will be requested from supplier nodes. */ @Nullable public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId, @Nullable GridDhtPartitionsExchangeFuture exchFut); @@ -74,10 +82,10 @@ public interface GridCachePreloader { * Adds assignments to preloader. * * @param assignments Assignments to add. - * @param forcePreload Force preload flag. - * @param rebalanceId Rebalance id. - * @param next Runnable responsible for cache rebalancing start. - * @param forcedRebFut Rebalance future. + * @param forcePreload {@code True} if preload requested by {@link ForceRebalanceExchangeTask}. + * @param rebalanceId Rebalance id created by exchange thread. + * @param next Runnable responsible for cache rebalancing chain. + * @param forcedRebFut External future for forced rebalance. * @return Rebalancing runnable. */ public Runnable addAssignments(GridDhtPreloaderAssignments assignments, @@ -114,7 +122,6 @@ public interface GridCachePreloader { * Future result is {@code false} in case rebalancing cancelled or finished with missed partitions and will be * restarted at current or pending topology. * - * Note that topology change creates new futures and finishes previous. */ public IgniteInternalFuture<Boolean> rebalanceFuture(); http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index af91679..c5e4a81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -152,6 +152,12 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ + @Override public boolean rebalanceRequired(AffinityTopologyVersion rebTopVer, + GridDhtPartitionsExchangeFuture exchFut) { + return true; + } + + /** {@inheritDoc} */ @Override public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut) { return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 1eeebae..54d3c93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -235,12 +235,12 @@ public class GridDhtPartitionDemander { /** * @param fut Future. - * @return {@code True} if topology changed. + * @return {@code True} if rebalance topology version changed by exchange thread or force + * reassing exchange occurs, see {@link RebalanceReassignExchangeTask} for details. */ private boolean topologyChanged(RebalanceFuture fut) { - return - !grp.affinity().lastVersion().equals(fut.topologyVersion()) || // Topology already changed. - fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions. + return !ctx.exchange().rebalanceTopologyVersion().equals(fut.topVer) || + fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions. } /** @@ -253,14 +253,21 @@ public class GridDhtPartitionDemander { } /** - * Initiates new rebalance process from given {@code assignments}. - * If previous rebalance is not finished method cancels it. - * In case of delayed rebalance method schedules new with configured delay. + * @return Collection of supplier nodes. Value {@code empty} means rebalance already finished. + */ + Collection<UUID> remainingNodes() { + return rebalanceFut.remainingNodes(); + } + + /** + * This method initiates new rebalance process from given {@code assignments} by creating new rebalance + * future based on them. Cancels previous rebalance future and sends rebalance started event. + * In case of delayed rebalance method schedules the new one with configured delay based on {@code lastExchangeFut}. * - * @param assignments Assignments. - * @param force {@code True} if dummy reassign. - * @param rebalanceId Rebalance id. - * @param next Runnable responsible for cache rebalancing start. + * @param assignments Assignments to process. + * @param force {@code True} if preload request by {@link ForceRebalanceExchangeTask}. + * @param rebalanceId Rebalance id generated from exchange thread. + * @param next Runnable responsible for cache rebalancing chain. * @param forcedRebFut External future for forced rebalance. * @return Rebalancing runnable. */ @@ -440,17 +447,7 @@ public class GridDhtPartitionDemander { if (fut.isDone()) return; - // Must add all remaining node before send first request, for avoid race between add remaining node and - // processing response, see checkIsDone(boolean). - for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assignments.entrySet()) { - UUID nodeId = e.getKey().id(); - - IgniteDhtDemandedPartitionsMap parts = e.getValue().partitions(); - - assert parts != null : "Partitions are null [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId + "]"; - - fut.remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts)); - } + fut.remaining.forEach((key, value) -> value.set1(U.currentTimeMillis())); } final CacheConfiguration cfg = grp.config(); @@ -979,6 +976,13 @@ public class GridDhtPartitionDemander { exchId = assignments.exchangeId(); topVer = assignments.topologyVersion(); + assignments.forEach((k, v) -> { + assert v.partitions() != null : + "Partitions are null [grp=" + grp.cacheOrGroupName() + ", fromNode=" + k.id() + "]"; + + remaining.put(k.id(), new T2<>(U.currentTimeMillis(), v.partitions())); + }); + this.grp = grp; this.log = log; this.rebalanceId = rebalanceId; @@ -1218,6 +1222,13 @@ public class GridDhtPartitionDemander { } /** + * @return Collection of supplier nodes. Value {@code empty} means rebalance already finished. + */ + private synchronized Collection<UUID> remainingNodes() { + return remaining.keySet(); + } + + /** * */ private void sendRebalanceStartedEvent() { http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 4946d7e..ea7f4c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -17,12 +17,14 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; @@ -118,19 +120,20 @@ class GridDhtPartitionSupplier { } /** - * Handles new topology version and clears supply context map of outdated contexts. - * - * @param topVer Topology version. + * Handle topology change and clear supply context map of outdated contexts. */ - @SuppressWarnings("ConstantConditions") - void onTopologyChanged(AffinityTopologyVersion topVer) { + void onTopologyChanged() { synchronized (scMap) { Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator(); + Collection<UUID> aliveNodes = grp.shared().discovery().aliveServerNodes().stream() + .map(ClusterNode::id) + .collect(Collectors.toList()); + while (it.hasNext()) { T3<UUID, Integer, AffinityTopologyVersion> t = it.next(); - if (topVer.compareTo(t.get3()) > 0) { // Clear all obsolete contexts. + if (!aliveNodes.contains(t.get1())) { // Clear all obsolete contexts. clearContext(scMap.get(t), log); it.remove(); @@ -171,17 +174,6 @@ class GridDhtPartitionSupplier { AffinityTopologyVersion curTop = grp.affinity().lastVersion(); AffinityTopologyVersion demTop = d.topologyVersion(); - if (curTop.compareTo(demTop) > 0) { - if (log.isDebugEnabled()) - log.debug("Demand request outdated [grp=" + grp.cacheOrGroupName() - + ", currentTopVer=" + curTop - + ", demandTopVer=" + demTop - + ", from=" + nodeId - + ", topicId=" + topicId + "]"); - - return; - } - T3<UUID, Integer, AffinityTopologyVersion> contextId = new T3<>(nodeId, topicId, demTop); if (d.rebalanceId() < 0) { // Demand node requested context cleanup. http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 77f4866..7cf55a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; @@ -39,7 +40,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -160,12 +160,68 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) { - supplier.onTopologyChanged(lastFut.initialVersion()); + supplier.onTopologyChanged(); demander.onTopologyChanged(lastFut); } /** {@inheritDoc} */ + @Override public boolean rebalanceRequired(AffinityTopologyVersion rebTopVer, + GridDhtPartitionsExchangeFuture exchFut) { + if (ctx.kernalContext().clientNode() || rebTopVer.equals(AffinityTopologyVersion.NONE)) + return false; // No-op. + + if (exchFut.localJoinExchange()) + return true; // Required, can have outdated updSeq partition counter if node reconnects. + + if (!grp.affinity().cachedVersions().contains(rebTopVer)) { + assert rebTopVer.compareTo(grp.localStartVersion()) <= 0 : + "Empty hisroty allowed only for newly started cache group [rebTopVer=" + rebTopVer + + ", localStartTopVer=" + grp.localStartVersion() + ']'; + + return true; // Required, since no history info available. + } + + final IgniteInternalFuture<Boolean> rebFut = rebalanceFuture(); + + if (rebFut.isDone() && !rebFut.result()) + return true; // Required, previous rebalance cancelled. + + final AffinityTopologyVersion exchTopVer = exchFut.context().events().topologyVersion(); + + Collection<UUID> aliveNodes = ctx.discovery().aliveServerNodes().stream() + .map(ClusterNode::id) + .collect(Collectors.toList()); + + return assignmentsChanged(rebTopVer, exchTopVer) || + !aliveNodes.containsAll(demander.remainingNodes()); // Some of nodes left before rabalance compelete. + } + + /** + * @param oldTopVer Previous topology version. + * @param newTopVer New topology version to check result. + * @return {@code True} if affinity assignments changed between two versions for local node. + */ + private boolean assignmentsChanged(AffinityTopologyVersion oldTopVer, AffinityTopologyVersion newTopVer) { + final AffinityAssignment aff = grp.affinity().readyAffinity(newTopVer); + + // We should get affinity assignments based on previous rebalance to calculate difference. + // Whole history size described by IGNITE_AFFINITY_HISTORY_SIZE constant. + final AffinityAssignment prevAff = grp.affinity().cachedVersions().contains(oldTopVer) ? + grp.affinity().cachedAffinity(oldTopVer) : null; + + if (prevAff == null) + return false; + + boolean assignsChanged = false; + + for (int p = 0; !assignsChanged && p < grp.affinity().partitions(); p++) + assignsChanged |= aff.get(p).contains(ctx.localNode()) != prevAff.get(p).contains(ctx.localNode()); + + return assignsChanged; + } + + /** {@inheritDoc} */ @Override public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut) { assert exchFut == null || exchFut.isDone(); http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java index 41dd076..6e847bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.util.concurrent.ConcurrentHashMap; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; import org.apache.ignite.internal.util.typedef.internal.S; /** @@ -73,9 +73,9 @@ public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode, } /** - * @return Topology version. + * @return Topology version based on last {@link GridDhtPartitionTopologyImpl#readyTopVer}. */ - AffinityTopologyVersion topologyVersion() { + public AffinityTopologyVersion topologyVersion() { return topVer; } http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/ClusterBaselineNodesMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterBaselineNodesMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterBaselineNodesMetricsSelfTest.java index 5653177..46b09ac 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterBaselineNodesMetricsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterBaselineNodesMetricsSelfTest.java @@ -149,7 +149,6 @@ public class ClusterBaselineNodesMetricsSelfTest extends GridCommonAbstractTest private void resetBlt() throws Exception { resetBaselineTopology(); - waitForRebalancing(); awaitPartitionMapExchange(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheValidatorMetricsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheValidatorMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheValidatorMetricsTest.java index ba3ad5a..4a950dd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheValidatorMetricsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheValidatorMetricsTest.java @@ -98,14 +98,14 @@ public class CacheValidatorMetricsTest extends GridCommonAbstractTest implements startGrid(2); - waitForRebalancing(); + awaitPartitionMapExchange(); assertCacheStatus(CACHE_NAME_1, true, true); assertCacheStatus(CACHE_NAME_2, true, true); stopGrid(1); - waitForRebalancing(); + awaitPartitionMapExchange(); // Invalid for writing due to invalid topology. assertCacheStatus(CACHE_NAME_1, true, false); http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java index 83eff89..23ba4b3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java @@ -22,29 +22,23 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.P1; -import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -58,9 +52,6 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.configuration.CacheConfiguration.DFLT_REBALANCE_BATCH_SIZE; import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS; import static org.apache.ignite.events.EventType.EVTS_CACHE_REBALANCE; -import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; -import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING; @@ -142,15 +133,6 @@ public class GridCacheDhtPreloadSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { -// resetLog4j(Level.DEBUG, true, -// // Categories. -// GridDhtPreloader.class.getPackage().getName(), -// GridDhtPartitionTopologyImpl.class.getName(), -// GridDhtLocalPartition.class.getName()); - } - - /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { backups = DFLT_BACKUPS; partitions = DFLT_PARTITIONS; @@ -227,11 +209,6 @@ public class GridCacheDhtPreloadSelfTest extends GridCommonAbstractTest { */ private void checkActivePartitionTransfer(int keyCnt, int nodeCnt, boolean sameCoord, boolean shuffle) throws Exception { -// resetLog4j(Level.DEBUG, true, -// // Categories. -// GridDhtPreloader.class.getPackage().getName(), -// GridDhtPartitionTopologyImpl.class.getName(), -// GridDhtLocalPartition.class.getName()); try { Ignite ignite1 = startGrid(0); @@ -270,8 +247,6 @@ public class GridCacheDhtPreloadSelfTest extends GridCommonAbstractTest { info(">>> Finished checking nodes [keyCnt=" + keyCnt + ", nodeCnt=" + nodeCnt + ", grids=" + U.grids2names(ignites) + ']'); - Collection<IgniteFuture<?>> futs = new LinkedList<>(); - Ignite last = F.last(ignites); for (Iterator<Ignite> it = ignites.iterator(); it.hasNext(); ) { @@ -285,21 +260,8 @@ public class GridCacheDhtPreloadSelfTest extends GridCommonAbstractTest { checkActiveState(ignites); - final UUID nodeId = g.cluster().localNode().id(); - it.remove(); - futs.add(waitForLocalEvent(last.events(), new P1<Event>() { - @Override public boolean apply(Event e) { - CacheRebalancingEvent evt = (CacheRebalancingEvent)e; - - ClusterNode node = evt.discoveryNode(); - - return evt.type() == EVT_CACHE_REBALANCE_STOPPED && node.id().equals(nodeId) && - (evt.discoveryEventType() == EVT_NODE_LEFT || evt.discoveryEventType() == EVT_NODE_FAILED); - } - }, EVT_CACHE_REBALANCE_STOPPED)); - info("Before grid stop [name=" + g.name() + ", fullTop=" + top2string(ignites)); stopGrid(g.name()); @@ -312,14 +274,6 @@ public class GridCacheDhtPreloadSelfTest extends GridCommonAbstractTest { awaitPartitionMapExchange(); // Need wait, otherwise test logic is broken if EVT_NODE_FAILED exchanges are merged. } - info("Waiting for preload futures: " + F.view(futs, new IgnitePredicate<IgniteFuture<?>>() { - @Override public boolean apply(IgniteFuture<?> fut) { - return !fut.isDone(); - } - })); - - X.waitAll(futs); - info("Finished waiting for preload futures."); assert last != null; @@ -499,11 +453,6 @@ public class GridCacheDhtPreloadSelfTest extends GridCommonAbstractTest { */ private void checkNodes(int keyCnt, int nodeCnt, boolean sameCoord, boolean shuffle) throws Exception { -// resetLog4j(Level.DEBUG, true, -// // Categories. -// GridDhtPreloader.class.getPackage().getName(), -// GridDhtPartitionTopologyImpl.class.getName(), -// GridDhtLocalPartition.class.getName()); try { Ignite ignite1 = startGrid(0); @@ -555,28 +504,13 @@ public class GridCacheDhtPreloadSelfTest extends GridCommonAbstractTest { it.remove(); - Collection<IgniteFuture<?>> futs = new LinkedList<>(); - - for (Ignite gg : ignites) - futs.add(waitForLocalEvent(gg.events(), new P1<Event>() { - @Override public boolean apply(Event e) { - CacheRebalancingEvent evt = (CacheRebalancingEvent)e; - - ClusterNode node = evt.discoveryNode(); - - return evt.type() == EVT_CACHE_REBALANCE_STOPPED && node.id().equals(nodeId) && - (evt.discoveryEventType() == EVT_NODE_LEFT || - evt.discoveryEventType() == EVT_NODE_FAILED); - } - }, EVT_CACHE_REBALANCE_STOPPED)); - info("Before grid stop [name=" + g.name() + ", fullTop=" + top2string(ignites)); stopGrid(g.name()); info(">>> Waiting for preload futures [leftNode=" + g.name() + ", remaining=" + U.grids2names(ignites) + ']'); - X.waitAll(futs); + awaitPartitionMapExchange(); info("After grid stop [name=" + g.name() + ", fullTop=" + top2string(ignites)); http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java index 20f292b..14c8571 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java @@ -807,9 +807,6 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { startServers(2); // Waiting for minor topology changing because of late affinity assignment. - waitForRebalancing(0, 2, 1); - waitForRebalancing(1, 2, 1); - awaitPartitionMapExchange(); Ignite srv0 = ignite(0); http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java index 4ebcd5d..0a8698a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java @@ -17,10 +17,11 @@ package org.apache.ignite.internal.processors.cache.distributed.rebalancing; -import org.apache.ignite.Ignite; +import java.util.Collections; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi; @@ -43,7 +44,7 @@ public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncS * @throws Exception Exception. */ public void testNodeFailedAtRebalancing() throws Exception { - Ignite ignite = startGrid(0); + IgniteEx ignite = startGrid(0); generateData(ignite, 0, 0); @@ -60,7 +61,7 @@ public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncS ((TestTcpDiscoverySpi)grid(1).configuration().getDiscoverySpi()).simulateNodeFailure(); - waitForRebalancing(0, 3); + awaitPartitionMapExchange(false, false, Collections.singletonList(ignite.localNode())); checkSupplyContextMapIsEmpty(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingCancelTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingCancelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingCancelTest.java new file mode 100644 index 0000000..3965290 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingCancelTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test cases for checking cancellation rebalancing process if some events occurs. + */ +public class GridCacheRebalancingCancelTest extends GridCommonAbstractTest { + /** */ + private static final String DHT_PARTITIONED_CACHE = "cacheP"; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration dfltCfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)dfltCfg.getDiscoverySpi()).setIpFinder(ipFinder); + + dfltCfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + return dfltCfg; + } + + /** + * Test rebalance not cancelled when client node join to cluster. + * + * @throws Exception Exception. + */ + public void testClientNodeJoinAtRebalancing() throws Exception { + final IgniteEx ignite0 = startGrid(0); + + IgniteCache<Integer, Integer> cache = ignite0.createCache( + new CacheConfiguration<Integer, Integer>(DHT_PARTITIONED_CACHE) + .setCacheMode(CacheMode.PARTITIONED) + .setRebalanceMode(CacheRebalanceMode.ASYNC) + .setBackups(1) + .setRebalanceOrder(2) + .setAffinity(new RendezvousAffinityFunction(false))); + + for (int i = 0; i < 2048; i++) + cache.put(i, i); + + TestRecordingCommunicationSpi.spi(ignite0) + .blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return (msg instanceof GridDhtPartitionSupplyMessage) + && ((GridCacheGroupIdMessage)msg).groupId() == groupIdForCache(ignite0, DHT_PARTITIONED_CACHE); + } + }); + + final IgniteEx ignite1 = startGrid(1); + + TestRecordingCommunicationSpi.spi(ignite0).waitForBlocked(); + + GridDhtPartitionDemander.RebalanceFuture fut = (GridDhtPartitionDemander.RebalanceFuture)ignite1.context(). + cache().internalCache(DHT_PARTITIONED_CACHE).preloader().rebalanceFuture(); + + String igniteClntName = getTestIgniteInstanceName(2); + + startGrid(igniteClntName, optimize(getConfiguration(igniteClntName).setClientMode(true))); + + // Resend delayed rebalance messages. + TestRecordingCommunicationSpi.spi(ignite0).stopBlock(true); + + awaitPartitionMapExchange(); + + // Previous rebalance future should not be cancelled. + assertTrue(fut.result()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionCountersTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionCountersTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionCountersTest.java index 1280e87..cb414ed 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionCountersTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionCountersTest.java @@ -141,7 +141,8 @@ public class GridCacheRebalancingPartitionCountersTest extends GridCommonAbstrac assertTrue(primaryRemoved); ignite.cluster().active(true); - waitForRebalancing(); + + awaitPartitionMapExchange(); List<String> issues = new ArrayList<>(); HashMap<Integer, Long> partMap = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index ed51cf3..a027a41 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -21,10 +21,9 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cluster.ClusterNode; @@ -42,11 +41,13 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.util.lang.GridAbsPredicateX; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -68,6 +69,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** */ private static final int TEST_SIZE = 100_000; + /** */ + private static final long TOPOLOGY_STILLNESS_TIME = 30_000L; + /** partitioned cache name. */ protected static final String CACHE_NAME_DHT_PARTITIONED = "cacheP"; @@ -89,11 +93,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** */ private volatile boolean concurrentStartFinished3; - /** */ - private volatile boolean record; - - /** */ - private final ConcurrentHashMap<Class, AtomicInteger> map = new ConcurrentHashMap<>(); + /** + * Time in milliseconds of last received {@link GridDhtPartitionsSingleMessage} + * or {@link GridDhtPartitionsFullMessage} using {@link CollectingCommunicationSpi}. + */ + private static volatile long lastPartMsgTime; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { @@ -102,7 +106,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); - TcpCommunicationSpi commSpi = new CountingCommunicationSpi(); + TcpCommunicationSpi commSpi = new CollectingCommunicationSpi(); commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); commSpi.setTcpNoDelay(true); @@ -232,47 +236,35 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { startGrid(1); - int waitMinorVer = ignite.configuration().isLateAffinityAssignment() ? 1 : 0; - - waitForRebalancing(0, 2, waitMinorVer); - waitForRebalancing(1, 2, waitMinorVer); - awaitPartitionMapExchange(true, true, null, true); checkPartitionMapExchangeFinished(); - checkPartitionMapMessagesAbsent(); + awaitPartitionMessagesAbsent(); stopGrid(0); - waitForRebalancing(1, 3); - awaitPartitionMapExchange(true, true, null, true); checkPartitionMapExchangeFinished(); - checkPartitionMapMessagesAbsent(); + awaitPartitionMessagesAbsent(); startGrid(2); - waitForRebalancing(1, 4, waitMinorVer); - waitForRebalancing(2, 4, waitMinorVer); - awaitPartitionMapExchange(true, true, null, true); checkPartitionMapExchangeFinished(); - checkPartitionMapMessagesAbsent(); + awaitPartitionMessagesAbsent(); stopGrid(2); - waitForRebalancing(1, 5); - awaitPartitionMapExchange(true, true, null, true); checkPartitionMapExchangeFinished(); - checkPartitionMapMessagesAbsent(); + awaitPartitionMessagesAbsent(); long spend = (System.currentTimeMillis() - start) / 1000; @@ -331,13 +323,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { startGrid(4); - waitForRebalancing(3, 6); - waitForRebalancing(4, 6); + awaitPartitionMapExchange(true, true, null); concurrentStartFinished = true; - awaitPartitionMapExchange(true, true, null); - checkSupplyContextMapIsEmpty(); t1.join(); @@ -442,27 +431,29 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { } /** + * Method checks for {@link GridDhtPartitionsSingleMessage} or {@link GridDhtPartitionsFullMessage} + * not received within {@code TOPOLOGY_STILLNESS_TIME} bound. + * * @throws Exception If failed. */ - protected void checkPartitionMapMessagesAbsent() throws Exception { - map.clear(); - - record = true; - - log.info("Checking GridDhtPartitions*Message absent (it will take 30 SECONDS) ... "); - - U.sleep(30_000); - - record = false; - - AtomicInteger iF = map.get(GridDhtPartitionsFullMessage.class); - AtomicInteger iS = map.get(GridDhtPartitionsSingleMessage.class); - - Integer fullMap = iF != null ? iF.get() : null; - Integer singleMap = iS != null ? iS.get() : null; - - assertTrue("Unexpected full map messages: " + fullMap, fullMap == null || fullMap.equals(1)); // 1 message can be sent right after all checks passed. - assertNull("Unexpected single map messages", singleMap); + protected void awaitPartitionMessagesAbsent() throws Exception { + log.info("Checking GridDhtPartitions*Message absent (it will take up to " + + TOPOLOGY_STILLNESS_TIME + " ms) ... "); + + // Start waiting new messages from current point of time. + lastPartMsgTime = U.currentTimeMillis(); + + assertTrue("Should not have partition Single or Full messages within bound " + + TOPOLOGY_STILLNESS_TIME + " ms.", + GridTestUtils.waitForCondition( + new GridAbsPredicateX() { + @Override public boolean applyx() { + return lastPartMsgTime + TOPOLOGY_STILLNESS_TIME < U.currentTimeMillis(); + } + }, + 2 * TOPOLOGY_STILLNESS_TIME // 30 sec to gain stable topology and 30 sec of silence. + ) + ); } /** {@inheritDoc} */ @@ -495,11 +486,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { while (!concurrentStartFinished2) U.sleep(10); - waitForRebalancing(0, 5, 0); - waitForRebalancing(1, 5, 0); - waitForRebalancing(2, 5, 0); - waitForRebalancing(3, 5, 0); - waitForRebalancing(4, 5, 0); + awaitPartitionMapExchange(); //New cache should start rebalancing. CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); @@ -552,12 +539,6 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { t2.join(); t3.join(); - waitForRebalancing(0, 5, 1); - waitForRebalancing(1, 5, 1); - waitForRebalancing(2, 5, 1); - waitForRebalancing(3, 5, 1); - waitForRebalancing(4, 5, 1); - awaitPartitionMapExchange(true, true, null); checkSupplyContextMapIsEmpty(); @@ -577,35 +558,23 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { stopGrid(1); - waitForRebalancing(0, 6); - waitForRebalancing(2, 6); - waitForRebalancing(3, 6); - waitForRebalancing(4, 6); - awaitPartitionMapExchange(true, true, null); checkSupplyContextMapIsEmpty(); stopGrid(0); - waitForRebalancing(2, 7); - waitForRebalancing(3, 7); - waitForRebalancing(4, 7); - awaitPartitionMapExchange(true, true, null); checkSupplyContextMapIsEmpty(); stopGrid(2); - waitForRebalancing(3, 8); - waitForRebalancing(4, 8); - awaitPartitionMapExchange(true, true, null); checkPartitionMapExchangeFinished(); - checkPartitionMapMessagesAbsent(); + awaitPartitionMessagesAbsent(); checkSupplyContextMapIsEmpty(); @@ -613,7 +582,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { stopGrid(3); - waitForRebalancing(4, 9); + awaitPartitionMapExchange(); checkSupplyContextMapIsEmpty(); @@ -634,36 +603,26 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** * */ - private class CountingCommunicationSpi extends TcpCommunicationSpi { + private static class CollectingCommunicationSpi extends TcpCommunicationSpi { + /** */ + @LoggerResource + private IgniteLogger log; + /** {@inheritDoc} */ @Override public void sendMessage(final ClusterNode node, final Message msg, final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { final Object msg0 = ((GridIoMessage)msg).message(); - recordMessage(msg0); + if (msg0 instanceof GridDhtPartitionsSingleMessage || + msg0 instanceof GridDhtPartitionsFullMessage) { + lastPartMsgTime = U.currentTimeMillis(); - super.sendMessage(node, msg, ackC); - } - - /** - * @param msg Message. - */ - private void recordMessage(Object msg) { - if (record) { - Class id = msg.getClass(); - - AtomicInteger ai = map.get(id); - - if (ai == null) { - ai = new AtomicInteger(); - - AtomicInteger oldAi = map.putIfAbsent(id, ai); - - (oldAi != null ? oldAi : ai).incrementAndGet(); - } - else - ai.incrementAndGet(); + log.info("Last seen time of GridDhtPartitionsSingleMessage or GridDhtPartitionsFullMessage updated " + + "[lastPartMsgTime=" + lastPartMsgTime + + ", node=" + node.id() + ']'); } + + super.sendMessage(node, msg, ackC); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java index 7e9765c..13a98e4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java @@ -383,7 +383,7 @@ public class ClientAffinityAssignmentWithBaselineTest extends GridCommonAbstract startGrid("flaky"); System.out.println("### Starting rebalancing after flaky node join"); - waitForRebalancing(); + awaitPartitionMapExchange(); System.out.println("### Rebalancing is finished after flaky node join"); awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker); @@ -689,7 +689,7 @@ public class ClientAffinityAssignmentWithBaselineTest extends GridCommonAbstract ig0.cluster().setBaselineTopology(fullBlt.subList(0, newBaselineSize)); System.out.println("### Starting rebalancing after BLT change: " + (newBaselineSize + 1) + " -> " + newBaselineSize); - waitForRebalancing(); + awaitPartitionMapExchange(); System.out.println("### Rebalancing is finished after BLT change: " + (newBaselineSize + 1) + " -> " + newBaselineSize); awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker); http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowHistoricalRebalanceSmallHistoryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowHistoricalRebalanceSmallHistoryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowHistoricalRebalanceSmallHistoryTest.java index 8f2e738..3500c8d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowHistoricalRebalanceSmallHistoryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowHistoricalRebalanceSmallHistoryTest.java @@ -157,7 +157,8 @@ public class SlowHistoricalRebalanceSmallHistoryTest extends GridCommonAbstractT SUPPLY_MESSAGE_LATCH.get().countDown(); - waitForRebalancing(); // Partition is OWNING on grid(0) and grid(1) + // Partition is OWNING on grid(0) and grid(1) + awaitPartitionMapExchange(); for (int i = 0; i < 2; i++) { for (int j = 0; i < 500; i++) @@ -178,7 +179,7 @@ public class SlowHistoricalRebalanceSmallHistoryTest extends GridCommonAbstractT startGrid(0); - waitForRebalancing(); + awaitPartitionMapExchange(); assertEquals(2, grid(1).context().discovery().aliveServerNodes().size()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java index 4d26823..a28ec5f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java @@ -194,7 +194,7 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr grid.cluster().setBaselineTopology(grid.cluster().topologyVersion()); - waitForRebalancing(); + awaitPartitionMapExchange(); } catch (Throwable expected) { // There can be any exception. Do nothing. http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java index 78f3c03..9de2702 100644 --- a/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java @@ -120,7 +120,8 @@ public class GridMarshallerMappingConsistencyTest extends GridCommonAbstractTest c1.put(k, new DummyObject(k)); startGrid(2); - waitForRebalancing(); + + awaitPartitionMapExchange(); stopAllGrids(); http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 2c5091c..313cd71 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -688,32 +688,34 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { if (affNodesCnt != ownerNodesCnt || !affNodes.containsAll(owners) || (waitEvicts && loc != null && loc.state() != GridDhtPartitionState.OWNING)) { + if (i % 50 == 0) + LT.warn(log(), "Waiting for topology map update [" + + "igniteInstanceName=" + g.name() + + ", cache=" + cfg.getName() + + ", cacheId=" + dht.context().cacheId() + + ", topVer=" + top.readyTopologyVersion() + + ", p=" + p + + ", affNodesCnt=" + affNodesCnt + + ", ownersCnt=" + ownerNodesCnt + + ", affNodes=" + F.nodeIds(affNodes) + + ", owners=" + F.nodeIds(owners) + + ", topFut=" + topFut + + ", locNode=" + g.cluster().localNode() + ']'); + } + else + match = true; + } + else { + if (i % 50 == 0) LT.warn(log(), "Waiting for topology map update [" + "igniteInstanceName=" + g.name() + ", cache=" + cfg.getName() + ", cacheId=" + dht.context().cacheId() + ", topVer=" + top.readyTopologyVersion() + + ", started=" + dht.context().started() + ", p=" + p + - ", affNodesCnt=" + affNodesCnt + - ", ownersCnt=" + ownerNodesCnt + - ", affNodes=" + F.nodeIds(affNodes) + - ", owners=" + F.nodeIds(owners) + - ", topFut=" + topFut + + ", readVer=" + readyVer + ", locNode=" + g.cluster().localNode() + ']'); - } - else - match = true; - } - else { - LT.warn(log(), "Waiting for topology map update [" + - "igniteInstanceName=" + g.name() + - ", cache=" + cfg.getName() + - ", cacheId=" + dht.context().cacheId() + - ", topVer=" + top.readyTopologyVersion() + - ", started=" + dht.context().started() + - ", p=" + p + - ", readVer=" + readyVer + - ", locNode=" + g.cluster().localNode() + ']'); } if (!match) { @@ -998,81 +1000,6 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { } /** - * @param id Node id. - * @param major Major ver. - * @param minor Minor ver. - * @throws IgniteCheckedException If failed. - */ - protected void waitForRebalancing(int id, int major, int minor) throws IgniteCheckedException { - waitForRebalancing(grid(id), new AffinityTopologyVersion(major, minor)); - } - - /** - * @param id Node id. - * @param major Major ver. - * @throws IgniteCheckedException If failed. - */ - protected void waitForRebalancing(int id, int major) throws IgniteCheckedException { - waitForRebalancing(grid(id), new AffinityTopologyVersion(major)); - } - - /** - * @throws IgniteCheckedException If failed. - */ - protected void waitForRebalancing() throws IgniteCheckedException { - for (Ignite ignite : G.allGrids()) - waitForRebalancing((IgniteEx)ignite, null); - } - - /** - * @param ignite Node. - * @param top Topology version. - * @throws IgniteCheckedException If failed. - */ - protected void waitForRebalancing(IgniteEx ignite, AffinityTopologyVersion top) throws IgniteCheckedException { - if (ignite.configuration().isClientMode()) - return; - - boolean finished = false; - - long stopTime = System.currentTimeMillis() + 60_000; - - while (!finished && (System.currentTimeMillis() < stopTime)) { - finished = true; - - if (top == null) - top = ignite.context().discovery().topologyVersionEx(); - - for (GridCacheAdapter c : ignite.context().cache().internalCaches()) { - GridDhtPartitionDemander.RebalanceFuture fut = - (GridDhtPartitionDemander.RebalanceFuture)c.preloader().rebalanceFuture(); - - if (fut.topologyVersion() == null || fut.topologyVersion().compareTo(top) < 0) { - finished = false; - - log.info("Unexpected future version, will retry [futVer=" + fut.topologyVersion() + - ", expVer=" + top + ']'); - - U.sleep(100); - - break; - } - else if (!fut.get()) { - finished = false; - - log.warning("Rebalancing finished with missed partitions."); - - U.sleep(100); - - break; - } - } - } - - assertTrue(finished); - } - - /** * @param ignite Node. */ public void dumpCacheDebugInfo(Ignite ignite) { http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java index 55436d6..5e94052 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePut import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRabalancingDelayedPartitionMapExchangeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingCancelTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncCheckDataTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingUnmarshallingFailedSelfTest; @@ -152,6 +153,7 @@ public class IgniteCacheTestSuite3 extends TestSuite { suite.addTestSuite(GridCacheRebalancingUnmarshallingFailedSelfTest.class); suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class); suite.addTestSuite(GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.class); + suite.addTestSuite(GridCacheRebalancingCancelTest.class); // Test for byte array value special case. suite.addTestSuite(GridCacheLocalByteArrayValuesSelfTest.class);