This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 88b5f47 IGNITE-13575 Fix invalid blocking thread reporting waiting on selector.select. Fix infinite loop while only one thread is registered in WorkersRegistry. (#8354) 88b5f47 is described below commit 88b5f4798a4a2ff3f5c1e7c981b7927a8a06854b Author: Ivan Daschinskiy <ivanda...@gmail.com> AuthorDate: Wed Oct 14 10:25:38 2020 +0300 IGNITE-13575 Fix invalid blocking thread reporting waiting on selector.select. Fix infinite loop while only one thread is registered in WorkersRegistry. (#8354) --- .../ignite/internal/util/nio/GridNioServer.java | 21 +++- .../ignite/internal/worker/WorkersRegistry.java | 2 +- .../ignite/failure/SystemWorkersBlockingTest.java | 106 ++++++++++++++++----- 3 files changed, 97 insertions(+), 32 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 41574ee..d52da34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -2233,15 +2233,21 @@ public class GridNioServer<T> { if (!changeReqs.isEmpty()) continue; - updateHeartbeat(); + blockingSectionBegin(); // Wake up every 2 seconds to check if closed. - if (selector.select(2000) > 0) { + int numKeys = selector.select(2000); + + blockingSectionEnd(); + + if (numKeys > 0) { // Walk through the ready keys collection and process network events. if (selectedKeys == null) processSelectedKeys(selector.selectedKeys()); else processSelectedKeysOptimized(selectedKeys.flip()); + + updateHeartbeat(); } // select() call above doesn't throw on interruption; checking it here to propagate timely. @@ -3037,14 +3043,19 @@ public class GridNioServer<T> { private void accept() throws IgniteCheckedException { try { while (!closed && selector.isOpen() && !Thread.currentThread().isInterrupted()) { - updateHeartbeat(); + blockingSectionBegin(); // Wake up every 2 seconds to check if closed. - if (selector.select(2000) > 0) + int numKeys = selector.select(2000); + + blockingSectionEnd(); + + if (numKeys > 0) { // Walk through the ready keys collection and process date requests. processSelectedKeys(selector.selectedKeys()); - else + updateHeartbeat(); + } if (balancer != null) balancer.run(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java index 3cf1d03..5829b3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java @@ -178,7 +178,7 @@ public class WorkersRegistry implements GridWorkerListener { Thread prevCheckerThread = lastChecker.get(); - if (prevCheckerThread == null || + if (prevCheckerThread == null || registeredWorkers.size() < 2 || U.currentTimeMillis() - lastCheckTs <= checkInterval || !lastChecker.compareAndSet(prevCheckerThread, null)) return; diff --git a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java index 8a84af8..8455f87 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java @@ -21,10 +21,15 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; import org.apache.ignite.Ignite; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.internal.worker.WorkersRegistry; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.thread.IgniteThread; import org.junit.Test; @@ -33,23 +38,29 @@ import org.junit.Test; * Tests the handling of long blocking operations in system-critical workers. */ public class SystemWorkersBlockingTest extends GridCommonAbstractTest { + /** */ + private static final long SYSTEM_WORKER_BLOCKED_TIMEOUT = 1_000L; + /** Handler latch. */ - private static volatile CountDownLatch hndLatch; + private final CountDownLatch hndLatch = new CountDownLatch(1); - /** */ - private static final long FAILURE_DETECTION_TIMEOUT = 5_000; + /** Reference to failure error. */ + private final AtomicReference<Throwable> failureError = new AtomicReference<>(); /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); // Set small value for the test. - cfg.setSystemWorkerBlockedTimeout(1_000); + cfg.setSystemWorkerBlockedTimeout(SYSTEM_WORKER_BLOCKED_TIMEOUT); AbstractFailureHandler failureHnd = new AbstractFailureHandler() { @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { - if (failureCtx.type() == FailureType.SYSTEM_WORKER_BLOCKED) + if (failureCtx.type() == FailureType.SYSTEM_WORKER_BLOCKED) { + failureError.set(failureCtx.error()); + hndLatch.countDown(); + } return false; } @@ -63,21 +74,10 @@ public class SystemWorkersBlockingTest extends GridCommonAbstractTest { cfg.setFailureHandler(failureHnd); - cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT); - return cfg; } /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - hndLatch = new CountDownLatch(1); - - startGrid(0); - } - - /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); @@ -89,28 +89,82 @@ public class SystemWorkersBlockingTest extends GridCommonAbstractTest { */ @Test public void testBlockingWorker() throws Exception { - IgniteEx ignite = grid(0); + IgniteEx ignite = startGrid(0); + + CountDownLatch blockLatch = new CountDownLatch(1); GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) { @Override protected void body() throws InterruptedException { - Thread.sleep(Long.MAX_VALUE); + blockLatch.await(); } }; - new IgniteThread(worker).start(); + IgniteThread runner = null; + try { + runner = runWorker(worker); + + ignite.context().workersRegistry().register(worker); + + assertTrue(hndLatch.await(SYSTEM_WORKER_BLOCKED_TIMEOUT * 2, TimeUnit.MILLISECONDS)); + + Throwable err = failureError.get(); + + assertNotNull(err); + assertTrue(err.getMessage() != null && err.getMessage().contains("test-worker")); + } + finally { + if (runner != null) { + blockLatch.countDown(); + + runner.join(SYSTEM_WORKER_BLOCKED_TIMEOUT); + } + } + } + + /** + * Tests that repeatedly calling {@link WorkersRegistry#onIdle} in single registered {@link GridWorker} + * doesn't lead to infinite loop. + * + * @throws Exception If failed. + */ + @Test + public void testSingleWorker_NotInInfiniteLoop() throws Exception { + WorkersRegistry registry = new WorkersRegistry((w, e) -> {}, SYSTEM_WORKER_BLOCKED_TIMEOUT, log()); + + CountDownLatch finishLatch = new CountDownLatch(1); - while (worker.runner() == null) - Thread.sleep(10); + GridWorker worker = new GridWorker("test", "test-worker", log, registry) { + @Override protected void body() { + while (!Thread.currentThread().isInterrupted()) { + onIdle(); - ignite.context().workersRegistry().register(worker); + LockSupport.parkNanos(1000); + } - assertTrue(hndLatch.await(ignite.configuration().getFailureDetectionTimeout() * 2, TimeUnit.MILLISECONDS)); + finishLatch.countDown(); + } + }; - Thread runner = worker.runner(); + IgniteThread runner = runWorker(worker); + + Thread.sleep(2 * SYSTEM_WORKER_BLOCKED_TIMEOUT); runner.interrupt(); - runner.join(1000); - assertFalse(runner.isAlive()); + assertTrue(finishLatch.await(SYSTEM_WORKER_BLOCKED_TIMEOUT, TimeUnit.MILLISECONDS)); + } + + /** + * @param worker Grid worker to run. + * @return Thread, running worker. + */ + private IgniteThread runWorker(GridWorker worker) throws IgniteInterruptedCheckedException { + IgniteThread runner = new IgniteThread(worker); + + runner.start(); + + GridTestUtils.waitForCondition(() -> worker.runner() != null, 100); + + return runner; } }