Repository: ignite Updated Branches: refs/heads/master dea8df2ce -> 7965d12c1
IGNITE-8751 Failure handler accordingly to segmentation policy should be invoked on node segmentation instead of configured failure handler Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7965d12c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7965d12c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7965d12c Branch: refs/heads/master Commit: 7965d12c1950d35ca108af9792fe51507bb23f66 Parents: dea8df2 Author: Andrey Gura <ag...@apache.org> Authored: Sat Jun 9 16:37:49 2018 +0300 Committer: Andrey Gura <ag...@apache.org> Committed: Thu Jun 14 18:59:59 2018 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 11 ++- .../tcp/TcpDiscoverySegmentationPolicyTest.java | 91 ++++++++++++++++++++ .../IgniteSpiDiscoverySelfTestSuite.java | 3 + 3 files changed, 101 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7965d12c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 35e9452..079058b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1687,7 +1687,10 @@ class ServerImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override public void brakeConnection() { - throw new UnsupportedOperationException(); + Socket sock = msgWorker.sock; + + if (sock != null) + U.closeQuiet(sock); } /** {@inheritDoc} */ @@ -2621,7 +2624,7 @@ class ServerImpl extends TcpDiscoveryImpl { super.body(); } catch (InterruptedException e) { - if (!spi.isNodeStopping0()) + if (!spi.isNodeStopping0() && spiStateCopy() != DISCONNECTING) err = e; throw e; @@ -2657,7 +2660,7 @@ class ServerImpl extends TcpDiscoveryImpl { throw e; } finally { - if (err == null && !spi.isNodeStopping0()) + if (err == null && !spi.isNodeStopping0() && spiStateCopy() != DISCONNECTING) err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly."); FailureProcessor failure = ((IgniteEx)spi.ignite()).context().failure(); @@ -5682,7 +5685,7 @@ class ServerImpl extends TcpDiscoveryImpl { throw t; } finally { - if (err == null && !spi.isNodeStopping0()) + if (err == null && !spi.isNodeStopping0() && spiStateCopy() != DISCONNECTING) err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly."); FailureProcessor failure = ((IgniteEx)spi.ignite()).context().failure(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7965d12c/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java new file mode 100644 index 0000000..df76afc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import java.util.Arrays; +import java.util.Collection; +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.failure.FailureHandler; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests for segmentation policy and failure handling in {@link TcpDiscoverySpi}. + */ +public class TcpDiscoverySegmentationPolicyTest extends GridCommonAbstractTest { + /** Nodes count. */ + private static final int NODES_CNT = 3; + + /** Default failure handler invoked. */ + private static volatile boolean dfltFailureHndInvoked; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (igniteInstanceName.endsWith("2")) + cfg.setFailureHandler(new TestFailureHandler()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testStopOnSegmentation() throws Exception { + startGrids(NODES_CNT); + + IgniteEx ignite1 = grid(1); + IgniteEx ignite2 = grid(2); + + ((TcpDiscoverySpi)ignite1.configuration().getDiscoverySpi()).brakeConnection(); + ((TcpDiscoverySpi)ignite2.configuration().getDiscoverySpi()).brakeConnection(); + + waitForTopology(2); + + assertFalse(dfltFailureHndInvoked); + + Collection<ClusterNode> nodes = ignite1.cluster().forServers().nodes(); + + assertEquals(2, nodes.size()); + assertTrue(nodes.containsAll(Arrays.asList(((IgniteKernal)ignite(0)).localNode(), ((IgniteKernal)ignite(1)).localNode()))); + + System.out.println(); + } + + /** + * Test failure handler. + */ + private static class TestFailureHandler implements FailureHandler { + /** {@inheritDoc} */ + @Override public boolean onFailure(Ignite ignite, FailureContext failureCtx) { + dfltFailureHndInvoked = true; + + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7965d12c/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index 1f82316..ef582a5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -35,6 +35,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeAttributesUpdateOnRec import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConfigConsistentIdSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConsistentIdSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryRestartTest; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySegmentationPolicyTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySnapshotHistoryTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiConfigSelfTest; @@ -96,6 +97,8 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { suite.addTest(new TestSuite(TcpDiscoveryRestartTest.class)); suite.addTest(new TestSuite(TcpDiscoveryMultiThreadedTest.class)); + suite.addTest(new TestSuite(TcpDiscoverySegmentationPolicyTest.class)); + suite.addTest(new TestSuite(TcpDiscoveryNodeAttributesUpdateOnReconnectTest.class)); suite.addTest(new TestSuite(AuthenticationRestartTest.class));