zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ed492a4f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ed492a4f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ed492a4f Branch: refs/heads/ignite-zk-ce Commit: ed492a4f3aca046b6e196420bd986d0f47232aed Parents: 78a994a Author: sboikov <[email protected]> Authored: Fri Dec 15 16:00:49 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 15 17:08:26 2017 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 17 - .../communication/tcp/TcpCommunicationSpi.java | 148 ++----- .../tcp/internal/ConnectionKey.java | 92 ++++ .../TcpCommunicationConnectionCheckFuture.java | 423 +++++++++++++++++-- ...pCommunicationNodeConnectionCheckFuture.java | 30 ++ .../GridTcpCommunicationSpiAbstractTest.java | 83 ++-- 6 files changed, 618 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index a30c439..78cb7a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -117,8 +117,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; -import org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage; -import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage; import org.apache.ignite.internal.processors.continuous.GridContinuousMessage; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; @@ -186,11 +184,6 @@ public class GridIoMessageFactory implements MessageFactory { switch (type) { // -54 is reserved for SQL. // -46 ... -51 - snapshot messages. - case -62: - msg = new TcpCommunicationConnectionCheckMessage(); - - break; - case -61: msg = new IgniteDiagnosticMessage(); @@ -886,16 +879,6 @@ public class GridIoMessageFactory implements MessageFactory { break; - case 129: - msg = new ClusterMetricsUpdateMessage(); - - break; - - case 130: - msg = new ContinuousRoutineStartResultMessage(); - - break; - // [-3..119] [124..128] [-23..-27] [-36..-55]- this // [120..123] - DR http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 3588a79..afa6953 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -72,6 +72,7 @@ import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.GridSpinReadWriteLock; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.ipc.IpcEndpoint; import org.apache.ignite.internal.util.ipc.IpcToNioAdapter; import org.apache.ignite.internal.util.ipc.shmem.IpcOutOfSystemResourcesException; @@ -82,7 +83,6 @@ import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter; import org.apache.ignite.internal.util.nio.GridDirectParser; import org.apache.ignite.internal.util.nio.GridNioCodecFilter; import org.apache.ignite.internal.util.nio.GridNioFilter; -import org.apache.ignite.internal.util.nio.GridNioFuture; import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory; import org.apache.ignite.internal.util.nio.GridNioMessageTracker; import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory; @@ -135,11 +135,13 @@ import org.apache.ignite.spi.IgniteSpiThread; import org.apache.ignite.spi.IgniteSpiTimeoutObject; import org.apache.ignite.spi.communication.CommunicationListener; import org.apache.ignite.spi.communication.CommunicationSpi; +import org.apache.ignite.spi.communication.tcp.internal.ConnectionKey; +import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture; +import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationNodeConnectionCheckFuture; import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage; import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2; import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage; import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage; -import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.thread.IgniteThread; @@ -149,6 +151,8 @@ import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META; +import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.CONN_CHECK_DUMMY_KEY; +import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.SES_FUT_META; import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED; import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NEED_WAIT; import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NODE_STOPPING; @@ -318,12 +322,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** Message tracker meta for session. */ private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey(); - /** Session future. */ - public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey(); - - /** */ - public static final ConnectionKey CONN_CHECK_DUMMY_KEY = new ConnectionKey(null, -1, -1); - /** * Default local port range (value is <tt>100</tt>). * See {@link #setLocalPortRange(int)} for details. @@ -409,17 +407,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati else { ConnectionKey connId = ses.meta(CONN_IDX_META); - if (connId != CONN_CHECK_DUMMY_KEY) { - if (log.isInfoEnabled()) - log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() + - ", rmtAddr=" + ses.remoteAddress() + ']'); - } + if (log.isInfoEnabled()) + log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() + + ", rmtAddr=" + ses.remoteAddress() + ']'); } } @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { ConnectionKey connId = ses.meta(CONN_IDX_META); + if (connId == CONN_CHECK_DUMMY_KEY) + return; + if (connId != null) { UUID id = connId.nodeId(); @@ -691,7 +690,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati ConnectionKey connKey = ses.meta(CONN_IDX_META); if (connKey == null) { - assert ses.accepted() : msg; + assert ses.accepted() : ses; if (!connectGate.tryEnter()) { if (log.isDebugEnabled()) @@ -714,9 +713,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } } else { - metricsLsnr.onMessageReceived(msg, connKey.nodeId()); - if (msg instanceof RecoveryLastReceivedMessage) { + metricsLsnr.onMessageReceived(msg, connKey.nodeId()); + GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor(); if (recovery != null) { @@ -729,9 +728,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } recovery.ackReceived(msg0.received()); - - return; } + + return; } else { GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor(); @@ -754,9 +753,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati else if (connKey == CONN_CHECK_DUMMY_KEY) { assert msg instanceof NodeIdMessage : msg; - TcpCommunicationConnectionCheckFuture fut = ses.meta(SES_FUT_META); + TcpCommunicationNodeConnectionCheckFuture fut = ses.meta(SES_FUT_META); + + assert fut != null : msg; - fut.onConnected(U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0)); + fut.onConnected(U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0)); nioSrvr.closeFromWorkerThread(ses); @@ -764,6 +765,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } } + metricsLsnr.onMessageReceived(msg, connKey.nodeId()); + IgniteRunnable c; if (msgQueueLimit > 0) { @@ -2592,18 +2595,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati sendMessage0(node, msg, null); } + /** {@inheritDoc} */ public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) { - ClusterNode node = nodes.get(0); - - try { - Collection<InetSocketAddress> addrs = nodeAddresses(node); + TcpCommunicationConnectionCheckFuture fut = new TcpCommunicationConnectionCheckFuture( + this, + log.getLogger(TcpCommunicationConnectionCheckFuture.class), + nioSrvr, + nodes); - } - catch (Exception e) { - throw new IgniteSpiException(e); - } + fut.init(failureDetectionTimeoutEnabled() ? failureDetectionTimeout() : connTimeout); - return null; + return new IgniteFutureImpl<>(fut); } /** @@ -3028,7 +3030,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati ConnectionKey id = ses.meta(CONN_IDX_META); if (id != null) { - ClusterNode node = getSpiContext().node(id.nodeId); + ClusterNode node = getSpiContext().node(id.nodeId()); if (node != null && node.isClient()) { String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " + @@ -3049,9 +3051,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** * @param node Node. * @return Node addresses. + * @throws IgniteCheckedException If failed. + */ + private Collection<InetSocketAddress> nodeAddresses(ClusterNode node) throws IgniteCheckedException { + return nodeAddresses(node, filterReachableAddresses); + } + + /** + * @param node Node. + * @param filterReachableAddresses Filter addresses flag. + * @return Node addresses. * @throws IgniteCheckedException If node does not have addresses. */ - private LinkedHashSet<InetSocketAddress> nodeAddresses(ClusterNode node) throws IgniteCheckedException { + public Collection<InetSocketAddress> nodeAddresses(ClusterNode node, boolean filterReachableAddresses) + throws IgniteCheckedException { Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS)); Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES)); Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT)); @@ -3132,7 +3145,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * @throws IgniteCheckedException If failed. */ protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException { - LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node); + Collection<InetSocketAddress> addrs = nodeAddresses(node); GridCommunicationClient client = null; IgniteCheckedException errs = null; @@ -4629,77 +4642,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** * */ - private static class ConnectionKey { - /** */ - private final UUID nodeId; - - /** */ - private final int idx; - - /** */ - private final long connCnt; - - /** - * @param nodeId Node ID. - * @param idx Connection index. - * @param connCnt Connection counter (set only for incoming connections). - */ - ConnectionKey(UUID nodeId, int idx, long connCnt) { - this.nodeId = nodeId; - this.idx = idx; - this.connCnt = connCnt; - } - - /** - * @return Connection counter. - */ - long connectCount() { - return connCnt; - } - - /** - * @return Node ID. - */ - UUID nodeId() { - return nodeId; - } - - /** - * @return Connection index. - */ - int connectionIndex() { - return idx; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - ConnectionKey key = (ConnectionKey) o; - - return idx == key.idx && nodeId.equals(key.nodeId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = nodeId.hashCode(); - res = 31 * res + idx; - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ConnectionKey.class, this); - } - } - - /** - * - */ interface ConnectionPolicy { /** * @return Thread connection index. http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java new file mode 100644 index 0000000..6716446 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp.internal; + +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +public class ConnectionKey { + /** */ + private final UUID nodeId; + + /** */ + private final int idx; + + /** */ + private final long connCnt; + + /** + * @param nodeId Node ID. + * @param idx Connection index. + * @param connCnt Connection counter (set only for incoming connections). + */ + public ConnectionKey(UUID nodeId, int idx, long connCnt) { + this.nodeId = nodeId; + this.idx = idx; + this.connCnt = connCnt; + } + + /** + * @return Connection counter. + */ + public long connectCount() { + return connCnt; + } + + /** + * @return Node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Connection index. + */ + public int connectionIndex() { + return idx; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + ConnectionKey key = (ConnectionKey) o; + + return idx == key.idx && nodeId.equals(key.nodeId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = nodeId.hashCode(); + res = 31 * res + idx; + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ConnectionKey.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java index e08ea13..99e1eca 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java @@ -17,84 +17,445 @@ package org.apache.ignite.spi.communication.tcp.internal; -import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; +import java.util.BitSet; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.nio.GridNioServer; import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.IgniteSpiTimeoutObject; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.jetbrains.annotations.Nullable; /** * */ -public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Boolean> { +public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject { + /** Session future. */ + public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey(); + + /** */ + public static final ConnectionKey CONN_CHECK_DUMMY_KEY = new ConnectionKey(null, -1, -1); + + /** */ + private static final AtomicIntegerFieldUpdater<SingleAddressConnectFuture> connFutDoneUpdater = + AtomicIntegerFieldUpdater.newUpdater(SingleAddressConnectFuture.class, "done"); + + /** */ + private static final AtomicIntegerFieldUpdater<MultipleAddressesConnectFuture> connResCntUpdater = + AtomicIntegerFieldUpdater.newUpdater(MultipleAddressesConnectFuture.class, "resCnt"); + + /** */ + private final AtomicInteger resCntr = new AtomicInteger(); + /** */ - private final UUID nodeId; + private final List<ClusterNode> nodes; + + /** */ + private volatile ConnectFuture[] futs; /** */ private final GridNioServer nioSrvr; /** */ - private Map<Integer, Object> sesMeta; + private final TcpCommunicationSpi spi; + + /** */ + private final IgniteUuid timeoutObjId = IgniteUuid.randomUuid(); + + /** */ + private final BitSet resBitSet; + + /** */ + private long endTime; /** */ - private SocketChannel ch; + private final IgniteLogger log; /** - * @param nodeId Remote note ID. + * @param spi SPI instance. + * @param log Logger. + * @param nioSrvr NIO server. + * @param nodes Nodes to check. */ - public TcpCommunicationConnectionCheckFuture(GridNioServer nioSrvr, UUID nodeId) { + public TcpCommunicationConnectionCheckFuture(TcpCommunicationSpi spi, + IgniteLogger log, + GridNioServer nioSrvr, + List<ClusterNode> nodes) + { + this.spi = spi; + this.log = log; this.nioSrvr = nioSrvr; - this.nodeId = nodeId; + this.nodes = nodes; + + resBitSet = new BitSet(nodes.size()); } /** - * @param addr - * @throws IOException + * @param timeout Connect timeout. */ - public void init(InetSocketAddress addr) throws IOException { - ch = SocketChannel.open(); + public void init(long timeout) { + ConnectFuture[] futs = new ConnectFuture[nodes.size()]; + + UUID locId = spi.getSpiContext().localNode().id(); + + for (int i = 0; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); + + if (!node.id().equals(locId)) { + if (spi.getSpiContext().node(node.id()) == null) { + receivedConnectionStatus(i, false); + + continue; + } + + Collection<InetSocketAddress> addrs; + + try { + addrs = spi.nodeAddresses(node, false); + } + catch (Exception e) { + U.error(log, "Failed to get node addresses: " + node, e); - ch.configureBlocking(false); + receivedConnectionStatus(i, false); - ch.socket().setTcpNoDelay(true); - ch.socket().setKeepAlive(false); + continue; + } - boolean connect = ch.connect(addr); + if (addrs.size() == 1) { + SingleAddressConnectFuture fut = new SingleAddressConnectFuture(i); - if (!connect) { - sesMeta = new GridLeanMap<>(2); + fut.init(addrs.iterator().next()); - sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, TcpCommunicationSpi.CONN_CHECK_DUMMY_KEY); - sesMeta.put(TcpCommunicationSpi.SES_FUT_META, this); + futs[i] = fut; + } + else { + MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i); - nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() { - @Override public void apply(IgniteInternalFuture<GridNioSession> fut) { - if (fut.error() != null) - onDone(false); + fut.init(addrs); } - }); + } + else + receivedConnectionStatus(i, true); + } + + this.futs = futs; + + if (!isDone()) { + endTime = System.currentTimeMillis() - timeout; + + spi.getSpiContext().addTimeoutObject(this); } } /** + * @param idx Node index. + * @param res Success flag. + */ + private void receivedConnectionStatus(int idx, boolean res) { + assert resCntr.get() < nodes.size(); + + synchronized (resBitSet) { + resBitSet.set(idx, res); + } + + if (resCntr.incrementAndGet() == nodes.size()) + onDone(resBitSet); + } + + /** + * @param nodeIdx Node index. + * @return Node ID. + */ + private UUID nodeId(int nodeIdx) { + return nodes.get(nodeIdx).id(); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return timeoutObjId; + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return endTime; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + if (!isDone()) + return; + + ConnectFuture[] futs = this.futs; + + for (int i = 0; i < futs.length; i++) { + ConnectFuture fut = futs[i]; + + if (fut != null) + fut.onTimeout(); + } + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable BitSet res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + spi.getSpiContext().removeTimeoutObject(this); + + return true; + } + + return false; + } + + /** * */ - public void onTimeout() { - if (super.onDone(false)) - nioSrvr.cancelConnect(ch, sesMeta); + private interface ConnectFuture { + /** + * + */ + void onTimeout(); } /** - * @param rmtNodeId + * */ - public void onConnected(UUID rmtNodeId) { - onDone(nodeId.equals(rmtNodeId)); + private class SingleAddressConnectFuture implements TcpCommunicationNodeConnectionCheckFuture, ConnectFuture { + /** */ + final int nodeIdx; + + /** */ + volatile int done; + + /** */ + Map<Integer, Object> sesMeta; + + /** */ + private SocketChannel ch; + + /** + * @param nodeIdx Node index. + */ + SingleAddressConnectFuture(int nodeIdx) { + this.nodeIdx = nodeIdx; + } + + /** + * @param addr Node address. + */ + public void init(InetSocketAddress addr) { + boolean connect; + + try { + ch = SocketChannel.open(); + + ch.configureBlocking(false); + + ch.socket().setTcpNoDelay(true); + ch.socket().setKeepAlive(false); + + connect = ch.connect(addr); + } + catch (Exception e) { + finish(false); + + return; + } + + if (!connect) { + sesMeta = new GridLeanMap<>(3); + + // Set dummy key to identify connection-check outgoing connection. + sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, CONN_CHECK_DUMMY_KEY); + sesMeta.put(SES_FUT_META, this); + + nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() { + @Override public void apply(IgniteInternalFuture<GridNioSession> fut) { + if (fut.error() != null) + finish(false); + } + }); + } + } + + /** + * + */ + void cancel() { + if (finish(false)) + nioSrvr.cancelConnect(ch, sesMeta); + } + + /** {@inheritDoc} */ + public void onTimeout() { + cancel(); + } + + /** {@inheritDoc} */ + public void onConnected(UUID rmtNodeId) { + finish(nodeId(nodeIdx).equals(rmtNodeId)); + } + + /** + * @param res Result. + * @return {@code True} if result was set by this call. + */ + boolean finish(boolean res) { + if (connFutDoneUpdater.compareAndSet(this, 0, 1)) { + onStatusReceived(res); + + return true; + } + + return false; + } + + /** + * @param res Result. + */ + void onStatusReceived(boolean res) { + receivedConnectionStatus(nodeIdx, res); + } + } + + /** + * + */ + private class MultipleAddressesConnectFuture implements ConnectFuture { + /** */ + volatile int resCnt; + + /** */ + volatile SingleAddressConnectFuture[] futs; + + /** */ + final int nodeIdx; + + /** + * @param nodeIdx Node index. + */ + MultipleAddressesConnectFuture(int nodeIdx) { + this.nodeIdx = nodeIdx; + + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + SingleAddressConnectFuture[] futs = this.futs; + + for (int i = 0; i < futs.length; i++) { + ConnectFuture fut = futs[i]; + + if (fut != null) + fut.onTimeout(); + } + } + + /** + * @param addrs Node addresses. + */ + void init(Collection<InetSocketAddress> addrs) { + SingleAddressConnectFuture[] futs = new SingleAddressConnectFuture[addrs.size()]; + + int idx = 0; + + for (InetSocketAddress addr : addrs) { + SingleAddressConnectFuture fut = new SingleAddressConnectFuture(nodeIdx) { + @Override void onStatusReceived(boolean res) { + receivedAddressStatus(res); + } + }; + + fut.init(addr); + + futs[idx++] = fut; + + if (done()) + return; + } + + this.futs = futs; + + // Close race. + if (done()) + cancelFutures(); + } + + /** + * @return {@code True} + */ + private boolean done() { + int resCnt0 = resCnt; + + return resCnt0 == Integer.MAX_VALUE || resCnt0 == futs.length; + } + + /** + * + */ + private void cancelFutures() { + SingleAddressConnectFuture[] futs = this.futs; + + if (futs != null) { + for (int i = 0; i < futs.length; i++) { + SingleAddressConnectFuture fut = futs[i]; + + fut.cancel(); + } + } + } + + /** + * @param res Result. + */ + void receivedAddressStatus(boolean res) { + if (res) { + for (;;) { + int resCnt0 = resCnt; + + if (resCnt0 == Integer.MAX_VALUE) + return; + + if (connResCntUpdater.compareAndSet(this, resCnt0, Integer.MAX_VALUE)) { + receivedConnectionStatus(nodeIdx, true); + + cancelFutures(); // Cancel others connects if they are still in progress. + + return; + } + } + } + else { + for (;;) { + int resCnt0 = resCnt; + + if (resCnt0 == Integer.MAX_VALUE) + return; + + int resCnt1 = resCnt0 + 1; + + if (connResCntUpdater.compareAndSet(this, resCnt0, resCnt1)) { + if (resCnt1 == futs.length) + receivedConnectionStatus(nodeIdx, false); + + return; + } + } + } + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java new file mode 100644 index 0000000..c034782 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp.internal; + +import java.util.UUID; + +/** + * + */ +public interface TcpCommunicationNodeConnectionCheckFuture { + /** + * @param nodeId Remote node ID. + */ + public void onConnected(UUID nodeId); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java index caa2d87..e89a4c8 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java @@ -18,20 +18,23 @@ package org.apache.ignite.spi.communication.tcp; import java.util.ArrayList; -import java.util.HashSet; +import java.util.BitSet; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.GridAbstractCommunicationSelfTest; -import org.apache.ignite.spi.communication.GridTestMessage; import org.apache.ignite.testframework.GridTestUtils; /** @@ -39,7 +42,7 @@ import org.apache.ignite.testframework.GridTestUtils; */ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi> { /** */ - private static final int SPI_COUNT = 2; + private static final int SPI_COUNT = 3; /** */ public static final int IDLE_CONN_TIMEOUT = 2000; @@ -92,33 +95,65 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica } } - public void testConnectionCheck() { - for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) { - UUID id = entry.getKey(); + /** + * + */ + public void testCheckConnection1() { + for (int i = 0; i < 100; i++) { + for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) { + TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue(); - TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue(); + List<ClusterNode> checkNodes = new ArrayList<>(nodes); - List<ClusterNode> checkNodes = new ArrayList<>(); + assert checkNodes.size() > 1; - for (ClusterNode node : nodes) { - if (!id.equals(node.id())) - checkNodes.add(node); + IgniteFuture<BitSet> fut = spi.checkConnection(checkNodes); + + BitSet res = fut.get(); + + for (int n = 0; n < checkNodes.size(); n++) + assertTrue(res.get(n)); } + } + } + + /** + * @throws Exception If failed. + */ + public void testCheckConnection2() throws Exception { + final int THREADS = spis.size(); + + final CyclicBarrier b = new CyclicBarrier(THREADS); + + List<IgniteInternalFuture> futs = new ArrayList<>(); - spi.checkConnection(checkNodes); - - break; -// for (ClusterNode node : nodes) { -// synchronized (mux) { -// if (!msgDestMap.containsKey(entry.getKey())) -// msgDestMap.put(entry.getKey(), new HashSet<UUID>()); -// -// msgDestMap.get(entry.getKey()).add(node.id()); -// } -// -// entry.getValue().sendMessage(node, new GridTestMessage(entry.getKey(), msgId++, 0)); -// } + for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) { + final TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue(); + + futs.add(GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + List<ClusterNode> checkNodes = new ArrayList<>(nodes); + + assert checkNodes.size() > 1; + + b.await(); + + for (int i = 0; i < 100; i++) { + IgniteFuture<BitSet> fut = spi.checkConnection(checkNodes); + + BitSet res = fut.get(); + + for (int n = 0; n < checkNodes.size(); n++) + assertTrue(res.get(n)); + } + + return null; + } + })); } + + for (IgniteInternalFuture f : futs) + f.get(); } /** {@inheritDoc} */
