IGNITE-9612 Improve checkpoint mark phase speed - Fixes #4813. Signed-off-by: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5e220f90 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5e220f90 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5e220f90 Branch: refs/heads/ignite-gg-14206 Commit: 5e220f90d05fcc92eb655329a96f909f444ac75f Parents: 8740fd6 Author: Aleksei Scherbakov <alexey.scherbak...@gmail.com> Authored: Thu Sep 27 14:09:48 2018 +0300 Committer: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Committed: Thu Sep 27 14:09:48 2018 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalContext.java | 5 + .../ignite/internal/GridKernalContextImpl.java | 13 +- .../apache/ignite/internal/IgniteKernal.java | 7 +- .../org/apache/ignite/internal/IgnitionEx.java | 3 +- .../processors/cache/mvcc/txlog/TxLog.java | 17 +- .../cache/persistence/DbCheckpointListener.java | 8 + .../GridCacheDatabaseSharedManager.java | 95 +++++++--- .../persistence/GridCacheOffheapManager.java | 68 ++++--- .../persistence/metastorage/MetaStorage.java | 29 ++- .../wal/reader/StandaloneGridKernalContext.java | 5 + .../IgniteTaskTrackingThreadPoolExecutor.java | 180 +++++++++++++++++++ ...nitePersistenceSequentialCheckpointTest.java | 6 +- .../junits/GridTestKernalContext.java | 1 + .../testsuites/IgniteUtilSelfTestSuite.java | 4 + ...gniteTaskTrackingThreadPoolExecutorTest.java | 140 +++++++++++++++ 15 files changed, 528 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5e220f90/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 0690565..4cb68da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -685,4 +685,9 @@ public interface GridKernalContext extends Iterable<GridComponent> { * @return subscription processor to manage internal-only (strict node-local) subscriptions between components. */ public GridInternalSubscriptionProcessor internalSubscriptionProcessor(); + + /** + * @return Default uncaught exception handler used by thread pools. + */ + public Thread.UncaughtExceptionHandler uncaughtExceptionHandler(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5e220f90/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 3b7b430..a0e3f93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -372,6 +372,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable private WorkersRegistry workersRegistry; /** */ + private Thread.UncaughtExceptionHandler hnd; + + /** */ private IgniteEx grid; /** */ @@ -438,6 +441,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable * @param customExecSvcs Custom named executors. * @param plugins Plugin providers. * @param workerRegistry Worker registry. + * @param hnd Default uncaught exception handler used by thread pools. */ @SuppressWarnings("TypeMayBeWeakened") protected GridKernalContextImpl( @@ -463,7 +467,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable @Nullable Map<String, ? extends ExecutorService> customExecSvcs, List<PluginProvider> plugins, IgnitePredicate<String> clsFilter, - WorkersRegistry workerRegistry + WorkersRegistry workerRegistry, + Thread.UncaughtExceptionHandler hnd ) { assert grid != null; assert cfg != null; @@ -489,6 +494,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable this.schemaExecSvc = schemaExecSvc; this.customExecSvcs = customExecSvcs; this.workersRegistry = workerRegistry; + this.hnd = hnd; marshCtx = new MarshallerContextImpl(plugins, clsFilter); @@ -1158,6 +1164,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + public Thread.UncaughtExceptionHandler uncaughtExceptionHandler() { + return hnd; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridKernalContextImpl.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5e220f90/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 245a9f9..6b1c995 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -765,6 +765,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { * @param customExecSvcs Custom named executors. * @param errHnd Error handler to use for notification about startup problems. * @param workerRegistry Worker registry. + * @param hnd Default uncaught exception handler used by thread pools. * @throws IgniteCheckedException Thrown in case of any errors. */ @SuppressWarnings({"CatchGenericClass", "unchecked"}) @@ -787,7 +788,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ExecutorService schemaExecSvc, @Nullable final Map<String, ? extends ExecutorService> customExecSvcs, GridAbsClosure errHnd, - WorkersRegistry workerRegistry + WorkersRegistry workerRegistry, + Thread.UncaughtExceptionHandler hnd ) throws IgniteCheckedException { gw.compareAndSet(null, new GridKernalGatewayImpl(cfg.getIgniteInstanceName())); @@ -909,7 +911,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { customExecSvcs, plugins, MarshallerUtils.classNameFilter(this.getClass().getClassLoader()), - workerRegistry + workerRegistry, + hnd ); cfg.getMarshaller().setContext(ctx.marshallerContext()); http://git-wip-us.apache.org/repos/asf/ignite/blob/5e220f90/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 8bef477..ed0fbe9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -2053,7 +2053,8 @@ public class IgnitionEx { startLatch.countDown(); } }, - workerRegistry + workerRegistry, + oomeHnd ); state = STARTED; http://git-wip-us.apache.org/repos/asf/ignite/blob/5e220f90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java index 905bfc4..61d9cc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java @@ -21,8 +21,10 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; @@ -187,7 +189,20 @@ public class TxLog implements DbCheckpointListener { /** {@inheritDoc} */ @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException { - reuseList.saveMetadata(); + Executor executor = ctx.executor(); + + if (executor == null) + reuseList.saveMetadata(); + else { + executor.execute(() -> { + try { + reuseList.saveMetadata(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + }); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5e220f90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java index 1c438b8..36ab163 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java @@ -17,8 +17,11 @@ package org.apache.ignite.internal.processors.cache.persistence; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap; +import org.jetbrains.annotations.Nullable; /** * @@ -42,6 +45,11 @@ public interface DbCheckpointListener { * @param cacheOrGrpName Cache or group name. */ public boolean needToSnapshot(String cacheOrGrpName); + + /** + * @return Context executor. + */ + public @Nullable Executor executor(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5e220f90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index fe0103a..158c3b1 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -45,6 +45,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -78,6 +79,7 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.mem.DirectMemoryProvider; import org.apache.ignite.internal.mem.DirectMemoryRegion; @@ -157,7 +159,7 @@ import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.mxbean.DataStorageMetricsMXBean; import org.apache.ignite.thread.IgniteThread; -import org.apache.ignite.thread.IgniteThreadPoolExecutor; +import org.apache.ignite.thread.IgniteTaskTrackingThreadPoolExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedHashMap; @@ -248,6 +250,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Timeout between partition file destroy and checkpoint to handle it. */ private static final long PARTITION_DESTROY_CHECKPOINT_TIMEOUT = 30 * 1000; // 30 Seconds. + /** */ + private static final String CHECKPOINT_RUNNER_THREAD_PREFIX = "checkpoint-runner"; + /** Checkpoint thread. Needs to be volatile because it is created in exchange worker. */ private volatile Checkpointer checkpointer; @@ -285,7 +290,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private boolean stopping; /** Checkpoint runner thread pool. If null tasks are to be run in single thread */ - @Nullable private ExecutorService asyncRunner; + @Nullable private IgniteTaskTrackingThreadPoolExecutor asyncRunner; /** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */ private ThreadLocal<ByteBuffer> threadBuf; @@ -722,13 +727,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan */ private void initDataBase() { if (persistenceCfg.getCheckpointThreads() > 1) - asyncRunner = new IgniteThreadPoolExecutor( - "checkpoint-runner", + asyncRunner = new IgniteTaskTrackingThreadPoolExecutor( + CHECKPOINT_RUNNER_THREAD_PREFIX, cctx.igniteInstanceName(), persistenceCfg.getCheckpointThreads(), persistenceCfg.getCheckpointThreads(), - 30_000, - new LinkedBlockingQueue<Runnable>() + 30_000, // A value is ignored if corePoolSize equals to maxPoolSize + new LinkedBlockingQueue<Runnable>(), + GridIoPolicy.UNDEFINED, + cctx.kernalContext().uncaughtExceptionHandler() ); } @@ -1538,7 +1545,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan @Override public boolean checkpointLockIsHeldByThread() { return !ASSERTION_ENABLED || checkpointLock.isWriteLockedByCurrentThread() || - CHECKPOINT_LOCK_HOLD_COUNT.get() > 0; + CHECKPOINT_LOCK_HOLD_COUNT.get() > 0 || + Thread.currentThread().getName().startsWith(CHECKPOINT_RUNNER_THREAD_PREFIX); } /** @@ -3171,7 +3179,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan // In case of checkpoint initialization error node should be invalidated and stopped. cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); - return; + throw new IgniteException(e); // Re-throw as unchecked exception to force stopping checkpoint thread. } updateHeartbeat(); @@ -3573,6 +3581,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan final PartitionAllocationMap map = new PartitionAllocationMap(); + if (asyncRunner != null) + asyncRunner.reset(); + DbCheckpointListener.Context ctx0 = new DbCheckpointListener.Context() { @Override public boolean nextSnapshot() { return curr.nextSnapshot; @@ -3583,39 +3594,81 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan return map; } + /** {@inheritDoc} */ @Override public boolean needToSnapshot(String cacheOrGrpName) { return curr.snapshotOperation.cacheGroupIds().contains(CU.cacheId(cacheOrGrpName)); } + + /** {@inheritDoc} */ + @Override public Executor executor() { + return asyncRunner == null ? null : cmd -> { + try { + asyncRunner.execute(cmd); + } + catch (RejectedExecutionException e) { + assert false: "A task should never be rejected by async runner"; + } + }; + } }; // Listeners must be invoked before we write checkpoint record to WAL. for (DbCheckpointListener lsnr : lsnrs) lsnr.onCheckpointBegin(ctx0); + if (asyncRunner != null) { + asyncRunner.markInitialized(); + + asyncRunner.awaitDone(); + } + if (curr.nextSnapshot) snapFut = snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map); + if (asyncRunner != null) + asyncRunner.reset(); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal() || !grp.walEnabled()) continue; - ArrayList<GridDhtLocalPartition> parts = new ArrayList<>(); + Runnable r = () -> { + ArrayList<GridDhtLocalPartition> parts = new ArrayList<>(grp.topology().localPartitions().size()); - for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) - parts.add(part); + for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) + parts.add(part); - CacheState state = new CacheState(parts.size()); + CacheState state = new CacheState(parts.size()); - for (GridDhtLocalPartition part : parts) { - state.addPartitionState( - part.id(), - part.dataStore().fullSize(), - part.updateCounter(), - (byte)part.state().ordinal() - ); - } + for (GridDhtLocalPartition part : parts) { + state.addPartitionState( + part.id(), + part.dataStore().fullSize(), + part.updateCounter(), + (byte)part.state().ordinal() + ); + } + + synchronized (cpRec) { + cpRec.addCacheGroupState(grp.groupId(), state); + } + }; + + if (asyncRunner == null) + r.run(); + else + try { + asyncRunner.execute(r); + } + catch (RejectedExecutionException e) { + assert false: "Task should never be rejected by async runner"; + } + } + + if (asyncRunner != null) { + asyncRunner.markInitialized(); - cpRec.addCacheGroupState(grp.groupId(), state); + asyncRunner.awaitDone(); } cpPagesTuple = beginAllCheckpoints(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5e220f90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index dc7112f..c57a790 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -166,30 +167,63 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException { assert grp.dataRegion().pageMemory() instanceof PageMemoryEx; - reuseList.saveMetadata(); + Executor execSvc = ctx.executor(); - boolean metaWasUpdated = false; + if (execSvc == null) { + reuseList.saveMetadata(); - for (CacheDataStore store : partDataStores.values()) - metaWasUpdated |= saveStoreMetadata(store, ctx, !metaWasUpdated, false); + if (ctx.nextSnapshot()) + updateSnapshotTag(ctx); + + for (CacheDataStore store : partDataStores.values()) + saveStoreMetadata(store, ctx, false); + } + else { + execSvc.execute(() -> { + try { + reuseList.saveMetadata(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + }); + + if (ctx.nextSnapshot()) { + execSvc.execute(() -> { + try { + updateSnapshotTag(ctx); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + }); + } + + for (CacheDataStore store : partDataStores.values()) + execSvc.execute(() -> { + try { + saveStoreMetadata(store, ctx, false); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + }); + } } /** * @param store Store to save metadata. * @throws IgniteCheckedException If failed. */ - private boolean saveStoreMetadata( + private void saveStoreMetadata( CacheDataStore store, Context ctx, - boolean saveMeta, boolean beforeDestroy ) throws IgniteCheckedException { RowStore rowStore0 = store.rowStore(); boolean needSnapshot = ctx != null && ctx.nextSnapshot() && ctx.needToSnapshot(grp.cacheOrGroupName()); - boolean wasSaveToMeta = false; - if (rowStore0 != null) { CacheFreeListImpl freeList = (CacheFreeListImpl)rowStore0.freeList(); @@ -220,7 +254,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple // Do not save meta for evicted partitions on next checkpoints. if (state == null) - return false; + return; } int grpId = grp.groupId(); @@ -232,10 +266,10 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (partMetaPageAddr == 0L) { U.warn(log, "Failed to acquire write lock for meta page [metaPage=" + partMetaPage + - ", saveMeta=" + saveMeta + ", beforeDestroy=" + beforeDestroy + ", size=" + size + + ", beforeDestroy=" + beforeDestroy + ", size=" + size + ", updCntr=" + updCntr + ", state=" + state + ']'); - return false; + return; } boolean changed = false; @@ -283,12 +317,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple io.setCandidatePageCount(partMetaPageAddr, size == 0 ? 0 : pageCnt); - if (saveMeta) { - saveMeta(ctx); - - wasSaveToMeta = true; - } - if (state == OWNING) { assert part != null; @@ -344,8 +372,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } else if (needSnapshot) tryAddEmptyPartitionToSnapshot(store, ctx); - - return wasSaveToMeta; } /** @@ -494,7 +520,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** * @param ctx Context. */ - private void saveMeta(Context ctx) throws IgniteCheckedException { + private void updateSnapshotTag(Context ctx) throws IgniteCheckedException { int grpId = grp.groupId(); PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); IgniteWriteAheadLogManager wal = this.ctx.wal(); @@ -590,7 +616,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple ctx.database().checkpointReadLock(); try { - saveStoreMetadata(store, null, false, true); + saveStoreMetadata(store, null, true); } finally { ctx.database().checkpointReadUnlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5e220f90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java index c0fba73..556d997 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java @@ -22,8 +22,10 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageIdUtils; @@ -415,9 +417,32 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R /** {@inheritDoc} */ @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException { - freeList.saveMetadata(); + Executor executor = ctx.executor(); - saveStoreMetadata(); + if (executor == null) { + freeList.saveMetadata(); + + saveStoreMetadata(); + } + else { + executor.execute(() -> { + try { + freeList.saveMetadata(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + }); + + executor.execute(() -> { + try { + saveStoreMetadata(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + }); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5e220f90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index f160549..3f35c5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -642,6 +642,11 @@ public class StandaloneGridKernalContext implements GridKernalContext { } /** {@inheritDoc} */ + @Override public Thread.UncaughtExceptionHandler uncaughtExceptionHandler() { + return null; + } + + /** {@inheritDoc} */ @Override public PdsFoldersResolver pdsFolderResolver() { return new PdsFoldersResolver() { /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5e220f90/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java new file mode 100644 index 0000000..6cae57e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.thread; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; + +/** + * An {@link ExecutorService} that executes submitted tasks using pooled grid threads. + * + * In addition to what it allows to track all enqueued tasks completion or failure during execution. + */ +public class IgniteTaskTrackingThreadPoolExecutor extends IgniteThreadPoolExecutor { + /** */ + private final LongAdder pendingTaskCnt = new LongAdder(); + + /** */ + private final LongAdder completedTaskCnt = new LongAdder(); + + /** */ + private volatile boolean initialized; + + /** */ + private volatile AtomicReference<Throwable> err = new AtomicReference<>(); + + /** + * Creates a new service with the given initial parameters. + * + * @param threadNamePrefix Will be added at the beginning of all created threads. + * @param igniteInstanceName Must be the name of the grid. + * @param corePoolSize The number of threads to keep in the pool, even if they are idle. + * @param maxPoolSize The maximum number of threads to allow in the pool. + * @param keepAliveTime When the number of threads is greater than the core, this is the maximum time + * that excess idle threads will wait for new tasks before terminating. + * @param workQ The queue to use for holding tasks before they are executed. This queue will hold only + * runnable tasks submitted by the {@link #execute(Runnable)} method. + */ + public IgniteTaskTrackingThreadPoolExecutor(String threadNamePrefix, String igniteInstanceName, int corePoolSize, + int maxPoolSize, long keepAliveTime, BlockingQueue<Runnable> workQ) { + super(threadNamePrefix, igniteInstanceName, corePoolSize, maxPoolSize, keepAliveTime, workQ); + } + + /** + * Creates a new service with the given initial parameters. + * + * @param threadNamePrefix Will be added at the beginning of all created threads. + * @param igniteInstanceName Must be the name of the grid. + * @param corePoolSize The number of threads to keep in the pool, even if they are idle. + * @param maxPoolSize The maximum number of threads to allow in the pool. + * @param keepAliveTime When the number of threads is greater than the core, this is the maximum time + * that excess idle threads will wait for new tasks before terminating. + * @param workQ The queue to use for holding tasks before they are executed. This queue will hold only + * runnable tasks submitted by the {@link #execute(Runnable)} method. + * @param plc {@link GridIoPolicy} for thread pool. + * @param eHnd Uncaught exception handler for thread pool. + */ + public IgniteTaskTrackingThreadPoolExecutor(String threadNamePrefix, String igniteInstanceName, int corePoolSize, + int maxPoolSize, long keepAliveTime, BlockingQueue<Runnable> workQ, byte plc, + UncaughtExceptionHandler eHnd) { + super(threadNamePrefix, igniteInstanceName, corePoolSize, maxPoolSize, keepAliveTime, workQ, plc, eHnd); + } + + /** + * Creates a new service with the given initial parameters. + * + * @param corePoolSize The number of threads to keep in the pool, even if they are idle. + * @param maxPoolSize The maximum number of threads to allow in the pool. + * @param keepAliveTime When the number of threads is greater than the core, this is the maximum time + * that excess idle threads will wait for new tasks before terminating. + * @param workQ The queue to use for holding tasks before they are executed. This queue will hold only the + * runnable tasks submitted by the {@link #execute(Runnable)} method. + * @param threadFactory Thread factory. + */ + public IgniteTaskTrackingThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, + BlockingQueue<Runnable> workQ, ThreadFactory threadFactory) { + super(corePoolSize, maxPoolSize, keepAliveTime, workQ, threadFactory); + } + + /** {@inheritDoc} */ + @Override public void execute(Runnable cmd) { + pendingTaskCnt.add(1); + + super.execute(cmd); + } + + /** {@inheritDoc} */ + @Override protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + + completedTaskCnt.add(1); + + if (t != null && err.compareAndSet(null, t) || isDone()) { + synchronized (this) { + notifyAll(); + } + } + } + + /** + * Mark this executor as initialized. + * This method should be called when all required tasks are enqueued for execution. + */ + public final void markInitialized() { + initialized = true; + } + + /** + * Check error status. + * + * @return {@code True} if any task execution resulted in error. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public final boolean isError() { + return err.get() != null; + } + + /** + * Check done status. + * + * @return {@code True} when all enqueued task are completed. + */ + public final boolean isDone() { + return initialized && completedTaskCnt.sum() == pendingTaskCnt.sum(); + } + + /** + * Wait synchronously until all tasks are completed or error has occurred. + * + * @throws IgniteCheckedException if task execution resulted in error. + */ + public final synchronized void awaitDone() throws IgniteCheckedException { + // There are no guarantee what all enqueued tasks will be finished if an error has occurred. + while(!isError() && !isDone()) { + try { + wait(); + } + catch (InterruptedException e) { + err.set(e); + + Thread.currentThread().interrupt(); + } + } + + if (isError()) + throw new IgniteCheckedException("Task execution resulted in error", err.get()); + } + + /** + * Reset tasks tracking context. + * The method should be called before adding new tasks to the executor. + */ + public final void reset() { + initialized = false; + completedTaskCnt.reset(); + pendingTaskCnt.reset(); + err.set(null); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5e220f90/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java index 814ee57..230b828 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java @@ -29,10 +29,8 @@ public class IgnitePersistenceSequentialCheckpointTest extends IgnitePersistentS @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - cfg.setDataStorageConfiguration(new DataStorageConfiguration() - .setWalMode(WALMode.LOG_ONLY) - .setCheckpointThreads(4) - .setCheckpointWriteOrder(CheckpointWriteOrder.SEQUENTIAL)); + DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration(); + dsCfg.setCheckpointThreads(4).setCheckpointWriteOrder(CheckpointWriteOrder.SEQUENTIAL); return cfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/5e220f90/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java index d1de347..1f95b94 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java @@ -80,6 +80,7 @@ public class GridTestKernalContext extends GridKernalContextImpl { null, U.allPluginProviders(), null, + null, null ); http://git-wip-us.apache.org/repos/asf/ignite/blob/5e220f90/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java index bf726d5..673269b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java @@ -57,6 +57,7 @@ import org.apache.ignite.util.GridSpinReadWriteLockSelfTest; import org.apache.ignite.util.GridStringBuilderFactorySelfTest; import org.apache.ignite.util.GridTopologyHeapSizeSelfTest; import org.apache.ignite.util.GridTransientTest; +import org.apache.ignite.util.IgniteTaskTrackingThreadPoolExecutorTest; import org.apache.ignite.util.mbeans.GridMBeanDisableSelfTest; import org.apache.ignite.util.mbeans.GridMBeanExoticNamesSelfTest; import org.apache.ignite.util.mbeans.GridMBeanSelfTest; @@ -140,6 +141,9 @@ public class IgniteUtilSelfTestSuite extends TestSuite { // control.sh suite.addTestSuite(CommandHandlerParsingTest.class); + // Thread pool. + suite.addTestSuite(IgniteTaskTrackingThreadPoolExecutorTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5e220f90/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java b/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java new file mode 100644 index 0000000..3db02b0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util; + +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import junit.framework.TestCase; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.thread.IgniteTaskTrackingThreadPoolExecutor; +import org.jetbrains.annotations.Nullable; + +/** + * Tests for tracking thread pool executor. + */ +public class IgniteTaskTrackingThreadPoolExecutorTest extends TestCase { + /** */ + private IgniteTaskTrackingThreadPoolExecutor executor; + + /** {@inheritDoc} */ + @Override protected void setUp() throws Exception { + int procs = Runtime.getRuntime().availableProcessors(); + + executor = new IgniteTaskTrackingThreadPoolExecutor("test", "default", + procs * 2, procs * 2, 30_000, new LinkedBlockingQueue<>(), GridIoPolicy.UNDEFINED, (t, e) -> { + // No-op. + }); + } + + /** {@inheritDoc} */ + @Override protected void tearDown() throws Exception { + List<Runnable> runnables = executor.shutdownNow(); + + assertEquals("Some tasks are not completed", 0, runnables.size()); + } + + /** */ + public void testSimple() throws IgniteCheckedException { + doTest(null); + } + + /** */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testWithException() throws IgniteCheckedException { + int fail = 5555; + + try { + doTest(fail); + + fail(); + } + catch (Throwable t) { + TestException cause = (TestException)X.getCause(t); + + assertEquals(fail, cause.idx); + } + + AtomicReference<Throwable> err = U.field(executor, "err"); + err.set(null); + + executor.awaitDone(); + } + + /** */ + public void testReuse() throws IgniteCheckedException { + long avg = 0; + + long warmUp = 30; + + int iters = 150; + + for (int i = 0; i < iters; i++) { + long t1 = System.nanoTime(); + + doTest(null); + + if (i >= warmUp) + avg += System.nanoTime() - t1; + + executor.reset(); + } + + X.print("Average time per iteration: " + (avg / (iters - warmUp)) / 1000 / 1000. + " ms"); + } + + /** */ + private void doTest(@Nullable Integer fail) throws IgniteCheckedException { + LongAdder cnt = new LongAdder(); + + int exp = 100_000; + + for (int i = 0; i < exp; i++) { + final int finalI = i; + executor.execute(() -> { + if (fail != null && fail == finalI) + throw new TestException(finalI); + else + cnt.add(1); + }); + } + + executor.markInitialized(); + + executor.awaitDone(); + + assertEquals("Counter is not as expected", exp, cnt.sum()); + } + + /** */ + private static class TestException extends RuntimeException { + /** */ + final int idx; + + /** + * @param idx Index. + */ + public TestException(int idx) { + this.idx = idx; + } + } +}