Repository: ignite
Updated Branches:
  refs/heads/ignite-2.7 b83fd0290 -> 30dc8e4df


IGNITE-9612 Improve checkpoint mark phase speed - Fixes #4813.

Signed-off-by: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>

(cherry picked from commit 5e220f90d05fcc92eb655329a96f909f444ac75f)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30dc8e4d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30dc8e4d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30dc8e4d

Branch: refs/heads/ignite-2.7
Commit: 30dc8e4df09d898c2e0706c64d8b9bcea14e98b9
Parents: b83fd02
Author: Aleksei Scherbakov <alexey.scherbak...@gmail.com>
Authored: Thu Sep 27 14:09:48 2018 +0300
Committer: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Committed: Thu Sep 27 14:11:20 2018 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridKernalContext.java      |   5 +
 .../ignite/internal/GridKernalContextImpl.java  |  13 +-
 .../apache/ignite/internal/IgniteKernal.java    |   7 +-
 .../org/apache/ignite/internal/IgnitionEx.java  |   3 +-
 .../processors/cache/mvcc/txlog/TxLog.java      |  17 +-
 .../cache/persistence/DbCheckpointListener.java |   8 +
 .../GridCacheDatabaseSharedManager.java         |  95 +++++++---
 .../persistence/GridCacheOffheapManager.java    |  68 ++++---
 .../persistence/metastorage/MetaStorage.java    |  29 ++-
 .../wal/reader/StandaloneGridKernalContext.java |   5 +
 .../IgniteTaskTrackingThreadPoolExecutor.java   | 180 +++++++++++++++++++
 ...nitePersistenceSequentialCheckpointTest.java |   6 +-
 .../junits/GridTestKernalContext.java           |   1 +
 .../testsuites/IgniteUtilSelfTestSuite.java     |   4 +
 ...gniteTaskTrackingThreadPoolExecutorTest.java | 140 +++++++++++++++
 15 files changed, 528 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/30dc8e4d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 0690565..4cb68da 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -685,4 +685,9 @@ public interface GridKernalContext extends 
Iterable<GridComponent> {
      * @return subscription processor to manage internal-only (strict 
node-local) subscriptions between components.
      */
     public GridInternalSubscriptionProcessor internalSubscriptionProcessor();
+
+    /**
+     * @return Default uncaught exception handler used by thread pools.
+     */
+    public Thread.UncaughtExceptionHandler uncaughtExceptionHandler();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/30dc8e4d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 3b7b430..a0e3f93 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -372,6 +372,9 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
     private WorkersRegistry workersRegistry;
 
     /** */
+    private Thread.UncaughtExceptionHandler hnd;
+
+    /** */
     private IgniteEx grid;
 
     /** */
@@ -438,6 +441,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
      * @param customExecSvcs Custom named executors.
      * @param plugins Plugin providers.
      * @param workerRegistry Worker registry.
+     * @param hnd Default uncaught exception handler used by thread pools.
      */
     @SuppressWarnings("TypeMayBeWeakened")
     protected GridKernalContextImpl(
@@ -463,7 +467,8 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
         @Nullable Map<String, ? extends ExecutorService> customExecSvcs,
         List<PluginProvider> plugins,
         IgnitePredicate<String> clsFilter,
-        WorkersRegistry workerRegistry
+        WorkersRegistry workerRegistry,
+        Thread.UncaughtExceptionHandler hnd
     ) {
         assert grid != null;
         assert cfg != null;
@@ -489,6 +494,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
         this.schemaExecSvc = schemaExecSvc;
         this.customExecSvcs = customExecSvcs;
         this.workersRegistry = workerRegistry;
+        this.hnd = hnd;
 
         marshCtx = new MarshallerContextImpl(plugins, clsFilter);
 
@@ -1158,6 +1164,11 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    public Thread.UncaughtExceptionHandler uncaughtExceptionHandler() {
+        return hnd;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridKernalContextImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/30dc8e4d/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 7b9895a..8a4eff1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -765,6 +765,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
      * @param customExecSvcs Custom named executors.
      * @param errHnd Error handler to use for notification about startup 
problems.
      * @param workerRegistry Worker registry.
+     * @param hnd Default uncaught exception handler used by thread pools.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     @SuppressWarnings({"CatchGenericClass", "unchecked"})
@@ -787,7 +788,8 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
         ExecutorService schemaExecSvc,
         @Nullable final Map<String, ? extends ExecutorService> customExecSvcs,
         GridAbsClosure errHnd,
-        WorkersRegistry workerRegistry
+        WorkersRegistry workerRegistry,
+        Thread.UncaughtExceptionHandler hnd
     )
         throws IgniteCheckedException {
         gw.compareAndSet(null, new 
GridKernalGatewayImpl(cfg.getIgniteInstanceName()));
@@ -909,7 +911,8 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
                 customExecSvcs,
                 plugins,
                 
MarshallerUtils.classNameFilter(this.getClass().getClassLoader()),
-                workerRegistry
+                workerRegistry,
+                hnd
             );
 
             cfg.getMarshaller().setContext(ctx.marshallerContext());

http://git-wip-us.apache.org/repos/asf/ignite/blob/30dc8e4d/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 8bef477..ed0fbe9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -2053,7 +2053,8 @@ public class IgnitionEx {
                             startLatch.countDown();
                         }
                     },
