This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bdd3a8  IGNITE-13558 Better parallelization down to partition level 
when restoring partition states - Fixes #8333.
9bdd3a8 is described below

commit 9bdd3a8db43c78e58d687c35cd79cd4c33d6bc58
Author: ibessonov <bessonov...@gmail.com>
AuthorDate: Fri Oct 16 13:01:15 2020 +0300

    IGNITE-13558 Better parallelization down to partition level when restoring 
partition states - Fixes #8333.
    
    Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com>
---
 .../org/apache/ignite/internal/IgniteFeatures.java |   3 +
 .../processors/cache/GridCacheProcessor.java       |  53 ++++++--
 .../GridCacheDatabaseSharedManager.java            |   2 +-
 .../cache/persistence/GridCacheOffheapManager.java | 149 ++++++++++++---------
 4 files changed, 129 insertions(+), 78 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index 11ef198..25361d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -173,6 +173,9 @@ public enum IgniteFeatures {
      * @return {@code True} if feature is declared to be supported by remote 
node.
      */
     public static boolean nodeSupports(byte[] featuresAttrBytes, 
IgniteFeatures feature) {
+        if (featuresAttrBytes == null)
+            return false;
+
         int featureId = feature.getFeatureId();
 
         // Same as "BitSet.valueOf(features).get(featureId)"
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index d3beafc..1260b60 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -33,6 +33,8 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -150,7 +152,6 @@ import 
org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.IgniteCollectors;
 import org.apache.ignite.internal.util.InitializationProtector;
-import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -192,7 +193,6 @@ import static java.lang.String.format;
 import static java.util.Arrays.asList;
 import static java.util.Objects.isNull;
 import static java.util.Objects.nonNull;
-import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
@@ -306,10 +306,13 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     private final CacheRecoveryLifecycle recovery = new 
CacheRecoveryLifecycle();
 
     /** Cache configuration splitter. */
-    private CacheConfigurationSplitter splitter;
+    private final CacheConfigurationSplitter splitter;
 
     /** Cache configuration enricher. */
-    private CacheConfigurationEnricher enricher;
+    private final CacheConfigurationEnricher enricher;
+
+    /** Pool to use while restoring partition states. */
+    private final ForkJoinPool restorePartitionsPool;
 
     /**
      * @param ctx Kernal context.
@@ -324,6 +327,19 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
         splitter = new CacheConfigurationSplitterImpl(ctx, marsh);
         enricher = new CacheConfigurationEnricher(ctx, marsh, 
U.resolveClassLoader(ctx.config()));
+
+        ForkJoinPool.ForkJoinWorkerThreadFactory factory = new 
ForkJoinPool.ForkJoinWorkerThreadFactory() {
+            @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) 
{
+                ForkJoinWorkerThread worker = 
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+
+                worker.setName("restore-partition-states-" + 
worker.getPoolIndex());
+
+                return worker;
+            }
+        };
+
+        int stripesCnt = ctx.getStripedExecutorService().stripesCount();
+        restorePartitionsPool = new ForkJoinPool(stripesCnt, factory, null, 
false);
     }
 
     /**
@@ -744,6 +760,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         sharedCtx.cleanup();
 
+        restorePartitionsPool.shutdownNow();
+
         if (log.isDebugEnabled())
             log.debug("Stopped cache processor.");
     }
@@ -5283,6 +5301,20 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * Returns {@code ForkJoinPool} instance to be used in partition states 
restoration.<br/>
+     * It's more convenient than regular pools because it can be used to 
parallel by cache groups and by partitions
+     * without sacrificing code simplicity (cache group tasks won't 
exclusively occupy their threads and won't block
+     * partition tasks as a result).<br/>
+     * <br/>
+     * There's a chance that this pool will later be replaced with a more 
common one, like system pool, for example.
+     *
+     * @return Pool instance.
+     */
+    public ForkJoinPool restorePartitionsPool() {
+        return restorePartitionsPool;
+    }
+
+    /**
      * Pages list view supplier.
      *
      * @param filter Filter.
@@ -5464,12 +5496,10 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
             AtomicReference<IgniteCheckedException> restoreStateError = new 
AtomicReference<>();
 
-            StripedExecutor stripedExec = ctx.getStripedExecutorService();
-
-            int roundRobin = 0;
+            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
 
             for (CacheGroupContext grp : forGroups) {
-                stripedExec.execute(roundRobin % stripedExec.stripesCount(), 
() -> {
+                restorePartitionsPool.submit(() -> {
                     try {
                         long processed = 
grp.offheap().restorePartitionStates(partitionStates);
 
@@ -5486,14 +5516,15 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                                 : new IgniteCheckedException(e)
                         );
                     }
+                    finally {
+                        completionLatch.countDown();
+                    }
                 });
-
-                roundRobin++;
             }
 
             try {
                 // Await completion restore state tasks in all stripes.
-                stripedExec.awaitComplete();
+                completionLatch.await();
             }
             catch (InterruptedException e) {
                 throw new IgniteInterruptedException(e);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 50224f3..d363f35 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1142,7 +1142,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         if (fut.localJoinExchange() || fut.activateCluster()
             || (fut.exchangeActions() != null && 
!F.isEmpty(fut.exchangeActions().cacheGroupsToStart()))) {
             U.doInParallel(
-                cctx.kernalContext().getSystemExecutorService(),
+                cctx.cache().restorePartitionsPool(),
                 cctx.cache().cacheGroups(),
                 cacheGroup -> {
                     if (cacheGroup.isLocal())
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 1fba65d..faed634 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -31,7 +31,10 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.IntConsumer;
 import java.util.function.ToLongFunction;
+import java.util.stream.IntStream;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -576,114 +579,128 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         if (partitionStatesRestored)
             return 0;
 
-        long processed = 0;
+        AtomicLong processed = new AtomicLong();
+        AtomicReference<IgniteCheckedException> err = new AtomicReference<>();
 
         PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
 
-        for (int p = 0; p < grp.affinity().partitions(); p++) {
+        IntConsumer partConsumer = p -> {
             Integer recoverState = partitionRecoveryStates.get(new 
GroupPartitionId(grp.groupId(), p));
 
             long startTime = U.currentTimeMillis();
 
-            if (ctx.pageStore().exists(grp.groupId(), p)) {
-                ctx.pageStore().ensure(grp.groupId(), p);
+            try {
+                if (ctx.pageStore().exists(grp.groupId(), p)) {
+                    ctx.pageStore().ensure(grp.groupId(), p);
+
+                    if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
+                        if (log.isDebugEnabled())
+                            log.debug("Skipping partition on recovery (pages 
less than 1) " +
+                                "[grp=" + grp.cacheOrGroupName() + ", p=" + p 
+ ']');
+
+                        return;
+                    }
 
-                if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
                     if (log.isDebugEnabled())
-                        log.debug("Skipping partition on recovery (pages less 
than 1) " +
+                        log.debug("Creating partition on recovery (exists in 
page store) " +
                             "[grp=" + grp.cacheOrGroupName() + ", p=" + p + 
']');
 
-                    continue;
-                }
+                    processed.incrementAndGet();
 
-                if (log.isDebugEnabled())
-                    log.debug("Creating partition on recovery (exists in page 
store) " +
-                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
+                    GridDhtLocalPartition part = 
grp.topology().forceCreatePartition(p);
 
-                processed++;
+                    // Triggers initialization of existing(having datafile) 
partition before acquiring cp read lock.
+                    part.dataStore().init();
 
-                GridDhtLocalPartition part = 
grp.topology().forceCreatePartition(p);
+                    ctx.database().checkpointReadLock();
 
-                // Triggers initialization of existing(having datafile) 
partition before acquiring cp read lock.
-                part.dataStore().init();
+                    try {
+                        long partMetaId = 
pageMem.partitionMetaPageId(grp.groupId(), p);
+                        long partMetaPage = pageMem.acquirePage(grp.groupId(), 
partMetaId);
 
-                ctx.database().checkpointReadLock();
+                        try {
+                            long pageAddr = pageMem.writeLock(grp.groupId(), 
partMetaId, partMetaPage);
 
-                try {
-                    long partMetaId = 
pageMem.partitionMetaPageId(grp.groupId(), p);
-                    long partMetaPage = pageMem.acquirePage(grp.groupId(), 
partMetaId);
+                            boolean changed = false;
 
-                    try {
-                        long pageAddr = pageMem.writeLock(grp.groupId(), 
partMetaId, partMetaPage);
+                            try {
+                                PagePartitionMetaIO io = 
PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
 
-                        boolean changed = false;
+                                if (recoverState != null) {
+                                    changed = io.setPartitionState(pageAddr, 
(byte)recoverState.intValue());
 
-                        try {
-                            PagePartitionMetaIO io = 
PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
+                                    updateState(part, recoverState);
 
-                            if (recoverState != null) {
-                                changed = io.setPartitionState(pageAddr, 
(byte)recoverState.intValue());
+                                    if (log.isDebugEnabled())
+                                        log.debug("Restored partition state 
(from WAL) " +
+                                            "[grp=" + grp.cacheOrGroupName() + 
", p=" + p + ", state=" + part.state() +
+                                            ", updCntr=" + 
part.initialUpdateCounter() +
+                                            ", size=" + part.fullSize() + ']');
+                                }
+                                else {
+                                    int stateId = 
io.getPartitionState(pageAddr);
 
-                                updateState(part, recoverState);
+                                    updateState(part, stateId);
 
-                                if (log.isDebugEnabled())
-                                    log.debug("Restored partition state (from 
WAL) " +
-                                        "[grp=" + grp.cacheOrGroupName() + ", 
p=" + p + ", state=" + part.state() +
-                                        ", updCntr=" + 
part.initialUpdateCounter() +
-                                        ", size=" + part.fullSize() + ']');
+                                    if (log.isDebugEnabled())
+                                        log.debug("Restored partition state 
(from page memory) " +
+                                            "[grp=" + grp.cacheOrGroupName() + 
", p=" + p + ", state=" + part.state() +
+                                            ", updCntr=" + 
part.initialUpdateCounter() + ", stateId=" + stateId +
+                                            ", size=" + part.fullSize() + ']');
+                                }
                             }
-                            else {
-                                int stateId = io.getPartitionState(pageAddr);
-
-                                updateState(part, stateId);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Restored partition state (from 
page memory) " +
-                                        "[grp=" + grp.cacheOrGroupName() + ", 
p=" + p + ", state=" + part.state() +
-                                        ", updCntr=" + 
part.initialUpdateCounter() + ", stateId=" + stateId +
-                                        ", size=" + part.fullSize() + ']');
+                            finally {
+                                pageMem.writeUnlock(grp.groupId(), partMetaId, 
partMetaPage, null, changed);
                             }
                         }
                         finally {
-                            pageMem.writeUnlock(grp.groupId(), partMetaId, 
partMetaPage, null, changed);
+                            pageMem.releasePage(grp.groupId(), partMetaId, 
partMetaPage);
                         }
                     }
                     finally {
-                        pageMem.releasePage(grp.groupId(), partMetaId, 
partMetaPage);
+                        ctx.database().checkpointReadUnlock();
                     }
                 }
-                finally {
-                    ctx.database().checkpointReadUnlock();
-                }
-            }
-            else if (recoverState != null) { // Pre-create partition if having 
valid state.
-                GridDhtLocalPartition part = 
grp.topology().forceCreatePartition(p);
+                else if (recoverState != null) { // Pre-create partition if 
having valid state.
+                    GridDhtLocalPartition part = 
grp.topology().forceCreatePartition(p);
 
-                updateState(part, recoverState);
+                    updateState(part, recoverState);
 
-                processed++;
+                    processed.incrementAndGet();
+
+                    if (log.isDebugEnabled())
+                        log.debug("Restored partition state (from WAL) " +
+                            "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", 
state=" + part.state() +
+                            ", updCntr=" + part.initialUpdateCounter() +
+                            ", size=" + part.fullSize() + ']');
+                }
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping partition on recovery (no page 
store OR wal state) " +
+                            "[grp=" + grp.cacheOrGroupName() + ", p=" + p + 
']');
+                }
 
                 if (log.isDebugEnabled())
-                    log.debug("Restored partition state (from WAL) " +
-                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", 
state=" + part.state() +
-                        ", updCntr=" + part.initialUpdateCounter() +
-                        ", size=" + part.fullSize() + ']');
+                    log.debug("Finished restoring partition state " +
+                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
+                        ", time=" + (U.currentTimeMillis() - startTime) + " 
ms]");
             }
-            else {
-                if (log.isDebugEnabled())
-                    log.debug("Skipping partition on recovery (no page store 
OR wal state) " +
-                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
+            catch (IgniteCheckedException e) {
+                if (!err.compareAndSet(null, e))
+                    err.get().addSuppressed(e);
             }
+        };
 
-            if (log.isDebugEnabled())
-                log.debug("Finished restoring partition state " +
-                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
-                    ", time=" + (U.currentTimeMillis() - startTime) + " ms]");
-        }
+        ctx.cache().restorePartitionsPool().submit(
+            () -> IntStream.range(0, 
grp.affinity().partitions()).parallel().forEach(partConsumer)
+        ).join();
+
+        if (err.get() != null)
+            throw err.get();
 
         partitionStatesRestored = true;
 
-        return processed;
+        return processed.get();
     }
 
     /**

Reply via email to