This is an automated email from the ASF dual-hosted git repository. sk0x50 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new fde65a6 IGNITE-13193 Added fallback to full rebalance if historical one has failed. fde65a6 is described below commit fde65a6bdaa782108ea50b2bb746a9980aa5b680 Author: Slava Koptilin <slava.kopti...@gmail.com> AuthorDate: Mon Jul 6 13:14:23 2020 +0300 IGNITE-13193 Added fallback to full rebalance if historical one has failed. --- .../cache/GridCachePartitionExchangeManager.java | 310 +++++------ .../processors/cache/GridCachePreloader.java | 5 +- .../cache/GridCachePreloaderAdapter.java | 5 +- .../dht/preloader/GridDhtPartitionDemander.java | 55 +- .../dht/preloader/GridDhtPartitionSupplier.java | 91 ++-- .../preloader/GridDhtPartitionsExchangeFuture.java | 74 ++- .../dht/preloader/GridDhtPreloader.java | 27 +- ...java => IgniteHistoricalIteratorException.java} | 33 +- .../preloader/RebalanceReassignExchangeTask.java | 15 +- .../cache/persistence/GridCacheOffheapManager.java | 142 ++--- .../IgniteShutdownOnSupplyMessageFailureTest.java | 17 + .../persistence/db/wal/IgniteWalRebalanceTest.java | 581 ++++++++++++++++++++- 12 files changed, 1050 insertions(+), 305 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 15aad21..fc8bef2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1173,8 +1173,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param exchId Exchange ID. */ - public void forceReassign(GridDhtPartitionExchangeId exchId) { - exchWorker.forceReassign(exchId); + public void forceReassign(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture fut) { + exchWorker.forceReassign(exchId, fut); } /** @@ -2878,9 +2878,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** */ private AffinityTopologyVersion lastFutVer; - /** Busy flag used as performance optimization to stop current preloading. */ - private volatile boolean busy; - /** */ private boolean crd; @@ -2901,9 +2898,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param exchId Exchange ID. */ - void forceReassign(GridDhtPartitionExchangeId exchId) { - if (!hasPendingExchange() && !busy) - futQ.add(new RebalanceReassignExchangeTask(exchId)); + void forceReassign(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture fut) { + if (!hasPendingExchange()) + futQ.add(new RebalanceReassignExchangeTask(exchId, fut)); } /** @@ -3048,11 +3045,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (!futQ.isEmpty()) { for (CachePartitionExchangeWorkerTask task : futQ) { if (task instanceof GridDhtPartitionsExchangeFuture) { - // First event is enough to check, - // because only current exchange future can have multiple discovery events (exchange merge). - ClusterNode triggeredBy = ((GridDhtPartitionsExchangeFuture) task).firstEvent().eventNode(); - - if (!triggeredBy.isClient()) + if (((GridDhtPartitionsExchangeFuture)task).changedAffinity()) return true; } } @@ -3173,8 +3166,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana continue; } - busy = true; - Map<Integer, GridDhtPreloaderAssignments> assignsMap = null; boolean forcePreload = false; @@ -3185,197 +3176,220 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana AffinityTopologyVersion resVer = null; - try { - if (isCancelled()) - break; + if (isCancelled()) + break; - if (task instanceof RebalanceReassignExchangeTask) - exchId = ((RebalanceReassignExchangeTask) task).exchangeId(); - else if (task instanceof ForceRebalanceExchangeTask) { - forcePreload = true; + if (task instanceof RebalanceReassignExchangeTask) { + RebalanceReassignExchangeTask reassignTask = (RebalanceReassignExchangeTask)task; - timeout = 0; // Force refresh. + exchId = reassignTask.exchangeId(); - exchId = ((ForceRebalanceExchangeTask)task).exchangeId(); - } - else { - assert task instanceof GridDhtPartitionsExchangeFuture : task; + GridDhtPartitionsExchangeFuture fut = reassignTask.future(); - exchFut = (GridDhtPartitionsExchangeFuture)task; + assert fut.changedAffinity() : + "Reassignment request started for exchange future which didn't change affinity " + + "[exchId=" + exchId + ", fut=" + exchFut + ']'; - exchId = exchFut.exchangeId(); + if (fut.hasInapplicableNodesForRebalance()) { + GridDhtPartitionsExchangeFuture lastFut = lastFinishedFut.get(); - lastInitializedFut = exchFut; + AffinityTopologyVersion lastAffChangedVer = cctx.exchange(). + lastAffinityChangedTopologyVersion(lastFut.topologyVersion()); - boolean newCrd = false; + if (fut.topologyVersion().equals(lastAffChangedVer)) + exchFut = fut; + else if (lastAffChangedVer.after(exchId.topologyVersion())) { + // There is a new exchange which should trigger rebalancing. + // This reassignment request can be skipped. + if (log.isInfoEnabled()) { + log.info("Partitions reassignment request skipped due to affinity was already changed" + + " [reassignTopVer=" + exchId.topologyVersion() + + ", lastAffChangedTopVer=" + lastAffChangedVer + ']'); + } - if (!crd) { - List<ClusterNode> srvNodes = exchFut.firstEventCache().serverNodes(); + continue; + } + } + } + else if (task instanceof ForceRebalanceExchangeTask) { + forcePreload = true; - crd = newCrd = !srvNodes.isEmpty() && srvNodes.get(0).isLocal(); - } + timeout = 0; // Force refresh. - if (!exchFut.changedAffinity()) { - GridDhtPartitionsExchangeFuture lastFut = lastFinishedFut.get(); + exchId = ((ForceRebalanceExchangeTask)task).exchangeId(); + } + else { + assert task instanceof GridDhtPartitionsExchangeFuture : task; - if (lastFut != null) { - if (!lastFut.changedAffinity()) { - // If lastFut corresponds to merged exchange, it is essential to use - // topologyVersion() instead of initialVersion() - nodes joined in this PME - // will have DiscoCache only for the last version. - AffinityTopologyVersion lastAffVer = cctx.exchange() - .lastAffinityChangedTopologyVersion(lastFut.topologyVersion()); + exchFut = (GridDhtPartitionsExchangeFuture)task; - cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(), - lastAffVer); - } - else - cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(), - lastFut.topologyVersion()); + exchId = exchFut.exchangeId(); + + lastInitializedFut = exchFut; + + boolean newCrd = false; + + if (!crd) { + List<ClusterNode> srvNodes = exchFut.firstEventCache().serverNodes(); + + crd = newCrd = !srvNodes.isEmpty() && srvNodes.get(0).isLocal(); + } + + if (!exchFut.changedAffinity()) { + GridDhtPartitionsExchangeFuture lastFut = lastFinishedFut.get(); + + if (lastFut != null) { + if (!lastFut.changedAffinity()) { + // If lastFut corresponds to merged exchange, it is essential to use + // topologyVersion() instead of initialVersion() - nodes joined in this PME + // will have DiscoCache only for the last version. + AffinityTopologyVersion lastAffVer = cctx.exchange() + .lastAffinityChangedTopologyVersion(lastFut.topologyVersion()); + + cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(), + lastAffVer); } + else + cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(), + lastFut.topologyVersion()); } + } - exchFut.timeBag().finishGlobalStage("Waiting in exchange queue"); + exchFut.timeBag().finishGlobalStage("Waiting in exchange queue"); - exchFut.init(newCrd); + exchFut.init(newCrd); - int dumpCnt = 0; + int dumpCnt = 0; - long waitStartNanos = System.nanoTime(); + long waitStartNanos = System.nanoTime(); - // Call rollback logic only for client node, for server nodes - // rollback logic is in GridDhtPartitionsExchangeFuture. - boolean txRolledBack = !cctx.localNode().isClient(); + // Call rollback logic only for client node, for server nodes + // rollback logic is in GridDhtPartitionsExchangeFuture. + boolean txRolledBack = !cctx.localNode().isClient(); - IgniteConfiguration cfg = cctx.gridConfig(); + IgniteConfiguration cfg = cctx.gridConfig(); - final long dumpTimeout = 2 * cfg.getNetworkTimeout(); + final long dumpTimeout = 2 * cfg.getNetworkTimeout(); - long nextDumpTime = 0; + long nextDumpTime = 0; - while (true) { - // Read txTimeoutOnPME from configuration after every iteration. - long curTimeout = cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange(); + while (true) { + // Read txTimeoutOnPME from configuration after every iteration. + long curTimeout = cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange(); + + try { + long exchTimeout = curTimeout > 0 && !txRolledBack + ? Math.min(curTimeout, dumpTimeout) + : dumpTimeout; + + blockingSectionBegin(); try { - long exchTimeout = curTimeout > 0 && !txRolledBack - ? Math.min(curTimeout, dumpTimeout) - : dumpTimeout; + resVer = exchFut.get(exchTimeout, TimeUnit.MILLISECONDS); + } finally { + blockingSectionEnd(); + } - blockingSectionBegin(); + onIdle(); + + break; + } + catch (IgniteFutureTimeoutCheckedException ignored) { + updateHeartbeat(); + + if (nextDumpTime <= U.currentTimeMillis()) { + U.warn(diagnosticLog, "Failed to wait for partition map exchange [" + + "topVer=" + exchFut.initialVersion() + + ", node=" + cctx.localNodeId() + "]. " + + (curTimeout <= 0 && !txRolledBack ? "Consider changing " + + "TransactionConfiguration.txTimeoutOnPartitionMapExchange" + + " to non default value to avoid this message. " : "") + + "Dumping pending objects that might be the cause: "); try { - resVer = exchFut.get(exchTimeout, TimeUnit.MILLISECONDS); - } finally { - blockingSectionEnd(); + dumpDebugInfo(exchFut); } - - onIdle(); - - break; - } - catch (IgniteFutureTimeoutCheckedException ignored) { - updateHeartbeat(); - - if (nextDumpTime <= U.currentTimeMillis()) { - U.warn(diagnosticLog, "Failed to wait for partition map exchange [" + - "topVer=" + exchFut.initialVersion() + - ", node=" + cctx.localNodeId() + "]. " + - (curTimeout <= 0 && !txRolledBack ? "Consider changing " + - "TransactionConfiguration.txTimeoutOnPartitionMapExchange" + - " to non default value to avoid this message. " : "") + - "Dumping pending objects that might be the cause: "); - - try { - dumpDebugInfo(exchFut); - } - catch (Exception e) { - U.error(diagnosticLog, "Failed to dump debug information: " + e, e); - } - - nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, dumpTimeout); + catch (Exception e) { + U.error(diagnosticLog, "Failed to dump debug information: " + e, e); } - long passedMillis = U.millisSinceNanos(waitStartNanos); + nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, dumpTimeout); + } - if (!txRolledBack && curTimeout > 0 && passedMillis >= curTimeout) { - txRolledBack = true; // Try automatic rollback only once. + long passedMillis = U.millisSinceNanos(waitStartNanos); - cctx.tm().rollbackOnTopologyChange(exchFut.initialVersion()); - } - } - catch (Exception e) { - if (exchFut.reconnectOnError(e)) - throw new IgniteNeedReconnectException(cctx.localNode(), e); + if (!txRolledBack && curTimeout > 0 && passedMillis >= curTimeout) { + txRolledBack = true; // Try automatic rollback only once. - throw e; + cctx.tm().rollbackOnTopologyChange(exchFut.initialVersion()); } } + catch (Exception e) { + if (exchFut.reconnectOnError(e)) + throw new IgniteNeedReconnectException(cctx.localNode(), e); - removeMergedFutures(resVer, exchFut); + throw e; + } + } - if (log.isTraceEnabled()) - log.trace("After waiting for exchange future [exchFut=" + exchFut + ", worker=" + - this + ']'); + removeMergedFutures(resVer, exchFut); - if (exchFut.exchangeId().nodeId().equals(cctx.localNodeId())) - lastRefresh.compareAndSet(-1, U.currentTimeMillis()); + if (log.isTraceEnabled()) + log.trace("After waiting for exchange future [exchFut=" + exchFut + ", worker=" + + this + ']'); - // Just pick first worker to do this, so we don't - // invoke topology callback more than once for the - // same event. + if (exchFut.exchangeId().nodeId().equals(cctx.localNodeId())) + lastRefresh.compareAndSet(-1, U.currentTimeMillis()); - boolean changed = false; + // Just pick first worker to do this, so we don't + // invoke topology callback more than once for the + // same event. + boolean changed = false; - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal()) - continue; + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) + continue; - changed |= grp.topology().afterExchange(exchFut); - } + changed |= grp.topology().afterExchange(exchFut); + } - if (!cctx.kernalContext().clientNode() && changed) { - if (log.isDebugEnabled()) - log.debug("Refresh partitions due to mapping was changed"); + if (!cctx.kernalContext().clientNode() && changed) { + if (log.isDebugEnabled()) + log.debug("Refresh partitions due to mapping was changed"); - refreshPartitions(); - } + refreshPartitions(); } + } - if (rebalanceRequired(exchFut)) { - if (rebalanceDelay > 0) - U.sleep(rebalanceDelay); + if (rebalanceRequired(exchFut)) { + if (rebalanceDelay > 0) + U.sleep(rebalanceDelay); - assignsMap = new HashMap<>(); + assignsMap = new HashMap<>(); - IgniteCacheSnapshotManager snp = cctx.snapshot(); + IgniteCacheSnapshotManager snp = cctx.snapshot(); - for (final CacheGroupContext grp : cctx.cache().cacheGroups()) { - long delay = grp.config().getRebalanceDelay(); + for (final CacheGroupContext grp : cctx.cache().cacheGroups()) { + long delay = grp.config().getRebalanceDelay(); - boolean disableRebalance = snp.partitionsAreFrozen(grp); + boolean disableRebalance = snp.partitionsAreFrozen(grp); - GridDhtPreloaderAssignments assigns = null; + GridDhtPreloaderAssignments assigns = null; - // Don't delay for dummy reassigns to avoid infinite recursion. - if ((delay == 0 || forcePreload) && !disableRebalance) - assigns = grp.preloader().generateAssignments(exchId, exchFut); + // Don't delay for dummy reassigns to avoid infinite recursion. + if ((delay == 0 || forcePreload) && !disableRebalance) + assigns = grp.preloader().generateAssignments(exchId, exchFut); - assignsMap.put(grp.groupId(), assigns); + assignsMap.put(grp.groupId(), assigns); - if (resVer == null && !grp.isLocal()) - resVer = grp.topology().readyTopologyVersion(); - } + if (resVer == null && !grp.isLocal()) + resVer = grp.topology().readyTopologyVersion(); } - - if (resVer == null) - resVer = exchId.topologyVersion(); - } - finally { - // Must flip busy flag before assignments are given to demand workers. - busy = false; } + if (resVer == null) + resVer = exchId.topologyVersion(); + if (!F.isEmpty(assignsMap)) { int size = assignsMap.size(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index c70f86b..3422a16 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -76,8 +76,9 @@ public interface GridCachePreloader { * @param exchFut Completed exchange future. Can be {@code null} if forced or reassigned generation occurs. * @return Partition assignments which will be requested from supplier nodes. */ - @Nullable public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId, - @Nullable GridDhtPartitionsExchangeFuture exchFut); + @Nullable public GridDhtPreloaderAssignments generateAssignments( + GridDhtPartitionExchangeId exchId, + @Nullable GridDhtPartitionsExchangeFuture exchFut); /** * Adds assignments to preloader. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index b382ba4..a98d192 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -144,8 +144,9 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId, - GridDhtPartitionsExchangeFuture exchFut) { + @Override public GridDhtPreloaderAssignments generateAssignments( + GridDhtPartitionExchangeId exchId, + GridDhtPartitionsExchangeFuture exchFut) { return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 5277085..514dd1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -327,7 +327,7 @@ public class GridDhtPartitionDemander { return null; } - final RebalanceFuture fut = new RebalanceFuture(grp, assignments, log, rebalanceId, next, lastCancelledTime); + final RebalanceFuture fut = new RebalanceFuture(grp, lastExchangeFut, assignments, log, rebalanceId, next, lastCancelledTime); if (!grp.localWalEnabled()) { fut.listen(new IgniteInClosureX<IgniteInternalFuture<Boolean>>() { @@ -482,7 +482,7 @@ public class GridDhtPartitionDemander { if (node == null) { if (log.isDebugEnabled()) - log.debug("Supply message ignored (supplier has left cluster) [" + demandRoutineInfo(nodeId, supplyMsg) + "]"); + log.debug("Supply message ignored (supplier has left cluster) [" + demandRoutineInfo(nodeId, supplyMsg) + ']'); return; } @@ -490,13 +490,13 @@ public class GridDhtPartitionDemander { // Topology already changed (for the future that supply message based on). if (!fut.isActual(supplyMsg.rebalanceId())) { if (log.isDebugEnabled()) - log.debug("Supply message ignored (topology changed) [" + demandRoutineInfo(nodeId, supplyMsg) + "]"); + log.debug("Supply message ignored (topology changed) [" + demandRoutineInfo(nodeId, supplyMsg) + ']'); return; } if (log.isDebugEnabled()) - log.debug("Received supply message [" + demandRoutineInfo(nodeId, supplyMsg) + "]"); + log.debug("Received supply message [" + demandRoutineInfo(nodeId, supplyMsg) + ']'); // Check whether there were error during supply message unmarshalling process. if (supplyMsg.classError() != null) { @@ -616,7 +616,7 @@ public class GridDhtPartitionDemander { if (log.isDebugEnabled()) log.debug("Skipping rebalancing partition (state is not MOVING): " + - "[" + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + "]"); + '[' + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + ']'); } } else { @@ -624,7 +624,7 @@ public class GridDhtPartitionDemander { if (log.isDebugEnabled()) log.debug("Skipping rebalancing partition (affinity changed): " + - "[" + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + "]"); + '[' + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + ']'); } } @@ -662,7 +662,7 @@ public class GridDhtPartitionDemander { else { if (log.isDebugEnabled()) log.debug("Will not request next demand message [" + demandRoutineInfo(nodeId, supplyMsg) + - ", rebalanceFuture=" + fut + "]"); + ", rebalanceFuture=" + fut + ']'); } } catch (IgniteSpiException | IgniteCheckedException e) { @@ -985,14 +985,14 @@ public class GridDhtPartitionDemander { * Internal states of rebalance future. */ private enum RebalanceFutureState { - /** Init. */ + /** Initial state. */ INIT, - /** Started. */ + /** Rebalance future started and requested required partitions. */ STARTED, - /** Marked as cancelled. */ - MARK_CANCELLED, + /** Marked as cancelled. This means partitions will not be requested. */ + MARK_CANCELLED } /** @@ -1018,13 +1018,17 @@ public class GridDhtPartitionDemander { /** Remaining. */ private final Map<UUID, IgniteDhtDemandedPartitionsMap> remaining = new HashMap<>(); - /** Missed. */ + /** Collection of missed partitions and partitions that could not be rebalanced from a supplier. */ private final Map<UUID, Collection<Integer>> missed = new HashMap<>(); /** Exchange ID. */ @GridToStringExclude private final GridDhtPartitionExchangeId exchId; + /** Coresponding exchange future. */ + @GridToStringExclude + private final GridDhtPartitionsExchangeFuture exchFut; + /** Topology version. */ private final AffinityTopologyVersion topVer; @@ -1076,7 +1080,10 @@ public class GridDhtPartitionDemander { private final Map<ClusterNode, Set<Integer>> rebalancingParts; /** - * @param grp Cache group. + * Creates a new rebalance future. + * + * @param grp Cache group context. + * @param exchFut Exchange future. * @param assignments Assignments. * @param log Logger. * @param rebalanceId Rebalance id. @@ -1085,17 +1092,21 @@ public class GridDhtPartitionDemander { */ RebalanceFuture( CacheGroupContext grp, + GridDhtPartitionsExchangeFuture exchFut, GridDhtPreloaderAssignments assignments, IgniteLogger log, long rebalanceId, RebalanceFuture next, - AtomicLong lastCancelledTime) { + AtomicLong lastCancelledTime + ) { assert assignments != null; + assert assignments != null : "Asiignments must not be null."; this.rebalancingParts = U.newHashMap(assignments.size()); this.assignments = assignments; exchId = assignments.exchangeId(); topVer = assignments.topologyVersion(); + this.exchFut = exchFut; this.next = next; this.lastCancelledTime = lastCancelledTime; @@ -1142,6 +1153,7 @@ public class GridDhtPartitionDemander { this.assignments = null; this.exchId = null; this.topVer = null; + this.exchFut = null; this.ctx = null; this.grp = null; this.log = null; @@ -1476,6 +1488,19 @@ public class GridDhtPartitionDemander { if (isDone()) return; + IgniteDhtDemandedPartitionsMap parts = remaining.get(nodeId); + + assert parts != null : "Remaining not found [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId + + ", part=" + p + "]"; + + if (parts.historicalMap().contains(p)) { + // The partition p cannot be wal rebalanced, + // let's exclude the given nodeId and give a try to full rebalance. + exchFut.markNodeAsInapplicableForHistoricalRebalance(nodeId); + } + else + exchFut.markNodeAsInapplicableForFullRebalance(nodeId, grp.groupId(), p); + missed.computeIfAbsent(nodeId, k -> new HashSet<>()); missed.get(nodeId).add(p); @@ -1611,7 +1636,7 @@ public class GridDhtPartitionDemander { onDone(false); // Finished but has missed partitions, will force dummy exchange - ctx.exchange().forceReassign(exchId); + ctx.exchange().forceReassign(exchId, exchFut); return; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 8dfb7c2..d054b05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -18,10 +18,12 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -47,7 +49,9 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware; import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -214,6 +218,15 @@ class GridDhtPartitionSupplier { SupplyContext sctx = null; + Set<Integer> remainingParts = null; + + GridDhtPartitionSupplyMessage supplyMsg = new GridDhtPartitionSupplyMessage( + demandMsg.rebalanceId(), + grp.groupId(), + demandMsg.topologyVersion(), + grp.deploymentEnabled() + ); + try { synchronized (scMap) { sctx = scMap.remove(contextId); @@ -257,15 +270,6 @@ class GridDhtPartitionSupplier { else maxBatchesCnt = 1; - GridDhtPartitionSupplyMessage supplyMsg = new GridDhtPartitionSupplyMessage( - demandMsg.rebalanceId(), - grp.groupId(), - demandMsg.topologyVersion(), - grp.deploymentEnabled() - ); - - Set<Integer> remainingParts; - if (sctx == null || sctx.iterator == null) { iter = grp.offheap().rebalanceIterator(demandMsg.partitions(), demandMsg.topologyVersion()); @@ -455,42 +459,65 @@ class GridDhtPartitionSupplier { } else U.error(log, "Failed to continue supplying [" - + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t); + + supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t); try { if (sctx != null) clearContext(sctx, log); - else if (iter != null) - iter.close(); } catch (Throwable t1) { U.error(log, "Failed to cleanup supplying context [" - + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t1); + + supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t1); } if (!sendErrMsg) return; + boolean fallbackToFullRebalance = X.hasCause(t, IgniteHistoricalIteratorException.class); + try { - GridDhtPartitionSupplyMessageV2 errMsg = new GridDhtPartitionSupplyMessageV2( - demandMsg.rebalanceId(), - grp.groupId(), - demandMsg.topologyVersion(), - grp.deploymentEnabled(), - t - ); + GridDhtPartitionSupplyMessage errMsg; + + if (fallbackToFullRebalance) { + // Mark the last checkpoint as not applicable for WAL rebalance. + grp.shared().database().lastCheckpointInapplicableForWalRebalance(grp.groupId()); + + // Mark all remaining partitions as missed to trigger full rebalance. + if (iter == null && F.isEmpty(remainingParts)) { + remainingParts = new HashSet<>(demandMsg.partitions().fullSet()); + remainingParts.addAll(demandMsg.partitions().historicalSet()); + } + + for (int p : Optional.ofNullable(remainingParts).orElseGet(Collections::emptySet)) + supplyMsg.missed(p); + + errMsg = supplyMsg; + } + else { + errMsg = new GridDhtPartitionSupplyMessageV2( + demandMsg.rebalanceId(), + grp.groupId(), + demandMsg.topologyVersion(), + grp.deploymentEnabled(), + t + ); + } reply(topicId, demanderNode, demandMsg, errMsg, contextId); } catch (Throwable t1) { U.error(log, "Failed to send supply error message [" - + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t1); + + supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t1); } - grp.shared().kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, - new IgniteCheckedException("Failed to continue supplying [" - + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t) - )); + // If fallback to full rebalance is possible then let's try to switch to it + // instead of triggering failure handler. + if (!fallbackToFullRebalance) { + grp.shared().kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, + new IgniteCheckedException("Failed to continue supplying [" + + supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t) + )); + } } } @@ -537,7 +564,7 @@ class GridDhtPartitionSupplier { * @param demander Recipient of supply message. * @param demandMsg Demand message. * @param supplyMsg Supply message. - * @param contextId Supply context id. + * @param ctxId Supply context id. * @return {@code True} if message was sent, {@code false} if recipient left grid. * @throws IgniteCheckedException If failed. */ @@ -546,7 +573,7 @@ class GridDhtPartitionSupplier { ClusterNode demander, GridDhtPartitionDemandMessage demandMsg, GridDhtPartitionSupplyMessage supplyMsg, - T3<UUID, Integer, AffinityTopologyVersion> contextId + T3<UUID, Integer, AffinityTopologyVersion> ctxId ) throws IgniteCheckedException { try { if (log.isDebugEnabled()) @@ -567,7 +594,7 @@ class GridDhtPartitionSupplier { log.debug("Failed to send supply message (demander left): [" + supplyRoutineInfo(topicId, demander.id(), demandMsg) + "]"); synchronized (scMap) { - clearContext(scMap.remove(contextId), log); + clearContext(scMap.remove(ctxId), log); } return false; @@ -588,21 +615,21 @@ class GridDhtPartitionSupplier { /** * Saves supply context with given parameters to {@code scMap}. * - * @param contextId Supply context id. + * @param ctxId Supply context id. * @param entryIt Entries rebalance iterator. * @param remainingParts Set of partitions that weren't sent yet. * @param rebalanceId Rebalance id. */ private void saveSupplyContext( - T3<UUID, Integer, AffinityTopologyVersion> contextId, + T3<UUID, Integer, AffinityTopologyVersion> ctxId, IgniteRebalanceIterator entryIt, Set<Integer> remainingParts, long rebalanceId ) { synchronized (scMap) { - assert scMap.get(contextId) == null; + assert scMap.get(ctxId) == null; - scMap.put(contextId, new SupplyContext(entryIt, remainingParts, rebalanceId)); + scMap.put(ctxId, new SupplyContext(entryIt, remainingParts, rebalanceId)); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index c940813..a34a098 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -313,6 +313,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte @GridToStringExclude private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap(); + /** Set of nodes that cannot be used for wal rebalancing due to some reason. */ + private Set<UUID> exclusionsFromHistoricalRebalance = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + /** + * Set of nodes that cannot be used for full rebalancing due missed partitions. + * Mapping pair of groupId and nodeId to set of partitions. + */ + private Map<T2<Integer, UUID>, Set<Integer>> exclusionsFromFullRebalance = new ConcurrentHashMap<>(); + /** Reserved max available history for calculation of history supplier on coordinator. */ private volatile Map<Integer /** Group. */, Map<Integer /** Partition */, Long /** Counter. */>> partHistReserved; @@ -537,7 +546,68 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @return List of IDs of history supplier nodes or empty list if these doesn't exist. */ @Nullable public List<UUID> partitionHistorySupplier(int grpId, int partId, long cntrSince) { - return partHistSuppliers.getSupplier(grpId, partId, cntrSince); + List<UUID> histSuppliers = partHistSuppliers.getSupplier(grpId, partId, cntrSince); + + return histSuppliers.stream().filter((supplier) -> !exclusionsFromHistoricalRebalance.contains(supplier)) + .collect(Collectors.toList()); + } + + /** + * Marks the given node as not applicable for historical rebalancing. + * + * @param nodeId Node id that should not be used for wal rebalancing (aka historical supplier). + */ + public void markNodeAsInapplicableForHistoricalRebalance(UUID nodeId) { + exclusionsFromHistoricalRebalance.add(nodeId); + } + + /** + * Marks the given node as not applicable for full rebalancing + * for the given group and partition. + * + * @param nodeId Node id that should not be used for full rebalancing. + * @param grpId Cache group id. + * @param p Partition id. + */ + public void markNodeAsInapplicableForFullRebalance(UUID nodeId, int grpId, int p) { + Set<Integer> parts = exclusionsFromFullRebalance.computeIfAbsent(new T2<>(grpId, nodeId), t2 -> + Collections.newSetFromMap(new ConcurrentHashMap<>()) + ); + + parts.add(p); + } + + /** + * @return {@code true} if there are nodes which are inapplicable for historical rebalancing. + */ + public boolean hasInapplicableNodesForHistoricalRebalance() { + return !exclusionsFromHistoricalRebalance.isEmpty(); + } + + /** + * @return {@code true} if there are nodes which are inapplicable for full rebalancing. + */ + public boolean hasInapplicableNodesForFullRebalance() { + return !exclusionsFromFullRebalance.isEmpty(); + } + + /** + * @return {@code true} if there are nodes which are inapplicable for rebalancing. + */ + public boolean hasInapplicableNodesForRebalance() { + return hasInapplicableNodesForHistoricalRebalance() || hasInapplicableNodesForFullRebalance(); + } + + /** + * @param nodeId Node id to check. + * @param grpId Cache group id. + * @param p Partition id. + * @return {@code true} if the node is applicable for full rebalancing. + */ + public boolean isNodeApplicableForFullRebalance(UUID nodeId, int grpId, int p) { + return Optional.ofNullable(exclusionsFromFullRebalance.get(new T2<>(grpId, nodeId))) + .map(s -> !s.contains(p)) + .orElse(true); } /** @@ -2696,6 +2766,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte newCrdFut = null; exchangeLocE = null; exchangeGlobalExceptions.clear(); + exclusionsFromHistoricalRebalance.clear(); + exclusionsFromFullRebalance.clear(); if (finishState != null) finishState.cleanUp(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 23ff455..bdaae9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION; @@ -186,7 +187,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { AffinityTopologyVersion topVer = top.readyTopologyVersion(); - assert exchFut == null || exchFut.context().events().topologyVersion().equals(top.readyTopologyVersion()) : + assert exchFut == null || + exchFut.context().events().topologyVersion().equals(top.readyTopologyVersion()) || + exchFut.context().events().topologyVersion().equals(ctx.exchange().lastAffinityChangedTopologyVersion(top.readyTopologyVersion())) : "Topology version mismatch [exchId=" + exchId + ", grp=" + grp.name() + ", topVer=" + top.readyTopologyVersion() + ']'; @@ -275,7 +278,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { addHistorical(p, part.initialUpdateCounter(), countersMap.updateCounter(p), partitions); } else { - List<ClusterNode> picked = remoteOwners(p, topVer); + int partId = p; + List<ClusterNode> picked = remoteOwners(p, topVer, node -> { + if (exchFut != null && !exchFut.isNodeApplicableForFullRebalance(node.id(), grp.groupId(), partId)) + return false; + + return true; + }); if (!picked.isEmpty()) { ClusterNode n = picked.get(p % picked.size()); @@ -319,12 +328,24 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { * @return Nodes owning this partition. */ private List<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) { + return remoteOwners(p, topVer, node -> true); + } + + /** + * Returns remote owners (excluding local node) for specified partition {@code p} + * which is additionally filtered by the specified predicate. + * + * @param p Partition. + * @param topVer Topology version. + * @return Nodes owning this partition. + */ + private List<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer, IgnitePredicate<ClusterNode> pred) { List<ClusterNode> owners = grp.topology().owners(p, topVer); List<ClusterNode> res = new ArrayList<>(owners.size()); for (ClusterNode owner : owners) { - if (!owner.id().equals(ctx.localNodeId())) + if (!owner.id().equals(ctx.localNodeId()) && pred.apply(owner)) res.add(owner); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteHistoricalIteratorException.java similarity index 58% copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteHistoricalIteratorException.java index 7e473be..5b641b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteHistoricalIteratorException.java @@ -13,37 +13,36 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; -import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; +import org.apache.ignite.IgniteException; /** - * + * Thrown when {@link IgniteHistoricalIterator} cannot iterate over WAL for some reason. */ -public class RebalanceReassignExchangeTask implements CachePartitionExchangeWorkerTask { +public class IgniteHistoricalIteratorException extends IgniteException { /** */ - private final GridDhtPartitionExchangeId exchId; + private static final long serialVersionUID = 0L; /** - * @param exchId Exchange ID. + * Creates a new exception with the specified cause. + * + * @param cause Cause. */ - public RebalanceReassignExchangeTask(GridDhtPartitionExchangeId exchId) { - assert exchId != null; - - this.exchId = exchId; - } - - /** {@inheritDoc} */ - @Override public boolean skipForExchangeMerge() { - return true; + public IgniteHistoricalIteratorException(Throwable cause) { + super(cause); } /** - * @return Exchange ID. + * Creates a new exception with the specified message and cause. + * + * @param msg Detail message. + * @param cause Cause. */ - public GridDhtPartitionExchangeId exchangeId() { - return exchId; + public IgniteHistoricalIteratorException(String msg, Throwable cause) { + super(msg, cause); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java index 7e473be..5cffcb7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java @@ -26,13 +26,19 @@ public class RebalanceReassignExchangeTask implements CachePartitionExchangeWork /** */ private final GridDhtPartitionExchangeId exchId; + /** */ + private final GridDhtPartitionsExchangeFuture exchFut; + /** * @param exchId Exchange ID. + * @param exchFut Exchange future. */ - public RebalanceReassignExchangeTask(GridDhtPartitionExchangeId exchId) { + public RebalanceReassignExchangeTask(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut) { assert exchId != null; + assert exchFut != null; this.exchId = exchId; + this.exchFut = exchFut; } /** {@inheritDoc} */ @@ -46,4 +52,11 @@ public class RebalanceReassignExchangeTask implements CachePartitionExchangeWork public GridDhtPartitionExchangeId exchangeId() { return exchId; } + + /** + * @return Exchange future. + */ + public GridDhtPartitionsExchangeFuture future() { + return exchFut; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 4f325a2..aa0520c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIteratorException; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; @@ -105,6 +106,7 @@ import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.lang.IgnitePredicateX; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -1002,7 +1004,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override @Nullable protected IgniteHistoricalIterator historicalIterator( - CachePartitionPartialCountersMap partCntrs, Set<Integer> missing) throws IgniteCheckedException { + CachePartitionPartialCountersMap partCntrs, + Set<Integer> missing + ) throws IgniteCheckedException { if (partCntrs == null || partCntrs.isEmpty()) return null; @@ -1022,14 +1026,22 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple FileWALPointer minPtr = (FileWALPointer)database.checkpointHistory().searchEarliestWalPointer(grp.groupId(), partsCounters); - WALIterator it = grp.shared().wal().replay(minPtr); + try { + WALIterator it = grp.shared().wal().replay(minPtr); + + WALHistoricalIterator histIt = new WALHistoricalIterator(log, grp, partCntrs, it); - WALHistoricalIterator iterator = new WALHistoricalIterator(log, grp, partCntrs, it); + // Add historical partitions which are unabled to reserve to missing set. + missing.addAll(histIt.missingParts); - // Add historical partitions which are unabled to reserve to missing set. - missing.addAll(iterator.missingParts); + return histIt; + } + catch (Exception ex) { + if (!X.hasCause(ex, IgniteHistoricalIteratorException.class)) + throw new IgniteHistoricalIteratorException(ex); - return iterator; + throw ex; + } } /** {@inheritDoc} */ @@ -1349,92 +1361,98 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple * */ private void advance() { - next = null; + try { + next = null; - outer: while (doneParts.size() != partMap.size()) { - if (entryIt != null) { - while (entryIt.hasNext()) { - DataEntry entry = entryIt.next(); + outer: + while (doneParts.size() != partMap.size()) { + if (entryIt != null) { + while (entryIt.hasNext()) { + DataEntry entry = entryIt.next(); - if (cacheIds.contains(entry.cacheId())) { - int idx = partMap.partitionIndex(entry.partitionId()); + if (cacheIds.contains(entry.cacheId())) { + int idx = partMap.partitionIndex(entry.partitionId()); - if (idx < 0 || missingParts.contains(idx)) - continue; + if (idx < 0 || missingParts.contains(idx)) + continue; - long from = partMap.initialUpdateCounterAt(idx); - long to = partMap.updateCounterAt(idx); + long from = partMap.initialUpdateCounterAt(idx); + long to = partMap.updateCounterAt(idx); - if (entry.partitionCounter() > from && entry.partitionCounter() <= to) { - // Partition will be marked as done for current entry on next iteration. - if (++rebalancedCntrs[idx] == to) - donePart = entry.partitionId(); + if (entry.partitionCounter() > from && entry.partitionCounter() <= to) { + // Partition will be marked as done for current entry on next iteration. + if (++rebalancedCntrs[idx] == to) + donePart = entry.partitionId(); - next = entry; + next = entry; - return; + return; + } } } } - } - entryIt = null; + entryIt = null; - // Search for next DataEntry while applying rollback counters. - while (walIt.hasNext()) { - IgniteBiTuple<WALPointer, WALRecord> rec = walIt.next(); + // Search for next DataEntry while applying rollback counters. + while (walIt.hasNext()) { + IgniteBiTuple<WALPointer, WALRecord> rec = walIt.next(); - if (rec.get2() instanceof DataRecord) { - DataRecord data = (DataRecord)rec.get2(); + if (rec.get2() instanceof DataRecord) { + DataRecord data = (DataRecord)rec.get2(); - entryIt = data.writeEntries().iterator(); + entryIt = data.writeEntries().iterator(); - // Move on to the next valid data entry. - continue outer; - } - else if (rec.get2() instanceof RollbackRecord) { - RollbackRecord rbRec = (RollbackRecord)rec.get2(); + // Move on to the next valid data entry. + continue outer; + } + else if (rec.get2() instanceof RollbackRecord) { + RollbackRecord rbRec = (RollbackRecord)rec.get2(); - if (grp.groupId() == rbRec.groupId()) { - int idx = partMap.partitionIndex(rbRec.partitionId()); + if (grp.groupId() == rbRec.groupId()) { + int idx = partMap.partitionIndex(rbRec.partitionId()); - if (idx < 0 || missingParts.contains(idx)) - continue; + if (idx < 0 || missingParts.contains(idx)) + continue; - long from = partMap.initialUpdateCounterAt(idx); - long to = partMap.updateCounterAt(idx); + long from = partMap.initialUpdateCounterAt(idx); + long to = partMap.updateCounterAt(idx); - rebalancedCntrs[idx] += rbRec.overlap(from, to); + rebalancedCntrs[idx] += rbRec.overlap(from, to); - if (rebalancedCntrs[idx] == partMap.updateCounterAt(idx)) { - if (log.isDebugEnabled()) { - log.debug("Partition done [grpId=" + grp.groupId() + - ", partId=" + donePart + - ", from=" + from + - ", to=" + to + ']'); - } + if (rebalancedCntrs[idx] == partMap.updateCounterAt(idx)) { + if (log.isDebugEnabled()) { + log.debug("Partition done [grpId=" + grp.groupId() + + ", partId=" + donePart + + ", from=" + from + + ", to=" + to + ']'); + } - doneParts.add(rbRec.partitionId()); // Add to done set immediately. + doneParts.add(rbRec.partitionId()); // Add to done set immediately. + } } } } - } - if (entryIt == null && doneParts.size() != partMap.size()) { - for (int i = 0; i < partMap.size(); i++) { - int p = partMap.partitionAt(i); + if (entryIt == null && doneParts.size() != partMap.size()) { + for (int i = 0; i < partMap.size(); i++) { + int p = partMap.partitionAt(i); - if (!doneParts.contains(p)) { - log.warning("Some partition entries were missed during historical rebalance [grp=" + grp + ", part=" + p + ", missed=" + - (partMap.updateCounterAt(i) - rebalancedCntrs[i]) + ']'); + if (!doneParts.contains(p)) { + log.warning("Some partition entries were missed during historical rebalance [grp=" + grp + ", part=" + p + ", missed=" + + (partMap.updateCounterAt(i) - rebalancedCntrs[i]) + ']'); - doneParts.add(p); - } - } + doneParts.add(p); + } + } - return; + return; + } } } + catch (Exception ex) { + throw new IgniteHistoricalIteratorException(ex); + } } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java index 2bc7fc7..2452662 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java @@ -35,6 +35,7 @@ import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.StopNodeFailureHandler; import org.apache.ignite.internal.IgniteEx; @@ -44,10 +45,13 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecora import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_SUPPLIED; + /** */ public class IgniteShutdownOnSupplyMessageFailureTest extends GridCommonAbstractTest { /** Rebalance cache name. */ @@ -84,6 +88,8 @@ public class IgniteShutdownOnSupplyMessageFailureTest extends GridCommonAbstract if (name.equals(getTestIgniteInstanceName(NODE_NAME_WITH_TEST_FILE_FACTORY))) { conf.setFileIOFactory(new FailingFileIOFactory(canFailFirstNode)); + cfg.setIncludeEventTypes(EVT_CACHE_REBALANCE_PART_SUPPLIED); + cfg.setFailureHandler(new TestFailureHandler()); } else @@ -132,8 +138,19 @@ public class IgniteShutdownOnSupplyMessageFailureTest extends GridCommonAbstract populateCache(ig, TEST_REBALANCE_CACHE, 3_000, 6_000); + // Breaks historical rebalance. The second node will try to switch to full rebalance. canFailFirstNode.set(true); + // Break full rebalance. + IgnitePredicate<CacheRebalancingEvent> locLsnr = evt -> { + if (TEST_REBALANCE_CACHE.equals(evt.cacheName())) + throw new AssertionError(new IOException("Test crash")); + + return true; + }; + + ig.events().localListen(locLsnr, EVT_CACHE_REBALANCE_PART_SUPPLIED); + startGrid(1); WAIT_ON_SUPPLY_MESSAGE_FAILURE.await(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java index 02bbf6d..191be72 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java @@ -19,20 +19,28 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal; import java.io.File; import java.io.IOException; +import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.file.OpenOption; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilter; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cluster.ClusterNode; @@ -41,31 +49,48 @@ import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.failure.StopNodeFailureHandler; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.pagemem.wal.record.DataEntry; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCachePreloader; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.WalTestUtils; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Assert; import org.junit.Test; +import static java.util.stream.Collectors.toList; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD; /** @@ -79,11 +104,17 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { private static final int PARTS_CNT = 32; /** Block message predicate to set to Communication SPI in node configuration. */ - private IgniteBiPredicate<ClusterNode, Message> blockMessagePredicate; + private IgniteBiPredicate<ClusterNode, Message> blockMsgPred; + + /** Record message predicate to set to Communication SPI in node configuration. */ + private IgniteBiPredicate<ClusterNode, Message> recordMsgPred; /** */ private int backups; + /** User attributes. */ + private Map<String, Serializable> userAttrs = new HashMap<>(); + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { System.setProperty(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); //to make all rebalance wal-based @@ -116,11 +147,15 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { cfg.setCommunicationSpi(new WalRebalanceCheckingCommunicationSpi()); - if (blockMessagePredicate != null) { - TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi) cfg.getCommunicationSpi(); + if (blockMsgPred != null) + ((TestRecordingCommunicationSpi) cfg.getCommunicationSpi()).blockMessages(blockMsgPred); - spi.blockMessages(blockMessagePredicate); - } + if (recordMsgPred != null) + ((TestRecordingCommunicationSpi) cfg.getCommunicationSpi()).record(recordMsgPred); + + cfg.setFailureHandler(new StopNodeFailureHandler()); + cfg.setConsistentId(gridName); + cfg.setUserAttributes(userAttrs); return cfg; } @@ -344,7 +379,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { backups = 4; // Prepare some data. - IgniteEx crd = (IgniteEx) startGrids(3); + IgniteEx crd = startGrids(3); crd.cluster().active(true); @@ -362,7 +397,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { stopAllGrids(); // Rewrite data with globally disabled WAL. - crd = (IgniteEx) startGrids(2); + crd = startGrids(2); crd.cluster().active(true); @@ -426,7 +461,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { backups = 4; // Prepare some data. - IgniteEx crd = (IgniteEx) startGrids(3); + IgniteEx crd = startGrids(3); crd.cluster().active(true); @@ -444,7 +479,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { stopAllGrids(); // Rewrite data to trigger further rebalance. - IgniteEx supplierNode = (IgniteEx) startGrid(0); + IgniteEx supplierNode = startGrid(0); supplierNode.cluster().active(true); @@ -455,12 +490,12 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { forceCheckpoint(); - final int groupId = supplierNode.cachex(CACHE_NAME).context().groupId(); + final int grpId = supplierNode.cachex(CACHE_NAME).context().groupId(); // Delay rebalance process for specified group. - blockMessagePredicate = (node, msg) -> { + blockMsgPred = (node, msg) -> { if (msg instanceof GridDhtPartitionDemandMessage) - return ((GridDhtPartitionDemandMessage) msg).groupId() == groupId; + return ((GridDhtPartitionDemandMessage) msg).groupId() == grpId; return false; }; @@ -478,11 +513,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { ); // Inject I/O factory which can throw exception during WAL read on supplier node. - FailingIOFactory ioFactory = new FailingIOFactory(new RandomAccessFileIOFactory()); - - ((FileWriteAheadLogManager) supplierNode.cachex(CACHE_NAME).context().shared().wal()).setFileIOFactory(ioFactory); - - ioFactory.throwExceptionOnWalRead(); + FailingIOFactory ioFactory = injectFailingIOFactory(supplierNode); // Resume rebalance process. TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi) demanderNode.configuration().getCommunicationSpi(); @@ -490,12 +521,12 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { spi.stopBlock(); // Wait till rebalance will be failed and cancelled. - Boolean result = preloader.rebalanceFuture().get(); + Boolean res = preloader.rebalanceFuture().get(); - Assert.assertEquals("Rebalance should be cancelled on demander node: " + preloader.rebalanceFuture(), false, result); + Assert.assertEquals("Rebalance should be cancelled on demander node: " + preloader.rebalanceFuture(), false, res); // Stop blocking messages and fail WAL during read. - blockMessagePredicate = null; + blockMsgPred = null; ioFactory.reset(); @@ -514,6 +545,445 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { } /** + * Tests that demander switches to full rebalance if the previously chosen two of three of suppliers + * for a group have failed to perform historical rebalance due to an unexpected error. + * + * @throws Exception If failed + */ + @Test + @WithSystemProperty(key = "IGNITE_DISABLE_WAL_DURING_REBALANCING", value = "true") + public void testMultipleNodesFailHistoricalRebalance() throws Exception { + backups = 1; + int node_cnt = 4; + int demanderId = node_cnt - 1; + + // Start a new cluster with 3 suppliers. + startGrids(node_cnt - 1); + + // Start demander node. + userAttrs.put("TEST_ATTR", "TEST_ATTR"); + startGrid(node_cnt - 1); + + grid(0).cluster().active(true); + + // Create a new cache that places a full set of partitions on demander node. + RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, PARTS_CNT); + aff.setAffinityBackupFilter(new ClusterNodeAttributeAffinityBackupFilter("TEST_ATTR")); + + String cacheName = "test-cache-1"; + IgniteCache<Integer, IndexedObject> cache0 = grid(0).getOrCreateCache( + new CacheConfiguration<Integer, IndexedObject>(cacheName) + .setBackups(backups) + .setAffinity(aff) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)); + + // Fill initial data and force checkpoint. + final int entryCnt = PARTS_CNT * 200; + for (int k = 0; k < entryCnt; k++) + cache0.put(k, new IndexedObject(k)); + + forceCheckpoint(); + + // Stop demander node. + stopGrid(demanderId); + + // Rewrite data to trigger further rebalance. + for (int k = 0; k < entryCnt; k++) { + // Should skip one random partition to be sure that after restarting demander node, + // it will have at least one partition in OWNING state, and so WAL will not be disabled while rebalancing. + // This fact allows moving partitions to OWNING state during rebalancing + // even though the corresponding RebalanceFuture will be cancelled. + if (grid(0).affinity(cacheName).partition(k) != 12) + cache0.put(k, new IndexedObject(k)); + } + + forceCheckpoint(); + + // Delay rebalance process for specified group. + blockMsgPred = (node, msg) -> { + if (msg instanceof GridDhtPartitionDemandMessage) { + GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg; + + return msg0.groupId() == CU.cacheId(cacheName); + } + + return false; + }; + + Queue<RecordedDemandMessage> recorderedMsgs = new ConcurrentLinkedQueue<>(); + + // Record demand messages for specified group. + recordMsgPred = (node, msg) -> { + if (msg instanceof GridDhtPartitionDemandMessage) { + GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg; + + if (msg0.groupId() == CU.cacheId(cacheName)) { + recorderedMsgs.add(new RecordedDemandMessage( + node.id(), + msg0.groupId(), + msg0.partitions().hasFull(), + msg0.partitions().hasHistorical())); + } + } + + return false; + }; + + // Corrupt WAL on suppliers, except the one. + injectFailingIOFactory(grid(0)); + injectFailingIOFactory(grid(1)); + + // Trigger rebalance process from suppliers. + IgniteEx restartedDemander = startGrid(node_cnt - 1); + + TestRecordingCommunicationSpi demanderSpi = TestRecordingCommunicationSpi.spi(restartedDemander); + + // Wait until demander starts historical rebalancning. + demanderSpi.waitForBlocked(); + + final IgniteInternalFuture<Boolean> preloadFut = restartedDemander.cachex(cacheName).context().group() + .preloader().rebalanceFuture(); + + // Unblock messages and start tracking demand and supply messages. + demanderSpi.stopBlock(); + + // Wait until rebalancing will be cancelled for both suppliers. + assertTrue( + "Rebalance future was not cancelled [fut=" + preloadFut + ']', + GridTestUtils.waitForCondition(preloadFut::isDone, getTestTimeout())); + + Assert.assertEquals( + "Rebalance should be cancelled on demander node: " + preloadFut, + false, + preloadFut.get()); + + awaitPartitionMapExchange(true, true, null); + + // Check data consistency. + assertPartitionsSame(idleVerify(restartedDemander, cacheName)); + + // Check that historical rebalance switched to full for supplier 1 & 2 and it was historical for supplier3. + IgnitePredicate<RecordedDemandMessage> histPred = msg -> + msg.hasHistorical() && !msg.hasFull(); + + IgnitePredicate<RecordedDemandMessage> fullPred = msg -> + !msg.hasHistorical() && msg.hasFull(); + + IgniteInClosure<UUID> supplierChecker = supplierId -> { + List<RecordedDemandMessage> demandMsgsForSupplier = recorderedMsgs.stream() + // Filter messages correspond to the supplierId + .filter(msg -> msg.supplierId().equals(supplierId)) + .filter(msg -> msg.groupId() == CU.cacheId(cacheName)) + // Filter out intermediate messages + .filter(msg -> msg.hasFull() || msg.hasHistorical()) + .collect(toList()); + + assertEquals("There should only two demand messages [supplierId=" + supplierId + ']', + 2, + demandMsgsForSupplier.size()); + assertTrue( + "The first message should require historical rebalance [msg=" + demandMsgsForSupplier.get(0) + ']', + histPred.apply(demandMsgsForSupplier.get(0))); + assertTrue( + "The second message should require full rebalance [msg=" + demandMsgsForSupplier.get(0) + ']', + fullPred.apply(demandMsgsForSupplier.get(1))); + }; + + supplierChecker.apply(grid(0).cluster().localNode().id()); + supplierChecker.apply(grid(1).cluster().localNode().id()); + + // Check supplier3 + List<RecordedDemandMessage> demandMsgsForSupplier = recorderedMsgs.stream() + // Filter messages correspond to the supplier3 + .filter(msg -> msg.supplierId().equals(grid(2).cluster().localNode().id())) + .filter(msg -> msg.groupId() == CU.cacheId(cacheName)) + // Filter out intermediate messages + .filter(msg -> msg.hasFull() || msg.hasHistorical()) + .collect(toList()); + + assertEquals("There should only one demand message.", 1, demandMsgsForSupplier.size()); + assertTrue( + "The first message should require historical rebalance [msg=" + demandMsgsForSupplier.get(0) + ']', + histPred.apply(demandMsgsForSupplier.get(0))); + } + + + /** + * Tests that demander switches to full rebalance if the previously chosen supplier for a group has failed + * to perform historical rebalance due to an unexpected error while historical iterator (wal iterator) is created. + * Additionally, the client node joins the cluster between the demand message sent, and the supply message received. + * + * @throws Exception If failed. + */ + @Test + public void testSwitchHistoricalRebalanceToFullAndClientJoin() throws Exception { + testSwitchHistoricalRebalanceToFull(IgniteWalRebalanceTest::injectFailingIOFactory, true); + } + + /** + * Tests that demander switches to full rebalance if the previously chosen supplier for a group has failed + * to perform historical rebalance due to an unexpected error while historical iterator (wal iterator) is created. + * + * @throws Exception If failed + */ + @Test + public void testSwitchHistoricalRebalanceToFullDueToFailOnCreatingWalIterator() throws Exception { + testSwitchHistoricalRebalanceToFull(IgniteWalRebalanceTest::injectFailingIOFactory, false); + } + + /** + * Tests that demander switches to full rebalance if the previously chosen supplier for a group has failed + * to perform historical rebalance due to an unexpected error while iterating over reserved wal. + * + * @throws Exception If failed + */ + @Test + public void testSwitchHistoricalRebalanceToFullWhileIteratingOverWAL() throws Exception { + testSwitchHistoricalRebalanceToFull(supplier1 -> { + try { + // Corrupt wal record in order to fail historical rebalance from supplier1 node. + IgniteWriteAheadLogManager walMgr = supplier1.context().cache().context().wal(); + + FileWALPointer ptr = (FileWALPointer)walMgr.log(new DataRecord(new DataEntry( + CU.cacheId("test-cache-1"), + new KeyCacheObjectImpl(0, null, 0), + null, + GridCacheOperation.DELETE, + new GridCacheVersion(0, 1, 1, 0), + new GridCacheVersion(0, 1, 1, 0), + 0, + 0, + 0 + ))); + + File walDir = U.field(walMgr, "walWorkDir"); + + List<FileDescriptor> walFiles = new IgniteWalIteratorFactory().resolveWalFiles( + new IgniteWalIteratorFactory.IteratorParametersBuilder().filesOrDirs(walDir)); + + FileDescriptor lastWalFile = walFiles.get(walFiles.size() - 1); + + WalTestUtils.corruptWalSegmentFile(lastWalFile, ptr); + + IgniteCache<Integer, IndexedObject> c1 = supplier1.cache("test-cache-1"); + for (int i = 0; i < PARTS_CNT * 100; i++) + c1.put(i, new IndexedObject(i)); + } + catch (IgniteCheckedException | IOException e) { + throw new RuntimeException(e); + } + }, false); + } + + /** + * Tests that demander switches to full rebalance if the previously chosen supplier for a group has failed + * to perform historical rebalance due to an unexpected error. + * + * @param corruptWalClo Closure that corrupts wal iterating on supplier node. + * @param needClientStart {@code true} if client node should join the cluster between + * the demand message sent and the supply message received. + * @throws Exception If failed + */ + public void testSwitchHistoricalRebalanceToFull( + IgniteInClosure<IgniteEx> corruptWalClo, + boolean needClientStart + ) throws Exception { + backups = 3; + + IgniteEx supplier1 = startGrid(0); + IgniteEx supplier2 = startGrid(1); + IgniteEx demander = startGrid(2); + + supplier1.cluster().active(true); + + String supplier1Name = supplier1.localNode().consistentId().toString(); + String supplier2Name = supplier2.localNode().consistentId().toString(); + String demanderName = demander.localNode().consistentId().toString(); + + String cacheName1 = "test-cache-1"; + String cacheName2 = "test-cache-2"; + + // Cache resides on supplier1 and demander nodes. + IgniteCache<Integer, IndexedObject> c1 = supplier1.getOrCreateCache( + new CacheConfiguration<Integer, IndexedObject>(cacheName1) + .setBackups(backups) + .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT)) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setRebalanceOrder(10) + .setNodeFilter(n -> n.consistentId().equals(supplier1Name) || n.consistentId().equals(demanderName))); + + // Cache resides on supplier2 and demander nodes. + IgniteCache<Integer, IndexedObject> c2 = supplier1.getOrCreateCache( + new CacheConfiguration<Integer, IndexedObject>("test-cache-2") + .setBackups(backups) + .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT)) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setRebalanceOrder(20) + .setNodeFilter(n -> n.consistentId().equals(supplier2Name) || n.consistentId().equals(demanderName))); + + // Fill initial data. + final int entryCnt = PARTS_CNT * 200; + for (int k = 0; k < entryCnt; k++) { + c1.put(k, new IndexedObject(k)); + + c2.put(k, new IndexedObject(k)); + } + + forceCheckpoint(); + + stopGrid(2); + + // Rewrite data to trigger further rebalance. + for (int i = 0; i < entryCnt; i++) { + c1.put(i, new IndexedObject(i)); + + c2.put(i, new IndexedObject(i)); + } + + // Delay rebalance process for specified groups. + blockMsgPred = (node, msg) -> { + if (msg instanceof GridDhtPartitionDemandMessage) { + GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg; + + return msg0.groupId() == CU.cacheId(cacheName1) || msg0.groupId() == CU.cacheId(cacheName2); + } + + return false; + }; + + Queue<RecordedDemandMessage> recorderedMsgs = new ConcurrentLinkedQueue<>(); + + // Record demand messages for specified groups. + recordMsgPred = (node, msg) -> { + if (msg instanceof GridDhtPartitionDemandMessage) { + GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg; + + if (msg0.groupId() == CU.cacheId(cacheName1) || msg0.groupId() == CU.cacheId(cacheName2)) { + recorderedMsgs.add(new RecordedDemandMessage( + node.id(), + msg0.groupId(), + msg0.partitions().hasFull(), + msg0.partitions().hasHistorical())); + } + } + + return false; + }; + + // Delay rebalance process for specified group from supplier2. + TestRecordingCommunicationSpi supplierSpi2 = TestRecordingCommunicationSpi.spi(supplier2); + supplierSpi2.blockMessages((node, msg) -> { + if (msg instanceof GridDhtPartitionSupplyMessage) { + GridDhtPartitionSupplyMessage msg0 = (GridDhtPartitionSupplyMessage)msg; + + return node.consistentId().equals(demanderName) && msg0.groupId() == CU.cacheId(cacheName2); + } + + return false; + }); + + // Corrupt WAL on supplier1 + corruptWalClo.apply(supplier1); + + // Trigger rebalance process from suppliers. + IgniteEx restartedDemander = startGrid(2); + + recordMsgPred = null; + blockMsgPred = null; + + TestRecordingCommunicationSpi demanderSpi = TestRecordingCommunicationSpi.spi(grid(2)); + + // Wait until demander starts historical rebalancning. + demanderSpi.waitForBlocked(); + + final IgniteInternalFuture<Boolean> preloadFut1 = restartedDemander.cachex(cacheName1).context().group() + .preloader().rebalanceFuture(); + final IgniteInternalFuture<Boolean> preloadFut2 = restartedDemander.cachex(cacheName2).context().group() + .preloader().rebalanceFuture(); + + if (needClientStart) + startClientGrid(3); + + // Unblock messages and start tracking demand and supply messages. + demanderSpi.stopBlock(); + + // Wait until rebalancing will be cancelled for both suppliers. + GridTestUtils.waitForCondition(() -> preloadFut1.isDone() && preloadFut2.isDone(), getTestTimeout()); + + Assert.assertEquals( + "Rebalance should be cancelled on demander node: " + preloadFut1, + false, + preloadFut1.get()); + Assert.assertEquals( + "Rebalance should be cancelled on demander node: " + preloadFut2, + false, + preloadFut2.get()); + + // Unblock supply messages from supplier2 + supplierSpi2.stopBlock(); + + awaitPartitionMapExchange(true, true, null); + + // Check data consistency. + assertPartitionsSame(idleVerify(restartedDemander, cacheName2, cacheName1)); + + // Check that historical rebalance switched to full for supplier1 and it is still historical for supplier2. + IgnitePredicate<RecordedDemandMessage> histPred = (msg) -> + msg.hasHistorical() && !msg.hasFull(); + + IgnitePredicate<RecordedDemandMessage> fullPred = (msg) -> + !msg.hasHistorical() && msg.hasFull(); + + // Supplier1 + List<RecordedDemandMessage> demandMsgsForSupplier1 = recorderedMsgs.stream() + // Filter messages correspond to the supplier1 + .filter(msg -> msg.groupId() == CU.cacheId(cacheName1)) + // Filter out intermediate messages + .filter(msg -> msg.hasFull() || msg.hasHistorical()) + .collect(toList()); + + assertEquals("There should only two demand messages.", 2, demandMsgsForSupplier1.size()); + assertTrue( + "The first message should require historical rebalance [msg=" + demandMsgsForSupplier1.get(0) + ']', + histPred.apply(demandMsgsForSupplier1.get(0))); + assertTrue( + "The second message should require full rebalance [msg=" + demandMsgsForSupplier1.get(0) + ']', + fullPred.apply(demandMsgsForSupplier1.get(1))); + + // Supplier2 + List<RecordedDemandMessage> demandMsgsForSupplier2 = recorderedMsgs.stream() + // Filter messages correspond to the supplier2 + .filter(msg -> msg.groupId() == CU.cacheId(cacheName2)) + // Filter out intermediate messages + .filter(msg -> msg.hasFull() || msg.hasHistorical()) + .collect(toList()); + + assertEquals("There should only two demand messages.", 2, demandMsgsForSupplier2.size()); + assertTrue( + "Both messages should require historical rebalance [" + + "msg=" + demandMsgsForSupplier2.get(0) + ", msg=" + demandMsgsForSupplier2.get(1) + ']', + histPred.apply(demandMsgsForSupplier2.get(0)) && histPred.apply(demandMsgsForSupplier2.get(1))); + } + + /** + * Injects a new instance of FailingIOFactory into wal manager for the given supplier node. + * This allows to break historical rebalance fo=rom the supplier. + * + * @param supplier Supplier node to be modified. + * @return Instance of FailingIOFactory that was injected. + */ + private static FailingIOFactory injectFailingIOFactory(IgniteEx supplier) { + // Inject I/O factory which can throw exception during WAL read on supplier1 node. + FailingIOFactory ioFactory = new FailingIOFactory(new RandomAccessFileIOFactory()); + + ((FileWriteAheadLogManager)supplier.context().cache().context().wal()).setFileIOFactory(ioFactory); + + ioFactory.throwExceptionOnWalRead(); + + return ioFactory; + } + + /** * */ private static class IndexedObject { @@ -641,7 +1111,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { for (int i = entryCnt / 2; i < entryCnt; i++) cache0.put(i, String.valueOf(i)); - blockMessagePredicate = (node, msg) -> { + blockMsgPred = (node, msg) -> { if (msg instanceof GridDhtPartitionDemandMessage) { GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg; @@ -664,7 +1134,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { // Wait until the full rebalance begins with g1 as a supplier. spi2.waitForBlocked(2); - blockMessagePredicate = null; + blockMsgPred = null; startGrid(0); // Should not force rebalancing remap. @@ -725,4 +1195,71 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { failRead = false; } } + + /** */ + static class RecordedDemandMessage { + /** Full rebalance. */ + private final boolean full; + + /** Historical rebalance. */ + private final boolean historical; + + /** Supplier node id. */ + private final UUID supplierId; + + /** Group id. */ + private final int grpId; + + /** + * Creates a new instance. + * @param supplierId Supplier node id. + * @param grpId Cache group id. + * @param full {@code true} if demand message has partitions that should be fully rebalanced. + * @param historical {@code true} if demand message has partitions that should be wal rebalanced. + */ + RecordedDemandMessage(UUID supplierId, int grpId, boolean full, boolean historical) { + this.supplierId = supplierId; + this.grpId = grpId; + this.full = full; + this.historical = historical; + } + + /** + * @return Supplier node id. + */ + UUID supplierId() { + return supplierId; + } + + /** + * @return cache group id. + */ + int groupId() { + return grpId; + } + + /** + * @return {@code true} if demand message has partitions that should be fully rebalanced. + */ + boolean hasFull() { + return full; + } + + /** + * @return {@code true} if demand message has partitions that should be wal rebalanced. + */ + boolean hasHistorical() { + return historical; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "RecordedDemandMessage{" + + "supplierId=" + supplierId + + ", groupId=" + grpId + + ", full=" + full + + ", historical=" + historical + + '}'; + } + } }