This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 88b5f47  IGNITE-13575 Fix invalid blocking thread reporting waiting on 
selector.select. Fix infinite loop while only one thread is registered in 
WorkersRegistry. (#8354)
88b5f47 is described below

commit 88b5f4798a4a2ff3f5c1e7c981b7927a8a06854b
Author: Ivan Daschinskiy <ivanda...@gmail.com>
AuthorDate: Wed Oct 14 10:25:38 2020 +0300

    IGNITE-13575 Fix invalid blocking thread reporting waiting on 
selector.select. Fix infinite loop while only one thread is registered in 
WorkersRegistry. (#8354)
---
 .../ignite/internal/util/nio/GridNioServer.java    |  21 +++-
 .../ignite/internal/worker/WorkersRegistry.java    |   2 +-
 .../ignite/failure/SystemWorkersBlockingTest.java  | 106 ++++++++++++++++-----
 3 files changed, 97 insertions(+), 32 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 41574ee..d52da34 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -2233,15 +2233,21 @@ public class GridNioServer<T> {
                         if (!changeReqs.isEmpty())
                             continue;
 
-                        updateHeartbeat();
+                        blockingSectionBegin();
 
                         // Wake up every 2 seconds to check if closed.
-                        if (selector.select(2000) > 0) {
+                        int numKeys = selector.select(2000);
+
+                        blockingSectionEnd();
+
+                        if (numKeys > 0) {
                             // Walk through the ready keys collection and 
process network events.
                             if (selectedKeys == null)
                                 processSelectedKeys(selector.selectedKeys());
                             else
                                 
processSelectedKeysOptimized(selectedKeys.flip());
+
+                            updateHeartbeat();
                         }
 
                         // select() call above doesn't throw on interruption; 
checking it here to propagate timely.
@@ -3037,14 +3043,19 @@ public class GridNioServer<T> {
         private void accept() throws IgniteCheckedException {
             try {
                 while (!closed && selector.isOpen() && 
!Thread.currentThread().isInterrupted()) {
-                    updateHeartbeat();
+                    blockingSectionBegin();
 
                     // Wake up every 2 seconds to check if closed.
-                    if (selector.select(2000) > 0)
+                    int numKeys = selector.select(2000);
+
+                    blockingSectionEnd();
+
+                    if (numKeys > 0) {
                         // Walk through the ready keys collection and process 
date requests.
                         processSelectedKeys(selector.selectedKeys());
-                    else
+
                         updateHeartbeat();
+                    }
 
                     if (balancer != null)
                         balancer.run();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
index 3cf1d03..5829b3c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
@@ -178,7 +178,7 @@ public class WorkersRegistry implements GridWorkerListener {
 
         Thread prevCheckerThread = lastChecker.get();
 
-        if (prevCheckerThread == null ||
+        if (prevCheckerThread == null || registeredWorkers.size() < 2 ||
             U.currentTimeMillis() - lastCheckTs <= checkInterval ||
             !lastChecker.compareAndSet(prevCheckerThread, null))
             return;
diff --git 
a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
index 8a84af8..8455f87 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
@@ -21,10 +21,15 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.internal.worker.WorkersRegistry;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.thread.IgniteThread;
 import org.junit.Test;
@@ -33,23 +38,29 @@ import org.junit.Test;
  * Tests the handling of long blocking operations in system-critical workers.
  */
 public class SystemWorkersBlockingTest extends GridCommonAbstractTest {
+    /** */
+    private static final long SYSTEM_WORKER_BLOCKED_TIMEOUT = 1_000L;
+
     /** Handler latch. */
-    private static volatile CountDownLatch hndLatch;
+    private final CountDownLatch hndLatch = new CountDownLatch(1);
 
-    /** */
-    private static final long FAILURE_DETECTION_TIMEOUT = 5_000;
+    /** Reference to failure error. */
+    private final AtomicReference<Throwable> failureError = new 
AtomicReference<>();
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
         // Set small value for the test.
-        cfg.setSystemWorkerBlockedTimeout(1_000);
+        cfg.setSystemWorkerBlockedTimeout(SYSTEM_WORKER_BLOCKED_TIMEOUT);
 
         AbstractFailureHandler failureHnd = new AbstractFailureHandler() {
             @Override protected boolean handle(Ignite ignite, FailureContext 
failureCtx) {
-                if (failureCtx.type() == FailureType.SYSTEM_WORKER_BLOCKED)
+                if (failureCtx.type() == FailureType.SYSTEM_WORKER_BLOCKED) {
+                    failureError.set(failureCtx.error());
+
                     hndLatch.countDown();
+                }
 
                 return false;
             }
@@ -63,21 +74,10 @@ public class SystemWorkersBlockingTest extends 
GridCommonAbstractTest {
 
         cfg.setFailureHandler(failureHnd);
 
-        cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT);
-
         return cfg;
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        super.beforeTest();
-
-        hndLatch = new CountDownLatch(1);
-
-        startGrid(0);
-    }
-
-    /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();
 
@@ -89,28 +89,82 @@ public class SystemWorkersBlockingTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testBlockingWorker() throws Exception {
-        IgniteEx ignite = grid(0);
+        IgniteEx ignite = startGrid(0);
+
+        CountDownLatch blockLatch = new CountDownLatch(1);
 
         GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) {
             @Override protected void body() throws InterruptedException {
-                Thread.sleep(Long.MAX_VALUE);
+                blockLatch.await();
             }
         };
 
-        new IgniteThread(worker).start();
+        IgniteThread runner = null;
+        try {
+            runner = runWorker(worker);
+
+            ignite.context().workersRegistry().register(worker);
+
+            assertTrue(hndLatch.await(SYSTEM_WORKER_BLOCKED_TIMEOUT * 2, 
TimeUnit.MILLISECONDS));
+
+            Throwable err = failureError.get();
+
+            assertNotNull(err);
+            assertTrue(err.getMessage() != null && 
err.getMessage().contains("test-worker"));
+        }
+        finally {
+            if (runner != null) {
+                blockLatch.countDown();
+
+                runner.join(SYSTEM_WORKER_BLOCKED_TIMEOUT);
+            }
+        }
+    }
+
+    /**
+     * Tests that repeatedly calling {@link WorkersRegistry#onIdle} in single 
registered {@link GridWorker}
+     * doesn't lead to infinite loop.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSingleWorker_NotInInfiniteLoop() throws Exception {
+        WorkersRegistry registry = new WorkersRegistry((w, e) -> {}, 
SYSTEM_WORKER_BLOCKED_TIMEOUT, log());
+
+        CountDownLatch finishLatch = new CountDownLatch(1);
 
-        while (worker.runner() == null)
-            Thread.sleep(10);
+        GridWorker worker = new GridWorker("test", "test-worker", log, 
registry) {
+            @Override protected void body() {
+                while (!Thread.currentThread().isInterrupted()) {
+                    onIdle();
 
-        ignite.context().workersRegistry().register(worker);
+                    LockSupport.parkNanos(1000);
+                }
 
-        
assertTrue(hndLatch.await(ignite.configuration().getFailureDetectionTimeout() * 
2, TimeUnit.MILLISECONDS));
+                finishLatch.countDown();
+            }
+        };
 
-        Thread runner = worker.runner();
+        IgniteThread runner = runWorker(worker);
+
+        Thread.sleep(2 * SYSTEM_WORKER_BLOCKED_TIMEOUT);
 
         runner.interrupt();
-        runner.join(1000);
 
-        assertFalse(runner.isAlive());
+        assertTrue(finishLatch.await(SYSTEM_WORKER_BLOCKED_TIMEOUT, 
TimeUnit.MILLISECONDS));
+    }
+
+    /**
+     * @param worker Grid worker to run.
+     * @return Thread, running worker.
+     */
+    private IgniteThread runWorker(GridWorker worker) throws 
IgniteInterruptedCheckedException {
+        IgniteThread runner = new IgniteThread(worker);
+
+        runner.start();
+
+        GridTestUtils.waitForCondition(() -> worker.runner() != null, 100);
+
+        return runner;
     }
 }

Reply via email to