-                    workerRegistry
+                    workerRegistry,
+                    oomeHnd
                 );
 
                 state = STARTED;

http://git-wip-us.apache.org/repos/asf/ignite/blob/30dc8e4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
index 905bfc4..61d9cc6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
@@ -21,8 +21,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.PageMemory;
@@ -187,7 +189,20 @@ public class TxLog implements DbCheckpointListener {
 
     /** {@inheritDoc} */
     @Override public void onCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
-        reuseList.saveMetadata();
+        Executor executor = ctx.executor();
+
+        if (executor == null)
+            reuseList.saveMetadata();
+        else {
+            executor.execute(() -> {
+                try {
+                    reuseList.saveMetadata();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            });
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/30dc8e4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
index 1c438b8..36ab163 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
@@ -17,8 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.persistence;
 
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import org.apache.ignite.IgniteCheckedException;
 import 
org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -42,6 +45,11 @@ public interface DbCheckpointListener {
          * @param cacheOrGrpName Cache or group name.
          */
         public boolean needToSnapshot(String cacheOrGrpName);
+
+        /**
+         * @return Context executor.
+         */
+        public @Nullable Executor executor();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/30dc8e4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 4bdf5ba..50f2782 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -45,6 +45,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
@@ -78,6 +79,7 @@ import 
org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.mem.DirectMemoryProvider;
 import org.apache.ignite.internal.mem.DirectMemoryRegion;
@@ -157,7 +159,7 @@ import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
 import org.apache.ignite.thread.IgniteThread;
-import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.apache.ignite.thread.IgniteTaskTrackingThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedHashMap;
@@ -248,6 +250,9 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     /** Timeout between partition file destroy and checkpoint to handle it. */
     private static final long PARTITION_DESTROY_CHECKPOINT_TIMEOUT = 30 * 
1000; // 30 Seconds.
 
+    /** */
+    private static final String CHECKPOINT_RUNNER_THREAD_PREFIX = 
"checkpoint-runner";
+
     /** Checkpoint thread. Needs to be volatile because it is created in 
exchange worker. */
     private volatile Checkpointer checkpointer;
 
@@ -285,7 +290,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     private boolean stopping;
 
     /** Checkpoint runner thread pool. If null tasks are to be run in single 
thread */
-    @Nullable private ExecutorService asyncRunner;
+    @Nullable private IgniteTaskTrackingThreadPoolExecutor asyncRunner;
 
     /** Thread local with buffers for the checkpoint threads. Each buffer 
represent one page for durable memory. */
     private ThreadLocal<ByteBuffer> threadBuf;
@@ -722,13 +727,15 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
      */
     private void initDataBase() {
         if (persistenceCfg.getCheckpointThreads() > 1)
-            asyncRunner = new IgniteThreadPoolExecutor(
-                "checkpoint-runner",
+            asyncRunner = new IgniteTaskTrackingThreadPoolExecutor(
+                CHECKPOINT_RUNNER_THREAD_PREFIX,
                 cctx.igniteInstanceName(),
                 persistenceCfg.getCheckpointThreads(),
                 persistenceCfg.getCheckpointThreads(),
-                30_000,
-                new LinkedBlockingQueue<Runnable>()
+                30_000, // A value is ignored if corePoolSize equals to 
maxPoolSize
+                new LinkedBlockingQueue<Runnable>(),
+                GridIoPolicy.UNDEFINED,
+                cctx.kernalContext().uncaughtExceptionHandler()
             );
     }
 
@@ -1538,7 +1545,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     @Override public boolean checkpointLockIsHeldByThread() {
         return !ASSERTION_ENABLED ||
             checkpointLock.isWriteLockedByCurrentThread() ||
-            CHECKPOINT_LOCK_HOLD_COUNT.get() > 0;
+            CHECKPOINT_LOCK_HOLD_COUNT.get() > 0 ||
+            
Thread.currentThread().getName().startsWith(CHECKPOINT_RUNNER_THREAD_PREFIX);
     }
 
     /**
@@ -3171,7 +3179,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                     // In case of checkpoint initialization error node should 
be invalidated and stopped.
                     cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
 
-                    return;
+                    throw new IgniteException(e); // Re-throw as unchecked 
exception to force stopping checkpoint thread.
                 }
 
                 updateHeartbeat();
@@ -3573,6 +3581,9 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
                 final PartitionAllocationMap map = new 
PartitionAllocationMap();
 
+                if (asyncRunner != null)
+                    asyncRunner.reset();
+
                 DbCheckpointListener.Context ctx0 = new 
DbCheckpointListener.Context() {
                     @Override public boolean nextSnapshot() {
                         return curr.nextSnapshot;
@@ -3583,39 +3594,81 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                         return map;
                     }
 
+                    /** {@inheritDoc} */
                     @Override public boolean needToSnapshot(String 
cacheOrGrpName) {
                         return 
curr.snapshotOperation.cacheGroupIds().contains(CU.cacheId(cacheOrGrpName));
                     }
+
+                    /** {@inheritDoc} */
+                    @Override public Executor executor() {
+                        return asyncRunner == null ? null : cmd -> {
+                            try {
+                                asyncRunner.execute(cmd);
+                            }
+                            catch (RejectedExecutionException e) {
+                                assert false: "A task should never be rejected 
by async runner";
+                            }
+                        };
+                    }
                 };
 
                 // Listeners must be invoked before we write checkpoint record 
to WAL.
                 for (DbCheckpointListener lsnr : lsnrs)
                     lsnr.onCheckpointBegin(ctx0);
 
+                if (asyncRunner != null) {
+                    asyncRunner.markInitialized();
+
+                    asyncRunner.awaitDone();
+                }
+
                 if (curr.nextSnapshot)
                     snapFut = 
snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map);
 
+                if (asyncRunner != null)
+                    asyncRunner.reset();
+
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     if (grp.isLocal() || !grp.walEnabled())
                         continue;
 
-                    ArrayList<GridDhtLocalPartition> parts = new ArrayList<>();
+                    Runnable r = () -> {
+                        ArrayList<GridDhtLocalPartition> parts = new 
ArrayList<>(grp.topology().localPartitions().size());
 
-                    for (GridDhtLocalPartition part : 
grp.topology().currentLocalPartitions())
-                        parts.add(part);
+                        for (GridDhtLocalPartition part : 
grp.topology().currentLocalPartitions())
+                            parts.add(part);
 
-                    CacheState state = new CacheState(parts.size());
+                        CacheState state = new CacheState(parts.size());
 
-                    for (GridDhtLocalPartition part : parts) {
-                        state.addPartitionState(
-                            part.id(),
-                            part.dataStore().fullSize(),
-                            part.updateCounter(),
-                            (byte)part.state().ordinal()
-                        );
-                    }
+                        for (GridDhtLocalPartition part : parts) {
+                            state.addPartitionState(
+                                part.id(),
+                                part.dataStore().fullSize(),
+                                part.updateCounter(),
+                                (byte)part.state().ordinal()
+                            );
+                        }
+
+                        synchronized (cpRec) {
+                            cpRec.addCacheGroupState(grp.groupId(), state);
+                        }
+                    };
+
+                    if (asyncRunner == null)
+                        r.run();
+                    else
+                        try {
+                            asyncRunner.execute(r);
+                        }
+                        catch (RejectedExecutionException e) {
+                            assert false: "Task should never be rejected by 
async runner";
+                        }
+                }
+
+                if (asyncRunner != null) {
+                    asyncRunner.markInitialized();
 
-                    cpRec.addCacheGroupState(grp.groupId(), state);
+                    asyncRunner.awaitDone();
                 }
 
                 cpPagesTuple = beginAllCheckpoints();

http://git-wip-us.apache.org/repos/asf/ignite/blob/30dc8e4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index dc7112f..c57a790 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -166,30 +167,63 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
     @Override public void onCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
         assert grp.dataRegion().pageMemory() instanceof PageMemoryEx;
 
-        reuseList.saveMetadata();
+        Executor execSvc = ctx.executor();
 
-        boolean metaWasUpdated = false;
+        if (execSvc == null) {
+            reuseList.saveMetadata();
 
-        for (CacheDataStore store : partDataStores.values())
-            metaWasUpdated |= saveStoreMetadata(store, ctx, !metaWasUpdated, 
false);
+            if (ctx.nextSnapshot())
+                updateSnapshotTag(ctx);
+
+            for (CacheDataStore store : partDataStores.values())
+                saveStoreMetadata(store, ctx, false);
+        }
+        else {
+            execSvc.execute(() -> {
+                try {
+                    reuseList.saveMetadata();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            });
+
+            if (ctx.nextSnapshot()) {
+                execSvc.execute(() -> {
+                    try {
+                        updateSnapshotTag(ctx);
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                });
+            }
+
+            for (CacheDataStore store : partDataStores.values())
+                execSvc.execute(() -> {
+                    try {
+                        saveStoreMetadata(store, ctx, false);
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                });
+        }
     }
 
     /**
      * @param store Store to save metadata.
      * @throws IgniteCheckedException If failed.
      */
-    private boolean saveStoreMetadata(
+    private void saveStoreMetadata(
         CacheDataStore store,
         Context ctx,
-        boolean saveMeta,
         boolean beforeDestroy
     ) throws IgniteCheckedException {
         RowStore rowStore0 = store.rowStore();
 
         boolean needSnapshot = ctx != null && ctx.nextSnapshot() && 
ctx.needToSnapshot(grp.cacheOrGroupName());
 
-        boolean wasSaveToMeta = false;
-
         if (rowStore0 != null) {
             CacheFreeListImpl freeList = 
(CacheFreeListImpl)rowStore0.freeList();
 
@@ -220,7 +254,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
 
                     // Do not save meta for evicted partitions on next 
checkpoints.
                     if (state == null)
-                        return false;
+                        return;
                 }
 
                 int grpId = grp.groupId();
@@ -232,10 +266,10 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
 
                     if (partMetaPageAddr == 0L) {
                         U.warn(log, "Failed to acquire write lock for meta 
page [metaPage=" + partMetaPage +
-                            ", saveMeta=" + saveMeta + ", beforeDestroy=" + 
beforeDestroy + ", size=" + size +
+                            ", beforeDestroy=" + beforeDestroy + ", size=" + 
size +
                             ", updCntr=" + updCntr + ", state=" + state + ']');
 
-                        return false;
+                        return;
                     }
 
                     boolean changed = false;
@@ -283,12 +317,6 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
 
                             io.setCandidatePageCount(partMetaPageAddr, size == 
0 ? 0 : pageCnt);
 
-                            if (saveMeta) {
-                                saveMeta(ctx);
-
-                                wasSaveToMeta = true;
-                            }
-
                             if (state == OWNING) {
                                 assert part != null;
 
@@ -344,8 +372,6 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         }
         else if (needSnapshot)
             tryAddEmptyPartitionToSnapshot(store, ctx);
-
-        return wasSaveToMeta;
     }
 
     /**
@@ -494,7 +520,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
     /**
      * @param ctx Context.
      */
-    private void saveMeta(Context ctx) throws IgniteCheckedException {
+    private void updateSnapshotTag(Context ctx) throws IgniteCheckedException {
         int grpId = grp.groupId();
         PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
         IgniteWriteAheadLogManager wal = this.ctx.wal();
@@ -590,7 +616,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         ctx.database().checkpointReadLock();
 
         try {
-            saveStoreMetadata(store, null, false, true);
+            saveStoreMetadata(store, null, true);
         }
         finally {
             ctx.database().checkpointReadUnlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/30dc8e4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index c0fba73..556d997 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -22,8 +22,10 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
@@ -415,9 +417,32 @@ public class MetaStorage implements DbCheckpointListener, 
ReadOnlyMetastorage, R
 
     /** {@inheritDoc} */
     @Override public void onCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
-        freeList.saveMetadata();
+        Executor executor = ctx.executor();
 
-        saveStoreMetadata();
+        if (executor == null) {
+            freeList.saveMetadata();
+
+            saveStoreMetadata();
+        }
+        else {
+            executor.execute(() -> {
+                try {
+                    freeList.saveMetadata();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            });
+
+            executor.execute(() -> {
+                try {
+                    saveStoreMetadata();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            });
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/30dc8e4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index f160549..3f35c5f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -642,6 +642,11 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
     }
 
     /** {@inheritDoc} */
+    @Override public Thread.UncaughtExceptionHandler 
uncaughtExceptionHandler() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public PdsFoldersResolver pdsFolderResolver() {
         return new PdsFoldersResolver() {
             /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/30dc8e4d/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java
new file mode 100644
index 0000000..6cae57e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.thread;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+
+/**
+ * An {@link ExecutorService} that executes submitted tasks using pooled grid 
threads.
+ *
+ * In addition to what it allows to track all enqueued tasks completion or 
failure during execution.
+ */
+public class IgniteTaskTrackingThreadPoolExecutor extends 
IgniteThreadPoolExecutor {
+    /** */
+    private final LongAdder pendingTaskCnt = new LongAdder();
+
+    /** */
+    private final LongAdder completedTaskCnt = new LongAdder();
+
+    /** */
+    private volatile boolean initialized;
+
+    /** */
+    private volatile AtomicReference<Throwable> err = new AtomicReference<>();
+
+    /**
+     * Creates a new service with the given initial parameters.
+     *
+     * @param threadNamePrefix Will be added at the beginning of all created 
threads.
+     * @param igniteInstanceName Must be the name of the grid.
+     * @param corePoolSize The number of threads to keep in the pool, even if 
they are idle.
+     * @param maxPoolSize The maximum number of threads to allow in the pool.
+     * @param keepAliveTime When the number of threads is greater than the 
core, this is the maximum time
+     *      that excess idle threads will wait for new tasks before 
terminating.
+     * @param workQ The queue to use for holding tasks before they are 
executed. This queue will hold only
+     *      runnable tasks submitted by the {@link #execute(Runnable)} method.
+     */
+    public IgniteTaskTrackingThreadPoolExecutor(String threadNamePrefix, 
String igniteInstanceName, int corePoolSize,
+        int maxPoolSize, long keepAliveTime, BlockingQueue<Runnable> workQ) {
+        super(threadNamePrefix, igniteInstanceName, corePoolSize, maxPoolSize, 
keepAliveTime, workQ);
+    }
+
+    /**
+     * Creates a new service with the given initial parameters.
+     *
+     * @param threadNamePrefix Will be added at the beginning of all created 
threads.
+     * @param igniteInstanceName Must be the name of the grid.
+     * @param corePoolSize The number of threads to keep in the pool, even if 
they are idle.
+     * @param maxPoolSize The maximum number of threads to allow in the pool.
+     * @param keepAliveTime When the number of threads is greater than the 
core, this is the maximum time
+     *      that excess idle threads will wait for new tasks before 
terminating.
+     * @param workQ The queue to use for holding tasks before they are 
executed. This queue will hold only
+     *      runnable tasks submitted by the {@link #execute(Runnable)} method.
+     * @param plc {@link GridIoPolicy} for thread pool.
+     * @param eHnd Uncaught exception handler for thread pool.
+     */
+    public IgniteTaskTrackingThreadPoolExecutor(String threadNamePrefix, 
String igniteInstanceName, int corePoolSize,
+        int maxPoolSize, long keepAliveTime, BlockingQueue<Runnable> workQ, 
byte plc,
+        UncaughtExceptionHandler eHnd) {
+        super(threadNamePrefix, igniteInstanceName, corePoolSize, maxPoolSize, 
keepAliveTime, workQ, plc, eHnd);
+    }
+
+    /**
+     * Creates a new service with the given initial parameters.
+     *
+     * @param corePoolSize The number of threads to keep in the pool, even if 
they are idle.
+     * @param maxPoolSize The maximum number of threads to allow in the pool.
+     * @param keepAliveTime When the number of threads is greater than the 
core, this is the maximum time
+     *      that excess idle threads will wait for new tasks before 
terminating.
+     * @param workQ The queue to use for holding tasks before they are 
executed. This queue will hold only the
+     *      runnable tasks submitted by the {@link #execute(Runnable)} method.
+     * @param threadFactory Thread factory.
+     */
+    public IgniteTaskTrackingThreadPoolExecutor(int corePoolSize, int 
maxPoolSize, long keepAliveTime,
+        BlockingQueue<Runnable> workQ, ThreadFactory threadFactory) {
+        super(corePoolSize, maxPoolSize, keepAliveTime, workQ, threadFactory);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void execute(Runnable cmd) {
+        pendingTaskCnt.add(1);
+
+        super.execute(cmd);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterExecute(Runnable r, Throwable t) {
+        super.afterExecute(r, t);
+
+        completedTaskCnt.add(1);
+
+        if (t != null && err.compareAndSet(null, t) || isDone()) {
+            synchronized (this) {
+                notifyAll();
+            }
+        }
+    }
+
+    /**
+     * Mark this executor as initialized.
+     * This method should be called when all required tasks are enqueued for 
execution.
+     */
+    public final void markInitialized() {
+        initialized = true;
+    }
+
+    /**
+     * Check error status.
+     *
+     * @return {@code True} if any task execution resulted in error.
+     */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    public final boolean isError() {
+        return err.get() != null;
+    }
+
+    /**
+     * Check done status.
+     *
+     * @return {@code True} when all enqueued task are completed.
+     */
+    public final boolean isDone() {
+        return initialized && completedTaskCnt.sum() == pendingTaskCnt.sum();
+    }
+
+    /**
+     * Wait synchronously until all tasks are completed or error has occurred.
+     *
+     * @throws IgniteCheckedException if task execution resulted in error.
+     */
+    public final synchronized void awaitDone() throws IgniteCheckedException {
+        // There are no guarantee what all enqueued tasks will be finished if 
an error has occurred.
+        while(!isError() && !isDone()) {
+            try {
+                wait();
+            }
+            catch (InterruptedException e) {
+                err.set(e);
+
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        if (isError())
+            throw new IgniteCheckedException("Task execution resulted in 
error", err.get());
+    }
+
+    /**
+     * Reset tasks tracking context.
+     * The method should be called before adding new tasks to the executor.
+     */
+    public final void reset() {
+        initialized = false;
+        completedTaskCnt.reset();
+        pendingTaskCnt.reset();
+        err.set(null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/30dc8e4d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
index 814ee57..230b828 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
@@ -29,10 +29,8 @@ public class IgnitePersistenceSequentialCheckpointTest 
extends IgnitePersistentS
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
-            .setWalMode(WALMode.LOG_ONLY)
-            .setCheckpointThreads(4)
-            .setCheckpointWriteOrder(CheckpointWriteOrder.SEQUENTIAL));
+        DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration();
+        
dsCfg.setCheckpointThreads(4).setCheckpointWriteOrder(CheckpointWriteOrder.SEQUENTIAL);
 
         return cfg;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/30dc8e4d/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index d1de347..1f95b94 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -80,6 +80,7 @@ public class GridTestKernalContext extends 
GridKernalContextImpl {
                 null,
                 U.allPluginProviders(),
                 null,
+                null,
                 null
         );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/30dc8e4d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
index bf726d5..673269b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
@@ -57,6 +57,7 @@ import org.apache.ignite.util.GridSpinReadWriteLockSelfTest;
 import org.apache.ignite.util.GridStringBuilderFactorySelfTest;
 import org.apache.ignite.util.GridTopologyHeapSizeSelfTest;
 import org.apache.ignite.util.GridTransientTest;
+import org.apache.ignite.util.IgniteTaskTrackingThreadPoolExecutorTest;
 import org.apache.ignite.util.mbeans.GridMBeanDisableSelfTest;
 import org.apache.ignite.util.mbeans.GridMBeanExoticNamesSelfTest;
 import org.apache.ignite.util.mbeans.GridMBeanSelfTest;
@@ -140,6 +141,9 @@ public class IgniteUtilSelfTestSuite extends TestSuite {
         // control.sh
         suite.addTestSuite(CommandHandlerParsingTest.class);
 
+        // Thread pool.
+        suite.addTestSuite(IgniteTaskTrackingThreadPoolExecutorTest.class);
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/30dc8e4d/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java
 
b/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java
new file mode 100644
index 0000000..3db02b0
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import junit.framework.TestCase;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.thread.IgniteTaskTrackingThreadPoolExecutor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Tests for tracking thread pool executor.
+ */
+public class IgniteTaskTrackingThreadPoolExecutorTest extends TestCase {
+    /** */
+    private IgniteTaskTrackingThreadPoolExecutor executor;
+
+    /** {@inheritDoc} */
+    @Override protected void setUp() throws Exception {
+        int procs = Runtime.getRuntime().availableProcessors();
+
+        executor = new IgniteTaskTrackingThreadPoolExecutor("test", "default",
+            procs * 2, procs * 2, 30_000, new LinkedBlockingQueue<>(), 
GridIoPolicy.UNDEFINED, (t, e) -> {
+                // No-op.
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void tearDown() throws Exception {
+        List<Runnable> runnables = executor.shutdownNow();
+
+        assertEquals("Some tasks are not completed", 0, runnables.size());
+    }
+
+    /** */
+    public void testSimple() throws IgniteCheckedException {
+        doTest(null);
+    }
+
+    /** */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    public void testWithException() throws IgniteCheckedException {
+        int fail = 5555;
+
+        try {
+            doTest(fail);
+
+            fail();
+        }
+        catch (Throwable t) {
+            TestException cause = (TestException)X.getCause(t);
+
+            assertEquals(fail, cause.idx);
+        }
+
+        AtomicReference<Throwable> err = U.field(executor, "err");
+        err.set(null);
+
+        executor.awaitDone();
+    }
+
+    /** */
+    public void testReuse() throws IgniteCheckedException {
+        long avg = 0;
+
+        long warmUp = 30;
+
+        int iters = 150;
+
+        for (int i = 0; i < iters; i++) {
+            long t1 = System.nanoTime();
+
+            doTest(null);
+
+            if (i >= warmUp)
+                avg += System.nanoTime() - t1;
+
+            executor.reset();
+        }
+
+        X.print("Average time per iteration: " + (avg / (iters - warmUp)) / 
1000 / 1000. + " ms");
+    }
+
+    /** */
+    private void doTest(@Nullable Integer fail) throws IgniteCheckedException {
+        LongAdder cnt = new LongAdder();
+
+        int exp = 100_000;
+
+        for (int i = 0; i < exp; i++) {
+            final int finalI = i;
+            executor.execute(() -> {
+                if (fail != null && fail == finalI)
+                    throw new TestException(finalI);
+                else
+                    cnt.add(1);
+            });
+        }
+
+        executor.markInitialized();
+
+        executor.awaitDone();
+
+        assertEquals("Counter is not as expected", exp, cnt.sum());
+    }
+
+    /** */
+    private static class TestException extends RuntimeException {
+        /** */
+        final int idx;
+
+        /**
+         * @param idx Index.
+         */
+        public TestException(int idx) {
+            this.idx = idx;
+        }
+    }
+}

Reply via email to