This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 9bdd3a8 IGNITE-13558 Better parallelization down to partition level when restoring partition states - Fixes #8333. 9bdd3a8 is described below commit 9bdd3a8db43c78e58d687c35cd79cd4c33d6bc58 Author: ibessonov <bessonov...@gmail.com> AuthorDate: Fri Oct 16 13:01:15 2020 +0300 IGNITE-13558 Better parallelization down to partition level when restoring partition states - Fixes #8333. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../org/apache/ignite/internal/IgniteFeatures.java | 3 + .../processors/cache/GridCacheProcessor.java | 53 ++++++-- .../GridCacheDatabaseSharedManager.java | 2 +- .../cache/persistence/GridCacheOffheapManager.java | 149 ++++++++++++--------- 4 files changed, 129 insertions(+), 78 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java index 11ef198..25361d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java @@ -173,6 +173,9 @@ public enum IgniteFeatures { * @return {@code True} if feature is declared to be supported by remote node. */ public static boolean nodeSupports(byte[] featuresAttrBytes, IgniteFeatures feature) { + if (featuresAttrBytes == null) + return false; + int featureId = feature.getFeatureId(); // Same as "BitSet.valueOf(features).get(featureId)" diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index d3beafc..1260b60 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -33,6 +33,8 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -150,7 +152,6 @@ import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.IgniteCollectors; import org.apache.ignite.internal.util.InitializationProtector; -import org.apache.ignite.internal.util.StripedExecutor; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -192,7 +193,6 @@ import static java.lang.String.format; import static java.util.Arrays.asList; import static java.util.Objects.isNull; import static java.util.Objects.nonNull; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; import static org.apache.ignite.IgniteSystemProperties.getBoolean; @@ -306,10 +306,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { private final CacheRecoveryLifecycle recovery = new CacheRecoveryLifecycle(); /** Cache configuration splitter. */ - private CacheConfigurationSplitter splitter; + private final CacheConfigurationSplitter splitter; /** Cache configuration enricher. */ - private CacheConfigurationEnricher enricher; + private final CacheConfigurationEnricher enricher; + + /** Pool to use while restoring partition states. */ + private final ForkJoinPool restorePartitionsPool; /** * @param ctx Kernal context. @@ -324,6 +327,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName()); splitter = new CacheConfigurationSplitterImpl(ctx, marsh); enricher = new CacheConfigurationEnricher(ctx, marsh, U.resolveClassLoader(ctx.config())); + + ForkJoinPool.ForkJoinWorkerThreadFactory factory = new ForkJoinPool.ForkJoinWorkerThreadFactory() { + @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) { + ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); + + worker.setName("restore-partition-states-" + worker.getPoolIndex()); + + return worker; + } + }; + + int stripesCnt = ctx.getStripedExecutorService().stripesCount(); + restorePartitionsPool = new ForkJoinPool(stripesCnt, factory, null, false); } /** @@ -744,6 +760,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx.cleanup(); + restorePartitionsPool.shutdownNow(); + if (log.isDebugEnabled()) log.debug("Stopped cache processor."); } @@ -5283,6 +5301,20 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * Returns {@code ForkJoinPool} instance to be used in partition states restoration.<br/> + * It's more convenient than regular pools because it can be used to parallel by cache groups and by partitions + * without sacrificing code simplicity (cache group tasks won't exclusively occupy their threads and won't block + * partition tasks as a result).<br/> + * <br/> + * There's a chance that this pool will later be replaced with a more common one, like system pool, for example. + * + * @return Pool instance. + */ + public ForkJoinPool restorePartitionsPool() { + return restorePartitionsPool; + } + + /** * Pages list view supplier. * * @param filter Filter. @@ -5464,12 +5496,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { AtomicReference<IgniteCheckedException> restoreStateError = new AtomicReference<>(); - StripedExecutor stripedExec = ctx.getStripedExecutorService(); - - int roundRobin = 0; + CountDownLatch completionLatch = new CountDownLatch(forGroups.size()); for (CacheGroupContext grp : forGroups) { - stripedExec.execute(roundRobin % stripedExec.stripesCount(), () -> { + restorePartitionsPool.submit(() -> { try { long processed = grp.offheap().restorePartitionStates(partitionStates); @@ -5486,14 +5516,15 @@ public class GridCacheProcessor extends GridProcessorAdapter { : new IgniteCheckedException(e) ); } + finally { + completionLatch.countDown(); + } }); - - roundRobin++; } try { // Await completion restore state tasks in all stripes. - stripedExec.awaitComplete(); + completionLatch.await(); } catch (InterruptedException e) { throw new IgniteInterruptedException(e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 50224f3..d363f35 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -1142,7 +1142,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (fut.localJoinExchange() || fut.activateCluster() || (fut.exchangeActions() != null && !F.isEmpty(fut.exchangeActions().cacheGroupsToStart()))) { U.doInParallel( - cctx.kernalContext().getSystemExecutorService(), + cctx.cache().restorePartitionsPool(), cctx.cache().cacheGroups(), cacheGroup -> { if (cacheGroup.isLocal()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 1fba65d..faed634 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -31,7 +31,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.IntConsumer; import java.util.function.ToLongFunction; +import java.util.stream.IntStream; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -576,114 +579,128 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (partitionStatesRestored) return 0; - long processed = 0; + AtomicLong processed = new AtomicLong(); + AtomicReference<IgniteCheckedException> err = new AtomicReference<>(); PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); - for (int p = 0; p < grp.affinity().partitions(); p++) { + IntConsumer partConsumer = p -> { Integer recoverState = partitionRecoveryStates.get(new GroupPartitionId(grp.groupId(), p)); long startTime = U.currentTimeMillis(); - if (ctx.pageStore().exists(grp.groupId(), p)) { - ctx.pageStore().ensure(grp.groupId(), p); + try { + if (ctx.pageStore().exists(grp.groupId(), p)) { + ctx.pageStore().ensure(grp.groupId(), p); + + if (ctx.pageStore().pages(grp.groupId(), p) <= 1) { + if (log.isDebugEnabled()) + log.debug("Skipping partition on recovery (pages less than 1) " + + "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); + + return; + } - if (ctx.pageStore().pages(grp.groupId(), p) <= 1) { if (log.isDebugEnabled()) - log.debug("Skipping partition on recovery (pages less than 1) " + + log.debug("Creating partition on recovery (exists in page store) " + "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); - continue; - } + processed.incrementAndGet(); - if (log.isDebugEnabled()) - log.debug("Creating partition on recovery (exists in page store) " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); + GridDhtLocalPartition part = grp.topology().forceCreatePartition(p); - processed++; + // Triggers initialization of existing(having datafile) partition before acquiring cp read lock. + part.dataStore().init(); - GridDhtLocalPartition part = grp.topology().forceCreatePartition(p); + ctx.database().checkpointReadLock(); - // Triggers initialization of existing(having datafile) partition before acquiring cp read lock. - part.dataStore().init(); + try { + long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p); + long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId); - ctx.database().checkpointReadLock(); + try { + long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage); - try { - long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p); - long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId); + boolean changed = false; - try { - long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage); + try { + PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr); - boolean changed = false; + if (recoverState != null) { + changed = io.setPartitionState(pageAddr, (byte)recoverState.intValue()); - try { - PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr); + updateState(part, recoverState); - if (recoverState != null) { - changed = io.setPartitionState(pageAddr, (byte)recoverState.intValue()); + if (log.isDebugEnabled()) + log.debug("Restored partition state (from WAL) " + + "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() + + ", updCntr=" + part.initialUpdateCounter() + + ", size=" + part.fullSize() + ']'); + } + else { + int stateId = io.getPartitionState(pageAddr); - updateState(part, recoverState); + updateState(part, stateId); - if (log.isDebugEnabled()) - log.debug("Restored partition state (from WAL) " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() + - ", updCntr=" + part.initialUpdateCounter() + - ", size=" + part.fullSize() + ']'); + if (log.isDebugEnabled()) + log.debug("Restored partition state (from page memory) " + + "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() + + ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId + + ", size=" + part.fullSize() + ']'); + } } - else { - int stateId = io.getPartitionState(pageAddr); - - updateState(part, stateId); - - if (log.isDebugEnabled()) - log.debug("Restored partition state (from page memory) " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() + - ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId + - ", size=" + part.fullSize() + ']'); + finally { + pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed); } } finally { - pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed); + pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage); } } finally { - pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage); + ctx.database().checkpointReadUnlock(); } } - finally { - ctx.database().checkpointReadUnlock(); - } - } - else if (recoverState != null) { // Pre-create partition if having valid state. - GridDhtLocalPartition part = grp.topology().forceCreatePartition(p); + else if (recoverState != null) { // Pre-create partition if having valid state. + GridDhtLocalPartition part = grp.topology().forceCreatePartition(p); - updateState(part, recoverState); + updateState(part, recoverState); - processed++; + processed.incrementAndGet(); + + if (log.isDebugEnabled()) + log.debug("Restored partition state (from WAL) " + + "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() + + ", updCntr=" + part.initialUpdateCounter() + + ", size=" + part.fullSize() + ']'); + } + else { + if (log.isDebugEnabled()) + log.debug("Skipping partition on recovery (no page store OR wal state) " + + "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); + } if (log.isDebugEnabled()) - log.debug("Restored partition state (from WAL) " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() + - ", updCntr=" + part.initialUpdateCounter() + - ", size=" + part.fullSize() + ']'); + log.debug("Finished restoring partition state " + + "[grp=" + grp.cacheOrGroupName() + ", p=" + p + + ", time=" + (U.currentTimeMillis() - startTime) + " ms]"); } - else { - if (log.isDebugEnabled()) - log.debug("Skipping partition on recovery (no page store OR wal state) " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); + catch (IgniteCheckedException e) { + if (!err.compareAndSet(null, e)) + err.get().addSuppressed(e); } + }; - if (log.isDebugEnabled()) - log.debug("Finished restoring partition state " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + p + - ", time=" + (U.currentTimeMillis() - startTime) + " ms]"); - } + ctx.cache().restorePartitionsPool().submit( + () -> IntStream.range(0, grp.affinity().partitions()).parallel().forEach(partConsumer) + ).join(); + + if (err.get() != null) + throw err.get(); partitionStatesRestored = true; - return processed; + return processed.get(); } /**