Repository: ignite Updated Branches: refs/heads/master c59791196 -> a6d0bd4b5
IGNITE-9744 Fix SYSTEM_WORKER_TERMINATION detection in general case - Fixes #4876. Signed-off-by: Ivan Rakov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a6d0bd4b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a6d0bd4b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a6d0bd4b Branch: refs/heads/master Commit: a6d0bd4b5551231b0516eaf0ebd8112b45bba86d Parents: c597911 Author: Andrey Kuznetsov <[email protected]> Authored: Thu Oct 4 19:45:07 2018 +0300 Committer: Ivan Rakov <[email protected]> Committed: Thu Oct 4 19:45:07 2018 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 3 + .../ignite/internal/util/nio/GridNioServer.java | 6 ++ .../ignite/internal/util/worker/GridWorker.java | 4 +- .../ignite/internal/worker/WorkersRegistry.java | 3 + .../failure/SystemWorkersTerminationTest.java | 72 ++++---------------- .../ignite/failure/TestFailureHandler.java | 10 +-- 6 files changed, 35 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 4c254b0..cc1fd33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -2548,6 +2548,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); else if (err != null) cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + else + // In case of reconnectNeeded == true, prevent general-case termination handling. + cancel(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index e4c96b4..f9fda8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -1830,6 +1830,9 @@ public class GridNioServer<T> { } else if (err != null) lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err); + else + // In case of closed == true, prevent general-case termination handling. + cancel(); } } @@ -2906,6 +2909,9 @@ public class GridNioServer<T> { lsnr.onFailure(CRITICAL_ERROR, err); else if (err != null) lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err); + else + // In case of closed == true, prevent general-case termination handling. + cancel(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java index 3d9163d..3f779da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java @@ -99,12 +99,12 @@ public abstract class GridWorker implements Runnable { /** {@inheritDoc} */ @Override public final void run() { + updateHeartbeat(); + // Runner thread must be recorded first as other operations // may depend on it being present. runner = Thread.currentThread(); - updateHeartbeat(); - if (log.isDebugEnabled()) log.debug("Grid runnable started: " + name); http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java index 55740a4..848bb59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java @@ -143,6 +143,9 @@ public class WorkersRegistry implements GridWorkerListener { /** {@inheritDoc} */ @Override public void onStopped(GridWorker w) { + if (!w.isCancelled()) + workerFailedHnd.apply(w, SYSTEM_WORKER_TERMINATION); + unregister(w.runner().getName()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java index 638e6f1..1cebe22 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java @@ -17,19 +17,15 @@ package org.apache.ignite.failure; -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.internal.worker.WorkersRegistry; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.thread.IgniteThread; @@ -37,11 +33,8 @@ import org.apache.ignite.thread.IgniteThread; * Tests system critical workers termination. */ public class SystemWorkersTerminationTest extends GridCommonAbstractTest { - /** Handler latch. */ - private static volatile CountDownLatch hndLatch; - /** */ - private static final long FAILURE_DETECTION_TIMEOUT = 5_000; + private static volatile String failureHndThreadName; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { @@ -58,8 +51,6 @@ public class SystemWorkersTerminationTest extends GridCommonAbstractTest { cfg.setDataStorageConfiguration(dsCfg); - cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT); - return cfg; } @@ -84,62 +75,28 @@ public class SystemWorkersTerminationTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testTermination() throws Exception { - Ignite ignite = ignite(0); - - ignite.cluster().active(true); - - WorkersRegistry registry = ((IgniteKernal)ignite).context().workersRegistry(); - - Collection<String> threadNames = new ArrayList<>(registry.names()); - - int cnt = 0; - - for (String threadName : threadNames) { - log.info("Worker termination: " + threadName); - - hndLatch = new CountDownLatch(1); - - GridWorker w = registry.worker(threadName); - - Thread t = w.runner(); - - t.interrupt(); - - assertTrue(hndLatch.await(3, TimeUnit.SECONDS)); - - log.info("Worker is terminated: " + threadName); - - cnt++; - } - - assertEquals(threadNames.size(), cnt); - } - - /** - * @throws Exception If failed. - */ public void testSyntheticWorkerTermination() throws Exception { - hndLatch = new CountDownLatch(1); - IgniteEx ignite = grid(0); - GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) { + WorkersRegistry registry = ignite.context().workersRegistry(); + + long fdTimeout = ignite.configuration().getFailureDetectionTimeout(); + + GridWorker worker = new GridWorker(ignite.name(), "test-worker", log, registry) { @Override protected void body() throws InterruptedException { - Thread.sleep(ignite.configuration().getFailureDetectionTimeout() / 2); + Thread.sleep(fdTimeout / 2); } }; - new IgniteThread(worker).start(); + IgniteThread thread = new IgniteThread(worker); - while (worker.runner() == null) - Thread.sleep(10); + failureHndThreadName = null; - ignite.context().workersRegistry().register(worker); + thread.start(); - worker.runner().join(); + thread.join(); - assertTrue(hndLatch.await(ignite.configuration().getFailureDetectionTimeout() * 2, TimeUnit.MILLISECONDS)); + assertTrue(GridTestUtils.waitForCondition(() -> thread.getName().equals(failureHndThreadName), fdTimeout * 2)); } /** @@ -157,7 +114,8 @@ public class SystemWorkersTerminationTest extends GridCommonAbstractTest { private class TestFailureHandler extends AbstractFailureHandler { /** {@inheritDoc} */ @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { - hndLatch.countDown(); + if (failureCtx.type() == FailureType.SYSTEM_WORKER_TERMINATION) + failureHndThreadName = Thread.currentThread().getName(); return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java index 09dce9b..5ac75d1 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java @@ -52,12 +52,14 @@ public class TestFailureHandler extends AbstractFailureHandler { /** {@inheritDoc} */ @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { - this.failureCtx = failureCtx; + if (this.failureCtx == null) { + this.failureCtx = failureCtx; - if (latch != null) - latch.countDown(); + if (latch != null) + latch.countDown(); - ignite.log().warning("Handled ignite failure: " + failureCtx); + ignite.log().warning("Handled ignite failure: " + failureCtx); + } return invalidate; }
