This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e99cc5  IGNITE-12252 Unchecked exceptions during rebalancing should 
be handled (#6965)
8e99cc5 is described below

commit 8e99cc5b5baf21fe8b2a1fdfca25e2c523889ed4
Author: Nikolai Kulagin <zzzadr...@gmail.com>
AuthorDate: Thu Apr 30 18:39:39 2020 +0300

    IGNITE-12252 Unchecked exceptions during rebalancing should be handled 
(#6965)
---
 .../org/apache/ignite/internal/IgnitionEx.java     | 11 ++++--
 .../failure/FailureHandlerTriggeredTest.java       | 40 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 2 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 5ea5baa..ff6ddfb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1752,6 +1752,13 @@ public class IgnitionEx {
                 }
             };
 
+            UncaughtExceptionHandler excHnd = new UncaughtExceptionHandler() {
+                @Override public void uncaughtException(Thread t, Throwable e) 
{
+                    if (grid != null)
+                        grid.context().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
+                }
+            };
+
             execSvc = new IgniteThreadPoolExecutor(
                 "pub",
                 cfg.getIgniteInstanceName(),
@@ -2007,7 +2014,7 @@ public class IgnitionEx {
                 DFLT_THREAD_KEEP_ALIVE_TIME,
                 new LinkedBlockingQueue<>(),
                 GridIoPolicy.UNDEFINED,
-                oomeHnd);
+                excHnd);
 
             rebalanceExecSvc.allowCoreThreadTimeOut(true);
 
@@ -2015,7 +2022,7 @@ public class IgnitionEx {
                 cfg.getRebalanceThreadPoolSize(),
                 cfg.getIgniteInstanceName(),
                 "rebalance-striped",
-                oomeHnd,
+                excHnd,
                 true,
                 DFLT_THREAD_KEEP_ALIVE_TIME);
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
 
b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
index 67ec311..2abdd58 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
@@ -19,6 +19,9 @@ package org.apache.ignite.failure;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import 
org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
@@ -31,6 +34,8 @@ import 
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
+import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
+
 /**
  * Test of triggering of failure handler.
  */
@@ -68,6 +73,41 @@ public class FailureHandlerTriggeredTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testFailureHandlerTriggeredOnUncheckedErrorOnRebalancing() 
throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        TestFailureHandler hnd = new TestFailureHandler(false, latch);
+
+        IgniteEx grid0 = startGrid(getConfiguration(testNodeName(0))
+            .setIncludeEventTypes(EVT_CACHE_REBALANCE_OBJECT_LOADED)
+            .setFailureHandler(hnd));
+
+        grid0.getOrCreateCache(new CacheConfiguration<>()
+                .setName(DEFAULT_CACHE_NAME)
+                .setCacheMode(CacheMode.REPLICATED))
+            .put(1,1);
+
+        grid0.cluster().baselineAutoAdjustEnabled(false);
+
+        IgniteEx grid1 = startGrid(getConfiguration(testNodeName(1))
+            .setIncludeEventTypes(EVT_CACHE_REBALANCE_OBJECT_LOADED)
+            .setFailureHandler(hnd));
+
+        grid1.events().localListen(e -> { throw new Error(); }, 
EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED);
+
+        grid1.cluster().setBaselineTopology(grid1.cluster().topologyVersion());
+
+        assertTrue(latch.await(3, TimeUnit.SECONDS));
+
+        assertNotNull(hnd.failureCtx);
+
+        assertEquals(hnd.failureCtx.type(), FailureType.CRITICAL_ERROR);
+    }
+
+    /**
      * Custom exchange worker task implementation for delaying exchange worker 
processing.
      */
     static class ExchangeWorkerFailureTask extends SchemaExchangeWorkerTask 
implements CachePartitionExchangeWorkerTask {

Reply via email to