Repository: ignite Updated Branches: refs/heads/master a1cb021c0 -> 10c2b10e6
IGNITE-9898 fix checkpoint thread hangs on await async task completion - Fixes #5002. Signed-off-by: Dmitriy Govorukhin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/10c2b10e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/10c2b10e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/10c2b10e Branch: refs/heads/master Commit: 10c2b10e605d372e832b65da52d67dc9656b53c1 Parents: a1cb021 Author: Dmitriy Govorukhin <[email protected]> Authored: Wed Oct 17 15:40:40 2018 +0300 Committer: Dmitriy Govorukhin <[email protected]> Committed: Wed Oct 17 15:40:40 2018 +0300 ---------------------------------------------------------------------- .../GridCacheDatabaseSharedManager.java | 47 ++--- .../ignite/internal/util/IgniteUtils.java | 22 +++ .../IgniteTaskTrackingThreadPoolExecutor.java | 180 ------------------- .../testsuites/IgniteUtilSelfTestSuite.java | 7 +- ...gniteTaskTrackingThreadPoolExecutorTest.java | 140 --------------- 5 files changed, 49 insertions(+), 347 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/10c2b10e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index d69b83c..e74954f 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -140,6 +140,7 @@ import org.apache.ignite.internal.util.GridMultiCollectionWrapper; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.CountDownFuture; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridInClosure3X; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -159,7 +160,7 @@ import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.mxbean.DataStorageMetricsMXBean; import org.apache.ignite.thread.IgniteThread; -import org.apache.ignite.thread.IgniteTaskTrackingThreadPoolExecutor; +import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedHashMap; @@ -290,7 +291,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private boolean stopping; /** Checkpoint runner thread pool. If null tasks are to be run in single thread */ - @Nullable private IgniteTaskTrackingThreadPoolExecutor asyncRunner; + @Nullable private IgniteThreadPoolExecutor asyncRunner; /** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */ private ThreadLocal<ByteBuffer> threadBuf; @@ -727,15 +728,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan */ private void initDataBase() { if (persistenceCfg.getCheckpointThreads() > 1) - asyncRunner = new IgniteTaskTrackingThreadPoolExecutor( + asyncRunner = new IgniteThreadPoolExecutor( CHECKPOINT_RUNNER_THREAD_PREFIX, cctx.igniteInstanceName(), persistenceCfg.getCheckpointThreads(), persistenceCfg.getCheckpointThreads(), - 30_000, // A value is ignored if corePoolSize equals to maxPoolSize - new LinkedBlockingQueue<Runnable>(), - GridIoPolicy.UNDEFINED, - cctx.kernalContext().uncaughtExceptionHandler() + 30_000, + new LinkedBlockingQueue<Runnable>() ); } @@ -3585,8 +3584,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan final PartitionAllocationMap map = new PartitionAllocationMap(); - if (asyncRunner != null) - asyncRunner.reset(); + GridCompoundFuture asyncLsnrFut = asyncRunner == null ? null : new GridCompoundFuture(); DbCheckpointListener.Context ctx0 = new DbCheckpointListener.Context() { @Override public boolean nextSnapshot() { @@ -3607,10 +3605,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan @Override public Executor executor() { return asyncRunner == null ? null : cmd -> { try { - asyncRunner.execute(cmd); + GridFutureAdapter<?> res = new GridFutureAdapter<>(); + + asyncRunner.execute(U.wrapIgniteFuture(cmd, res)); + + asyncLsnrFut.add(res); } catch (RejectedExecutionException e) { - assert false: "A task should never be rejected by async runner"; + assert false : "A task should never be rejected by async runner"; } }; } @@ -3620,17 +3622,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan for (DbCheckpointListener lsnr : lsnrs) lsnr.onCheckpointBegin(ctx0); - if (asyncRunner != null) { - asyncRunner.markInitialized(); + if (asyncLsnrFut != null) { + asyncLsnrFut.markInitialized(); - asyncRunner.awaitDone(); + asyncLsnrFut.get(); } if (curr.nextSnapshot) snapFut = snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map); - if (asyncRunner != null) - asyncRunner.reset(); + GridCompoundFuture grpHandleFut = asyncRunner == null ? null : new GridCompoundFuture(); for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal() || !grp.walEnabled()) @@ -3662,17 +3663,21 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan r.run(); else try { - asyncRunner.execute(r); + GridFutureAdapter<?> res = new GridFutureAdapter<>(); + + asyncRunner.execute(U.wrapIgniteFuture(r, res)); + + grpHandleFut.add(res); } catch (RejectedExecutionException e) { - assert false: "Task should never be rejected by async runner"; + assert false : "Task should never be rejected by async runner"; } } - if (asyncRunner != null) { - asyncRunner.markInitialized(); + if (grpHandleFut != null) { + grpHandleFut.markInitialized(); - asyncRunner.awaitDone(); + grpHandleFut.get(); } cpPagesTuple = beginAllCheckpoints(); http://git-wip-us.apache.org/repos/asf/ignite/blob/10c2b10e/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 8dbea17..edb9871 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -207,6 +207,7 @@ import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.io.GridFilenameUtils; import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryNativeLoader; @@ -10723,6 +10724,27 @@ public abstract class IgniteUtils { /** * + * @param r Runnable. + * @param fut Grid future apater. + * @return Runnable with wrapped future. + */ + public static Runnable wrapIgniteFuture(Runnable r, GridFutureAdapter<?> fut) { + return () -> { + try { + r.run(); + + fut.onDone(); + } + catch (Throwable e) { + fut.onDone(e); + + throw e; + } + }; + } + + /** + * */ public static class ReentrantReadWriteLockTracer implements ReadWriteLock { /** Read lock. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/10c2b10e/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java deleted file mode 100644 index 6cae57e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.thread; - -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.LongAdder; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; - -/** - * An {@link ExecutorService} that executes submitted tasks using pooled grid threads. - * - * In addition to what it allows to track all enqueued tasks completion or failure during execution. - */ -public class IgniteTaskTrackingThreadPoolExecutor extends IgniteThreadPoolExecutor { - /** */ - private final LongAdder pendingTaskCnt = new LongAdder(); - - /** */ - private final LongAdder completedTaskCnt = new LongAdder(); - - /** */ - private volatile boolean initialized; - - /** */ - private volatile AtomicReference<Throwable> err = new AtomicReference<>(); - - /** - * Creates a new service with the given initial parameters. - * - * @param threadNamePrefix Will be added at the beginning of all created threads. - * @param igniteInstanceName Must be the name of the grid. - * @param corePoolSize The number of threads to keep in the pool, even if they are idle. - * @param maxPoolSize The maximum number of threads to allow in the pool. - * @param keepAliveTime When the number of threads is greater than the core, this is the maximum time - * that excess idle threads will wait for new tasks before terminating. - * @param workQ The queue to use for holding tasks before they are executed. This queue will hold only - * runnable tasks submitted by the {@link #execute(Runnable)} method. - */ - public IgniteTaskTrackingThreadPoolExecutor(String threadNamePrefix, String igniteInstanceName, int corePoolSize, - int maxPoolSize, long keepAliveTime, BlockingQueue<Runnable> workQ) { - super(threadNamePrefix, igniteInstanceName, corePoolSize, maxPoolSize, keepAliveTime, workQ); - } - - /** - * Creates a new service with the given initial parameters. - * - * @param threadNamePrefix Will be added at the beginning of all created threads. - * @param igniteInstanceName Must be the name of the grid. - * @param corePoolSize The number of threads to keep in the pool, even if they are idle. - * @param maxPoolSize The maximum number of threads to allow in the pool. - * @param keepAliveTime When the number of threads is greater than the core, this is the maximum time - * that excess idle threads will wait for new tasks before terminating. - * @param workQ The queue to use for holding tasks before they are executed. This queue will hold only - * runnable tasks submitted by the {@link #execute(Runnable)} method. - * @param plc {@link GridIoPolicy} for thread pool. - * @param eHnd Uncaught exception handler for thread pool. - */ - public IgniteTaskTrackingThreadPoolExecutor(String threadNamePrefix, String igniteInstanceName, int corePoolSize, - int maxPoolSize, long keepAliveTime, BlockingQueue<Runnable> workQ, byte plc, - UncaughtExceptionHandler eHnd) { - super(threadNamePrefix, igniteInstanceName, corePoolSize, maxPoolSize, keepAliveTime, workQ, plc, eHnd); - } - - /** - * Creates a new service with the given initial parameters. - * - * @param corePoolSize The number of threads to keep in the pool, even if they are idle. - * @param maxPoolSize The maximum number of threads to allow in the pool. - * @param keepAliveTime When the number of threads is greater than the core, this is the maximum time - * that excess idle threads will wait for new tasks before terminating. - * @param workQ The queue to use for holding tasks before they are executed. This queue will hold only the - * runnable tasks submitted by the {@link #execute(Runnable)} method. - * @param threadFactory Thread factory. - */ - public IgniteTaskTrackingThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, - BlockingQueue<Runnable> workQ, ThreadFactory threadFactory) { - super(corePoolSize, maxPoolSize, keepAliveTime, workQ, threadFactory); - } - - /** {@inheritDoc} */ - @Override public void execute(Runnable cmd) { - pendingTaskCnt.add(1); - - super.execute(cmd); - } - - /** {@inheritDoc} */ - @Override protected void afterExecute(Runnable r, Throwable t) { - super.afterExecute(r, t); - - completedTaskCnt.add(1); - - if (t != null && err.compareAndSet(null, t) || isDone()) { - synchronized (this) { - notifyAll(); - } - } - } - - /** - * Mark this executor as initialized. - * This method should be called when all required tasks are enqueued for execution. - */ - public final void markInitialized() { - initialized = true; - } - - /** - * Check error status. - * - * @return {@code True} if any task execution resulted in error. - */ - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public final boolean isError() { - return err.get() != null; - } - - /** - * Check done status. - * - * @return {@code True} when all enqueued task are completed. - */ - public final boolean isDone() { - return initialized && completedTaskCnt.sum() == pendingTaskCnt.sum(); - } - - /** - * Wait synchronously until all tasks are completed or error has occurred. - * - * @throws IgniteCheckedException if task execution resulted in error. - */ - public final synchronized void awaitDone() throws IgniteCheckedException { - // There are no guarantee what all enqueued tasks will be finished if an error has occurred. - while(!isError() && !isDone()) { - try { - wait(); - } - catch (InterruptedException e) { - err.set(e); - - Thread.currentThread().interrupt(); - } - } - - if (isError()) - throw new IgniteCheckedException("Task execution resulted in error", err.get()); - } - - /** - * Reset tasks tracking context. - * The method should be called before adding new tasks to the executor. - */ - public final void reset() { - initialized = false; - completedTaskCnt.reset(); - pendingTaskCnt.reset(); - err.set(null); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/10c2b10e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java index 673269b..a281662 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java @@ -17,6 +17,7 @@ package org.apache.ignite.testsuites; +import java.util.Set; import junit.framework.TestSuite; import org.apache.ignite.internal.commandline.CommandHandlerParsingTest; import org.apache.ignite.internal.pagemem.impl.PageIdUtilsSelfTest; @@ -57,14 +58,11 @@ import org.apache.ignite.util.GridSpinReadWriteLockSelfTest; import org.apache.ignite.util.GridStringBuilderFactorySelfTest; import org.apache.ignite.util.GridTopologyHeapSizeSelfTest; import org.apache.ignite.util.GridTransientTest; -import org.apache.ignite.util.IgniteTaskTrackingThreadPoolExecutorTest; import org.apache.ignite.util.mbeans.GridMBeanDisableSelfTest; import org.apache.ignite.util.mbeans.GridMBeanExoticNamesSelfTest; import org.apache.ignite.util.mbeans.GridMBeanSelfTest; import org.apache.ignite.util.mbeans.WorkersControlMXBeanTest; -import java.util.Set; - /** * Test suite for Ignite utility classes. */ @@ -141,9 +139,6 @@ public class IgniteUtilSelfTestSuite extends TestSuite { // control.sh suite.addTestSuite(CommandHandlerParsingTest.class); - // Thread pool. - suite.addTestSuite(IgniteTaskTrackingThreadPoolExecutorTest.class); - return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/10c2b10e/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java b/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java deleted file mode 100644 index 3db02b0..0000000 --- a/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.util; - -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.LongAdder; -import junit.framework.TestCase; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.thread.IgniteTaskTrackingThreadPoolExecutor; -import org.jetbrains.annotations.Nullable; - -/** - * Tests for tracking thread pool executor. - */ -public class IgniteTaskTrackingThreadPoolExecutorTest extends TestCase { - /** */ - private IgniteTaskTrackingThreadPoolExecutor executor; - - /** {@inheritDoc} */ - @Override protected void setUp() throws Exception { - int procs = Runtime.getRuntime().availableProcessors(); - - executor = new IgniteTaskTrackingThreadPoolExecutor("test", "default", - procs * 2, procs * 2, 30_000, new LinkedBlockingQueue<>(), GridIoPolicy.UNDEFINED, (t, e) -> { - // No-op. - }); - } - - /** {@inheritDoc} */ - @Override protected void tearDown() throws Exception { - List<Runnable> runnables = executor.shutdownNow(); - - assertEquals("Some tasks are not completed", 0, runnables.size()); - } - - /** */ - public void testSimple() throws IgniteCheckedException { - doTest(null); - } - - /** */ - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public void testWithException() throws IgniteCheckedException { - int fail = 5555; - - try { - doTest(fail); - - fail(); - } - catch (Throwable t) { - TestException cause = (TestException)X.getCause(t); - - assertEquals(fail, cause.idx); - } - - AtomicReference<Throwable> err = U.field(executor, "err"); - err.set(null); - - executor.awaitDone(); - } - - /** */ - public void testReuse() throws IgniteCheckedException { - long avg = 0; - - long warmUp = 30; - - int iters = 150; - - for (int i = 0; i < iters; i++) { - long t1 = System.nanoTime(); - - doTest(null); - - if (i >= warmUp) - avg += System.nanoTime() - t1; - - executor.reset(); - } - - X.print("Average time per iteration: " + (avg / (iters - warmUp)) / 1000 / 1000. + " ms"); - } - - /** */ - private void doTest(@Nullable Integer fail) throws IgniteCheckedException { - LongAdder cnt = new LongAdder(); - - int exp = 100_000; - - for (int i = 0; i < exp; i++) { - final int finalI = i; - executor.execute(() -> { - if (fail != null && fail == finalI) - throw new TestException(finalI); - else - cnt.add(1); - }); - } - - executor.markInitialized(); - - executor.awaitDone(); - - assertEquals("Counter is not as expected", exp, cnt.sum()); - } - - /** */ - private static class TestException extends RuntimeException { - /** */ - final int idx; - - /** - * @param idx Index. - */ - public TestException(int idx) { - this.idx = idx; - } - } -}
