Repository: ignite
Updated Branches:
  refs/heads/master 29588c515 -> 9cec13857


IGNITE-10225 Fix striped pool starvation check, add MXBean method - Fixes #5606.

Signed-off-by: Ilya Kasnacheev <ilya.kasnach...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9cec1385
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9cec1385
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9cec1385

Branch: refs/heads/master
Commit: 9cec138570b75c498cf68f0349aeed7acd81e177
Parents: 29588c5
Author: Vladimir Pligin <vova199...@yandex.ru>
Authored: Thu Dec 20 13:19:03 2018 +0300
Committer: Ilya Kasnacheev <ilya.kasnach...@gmail.com>
Committed: Thu Dec 20 13:19:03 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  2 +-
 .../org/apache/ignite/internal/IgnitionEx.java  |  6 ++-
 .../internal/StripedExecutorMXBeanAdapter.java  |  8 +++-
 .../ignite/internal/util/StripedExecutor.java   | 45 +++++++++++---------
 .../ignite/mxbean/StripedExecutorMXBean.java    | 10 +++++
 .../internal/util/StripedExecutorTest.java      | 23 +++++++++-
 6 files changed, 68 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9cec1385/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index cc0363c..95bf49c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1215,7 +1215,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
                     }
 
                     if (stripedExecSvc != null)
-                        stripedExecSvc.checkStarvation();
+                        stripedExecSvc.detectStarvation();
                 }
 
                 /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9cec1385/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 559b3bb..5ddf7cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1849,7 +1849,8 @@ public class IgnitionEx {
                             grid.context().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, t));
                     }
                 },
-                workerRegistry);
+                workerRegistry,
+                cfg.getFailureDetectionTimeout());
 
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.
@@ -1898,7 +1899,8 @@ public class IgnitionEx {
                     }
                 },
                 true,
-                workerRegistry);
+                workerRegistry,
+                cfg.getFailureDetectionTimeout());
 
             // Note that we do not pre-start threads here as igfs pool may not 
be needed.
             validateThreadPoolSize(cfg.getIgfsThreadPoolSize(), "IGFS");

http://git-wip-us.apache.org/repos/asf/ignite/blob/9cec1385/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
index e6811b7..0659492 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
@@ -39,8 +39,14 @@ public class StripedExecutorMXBeanAdapter implements 
StripedExecutorMXBean {
     }
 
     /** {@inheritDoc} */
