IGNITE-5943 Communication. Server node may reject client connection during massive clients join. This closes #2423
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/15393153 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/15393153 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/15393153 Branch: refs/heads/ignite-5578 Commit: 1539315342d09150a4af942fce2f8147e390f85e Parents: 45708b9 Author: EdShangGG <[email protected]> Authored: Fri Aug 18 18:52:59 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Aug 18 19:13:58 2017 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 32 ++-- .../communication/tcp/TcpCommunicationSpi.java | 60 ++++++- .../ignite/spi/discovery/tcp/ServerImpl.java | 16 ++ .../spi/discovery/tcp/TcpDiscoverySpi.java | 10 ++ .../discovery/tcp/IgniteClientConnectTest.java | 163 +++++++++++++++++++ .../IgniteSpiDiscoverySelfTestSuite.java | 6 + 6 files changed, 268 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index be4aace..0b7a243 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -609,21 +609,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final boolean skipStore = opCtx != null && opCtx.skipStore(); - if (asyncOp) {return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() { - @Override public IgniteInternalFuture<Map<K, V>> apply() { - return getAllAsync0(ctx.cacheKeysView(keys), - forcePrimary, - subjId0, - taskName, - deserializeBinary, - recovery, - expiryPlc, - skipVals, - skipStore, - canRemap, - needVer); - } - });} + if (asyncOp) { + return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() { + @Override public IgniteInternalFuture<Map<K, V>> apply() { + return getAllAsync0(ctx.cacheKeysView(keys), + forcePrimary, + subjId0, + taskName, + deserializeBinary, + recovery, + expiryPlc, + skipVals, + skipStore, + canRemap, + needVer); + } + }); + } else { return getAllAsync0(ctx.cacheKeysView(keys), forcePrimary, http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 1b00b5d..bab9cfa 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -109,6 +109,7 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; @@ -132,6 +133,8 @@ import org.apache.ignite.spi.IgniteSpiThread; import org.apache.ignite.spi.IgniteSpiTimeoutObject; import org.apache.ignite.spi.communication.CommunicationListener; import org.apache.ignite.spi.communication.CommunicationSpi; +import org.apache.ignite.spi.discovery.DiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedDeque8; @@ -141,6 +144,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.ALREADY_CONNECTED; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.NEED_WAIT; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.NODE_STOPPING; /** @@ -296,6 +300,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati */ public static final int DFLT_SELECTORS_CNT = Math.max(4, Runtime.getRuntime().availableProcessors() / 2); + /** + * Version when client is ready to wait to connect to server (could be needed when client tries to open connection + * before it starts being visible for server) + */ + private static final IgniteProductVersion VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT = IgniteProductVersion.fromString("2.1.4"); + /** Connection index meta for session. */ private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey(); @@ -442,7 +452,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * @param ses Session. * @param msg Message. */ - private void onFirstMessage(GridNioSession ses, Message msg) { + private void onFirstMessage(final GridNioSession ses, Message msg) { UUID sndId; ConnectionKey connKey; @@ -466,10 +476,35 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati final ClusterNode rmtNode = getSpiContext().node(sndId); if (rmtNode == null) { - U.warn(log, "Close incoming connection, unknown node [nodeId=" + sndId + - ", ses=" + ses + ']'); + DiscoverySpi discoverySpi = ignite().configuration().getDiscoverySpi(); + + assert discoverySpi instanceof TcpDiscoverySpi; + + TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi) discoverySpi; + + ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId); + + boolean unknownNode = true; - ses.close(); + if (node0 != null) { + assert node0.isClient() : node0; + + if (node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0) + unknownNode = false; + } + + if (unknownNode) { + U.warn(log, "Close incoming connection, unknown node [nodeId=" + sndId + ", ses=" + ses + ']'); + + ses.close(); + } + else { + ses.send(new RecoveryLastReceivedMessage(NEED_WAIT)).listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + ses.close(); + } + }); + } return; } @@ -3031,6 +3066,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this, !node.isClient()); + int lastWaitingTimeout = 1; + while (!conn) { // Reconnection on handshake timeout. try { SocketChannel ch = SocketChannel.open(); @@ -3101,6 +3138,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id()); } + else if (rcvCnt == NEED_WAIT) { + recoveryDesc.release(); + + U.closeQuiet(ch); + + if (lastWaitingTimeout < 60000) + lastWaitingTimeout *= 2; + + U.sleep(lastWaitingTimeout); + + continue; + } } finally { if (recoveryDesc != null && rcvCnt == null) @@ -4559,6 +4608,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** */ static final long NODE_STOPPING = -2; + /** Need wait. */ + static final long NEED_WAIT = -3; + /** Message body size in bytes. */ private static final int MESSAGE_SIZE = 8; http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index a6de34b..ca7dd4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1805,6 +1805,22 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * Trying get node in any state (visible or not) + * @param nodeId Node id. + */ + ClusterNode getNode0(UUID nodeId) { + assert nodeId != null; + + UUID locNodeId0 = getLocalNodeId(); + + if (locNodeId0 != null && locNodeId0.equals(nodeId)) + // Return local node directly. + return locNode; + + return ring.node(nodeId); + } + + /** * Thread that cleans IP finder and keeps it in the correct state, unregistering * addresses of the nodes that has left the topology. * <p> http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index c988d7e..e6eaa8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -433,6 +433,16 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { return impl.getNode(nodeId); } + /** + * @param id Id. + */ + public ClusterNode getNode0(UUID id) { + if (impl instanceof ServerImpl) + return ((ServerImpl)impl).getNode0(id); + + return getNode(id); + } + /** {@inheritDoc} */ @Override public boolean pingNode(UUID nodeId) { return impl.pingNode(nodeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java new file mode 100644 index 0000000..1a89987 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + + +/** + * We emulate that client receive message about joining to topology earlier than some server nodes in topology. + * And make this client connect to such servers. + * To emulate this we connect client to second node in topology and pause sending message about joining finishing to + * third node. + */ +public class IgniteClientConnectTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Latch to stop message sending. */ + private final CountDownLatch latch = new CountDownLatch(1); + + /** Start client flag. */ + private final AtomicBoolean clientJustStarted = new AtomicBoolean(false); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi(); + + if (igniteInstanceName.equals("client")) { + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); + + ipFinder.registerAddresses(Collections.singleton(new InetSocketAddress(InetAddress.getLoopbackAddress(), 47501))); + + disco.setIpFinder(ipFinder); + } + else + disco.setIpFinder(ipFinder); + + disco.setJoinTimeout(2 * 60_000); + disco.setSocketTimeout(1000); + disco.setNetworkTimeout(2000); + + cfg.setDiscoverySpi(disco); + + CacheConfiguration cacheConfiguration = new CacheConfiguration() + .setName(DEFAULT_CACHE_NAME) + .setCacheMode(CacheMode.PARTITIONED) + .setAffinity(new RendezvousAffinityFunction(false, 8)) + .setBackups(0); + + cfg.setCacheConfiguration(cacheConfiguration); + + return cfg; + } + + /** + * + * @throws Exception If failed. + */ + public void testClientConnectToBigTopology() throws Exception { + Ignite ignite = startGrids(3); + + IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME); + + Set<Integer> keys = new HashSet<>(); + + for (int i = 0; i < 80; i++) { + cache.put(i, i); + + keys.add(i); + } + + TcpDiscoveryImpl discovery = ((TestTcpDiscoverySpi) ignite.configuration().getDiscoverySpi()).discovery(); + + assertTrue(discovery instanceof ServerImpl); + + IgniteConfiguration clientCfg = getConfiguration("client"); + + clientCfg.setClientMode(true); + + clientJustStarted.set(true); + + IgniteEx client = startGrid(clientCfg); + + latch.countDown(); + + System.err.println("GET ALL"); + client.cache(DEFAULT_CACHE_NAME).getAll(keys); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * + */ + class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** {@inheritDoc} */ + protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, + IgniteCheckedException { + if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { + if (msg.senderNodeId() != null && clientJustStarted.get()) + try { + latch.await(); + + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + super.writeToSocket(sock, out, msg, timeout); + } + else + super.writeToSocket(sock, out, msg, timeout); + } + + /** + * + */ + TcpDiscoveryImpl discovery() { + return impl; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index 1287149..c506ca7 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -20,6 +20,8 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.spi.GridTcpSpiForwardingSelfTest; import org.apache.ignite.spi.discovery.AuthenticationRestartTest; +import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectTest; +import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownTest; import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoveryMarshallerCheckSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiFailureTimeoutSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiMulticastTest; @@ -90,6 +92,10 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { suite.addTest(new TestSuite(TcpDiscoveryNodeAttributesUpdateOnReconnectTest.class)); suite.addTest(new TestSuite(AuthenticationRestartTest.class)); + //Client connect + suite.addTest(new TestSuite(IgniteClientConnectTest.class)); + suite.addTest(new TestSuite(IgniteClientReconnectMassiveShutdownTest.class)); + // SSL. suite.addTest(new TestSuite(TcpDiscoverySslSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoverySslSecuredUnsecuredTest.class));
