zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/60b625a9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/60b625a9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/60b625a9 Branch: refs/heads/ignite-zk-ce Commit: 60b625a99f956db75f1c2abfdbeb59b1df91166d Parents: 0b22522 Author: sboikov <sboi...@gridgain.com> Authored: Fri Dec 15 14:16:36 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Dec 15 14:16:36 2017 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 6 ++ .../ignite/internal/util/nio/GridNioServer.java | 2 +- .../communication/tcp/TcpCommunicationSpi.java | 69 ++++++------- .../TcpCommunicationConnectionCheckFuture.java | 100 +++++++++++++++++++ .../ZkCommunicationErrorProcessFuture.java | 2 +- .../GridTcpCommunicationSpiAbstractTest.java | 38 ++++++- .../ZookeeperDiscoverySpiBasicTest.java | 6 +- 7 files changed, 177 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 7761020..37a7d8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -155,6 +155,7 @@ import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationConnectionCheckMessage; import org.apache.ignite.spi.communication.tcp.HandshakeMessage; import org.apache.ignite.spi.communication.tcp.HandshakeMessage2; import org.apache.ignite.spi.communication.tcp.NodeIdMessage; @@ -186,6 +187,11 @@ public class GridIoMessageFactory implements MessageFactory { switch (type) { // -54 is reserved for SQL. // -46 ... -51 - snapshot messages. + case -62: + msg = new TcpCommunicationConnectionCheckMessage(); + + break; + case -61: msg = new IgniteDiagnosticMessage(); http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 14d55d8..9784549 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -842,7 +842,7 @@ public class GridNioServer<T> { NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta); if (async) { - // assert meta != null; + assert meta != null; req.op = NioOperation.CONNECT; } http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index ca73e7b..d4ccc33 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -135,6 +135,7 @@ import org.apache.ignite.spi.IgniteSpiThread; import org.apache.ignite.spi.IgniteSpiTimeoutObject; import org.apache.ignite.spi.communication.CommunicationListener; import org.apache.ignite.spi.communication.CommunicationSpi; +import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.thread.IgniteThread; @@ -308,11 +309,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati private static final IgniteProductVersion VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT = IgniteProductVersion.fromString("2.1.4"); /** Connection index meta for session. */ - private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey(); + public static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey(); /** Message tracker meta for session. */ private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey(); + /** Session future. */ + public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey(); + + /** */ + public static final ConnectionKey CONN_CHECK_DUMMY_KEY = new ConnectionKey(null, -1, -1); + /** * Default local port range (value is <tt>100</tt>). * See {@link #setLocalPortRange(int)} for details. @@ -396,9 +403,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } } else { - if (log.isInfoEnabled()) - log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() + - ", rmtAddr=" + ses.remoteAddress() + ']'); + ConnectionKey connId = ses.meta(CONN_IDX_META); + + if (connId != CONN_CHECK_DUMMY_KEY) { + if (log.isInfoEnabled()) + log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() + + ", rmtAddr=" + ses.remoteAddress() + ']'); + } } } @@ -676,7 +687,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati ConnectionKey connKey = ses.meta(CONN_IDX_META); if (connKey == null) { - assert ses.accepted() : ses; + assert ses.accepted() : msg; if (!connectGate.tryEnter()) { if (log.isDebugEnabled()) @@ -736,6 +747,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati recovery.lastAcknowledged(rcvCnt); } } + else if (connKey == CONN_CHECK_DUMMY_KEY) { + assert msg instanceof NodeIdMessage : msg; + + TcpCommunicationConnectionCheckFuture fut = ses.meta(SES_FUT_META); + + fut.onConnected(U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0)); + + nioSrvr.closeFromWorkerThread(ses); + + return; + } } IgniteRunnable c; @@ -2566,45 +2588,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati sendMessage0(node, msg, null); } - public IgniteFuture<BitSet> pingNodes(List<ClusterNode> nodes) { + public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) { ClusterNode node = nodes.get(0); try { - LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node); - - // /172.25.4.90:45012 + Collection<InetSocketAddress> addrs = nodeAddresses(node); - for (InetSocketAddress addr : addrs) { - SocketChannel ch = SocketChannel.open(); - - ch.configureBlocking(false); - - ch.socket().setTcpNoDelay(tcpNoDelay); - ch.socket().setKeepAlive(true); - - boolean connect = ch.connect(addr); - - if (!connect) { - GridNioFuture<GridNioSession> fut = nioSrvr.createSession(ch, null, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() { - @Override public void apply(IgniteInternalFuture<GridNioSession> fut) { - try { - GridNioSession ses = fut.get(); - - log.info("Ping connected"); - - nioSrvr.closeFromWorkerThread(ses); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - - fut.get(); - } - else - log.info("Connected"); - } } catch (Exception e) { throw new IgniteSpiException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java new file mode 100644 index 0000000..e08ea13 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp.internal; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.GridLeanMap; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.nio.GridNioServer; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; + +/** + * + */ +public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Boolean> { + /** */ + private final UUID nodeId; + + /** */ + private final GridNioServer nioSrvr; + + /** */ + private Map<Integer, Object> sesMeta; + + /** */ + private SocketChannel ch; + + /** + * @param nodeId Remote note ID. + */ + public TcpCommunicationConnectionCheckFuture(GridNioServer nioSrvr, UUID nodeId) { + this.nioSrvr = nioSrvr; + this.nodeId = nodeId; + } + + /** + * @param addr + * @throws IOException + */ + public void init(InetSocketAddress addr) throws IOException { + ch = SocketChannel.open(); + + ch.configureBlocking(false); + + ch.socket().setTcpNoDelay(true); + ch.socket().setKeepAlive(false); + + boolean connect = ch.connect(addr); + + if (!connect) { + sesMeta = new GridLeanMap<>(2); + + sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, TcpCommunicationSpi.CONN_CHECK_DUMMY_KEY); + sesMeta.put(TcpCommunicationSpi.SES_FUT_META, this); + + nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() { + @Override public void apply(IgniteInternalFuture<GridNioSession> fut) { + if (fut.error() != null) + onDone(false); + } + }); + } + } + + /** + * + */ + public void onTimeout() { + if (super.onDone(false)) + nioSrvr.cancelConnect(ch, sesMeta); + } + + /** + * @param rmtNodeId + */ + public void onConnected(UUID rmtNodeId) { + onDone(nodeId.equals(rmtNodeId)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java index 6812ab0..d7d4bd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java @@ -151,7 +151,7 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen throws Exception { TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi(); - spi.pingNodes(nodes); + spi.checkConnection(nodes); ZkDistributedCollectDataFuture.saveNodeResult(futPath, rtState.zkClient, locNodeOrder, null); } http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java index 54b3a78..caa2d87 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java @@ -17,14 +17,21 @@ package org.apache.ignite.spi.communication.tcp; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.GridAbstractCommunicationSelfTest; +import org.apache.ignite.spi.communication.GridTestMessage; import org.apache.ignite.testframework.GridTestUtils; /** @@ -32,7 +39,7 @@ import org.apache.ignite.testframework.GridTestUtils; */ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi> { /** */ - private static final int SPI_COUNT = 3; + private static final int SPI_COUNT = 2; /** */ public static final int IDLE_CONN_TIMEOUT = 2000; @@ -85,6 +92,35 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica } } + public void testConnectionCheck() { + for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) { + UUID id = entry.getKey(); + + TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue(); + + List<ClusterNode> checkNodes = new ArrayList<>(); + + for (ClusterNode node : nodes) { + if (!id.equals(node.id())) + checkNodes.add(node); + } + + spi.checkConnection(checkNodes); + + break; +// for (ClusterNode node : nodes) { +// synchronized (mux) { +// if (!msgDestMap.containsKey(entry.getKey())) +// msgDestMap.put(entry.getKey(), new HashSet<UUID>()); +// +// msgDestMap.get(entry.getKey()).add(node.id()); +// } +// +// entry.getValue().sendMessage(node, new GridTestMessage(entry.getKey(), msgId++, 0)); +// } + } + } + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 64fcd34..a2e8784 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -1929,7 +1929,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { nodes.add(ignite(2).cluster().localNode()); - spi.pingNodes(nodes); + // spi.pingNodes(nodes); } /** @@ -2628,7 +2628,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public IgniteFuture<BitSet> pingNodes(List<ClusterNode> nodes) { + @Override public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) { CountDownLatch pingLatch = this.pingLatch; try { @@ -2639,7 +2639,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { throw new IgniteException(e); } - return super.pingNodes(nodes); + return super.checkConnection(nodes); } } }