+    @Deprecated
     @Override public void checkStarvation() {
-        exec.checkStarvation();
+        exec.detectStarvation();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean detectStarvation() {
+        return exec.detectStarvation();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9cec1385/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 9853920..318db6c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.util;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
@@ -55,8 +54,8 @@ public class StripedExecutor implements ExecutorService {
     /** Stripes. */
     private final Stripe[] stripes;
 
-    /** For starvation checks. */
-    private final long[] completedCntrs;
+    /** Threshold for starvation checks */
+    private final long threshold;
 
     /** */
     private final IgniteLogger log;
@@ -75,9 +74,10 @@ public class StripedExecutor implements ExecutorService {
         String poolName,
         final IgniteLogger log,
         IgniteInClosure<Throwable> errHnd,
-        GridWorkerListener gridWorkerLsnr
+        GridWorkerListener gridWorkerLsnr,
+        long failureDetectionTimeout
     ) {
-        this(cnt, igniteInstanceName, poolName, log, errHnd, false, 
gridWorkerLsnr);
+        this(cnt, igniteInstanceName, poolName, log, errHnd, false, 
gridWorkerLsnr, failureDetectionTimeout);
     }
 
     /**
@@ -96,7 +96,8 @@ public class StripedExecutor implements ExecutorService {
         final IgniteLogger log,
         IgniteInClosure<Throwable> errHnd,
         boolean stealTasks,
-        GridWorkerListener gridWorkerLsnr
+        GridWorkerListener gridWorkerLsnr,
+        long failureDetectionTimeout
     ) {
         A.ensure(cnt > 0, "cnt > 0");
 
@@ -104,9 +105,7 @@ public class StripedExecutor implements ExecutorService {
 
         stripes = new Stripe[cnt];
 
-        completedCntrs = new long[cnt];
-
-        Arrays.fill(completedCntrs, -1);
+        threshold = failureDetectionTimeout;
 
         this.log = log;
 
@@ -141,18 +140,19 @@ public class StripedExecutor implements ExecutorService {
     /**
      * Checks starvation in striped pool. Maybe too verbose
      * but this is needed to faster debug possible issues.
+     *
+     * @return Flag representing presence of possible starvation in striped 
pool.
      */
-    public void checkStarvation() {
-        for (int i = 0; i < stripes.length; i++) {
-            Stripe stripe = stripes[i];
-
-            long completedCnt = stripe.completedCnt;
+    public boolean detectStarvation() {
+        boolean starvationDetected = false;
 
+        for (Stripe stripe : stripes) {
             boolean active = stripe.active;
 
-            if (completedCntrs[i] != -1 &&
-                completedCntrs[i] == completedCnt &&
-                active) {
+            long lastStartedTs = stripe.lastStartedTs;
+
+            if (active && lastStartedTs + threshold < U.currentTimeMillis()) {
+                starvationDetected = true;
                 boolean deadlockPresent = U.deadlockPresent();
 
                 GridStringBuilder sb = new GridStringBuilder();
@@ -161,7 +161,7 @@ public class StripedExecutor implements ExecutorService {
                     .a("    Thread name: 
").a(stripe.thread.getName()).a(U.nl())
                     .a("    Queue: ").a(stripe.queueToString()).a(U.nl())
                     .a("    Deadlock: ").a(deadlockPresent).a(U.nl())
-                    .a("    Completed: ").a(completedCnt).a(U.nl());
+                    .a("    Completed: ").a(stripe.completedCnt).a(U.nl());
 
                 U.printStackTrace(
                     stripe.thread.getId(),
@@ -171,10 +171,8 @@ public class StripedExecutor implements ExecutorService {
 
                 U.warn(log, msg);
             }
-
-            if (active || completedCnt > 0)
-                completedCntrs[i] = completedCnt;
         }
+        return starvationDetected;
     }
 
     /**
@@ -436,6 +434,9 @@ public class StripedExecutor implements ExecutorService {
         /** */
         private volatile boolean active;
 
+        /** */
+        private volatile long lastStartedTs;
+
         /** Thread executing the loop. */
         protected Thread thread;
 
@@ -499,6 +500,8 @@ public class StripedExecutor implements ExecutorService {
                     if (cmd != null) {
                         active = true;
 
+                        lastStartedTs = U.currentTimeMillis();
+
                         updateHeartbeat();
 
                         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9cec1385/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java
 
b/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java
index 7428b19..d7e56a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java
@@ -25,11 +25,21 @@ public interface StripedExecutorMXBean {
     /**
      * Checks for starvation in striped pool, dumps in log information if 
potential starvation
      * was found.
+     *
+     * @deprecated Will be removed at 3.0.
      */
+    @Deprecated
     @MXBeanDescription("Checks for starvation in striped pool.")
     public void checkStarvation();
 
     /**
+     *
+     * @return {@code True} if possible starvation in striped pool is detected.
+     */
+    @MXBeanDescription("True if possible starvation in striped pool is 
detected.")
+    public boolean detectStarvation();
+
+    /**
      * @return Stripes count.
      */
     @MXBeanDescription("Stripes count.")

http://git-wip-us.apache.org/repos/asf/ignite/blob/9cec1385/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
index 0b4123a..0e797ea 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util;
 
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.logger.java.JavaLogger;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -37,7 +38,7 @@ public class StripedExecutorTest extends 
GridCommonAbstractTest {
         stripedExecSvc = new StripedExecutor(3, "foo name", "pool name", new 
JavaLogger(),
             new IgniteInClosure<Throwable>() {
                 @Override public void apply(Throwable throwable) {}
-            }, null);
+            }, null, 2000);
     }
 
     /** {@inheritDoc} */
@@ -141,6 +142,26 @@ public class StripedExecutorTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testStarvationDetected() throws Exception {
+        final int stripeIdx = 0;
+
+        stripedExecSvc.execute(stripeIdx, new TestRunnable(true));
+
+        sleepASec();
+
+        assertFalse(GridTestUtils.waitForCondition(() -> 
stripedExecSvc.activeStripesCount() == 0, 2000));
+
+        stripedExecSvc.execute(stripeIdx, new TestRunnable());
+
+        assertTrue(GridTestUtils.waitForCondition(() -> 
stripedExecSvc.activeStripesCount() == 1, 10000));
+
+        assertTrue(stripedExecSvc.detectStarvation());
+    }
+
+    /**
      *
      */
     private final class TestRunnable implements Runnable {

Reply via email to