Repository: ignite Updated Branches: refs/heads/master e1f8f46f9 -> d82b21ec5
IGNITE-9738 Client node can suddenly fail on start - Fixes #4968. Signed-off-by: Dmitriy Govorukhin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d82b21ec Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d82b21ec Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d82b21ec Branch: refs/heads/master Commit: d82b21ec56a956fa7cc5374e3f15e279e7c492ac Parents: e1f8f46 Author: vd-pyatkov <[email protected]> Authored: Mon Oct 22 18:19:53 2018 +0300 Committer: Dmitriy Govorukhin <[email protected]> Committed: Mon Oct 22 18:19:53 2018 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ClientImpl.java | 10 +- .../LongClientConnectToClusterTest.java | 173 +++++++++++++++++++ .../IgniteSpiDiscoverySelfTestSuite.java | 2 + 3 files changed, 180 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d82b21ec/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index d3a8b18..92c197a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -305,6 +305,11 @@ class ClientImpl extends TcpDiscoveryImpl { } }.start(); + timer.schedule( + new MetricsSender(), + spi.metricsUpdateFreq, + spi.metricsUpdateFreq); + try { joinLatch.await(); @@ -317,11 +322,6 @@ class ClientImpl extends TcpDiscoveryImpl { throw new IgniteSpiException("Thread has been interrupted.", e); } - timer.schedule( - new MetricsSender(), - spi.metricsUpdateFreq, - spi.metricsUpdateFreq); - spi.printStartInfo(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d82b21ec/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java new file mode 100644 index 0000000..a079926 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +/** + * Test client connects to two nodes cluster during time more than the + * {@link org.apache.ignite.configuration.IgniteConfiguration#clientFailureDetectionTimeout}. + */ +public class LongClientConnectToClusterTest extends GridCommonAbstractTest { + /** Client instance name. */ + public static final String CLIENT_INSTANCE_NAME = "client"; + /** Client metrics update count. */ + private static volatile int clientMetricsUpdateCnt; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + TcpDiscoverySpi discoSpi = getTestIgniteInstanceName(0).equals(igniteInstanceName) + ? new DelayedTcpDiscoverySpi() + : getTestIgniteInstanceName(1).equals(igniteInstanceName) + ? new UpdateMetricsInterceptorTcpDiscoverySpi() + : new TcpDiscoverySpi(); + + return super.getConfiguration(igniteInstanceName) + .setClientMode(igniteInstanceName.startsWith(CLIENT_INSTANCE_NAME)) + .setClientFailureDetectionTimeout(1_000) + .setMetricsUpdateFrequency(500) + .setDiscoverySpi(discoSpi + .setReconnectCount(1) + .setLocalAddress("127.0.0.1") + .setIpFinder(new TcpDiscoveryVmIpFinder() + .setAddresses(Collections.singletonList(igniteInstanceName.startsWith(CLIENT_INSTANCE_NAME) + ? "127.0.0.1:47501" + : "127.0.0.1:47500..47502")))); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * Test method. + * + * @throws Exception If failed. + */ + public void testClientConnectToCluster() throws Exception { + clientMetricsUpdateCnt = 0; + + IgniteEx client = startGrid(CLIENT_INSTANCE_NAME); + + assertTrue(clientMetricsUpdateCnt > 0); + + assertTrue(client.localNode().isClient()); + + assertEquals(client.cluster().nodes().size(), 3); + } + + /** Discovery SPI which intercept TcpDiscoveryClientMetricsUpdateMessage. */ + private static class UpdateMetricsInterceptorTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + private class DiscoverySpiListenerWrapper implements DiscoverySpiListener { + /** */ + private DiscoverySpiListener delegate; + + /** + * @param delegate Delegate. + */ + private DiscoverySpiListenerWrapper(DiscoverySpiListener delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> onDiscovery( + int type, + long topVer, + ClusterNode node, + Collection<ClusterNode> topSnapshot, + @Nullable Map<Long, Collection<ClusterNode>> topHist, + @Nullable DiscoverySpiCustomMessage spiCustomMsg + ) { + if (EventType.EVT_NODE_METRICS_UPDATED == type) { + log.info("Metrics update message catched from node " + node); + + assertFalse(locNode.isClient()); + + if (node.isClient()) + clientMetricsUpdateCnt++; + } + + if (delegate != null) + return delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg); + + return new IgniteFinishedFutureImpl<>(); + } + + /** {@inheritDoc} */ + @Override public void onLocalNodeInitialized(ClusterNode locNode) { + if (delegate != null) + delegate.onLocalNodeInitialized(locNode); + } + } + + /** {@inheritDoc} */ + @Override public void setListener(@Nullable DiscoverySpiListener lsnr) { + super.setListener(new DiscoverySpiListenerWrapper(lsnr)); + } + } + + /** Discovery SPI delayed TcpDiscoveryNodeAddFinishedMessage. */ + private static class DelayedTcpDiscoverySpi extends TcpDiscoverySpi { + /** Delay message period millis. */ + public static final int DELAY_MSG_PERIOD_MILLIS = 2_000; + + /** {@inheritDoc} */ + @Override protected void writeToSocket(ClusterNode node, Socket sock, OutputStream out, + TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { + if (msg instanceof TcpDiscoveryNodeAddFinishedMessage && msg.topologyVersion() == 3) { + log.info("Catched discovery message: " + msg); + + try { + Thread.sleep(DELAY_MSG_PERIOD_MILLIS); + } + catch (InterruptedException e) { + log.error("Interrupt on DelayedTcpDiscoverySpi.", e); + + Thread.currentThread().interrupt(); + } + } + + super.writeToSocket(node, sock, out, msg, timeout); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d82b21ec/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index 04869f9..80f093d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -23,6 +23,7 @@ import org.apache.ignite.spi.GridTcpSpiForwardingSelfTest; import org.apache.ignite.spi.discovery.AuthenticationRestartTest; import org.apache.ignite.spi.discovery.FilterDataForClientNodeDiscoveryTest; import org.apache.ignite.spi.discovery.IgniteDiscoveryCacheReuseSelfTest; +import org.apache.ignite.spi.discovery.LongClientConnectToClusterTest; import org.apache.ignite.spi.discovery.tcp.DiscoveryUnmarshalVulnerabilityTest; import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectTest; import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownTest; @@ -97,6 +98,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { suite.addTest(new TestSuite(GridTcpSpiForwardingSelfTest.class)); suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class)); + suite.addTest(new TestSuite(LongClientConnectToClusterTest.class)); suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class)); suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class)); suite.addTest(new TestSuite(TcpClientDiscoverySpiFailureTimeoutSelfTest.class));
