Repository: ignite Updated Branches: refs/heads/master b058bf657 -> 5c7185436
IGNITE-8116 Fixed historical rebalance from WAL Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5c718543 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5c718543 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5c718543 Branch: refs/heads/master Commit: 5c718543695fdc3006236652e878e25ab9946de5 Parents: b058bf6 Author: Pavel Kovalenko <[email protected]> Authored: Wed Apr 18 19:41:44 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Apr 18 19:41:44 2018 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManagerImpl.java | 28 ++-- .../dht/preloader/GridDhtPartitionDemander.java | 118 ++++++++----- .../dht/preloader/GridDhtPartitionSupplier.java | 54 ++++-- .../dht/preloader/GridDhtPreloader.java | 4 +- .../IgniteDhtPartitionsToReloadMap.java | 2 +- .../persistence/GridCacheOffheapManager.java | 82 ++++++++-- modules/core/src/test/config/log4j-test.xml | 6 - ...PdsAtomicCacheHistoricalRebalancingTest.java | 40 +++++ .../IgnitePdsCacheRebalancingAbstractTest.java | 32 ++-- ...nitePdsTxCacheHistoricalRebalancingTest.java | 39 +++++ .../db/wal/IgniteWalRebalanceTest.java | 164 +++++++++++++++++++ ...lientQueryReplicatedNodeRestartSelfTest.java | 1 - .../IgnitePdsWithIndexingCoreTestSuite.java | 7 + 13 files changed, 467 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5c718543/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index f8cc86f..5c78eb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -861,7 +861,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } @Override protected void onClose() throws IgniteCheckedException { - assert loc != null && loc.state() == OWNING && loc.reservations() > 0; + assert loc != null && loc.state() == OWNING && loc.reservations() > 0 + : "Partition should be in OWNING state and has at least 1 reservation: " + loc; loc.release(); } @@ -874,36 +875,37 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager throws IgniteCheckedException { final TreeMap<Integer, GridCloseableIterator<CacheDataRow>> iterators = new TreeMap<>(); - Set<Integer> missing = null; + + Set<Integer> missing = new HashSet<>(); for (Integer p : parts.fullSet()) { GridCloseableIterator<CacheDataRow> partIter = reservedIterator(p, topVer); if (partIter == null) { - if (missing == null) - missing = new HashSet<>(); - missing.add(p); + + continue; } - else - iterators.put(p, partIter); + + iterators.put(p, partIter); } - IgniteRebalanceIterator iter = new IgniteRebalanceIteratorImpl(iterators, historicalIterator(parts.historicalMap())); + IgniteHistoricalIterator historicalIterator = historicalIterator(parts.historicalMap(), missing); - if (missing != null) { - for (Integer p : missing) - iter.setPartitionMissing(p); - } + IgniteRebalanceIterator iter = new IgniteRebalanceIteratorImpl(iterators, historicalIterator); + + for (Integer p : missing) + iter.setPartitionMissing(p); return iter; } /** * @param partCntrs Partition counters map. + * @param missing Set of partitions need to populate if partition is missing or failed to reserve. * @return Historical iterator. */ - @Nullable protected IgniteHistoricalIterator historicalIterator(CachePartitionPartialCountersMap partCntrs) + @Nullable protected IgniteHistoricalIterator historicalIterator(CachePartitionPartialCountersMap partCntrs, Set<Integer> missing) throws IgniteCheckedException { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/5c718543/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index dc4bfe9..c94f511 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -459,7 +459,9 @@ public class GridDhtPartitionDemander { + ", topology=" + fut.topologyVersion() + ", rebalanceId=" + fut.rebalanceId + "]"); } - int stripes = ctx.gridConfig().getRebalanceThreadPoolSize(); + int totalStripes = ctx.gridConfig().getRebalanceThreadPoolSize(); + + int stripes = totalStripes; final List<IgniteDhtDemandedPartitionsMap> stripePartitions = new ArrayList<>(stripes); for (int i = 0; i < stripes; i++) @@ -467,7 +469,7 @@ public class GridDhtPartitionDemander { // Reserve one stripe for historical partitions. if (parts.hasHistorical()) { - stripePartitions.add(stripes - 1, new IgniteDhtDemandedPartitionsMap(parts.historicalMap(), null)); + stripePartitions.set(stripes - 1, new IgniteDhtDemandedPartitionsMap(parts.historicalMap(), null)); if (stripes > 1) stripes--; @@ -478,7 +480,7 @@ public class GridDhtPartitionDemander { for (int i = 0; it.hasNext(); i++) stripePartitions.get(i % stripes).addFull(it.next()); - for (int stripe = 0; stripe < stripes; stripe++) { + for (int stripe = 0; stripe < totalStripes; stripe++) { if (!stripePartitions.get(stripe).isEmpty()) { // Create copy of demand message with new striped partitions map. final GridDhtPartitionDemandMessage demandMsg = d.withNewPartitionsMap(stripePartitions.get(stripe)); @@ -489,23 +491,27 @@ public class GridDhtPartitionDemander { final int topicId = stripe; - Runnable initDemandRequestTask = () -> { + IgniteInternalFuture<?> clearAllFuture = clearFullPartitions(fut, demandMsg.partitions().fullSet()); + + // Start rebalancing after clearing full partitions is finished. + clearAllFuture.listen(f -> ctx.kernalContext().closure().runLocalSafe(() -> { + if (fut.isDone()) + return; + try { - if (!fut.isDone()) { - ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId), - demandMsg.convertIfNeeded(node.version()), grp.ioPolicy(), demandMsg.timeout()); - - // Cleanup required in case partitions demanded in parallel with cancellation. - synchronized (fut) { - if (fut.isDone()) - fut.cleanupRemoteContexts(node.id()); - } + ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId), + demandMsg.convertIfNeeded(node.version()), grp.ioPolicy(), demandMsg.timeout()); - if (log.isDebugEnabled()) - log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + - topicId + ", partitions count=" + stripePartitions.get(topicId).size() + - " (" + stripePartitions.get(topicId).partitionsList() + ")]"); + // Cleanup required in case partitions demanded in parallel with cancellation. + synchronized (fut) { + if (fut.isDone()) + fut.cleanupRemoteContexts(node.id()); } + + if (log.isDebugEnabled()) + log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + + topicId + " " + demandMsg.rebalanceId() + ", partitions count=" + stripePartitions.get(topicId).size() + + " (" + stripePartitions.get(topicId).partitionsList() + ")]"); } catch (IgniteCheckedException e1) { ClusterTopologyCheckedException cause = e1.getCause(ClusterTopologyCheckedException.class); @@ -522,31 +528,26 @@ public class GridDhtPartitionDemander { fut.cancel(); } - }; - - awaitClearingAndStartRebalance(fut, demandMsg, initDemandRequestTask); + }, true)); } } } } /** - * Awaits partitions clearing for full partitions and sends initial demand request - * after all partitions are cleared and safe to consume data. + * Creates future which will be completed when all {@code fullPartitions} are cleared. * * @param fut Rebalance future. - * @param demandMessage Initial demand message which contains set of full partitions to await. - * @param initDemandRequestTask Task which sends initial demand request. + * @param fullPartitions Set of full partitions need to be cleared. + * @return Future which will be completed when given partitions are cleared. */ - private void awaitClearingAndStartRebalance(RebalanceFuture fut, - GridDhtPartitionDemandMessage demandMessage, - Runnable initDemandRequestTask) { - Set<Integer> fullPartitions = demandMessage.partitions().fullSet(); + private IgniteInternalFuture<?> clearFullPartitions(RebalanceFuture fut, Set<Integer> fullPartitions) { + final GridFutureAdapter clearAllFuture = new GridFutureAdapter(); if (fullPartitions.isEmpty()) { - ctx.kernalContext().closure().runLocalSafe(initDemandRequestTask, true); + clearAllFuture.onDone(); - return; + return clearAllFuture; } for (GridCacheContext cctx : grp.caches()) { @@ -560,16 +561,19 @@ public class GridDhtPartitionDemander { final AtomicInteger clearingPartitions = new AtomicInteger(fullPartitions.size()); for (int partId : fullPartitions) { - if (fut.isDone()) - return; + if (fut.isDone()) { + clearAllFuture.onDone(); + + return clearAllFuture; + } GridDhtLocalPartition part = grp.topology().localPartition(partId); if (part != null && part.state() == MOVING) { part.onClearFinished(f -> { - // Cancel rebalance if partition clearing was failed. - if (f.error() != null) { - if (!fut.isDone()) { + if (!fut.isDone()) { + // Cancel rebalance if partition clearing was failed. + if (f.error() != null) { for (GridCacheContext cctx : grp.caches()) { if (cctx.statisticsEnabled()) { final CacheMetricsImpl metrics = cctx.cache().metrics0(); @@ -581,30 +585,54 @@ public class GridDhtPartitionDemander { log.error("Unable to await partition clearing " + part, f.error()); fut.cancel(); + + clearAllFuture.onDone(f.error()); } - } - else { - if (!fut.isDone()) { - int existed = clearingPartitions.decrementAndGet(); + else { + int remaining = clearingPartitions.decrementAndGet(); for (GridCacheContext cctx : grp.caches()) { if (cctx.statisticsEnabled()) { final CacheMetricsImpl metrics = cctx.cache().metrics0(); - metrics.rebalanceClearingPartitions(existed); + metrics.rebalanceClearingPartitions(remaining); } } - // If all partitions are cleared send initial demand message. - if (existed == 0) - ctx.kernalContext().closure().runLocalSafe(initDemandRequestTask, true); + if (log.isDebugEnabled()) + log.debug("Remaining clearing partitions [grp=" + grp.cacheOrGroupName() + + ", remaining=" + remaining + "]"); + + if (remaining == 0) + clearAllFuture.onDone(); } } + else { + clearAllFuture.onDone(); + } }); } - else - clearingPartitions.decrementAndGet(); + else { + int remaining = clearingPartitions.decrementAndGet(); + + for (GridCacheContext cctx : grp.caches()) { + if (cctx.statisticsEnabled()) { + final CacheMetricsImpl metrics = cctx.cache().metrics0(); + + metrics.rebalanceClearingPartitions(remaining); + } + } + + if (log.isDebugEnabled()) + log.debug("Remaining clearing partitions [grp=" + grp.cacheOrGroupName() + + ", remaining=" + remaining + "]"); + + if (remaining == 0) + clearAllFuture.onDone(); + } } + + return clearAllFuture; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5c718543/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 6d2f526..a3ee305 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -173,7 +173,8 @@ class GridDhtPartitionSupplier { if (curTop.compareTo(demTop) > 0) { if (log.isDebugEnabled()) - log.debug("Demand request outdated [currentTopVer=" + curTop + log.debug("Demand request outdated [grp=" + grp.cacheOrGroupName() + + ", currentTopVer=" + curTop + ", demandTopVer=" + demTop + ", from=" + nodeId + ", topicId=" + topicId + "]"); @@ -189,10 +190,19 @@ class GridDhtPartitionSupplier { if (sctx != null && sctx.rebalanceId == -d.rebalanceId()) { clearContext(scMap.remove(contextId), log); + + if (log.isDebugEnabled()) + log.debug("Supply context cleaned [grp=" + grp.cacheOrGroupName() + + ", from=" + nodeId + + ", demandMsg=" + d + + ", supplyContext=" + sctx + "]"); } else { if (log.isDebugEnabled()) - log.debug("Stale context cleanup message " + d + ", supplyContext=" + sctx); + log.debug("Stale supply context cleanup message [grp=" + grp.cacheOrGroupName() + + ", from=" + nodeId + + ", demandMsg=" + d + + ", supplyContext=" + sctx + "]"); } return; @@ -200,13 +210,16 @@ class GridDhtPartitionSupplier { } if (log.isDebugEnabled()) - log.debug("Demand request accepted [current=" + curTop + ", demanded=" + demTop + - ", from=" + nodeId + ", topicId=" + topicId + "]"); + log.debug("Demand request accepted [grp=" + grp.cacheOrGroupName() + + ", from=" + nodeId + + ", currentVer=" + curTop + + ", demandedVer=" + demTop + + ", topicId=" + topicId + "]"); ClusterNode node = grp.shared().discovery().node(nodeId); if (node == null) - return; // Context will be cleaned at topology change. + return; try { SupplyContext sctx; @@ -217,13 +230,27 @@ class GridDhtPartitionSupplier { if (sctx != null && d.rebalanceId() < sctx.rebalanceId) { // Stale message, return context back and return. scMap.put(contextId, sctx); + + if (log.isDebugEnabled()) + log.debug("Stale demand message [grp=" + grp.cacheOrGroupName() + + ", actualContext=" + sctx + + ", from=" + nodeId + + ", demandMsg=" + d + "]"); + return; } } // Demand request should not contain empty partitions if no supply context is associated with it. - if (sctx == null && (d.partitions() == null || d.partitions().isEmpty())) + if (sctx == null && (d.partitions() == null || d.partitions().isEmpty())) { + if (log.isDebugEnabled()) + log.debug("Empty demand message [grp=" + grp.cacheOrGroupName() + + ", from=" + nodeId + + ", topicId=" + topicId + + ", demandMsg=" + d + "]"); + return; + } assert !(sctx != null && !d.partitions().isEmpty()); @@ -271,7 +298,8 @@ class GridDhtPartitionSupplier { GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); - assert loc != null && loc.state() == GridDhtPartitionState.OWNING; + assert loc != null && loc.state() == GridDhtPartitionState.OWNING + : "Partition should be in OWNING state: " + loc; s.addEstimatedKeysCount(grp.offheap().totalPartitionEntriesCount(part)); } @@ -323,7 +351,8 @@ class GridDhtPartitionSupplier { GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); - assert (loc != null && loc.state() == OWNING && loc.reservations() > 0) || iter.isPartitionMissing(part) : loc; + assert (loc != null && loc.state() == OWNING && loc.reservations() > 0) || iter.isPartitionMissing(part) + : "Partition should be in OWNING state and has at least 1 reservation " + loc; if (iter.isPartitionMissing(part) && remainingParts.contains(part)) { s.missed(part); @@ -361,9 +390,6 @@ class GridDhtPartitionSupplier { remainingParts.remove(part); } - - // Need to manually prepare cache message. - // TODO GG-11141. } Iterator<Integer> remainingIter = remainingParts.iterator(); @@ -374,7 +400,8 @@ class GridDhtPartitionSupplier { if (iter.isPartitionDone(p)) { GridDhtLocalPartition loc = top.localPartition(p, d.topologyVersion(), false); - assert loc != null; + assert loc != null + : "Supply partition is gone: grp=" + grp.cacheOrGroupName() + ", p=" + p; s.last(p, loc.updateCounter()); @@ -387,7 +414,8 @@ class GridDhtPartitionSupplier { } } - assert remainingParts.isEmpty(); + assert remainingParts.isEmpty() + : "Partitions after rebalance should be either done or missing: " + remainingParts; if (sctx != null) clearContext(sctx, log); http://git-wip-us.apache.org/repos/asf/ignite/blob/5c718543/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index ddcb81e..700f0cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -187,7 +187,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { AffinityAssignment aff = grp.affinity().cachedAffinity(topVer); - CachePartitionFullCountersMap cntrMap = top.fullUpdateCounters(); + CachePartitionFullCountersMap countersMap = grp.topology().fullUpdateCounters(); for (int p = 0; p < partCnt; p++) { if (ctx.exchange().hasPendingExchange()) { @@ -251,7 +251,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { ); } - msg.partitions().addHistorical(p, cntrMap.initialUpdateCounter(p), cntrMap.updateCounter(p), partCnt); + msg.partitions().addHistorical(p, part.initialUpdateCounter(), countersMap.updateCounter(p), partCnt); } else { Collection<ClusterNode> picked = pickOwners(p, topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/5c718543/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java index 7066e0d..8515004 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java @@ -90,7 +90,7 @@ public class IgniteDhtPartitionsToReloadMap implements Serializable { /** * @return {@code True} if empty. */ - public boolean isEmpty() { + public synchronized boolean isEmpty() { return map == null || map.isEmpty(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5c718543/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 68ec83d..5feaa25 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -755,8 +755,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override @Nullable protected IgniteHistoricalIterator historicalIterator( - CachePartitionPartialCountersMap partCntrs) throws IgniteCheckedException { + @Override @Nullable protected WALHistoricalIterator historicalIterator( + CachePartitionPartialCountersMap partCntrs, Set<Integer> missing) throws IgniteCheckedException { if (partCntrs == null || partCntrs.isEmpty()) return null; @@ -773,13 +773,18 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (startPtr == null) throw new IgniteCheckedException("Could not find start pointer for partition [part=" + p + ", partCntrSince=" + initCntr + "]"); - if (minPtr == null || startPtr.compareTo(minPtr) == -1) + if (minPtr == null || startPtr.compareTo(minPtr) < 0) minPtr = startPtr; } WALIterator it = grp.shared().wal().replay(minPtr); - return new WALIteratorAdapter(grp, partCntrs, it); + WALHistoricalIterator iterator = new WALHistoricalIterator(grp, partCntrs, it); + + // Add historical partitions which are unabled to reserve to missing set. + missing.addAll(iterator.missingParts); + + return iterator; } /** @@ -807,7 +812,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** * */ - private static class WALIteratorAdapter implements IgniteHistoricalIterator { + private static class WALHistoricalIterator implements IgniteHistoricalIterator { /** */ private static final long serialVersionUID = 0L; @@ -817,6 +822,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** Partition counters map. */ private final CachePartitionPartialCountersMap partMap; + /** Partitions marked as missing (unable to reserve or partition is not in OWNING state). */ + private final Set<Integer> missingParts = new HashSet<>(); + /** Partitions marked as done. */ private final Set<Integer> doneParts = new HashSet<>(); @@ -830,19 +838,24 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple private Iterator<DataEntry> entryIt; /** */ - private CacheDataRow next; + private DataEntry next; + + /** Flag indicates that partition belongs to current {@link #next} is finished and no longer needs to rebalance. */ + private boolean reachedPartitionEnd; /** * @param grp Cache context. * @param walIt WAL iterator. */ - private WALIteratorAdapter(CacheGroupContext grp, CachePartitionPartialCountersMap partMap, WALIterator walIt) { + private WALHistoricalIterator(CacheGroupContext grp, CachePartitionPartialCountersMap partMap, WALIterator walIt) { this.grp = grp; this.partMap = partMap; this.walIt = walIt; cacheIds = grp.cacheIds(); + reservePartitions(); + advance(); } @@ -859,6 +872,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { walIt.close(); + releasePartitions(); } /** {@inheritDoc} */ @@ -896,7 +910,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (next == null) throw new NoSuchElementException(); - CacheDataRow val = next; + CacheDataRow val = new DataEntryRow(next); + + if (reachedPartitionEnd) { + doneParts.add(next.partitionId()); + + reachedPartitionEnd = false; + } advance(); @@ -909,6 +929,46 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** + * Reserve historical partitions. + * If partition is unable to reserve, id of that partition is placed to {@link #missingParts} set. + */ + private void reservePartitions() { + for (int i = 0; i < partMap.size(); i++) { + int p = partMap.partitionAt(i); + GridDhtLocalPartition part = grp.topology().localPartition(p); + + if (part == null || !part.reserve()) { + missingParts.add(p); + continue; + } + + if (part.state() != OWNING) { + part.release(); + missingParts.add(p); + } + } + } + + /** + * Release historical partitions. + */ + private void releasePartitions() { + for (int i = 0; i < partMap.size(); i++) { + int p = partMap.partitionAt(i); + + if (missingParts.contains(p)) + continue; + + GridDhtLocalPartition part = grp.topology().localPartition(p); + + assert part != null && part.state() == OWNING && part.reservations() > 0 + : "Partition should in OWNING state and has at least 1 reservation"; + + part.release(); + } + } + + /** * */ private void advance() { @@ -922,7 +982,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (cacheIds.contains(entry.cacheId())) { int idx = partMap.partitionIndex(entry.partitionId()); - if (idx < 0) + if (idx < 0 || missingParts.contains(idx)) continue; long from = partMap.initialUpdateCounterAt(idx); @@ -930,9 +990,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (entry.partitionCounter() >= from && entry.partitionCounter() <= to) { if (entry.partitionCounter() == to) - doneParts.add(entry.partitionId()); + reachedPartitionEnd = true; - next = new DataEntryRow(entry); + next = entry; return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/5c718543/modules/core/src/test/config/log4j-test.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/log4j-test.xml b/modules/core/src/test/config/log4j-test.xml index 9138c02..b0b08e7 100755 --- a/modules/core/src/test/config/log4j-test.xml +++ b/modules/core/src/test/config/log4j-test.xml @@ -79,12 +79,6 @@ Uncomment to enable Ignite query execution debugging. --> <!-- - <category name="org.apache.ignite.cache.query"> - <level value="DEBUG"/> - </category> - --> - - <!-- <category name="org.apache.ignite.internal.processors.query"> <level value="DEBUG"/> </category> http://git-wip-us.apache.org/repos/asf/ignite/blob/5c718543/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java new file mode 100644 index 0000000..a090381 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence; + +import org.apache.ignite.IgniteSystemProperties; + +/** + * + */ +public class IgnitePdsAtomicCacheHistoricalRebalancingTest extends IgnitePdsAtomicCacheRebalancingTest { + /** {@inheritDoc */ + @Override protected void beforeTest() throws Exception { + // Use rebalance from WAL if possible. + System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); + + super.beforeTest(); + } + + /** {@inheritDoc */ + @Override protected void afterTest() throws Exception { + System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD); + + super.afterTest(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5c718543/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java index 10f9b03..0dd9c78 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java @@ -78,12 +78,15 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb cfg.setConsistentId(gridName); + cfg.setRebalanceThreadPoolSize(2); + CacheConfiguration ccfg1 = cacheConfiguration(cacheName) .setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE) - .setBackups(2) + .setBackups(1) .setRebalanceMode(CacheRebalanceMode.ASYNC) .setIndexedTypes(Integer.class, Integer.class) .setAffinity(new RendezvousAffinityFunction(false, 32)) + .setRebalanceBatchesPrefetchCount(2) .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); CacheConfiguration ccfg2 = cacheConfiguration("indexed"); @@ -172,8 +175,6 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb stopAllGrids(); cleanPersistenceDir(); - - System.clearProperty(IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE); } /** @@ -184,7 +185,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb public void testRebalancingOnRestart() throws Exception { Ignite ignite0 = startGrid(0); - ignite0.active(true); + ignite0.cluster().active(true); startGrid(1); @@ -233,8 +234,6 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb * @throws Exception If fails. */ public void testRebalancingOnRestartAfterCheckpoint() throws Exception { - fail("IGNITE-5302"); - IgniteEx ignite0 = startGrid(0); IgniteEx ignite1 = startGrid(1); @@ -242,7 +241,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb IgniteEx ignite2 = startGrid(2); IgniteEx ignite3 = startGrid(3); - ignite0.active(true); + ignite0.cluster().active(true); ignite0.cache(cacheName).rebalance().get(); ignite1.cache(cacheName).rebalance().get(); @@ -264,6 +263,8 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb ignite2.close(); ignite3.close(); + resetBaselineTopology(); + ignite0.resetLostPartitions(Collections.singletonList(cache1.getName())); assert cache1.lostPartitions().isEmpty(); @@ -306,7 +307,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb IgniteEx ignite3 = (IgniteEx)G.start(getConfiguration("test3")); IgniteEx ignite4 = (IgniteEx)G.start(getConfiguration("test4")); - ignite1.active(true); + ignite1.cluster().active(true); awaitPartitionMapExchange(); @@ -325,7 +326,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb ignite3 = (IgniteEx)G.start(getConfiguration("test3")); ignite4 = (IgniteEx)G.start(getConfiguration("test4")); - ignite1.active(true); + ignite1.cluster().active(true); awaitPartitionMapExchange(); @@ -348,13 +349,13 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb * @throws Exception If fails. */ public void testPartitionLossAndRecover() throws Exception { - fail("IGNITE-5302"); - Ignite ignite1 = startGrid(0); Ignite ignite2 = startGrid(1); Ignite ignite3 = startGrid(2); Ignite ignite4 = startGrid(3); + ignite1.cluster().active(true); + awaitPartitionMapExchange(); IgniteCache<String, String> cache1 = ignite1.cache(cacheName); @@ -537,7 +538,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb final Ignite ig = grid(1); - ig.active(true); + ig.cluster().active(true); awaitPartitionMapExchange(); @@ -574,8 +575,6 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb * @throws Exception If failed */ public void testPartitionCounterConsistencyOnUnstableTopology() throws Exception { - System.setProperty(IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, "1"); - final Ignite ig = startGrids(4); ig.cluster().active(true); @@ -595,9 +594,8 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb try { stopGrid(3); - // Clear checkpoint history to avoid rebalance from WAL. - forceCheckpoint(); forceCheckpoint(); + U.sleep(500); // Wait for data load. IgniteEx ig0 = startGrid(3); @@ -609,8 +607,6 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb awaitPartitionMapExchange(); - // Clear checkpoint history to avoid rebalance from WAL. - forceCheckpoint(); forceCheckpoint(); startGrid(2); http://git-wip-us.apache.org/repos/asf/ignite/blob/5c718543/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheHistoricalRebalancingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheHistoricalRebalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheHistoricalRebalancingTest.java new file mode 100644 index 0000000..179c8e0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheHistoricalRebalancingTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence; + +import org.apache.ignite.IgniteSystemProperties; + +/** + * + */ +public class IgnitePdsTxCacheHistoricalRebalancingTest extends IgnitePdsTxCacheRebalancingTest { + /** {@inheritDoc */ + @Override protected void beforeTest() throws Exception { + // Use rebalance from WAL if possible. + System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); + + super.beforeTest(); + } + + /** {@inheritDoc */ + @Override protected void afterTest() throws Exception { + System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD); + + super.afterTest(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5c718543/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java new file mode 100644 index 0000000..6387dac --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import java.util.concurrent.TimeUnit; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD; + +/** + * Historic WAL rebalance base test. + */ +public class IgniteWalRebalanceTest extends GridCommonAbstractTest { + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + System.setProperty(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); //to make all rebalance wal-based + + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration<Integer, IndexedObject> ccfg = new CacheConfiguration<>(CACHE_NAME); + + ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); + + ccfg.setRebalanceMode(CacheRebalanceMode.ASYNC); + + ccfg.setCacheMode(CacheMode.REPLICATED); + + ccfg.setAffinity(new RendezvousAffinityFunction(false, 32)); + + cfg.setCacheConfiguration(ccfg); + + DataStorageConfiguration dbCfg = new DataStorageConfiguration() + .setWalHistorySize(Integer.MAX_VALUE) + .setWalMode(WALMode.LOG_ONLY) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)); + + cfg.setDataStorageConfiguration(dbCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + System.clearProperty(IGNITE_PDS_WAL_REBALANCE_THRESHOLD); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * @throws Exception if failed. + */ + public void test() throws Exception { + IgniteEx ig0 = startGrid(0); + IgniteEx ig1 = startGrid(1); + final int entryCnt = 10_000; + + ig0.cluster().active(true); + + IgniteCache<Object, Object> cache = ig0.cache(CACHE_NAME); + + for (int k = 0; k < entryCnt; k++) + cache.put(k, new IndexedObject(k)); + + forceCheckpoint(); + + stopGrid(1, false); + + for (int k = 0; k < entryCnt; k++) + cache.put(k, new IndexedObject(k + 1)); + + forceCheckpoint(); + + ig1 = startGrid(1); + + IgniteCache<Object, Object> cache1 = ig1.cache(CACHE_NAME); + + cache1.rebalance().get(2, TimeUnit.MINUTES); + + for (int k = 0; k < entryCnt; k++) + assertEquals(new IndexedObject(k + 1), cache.get(k)); + } + + /** + * + */ + private static class IndexedObject { + /** */ + @QuerySqlField(index = true) + private int iVal; + + /** */ + private byte[] payload = new byte[1024]; + + /** + * @param iVal Integer value. + */ + private IndexedObject(int iVal) { + this.iVal = iVal; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof IndexedObject)) + return false; + + IndexedObject that = (IndexedObject)o; + + return iVal == that.iVal; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return iVal; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IndexedObject.class, this); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5c718543/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java index de7ee5f..996d6a7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java @@ -209,7 +209,6 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom * @throws Exception If failed. */ public void testRestarts() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-7946"); int duration = 90 * 1000; int qryThreadNum = 5; int restartThreadsNum = 2; // 2 of 4 data nodes http://git-wip-us.apache.org/repos/asf/ignite/blob/5c718543/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java index 447b622..943d43f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java @@ -17,10 +17,12 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsAtomicCacheHistoricalRebalancingTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsAtomicCacheRebalancingTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinaryMetadataOnClusterRestartTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinarySortObjectFieldsTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsMarshallerMappingRestoreOnNodeStartTest; +import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheHistoricalRebalancingTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheRebalancingTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreCacheGroupsTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsMultiNodePutGetRestartTest; @@ -30,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePds import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsDiskErrorsRecoveringTest; import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsNoActualWalHistoryTest; import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsThreadInterruptionTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryPPCTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryWithCompactionTest; @@ -60,6 +63,10 @@ public class IgnitePdsWithIndexingCoreTestSuite extends TestSuite { suite.addTestSuite(IgnitePdsAtomicCacheRebalancingTest.class); suite.addTestSuite(IgnitePdsTxCacheRebalancingTest.class); + suite.addTestSuite(IgnitePdsAtomicCacheHistoricalRebalancingTest.class); + suite.addTestSuite(IgnitePdsTxCacheHistoricalRebalancingTest.class); + suite.addTestSuite(IgniteWalRebalanceTest.class); + suite.addTestSuite(IgniteWalRecoveryPPCTest.class); suite.addTestSuite(IgnitePdsDiskErrorsRecoveringTest.class); suite.addTestSuite(IgnitePdsCacheDestroyDuringCheckpointTest.class);
