This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 8e99cc5 IGNITE-12252 Unchecked exceptions during rebalancing should be handled (#6965) 8e99cc5 is described below commit 8e99cc5b5baf21fe8b2a1fdfca25e2c523889ed4 Author: Nikolai Kulagin <zzzadr...@gmail.com> AuthorDate: Thu Apr 30 18:39:39 2020 +0300 IGNITE-12252 Unchecked exceptions during rebalancing should be handled (#6965) --- .../org/apache/ignite/internal/IgnitionEx.java | 11 ++++-- .../failure/FailureHandlerTriggeredTest.java | 40 ++++++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 5ea5baa..ff6ddfb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1752,6 +1752,13 @@ public class IgnitionEx { } }; + UncaughtExceptionHandler excHnd = new UncaughtExceptionHandler() { + @Override public void uncaughtException(Thread t, Throwable e) { + if (grid != null) + grid.context().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + } + }; + execSvc = new IgniteThreadPoolExecutor( "pub", cfg.getIgniteInstanceName(), @@ -2007,7 +2014,7 @@ public class IgnitionEx { DFLT_THREAD_KEEP_ALIVE_TIME, new LinkedBlockingQueue<>(), GridIoPolicy.UNDEFINED, - oomeHnd); + excHnd); rebalanceExecSvc.allowCoreThreadTimeOut(true); @@ -2015,7 +2022,7 @@ public class IgnitionEx { cfg.getRebalanceThreadPoolSize(), cfg.getIgniteInstanceName(), "rebalance-striped", - oomeHnd, + excHnd, true, DFLT_THREAD_KEEP_ALIVE_TIME); diff --git a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java index 67ec311..2abdd58 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java @@ -19,6 +19,9 @@ package org.apache.ignite.failure; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; @@ -31,6 +34,8 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; import org.junit.Test; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED; + /** * Test of triggering of failure handler. */ @@ -68,6 +73,41 @@ public class FailureHandlerTriggeredTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + @Test + public void testFailureHandlerTriggeredOnUncheckedErrorOnRebalancing() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + + TestFailureHandler hnd = new TestFailureHandler(false, latch); + + IgniteEx grid0 = startGrid(getConfiguration(testNodeName(0)) + .setIncludeEventTypes(EVT_CACHE_REBALANCE_OBJECT_LOADED) + .setFailureHandler(hnd)); + + grid0.getOrCreateCache(new CacheConfiguration<>() + .setName(DEFAULT_CACHE_NAME) + .setCacheMode(CacheMode.REPLICATED)) + .put(1,1); + + grid0.cluster().baselineAutoAdjustEnabled(false); + + IgniteEx grid1 = startGrid(getConfiguration(testNodeName(1)) + .setIncludeEventTypes(EVT_CACHE_REBALANCE_OBJECT_LOADED) + .setFailureHandler(hnd)); + + grid1.events().localListen(e -> { throw new Error(); }, EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED); + + grid1.cluster().setBaselineTopology(grid1.cluster().topologyVersion()); + + assertTrue(latch.await(3, TimeUnit.SECONDS)); + + assertNotNull(hnd.failureCtx); + + assertEquals(hnd.failureCtx.type(), FailureType.CRITICAL_ERROR); + } + + /** * Custom exchange worker task implementation for delaying exchange worker processing. */ static class ExchangeWorkerFailureTask extends SchemaExchangeWorkerTask implements CachePartitionExchangeWorkerTask {