This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 35a6e5b IGNITE-13558 Partitions restore process parallelization on node startup - Fixes #9327. 35a6e5b is described below commit 35a6e5b0c228e86b637bc2dcfdbfcd166836671f Author: denis-chudov <moongll...@gmail.com> AuthorDate: Thu Nov 18 14:49:08 2021 +0300 IGNITE-13558 Partitions restore process parallelization on node startup - Fixes #9327. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../processors/cache/GridCacheProcessor.java | 162 +++++++++++++------ .../cache/IgniteCacheOffheapManager.java | 23 ++- .../cache/IgniteCacheOffheapManagerImpl.java | 17 +- .../GridCacheDatabaseSharedManager.java | 2 +- .../cache/persistence/GridCacheOffheapManager.java | 173 +++++++++++---------- .../cache/RestorePartitionStateTest.java | 8 +- 6 files changed, 239 insertions(+), 146 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 5ff1d3e..346ca69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -41,7 +41,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -5552,66 +5552,75 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (log.isInfoEnabled()) log.info("Restoring partition state for local groups."); - AtomicLong totalProcessed = new AtomicLong(); - AtomicReference<IgniteCheckedException> restoreStateError = new AtomicReference<>(); ExecutorService sysPool = ctx.pools().getSystemExecutorService(); - CountDownLatch completionLatch = new CountDownLatch(forGroups.size()); + final int totalPart = forGroups.stream().mapToInt(grpCtx -> grpCtx.affinity().partitions()).sum(); - AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef = new AtomicReference<>(); + CountDownLatch completionLatch = new CountDownLatch(totalPart); - long totalPart = forGroups.stream().mapToLong(grpCtx -> grpCtx.affinity().partitions()).sum(); + Map<Thread, RestorePartitionStateThreadContext> threadCtxs = new ConcurrentHashMap<>(); - for (CacheGroupContext grp : forGroups) { - sysPool.execute(() -> { - try { - Map<Integer, Long> processed = grp.offheap().restorePartitionStates(partStates); + final int topPartRefLimit = 5; - totalProcessed.addAndGet(processed.size()); + for (CacheGroupContext grpCtx : forGroups) { + for (int i = 0; i < grpCtx.affinity().partitions(); i++) { + final int partId = i; - if (log.isInfoEnabled()) { - TreeSet<T3<Long, Long, GroupPartitionId>> top = - new TreeSet<>(processedPartitionComparator()); + sysPool.execute(() -> { + GroupPartitionId grpPartId = new GroupPartitionId(grpCtx.groupId(), partId); - long ts = System.currentTimeMillis(); + try { + long time = grpCtx.offheap().restoreStateOfPartition(partId, partStates.get(grpPartId)); - for (Map.Entry<Integer, Long> e : processed.entrySet()) { - top.add(new T3<>(e.getValue(), ts, new GroupPartitionId(grp.groupId(), e.getKey()))); + if (log.isInfoEnabled()) { + T3<Long, Long, GroupPartitionId> curPart = new T3<>(time, U.currentTimeMillis(), grpPartId); - trimToSize(top, 5); - } + RestorePartitionStateThreadContext threadCtx = threadCtxs.computeIfAbsent( + Thread.currentThread(), + t -> new RestorePartitionStateThreadContext() + ); + + Comparator<T3<Long, Long, GroupPartitionId>> cmp = processedPartitionComparator(); - topPartRef.updateAndGet(top0 -> { - if (top0 == null) - return top; + threadCtx.topPartRef.updateAndGet(prev -> { + if (prev == null || + cmp.compare(prev.last(), curPart) < 0) { + SortedSet<T3<Long, Long, GroupPartitionId>> top = new TreeSet<>(cmp); - for (T3<Long, Long, GroupPartitionId> t2 : top0) { - top.add(t2); + top.add(curPart); - trimToSize(top, 5); - } + if (prev != null) + top.addAll(prev); - return top; - }); + trimToSize(top, topPartRefLimit); + + return top; + } + else + return prev; + }); + + threadCtx.incrementProcessedCnt(); + } } - } - catch (IgniteCheckedException | RuntimeException | Error e) { - U.error(log, "Failed to restore partition state for " + - "groupName=" + grp.name() + " groupId=" + grp.groupId(), e); + catch (IgniteCheckedException | RuntimeException | Error e) { + U.error(log, "Failed to restore partition state for " + + "groupName=" + grpCtx.name() + " groupId=" + grpCtx.groupId(), e); - restoreStateError.compareAndSet( - null, - e instanceof IgniteCheckedException + IgniteCheckedException ex = e instanceof IgniteCheckedException ? ((IgniteCheckedException)e) - : new IgniteCheckedException(e) - ); - } - finally { - completionLatch.countDown(); - } - }); + : new IgniteCheckedException(e); + + if (!restoreStateError.compareAndSet(null, ex)) + restoreStateError.get().addSuppressed(ex); + } + finally { + completionLatch.countDown(); + } + }); + } } boolean printTop = false; @@ -5625,15 +5634,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { while (!completionLatch.await(timeout, TimeUnit.MILLISECONDS)) { if (log.isInfoEnabled()) { - @Nullable SortedSet<T3<Long, Long, GroupPartitionId>> top = topPartRef.get(); + SortedSet<T3<Long, Long, GroupPartitionId>> top = + collectTopProcessedParts(threadCtxs.values(), topPartRefLimit); + + long totalProcessed = threadCtxs.values().stream().mapToLong(c -> c.processedCnt).sum(); log.info("Restore partitions state progress [grpCnt=" + (forGroups.size() - completionLatch.getCount()) + '/' + forGroups.size() + - ", partitionCnt=" + totalProcessed.get() + '/' + totalPart + (top == null ? "" : + ", partitionCnt=" + totalProcessed + '/' + totalPart + (top.isEmpty() ? "" : ", topProcessedPartitions=" + toStringTopProcessingPartitions(top, forGroups)) + ']'); } timeout = TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS / 5; + printTop = true; } } @@ -5642,16 +5655,22 @@ public class GridCacheProcessor extends GridProcessorAdapter { throw new IgniteInterruptedException(e); } + for (CacheGroupContext grpCtx : forGroups) + grpCtx.offheap().confirmPartitionStatesRestored(); + // Checking error after all task applied. if (restoreStateError.get() != null) throw restoreStateError.get(); if (log.isInfoEnabled()) { - SortedSet<T3<Long, Long, GroupPartitionId>> t = printTop ? topPartRef.get() : null; + SortedSet<T3<Long, Long, GroupPartitionId>> t = + printTop ? collectTopProcessedParts(threadCtxs.values(), topPartRefLimit) : null; + + long totalProcessed = threadCtxs.values().stream().mapToLong(c -> c.processedCnt).sum(); log.info("Finished restoring partition state for local groups [" + "groupsProcessed=" + forGroups.size() + - ", partitionsProcessed=" + totalProcessed.get() + + ", partitionsProcessed=" + totalProcessed + ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startRestorePart) + (t == null ? "" : ", topProcessedPartitions=" + toStringTopProcessingPartitions(t, forGroups)) + "]"); @@ -5659,6 +5678,30 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * Collects top processed partitions from thread local contexts of restore partition state process. + * + * @param threadCtxs Thread local contexts. + * @param topPartRefLimit Limit of top partitions collection size. + */ + private SortedSet<T3<Long, Long, GroupPartitionId>> collectTopProcessedParts( + Collection<RestorePartitionStateThreadContext> threadCtxs, + int topPartRefLimit + ) { + SortedSet<T3<Long, Long, GroupPartitionId>> top = new TreeSet<>(processedPartitionComparator()); + + for (RestorePartitionStateThreadContext threadCtx : threadCtxs) { + SortedSet<T3<Long, Long, GroupPartitionId>> threadTop = threadCtx.topPartRef.get(); + + if (threadTop != null) + top.addAll(threadTop); + } + + trimToSize(top, topPartRefLimit); + + return top; + } + + /** * Start warming up sequentially for each persist data region. * * @throws IgniteCheckedException If failed. @@ -5972,13 +6015,36 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * Comparator of processed partitions. * T3 -> 1 - duration, 2 - timestamp, 3 - partition of group. - * Sort order: duration -> timestamp -> partition of group. + * Sort order: duration -> timestamp (reversed order) -> partition of group. * * @return Comparator. */ static Comparator<T3<Long, Long, GroupPartitionId>> processedPartitionComparator() { Comparator<T3<Long, Long, GroupPartitionId>> comp = Comparator.comparing(T3::get1); - return comp.thenComparing(T3::get2).thenComparing(T3::get3); + return comp.thenComparing(T3::get2, Comparator.reverseOrder()).thenComparing(T3::get3); + } + + /** + * Thread local context of restore partition state progress. + */ + private static class RestorePartitionStateThreadContext { + /** Field updater. */ + static final AtomicLongFieldUpdater<RestorePartitionStateThreadContext> PROCESSED_CNT_UPD = + AtomicLongFieldUpdater.newUpdater(RestorePartitionStateThreadContext.class, "processedCnt"); + + /** Top partitions by processing time. */ + final AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef = + new AtomicReference<>(); + + /** Processed partitions count. It is always updated from the same thread. */ + volatile long processedCnt = 0; + + /** + * Increment {@code processedCnt} field. + */ + void incrementProcessedCnt() { + PROCESSED_CNT_UPD.incrementAndGet(this); + } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index bc08c15..d206164 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -35,7 +35,6 @@ import org.apache.ignite.internal.processors.cache.persistence.DataRowCacheAware import org.apache.ignite.internal.processors.cache.persistence.RootPage; import org.apache.ignite.internal.processors.cache.persistence.RowStore; import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow; -import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; import org.apache.ignite.internal.processors.cache.persistence.partstorage.PartitionMetaStorage; import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.cache.tree.CacheDataTree; @@ -90,15 +89,27 @@ public interface IgniteCacheOffheapManager { public void stop(); /** + * Pre-create single partition that resides in page memory or WAL and restores their state. + * + * @param p Partition id. + * @param recoveryState Partition recovery state. + * @return Processing time in millis. + * @throws IgniteCheckedException If failed. + */ + long restoreStateOfPartition(int p, @Nullable Integer recoveryState) throws IgniteCheckedException; + + /** * Pre-create partitions that resides in page memory or WAL and restores their state. * - * @param partRecoveryStates Partition recovery states. - * @return Processed partitions: partition id -> processing time in millis. * @throws IgniteCheckedException If failed. */ - Map<Integer, Long> restorePartitionStates( - Map<GroupPartitionId, Integer> partRecoveryStates - ) throws IgniteCheckedException; + void restorePartitionStates() throws IgniteCheckedException; + + /** + * Confirm that partition states are restored. This method should be called after restoring state of all partitions + * in group using {@link #restoreStateOfPartition(int, Integer)}. + */ + void confirmPartitionStatesRestored(); /** * @param entry Cache entry. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 8507b57..46ccede 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -68,7 +68,6 @@ import org.apache.ignite.internal.processors.cache.persistence.DataRowCacheAware import org.apache.ignite.internal.processors.cache.persistence.RootPage; import org.apache.ignite.internal.processors.cache.persistence.RowStore; import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow; -import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; import org.apache.ignite.internal.processors.cache.persistence.partstorage.PartitionMetaStorage; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; @@ -264,10 +263,18 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public Map<Integer, Long> restorePartitionStates( - Map<GroupPartitionId, Integer> partRecoveryStates - ) throws IgniteCheckedException { - return Collections.emptyMap(); // No-op. + @Override public long restoreStateOfPartition(int p, @Nullable Integer recoveryState) throws IgniteCheckedException { + return 0; + } + + /** {@inheritDoc} */ + @Override public void restorePartitionStates() throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void confirmPartitionStatesRestored() { + // No-op. } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index a63fdf5..ed92638 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -1434,7 +1434,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan cctx.database().checkpointReadLock(); try { - cacheGroup.offheap().restorePartitionStates(Collections.emptyMap()); + cacheGroup.offheap().restorePartitionStates(); if (cacheGroup.localStartVersion().equals(fut.initialVersion())) cacheGroup.topology().afterStateRestored(fut.initialVersion()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 2532ae5..1319ae1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -623,131 +623,140 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public Map<Integer, Long> restorePartitionStates( - Map<GroupPartitionId, Integer> partRecoveryStates - ) throws IgniteCheckedException { + @Override public long restoreStateOfPartition(int p, @Nullable Integer recoveryState) throws IgniteCheckedException { if (grp.isLocal() || !grp.affinityNode() || !grp.dataRegion().config().isPersistenceEnabled() || partitionStatesRestored) - return Collections.emptyMap(); - - Map<Integer, Long> processed = new HashMap<>(); + return 0; PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); - for (int p = 0; p < grp.affinity().partitions(); p++) { - Integer recoverState = partRecoveryStates.get(new GroupPartitionId(grp.groupId(), p)); - - long startTime = U.currentTimeMillis(); + long startTime = U.currentTimeMillis(); - if (log.isDebugEnabled()) - log.debug("Started restoring partition state [grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); - - if (ctx.pageStore().exists(grp.groupId(), p)) { - ctx.pageStore().ensure(grp.groupId(), p); + long res = 0; - if (ctx.pageStore().pages(grp.groupId(), p) <= 1) { - if (log.isDebugEnabled()) { - log.debug("Skipping partition on recovery (pages less than 1) " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); - } + if (log.isDebugEnabled()) + log.debug("Started restoring partition state [grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); - continue; - } + if (ctx.pageStore().exists(grp.groupId(), p)) { + ctx.pageStore().ensure(grp.groupId(), p); + if (ctx.pageStore().pages(grp.groupId(), p) <= 1) { if (log.isDebugEnabled()) { - log.debug("Creating partition on recovery (exists in page store) " + + log.debug("Skipping partition on recovery (pages less than or equals 1) " + "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); } - GridDhtLocalPartition part = grp.topology().forceCreatePartition(p); + return 0; + } - // Triggers initialization of existing(having datafile) partition before acquiring cp read lock. - part.dataStore().init(); + if (log.isDebugEnabled()) { + log.debug("Creating partition on recovery (exists in page store) " + + "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); + } - ctx.database().checkpointReadLock(); + GridDhtLocalPartition part = grp.topology().forceCreatePartition(p); - try { - long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p); - long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId); + // Triggers initialization of existing(having datafile) partition before acquiring cp read lock. + part.dataStore().init(); - try { - long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage); + ctx.database().checkpointReadLock(); - boolean changed = false; + try { + long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p); + long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId); - try { - PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr); + try { + long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage); - if (recoverState != null) { - changed = io.setPartitionState(pageAddr, (byte)recoverState.intValue()); + boolean changed = false; - updateState(part, recoverState); + try { + PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr); - if (log.isDebugEnabled()) { - log.debug("Restored partition state (from WAL) " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() + - ", updCntr=" + part.initialUpdateCounter() + - ", size=" + part.fullSize() + ']'); - } - } - else { - int stateId = io.getPartitionState(pageAddr); + if (recoveryState != null) { + changed = io.setPartitionState(pageAddr, (byte)recoveryState.intValue()); - updateState(part, stateId); + updateState(part, recoveryState); - if (log.isDebugEnabled()) { - log.debug("Restored partition state (from page memory) " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() + - ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId + - ", size=" + part.fullSize() + ']'); - } + if (log.isDebugEnabled()) { + log.debug("Restored partition state (from WAL) " + + "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() + + ", updCntr=" + part.initialUpdateCounter() + + ", size=" + part.fullSize() + ']'); } } - finally { - pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed); + else { + int stateId = io.getPartitionState(pageAddr); + + updateState(part, stateId); + + if (log.isDebugEnabled()) { + log.debug("Restored partition state (from page memory) " + + "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() + + ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId + + ", size=" + part.fullSize() + ']'); + } } } finally { - pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage); + pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed); } } finally { - ctx.database().checkpointReadUnlock(); + pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage); } - - processed.put(p, U.currentTimeMillis() - startTime); } - else if (recoverState != null) { // Pre-create partition if having valid state. - GridDhtLocalPartition part = grp.topology().forceCreatePartition(p); + finally { + ctx.database().checkpointReadUnlock(); + } - updateState(part, recoverState); + res = U.currentTimeMillis() - startTime; + } + else if (recoveryState != null) { // Pre-create partition if having valid state. + GridDhtLocalPartition part = grp.topology().forceCreatePartition(p); - processed.put(p, U.currentTimeMillis() - startTime); + updateState(part, recoveryState); - if (log.isDebugEnabled()) { - log.debug("Restored partition state (from WAL) " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() + - ", updCntr=" + part.initialUpdateCounter() + - ", size=" + part.fullSize() + ']'); - } - } - else { - if (log.isDebugEnabled()) { - log.debug("Skipping partition on recovery (no page store OR wal state) " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); - } - } + res = U.currentTimeMillis() - startTime; if (log.isDebugEnabled()) { - log.debug("Finished restoring partition state " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + p + - ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startTime) + ']'); + log.debug("Restored partition state (from WAL) " + + "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() + + ", updCntr=" + part.initialUpdateCounter() + + ", size=" + part.fullSize() + ']'); + } + } + else { + if (log.isDebugEnabled()) { + log.debug("Skipping partition on recovery (no page store OR wal state) " + + "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); } } - partitionStatesRestored = true; + if (log.isDebugEnabled()) { + log.debug("Finished restoring partition state " + + "[grp=" + grp.cacheOrGroupName() + ", p=" + p + + ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startTime) + ']'); + } + + return res; + } - return processed; + /** {@inheritDoc} */ + @Override public void restorePartitionStates() throws IgniteCheckedException { + if (grp.isLocal() || !grp.affinityNode() || !grp.dataRegion().config().isPersistenceEnabled() + || partitionStatesRestored) + return; + + for (int p = 0; p < grp.affinity().partitions(); p++) + restoreStateOfPartition(p, null); + + confirmPartitionStatesRestored(); + } + + /** {@inheritDoc} */ + @Override public void confirmPartitionStatesRestored() { + partitionStatesRestored = true; } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RestorePartitionStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RestorePartitionStateTest.java index 319d885..eecd28d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RestorePartitionStateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RestorePartitionStateTest.java @@ -145,12 +145,12 @@ public class RestorePartitionStateTest extends GridCommonAbstractTest { @Test public void testProcessedPartitionComparator() { List<T3<Long, Long, GroupPartitionId>> exp = F.asList( - new T3<>(0L, 0L, new GroupPartitionId(0, 0)), + new T3<>(0L, 2L, new GroupPartitionId(0, 0)), new T3<>(0L, 1L, new GroupPartitionId(0, 0)), new T3<>(1L, 1L, new GroupPartitionId(0, 0)), - new T3<>(1L, 2L, new GroupPartitionId(0, 0)), - new T3<>(1L, 2L, new GroupPartitionId(1, 0)), - new T3<>(1L, 2L, new GroupPartitionId(1, 1)) + new T3<>(1L, 0L, new GroupPartitionId(0, 0)), + new T3<>(1L, 0L, new GroupPartitionId(1, 0)), + new T3<>(1L, 0L, new GroupPartitionId(1, 1)) ); TreeSet<T3<Long, Long, GroupPartitionId>> act = new TreeSet<>(processedPartitionComparator());