Repository: incubator-ignite Updated Branches: refs/heads/ignite-499 [created] a89dcff14
# IGNITE-499 Minor refactoring. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a89dcff1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a89dcff1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a89dcff1 Branch: refs/heads/ignite-499 Commit: a89dcff14ed5f09b2cee650f4e30e6311a9cda30 Parents: d471a0b Author: sevdokimov <[email protected]> Authored: Thu Apr 9 20:07:57 2015 +0300 Committer: sevdokimov <[email protected]> Committed: Thu Apr 9 20:07:57 2015 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 2 +- .../spi/communication/tcp/NodeIdMessage.java | 110 +++ .../communication/tcp/TcpCommunicationSpi.java | 744 +++++++++---------- 3 files changed, 450 insertions(+), 406 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a89dcff1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index b82147b..93eb0c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -71,7 +71,7 @@ public class GridIoMessageFactory implements MessageFactory { switch (type) { case TcpCommunicationSpi.NODE_ID_MSG_TYPE: - msg = new TcpCommunicationSpi.NodeIdMessage(); + msg = new NodeIdMessage(); break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a89dcff1/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java new file mode 100644 index 0000000..3beb26a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.nio.*; +import java.util.*; + +/** + * Message with node ID. + */ +public class NodeIdMessage implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private byte[] nodeIdBytes; + + /** */ + private byte[] nodeIdBytesWithType; + + /** */ + public NodeIdMessage() { + // No-op. + } + + /** + * @param nodeId Node ID. + */ + public NodeIdMessage(UUID nodeId) { + nodeIdBytes = U.uuidToBytes(nodeId); + + nodeIdBytesWithType = new byte[nodeIdBytes.length + 1]; + + nodeIdBytesWithType[0] = TcpCommunicationSpi.NODE_ID_MSG_TYPE; + + System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 1, nodeIdBytes.length); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + assert nodeIdBytes.length == 16; + + if (buf.remaining() < 17) + return false; + + buf.put(TcpCommunicationSpi.NODE_ID_MSG_TYPE); + buf.put(nodeIdBytes); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + if (buf.remaining() < 16) + return false; + + nodeIdBytes = new byte[16]; + + buf.get(nodeIdBytes); + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return TcpCommunicationSpi.NODE_ID_MSG_TYPE; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 0; + } + + /** + * @return Node id bytes with type. + */ + public byte[] nodeIdBytesWithType() { + return nodeIdBytesWithType; + } + + /** + * @return Node id bytes. + */ + public byte[] nodeIdBytes() { + return nodeIdBytes; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(NodeIdMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a89dcff1/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index ff84e5b..9731060 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -41,7 +41,6 @@ import java.io.*; import java.net.*; import java.nio.*; import java.nio.channels.*; -import java.nio.channels.spi.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -223,357 +222,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter public static final byte HANDSHAKE_MSG_TYPE = -3; /** Server listener. */ - private final GridNioServerListener<Message> srvLsnr = - new GridNioServerListenerAdapter<Message>() { - @Override public void onSessionWriteTimeout(GridNioSession ses) { - LT.warn(log, null, "Communication SPI Session write timed out (consider increasing " + - "'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() + - ", writeTimeout=" + sockWriteTimeout + ']'); - - if (log.isDebugEnabled()) - log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() + - ", writeTimeout=" + sockWriteTimeout + ']'); - - ses.close(); - } - - @Override public void onConnected(GridNioSession ses) { - if (ses.accepted()) { - if (log.isDebugEnabled()) - log.debug("Sending local node ID to newly accepted session: " + ses); - - ses.send(nodeIdMsg); - } - } - - @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { - UUID id = ses.meta(NODE_ID_META); - - if (id != null) { - GridCommunicationClient rmv = clients.get(id); - - if (rmv instanceof GridTcpNioCommunicationClient && - ((GridTcpNioCommunicationClient)rmv).session() == ses && - clients.remove(id, rmv)) { - rmv.forceClose(); - - if (!isNodeStopping()) { - GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor(); - - if (recoveryData != null) { - if (recoveryData.nodeAlive(getSpiContext().node(id))) { - if (!recoveryData.messagesFutures().isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Session was closed but there are unacknowledged messages, " + - "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']'); - - recoveryWorker.addReconnectRequest(recoveryData); - } - } - else - recoveryData.onNodeLeft(); - } - } - } - - CommunicationListener<Message> lsnr0 = lsnr; - - if (lsnr0 != null) - lsnr0.onDisconnected(id); - } - } - - @Override public void onMessage(GridNioSession ses, Message msg) { - UUID sndId = ses.meta(NODE_ID_META); - - if (sndId == null) { - assert ses.accepted(); - - if (msg instanceof NodeIdMessage) - sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0); - else { - assert msg instanceof HandshakeMessage : msg; - - sndId = ((HandshakeMessage)msg).nodeId(); - } - - if (log.isDebugEnabled()) - log.debug("Remote node ID received: " + sndId); - - final UUID old = ses.addMeta(NODE_ID_META, sndId); - - assert old == null; - - final ClusterNode rmtNode = getSpiContext().node(sndId); - - if (rmtNode == null) { - ses.close(); - - return; - } - - ClusterNode locNode = getSpiContext().localNode(); - - if (ses.remoteAddress() == null) - return; - - GridCommunicationClient oldClient = clients.get(sndId); - - if (oldClient != null) { - if (oldClient instanceof GridTcpNioCommunicationClient) { - if (log.isDebugEnabled()) - log.debug("Received incoming connection when already connected " + - "to this node, rejecting [locNode=" + locNode.id() + - ", rmtNode=" + sndId + ']'); - - ses.send(new RecoveryLastReceivedMessage(-1)); - - return; - } - } - - GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>(); - - GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut); - - assert msg instanceof HandshakeMessage : msg; - - HandshakeMessage msg0 = (HandshakeMessage)msg; - - final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode); - - if (oldFut == null) { - oldClient = clients.get(sndId); - - if (oldClient != null) { - if (oldClient instanceof GridTcpNioCommunicationClient) { - if (log.isDebugEnabled()) - log.debug("Received incoming connection when already connected " + - "to this node, rejecting [locNode=" + locNode.id() + - ", rmtNode=" + sndId + ']'); - - ses.send(new RecoveryLastReceivedMessage(-1)); - - return; - } - } - - boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut)); - - if (log.isDebugEnabled()) - log.debug("Received incoming connection from remote node " + - "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']'); - - if (reserved) { - try { - GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg0.received(), true); - - fut.onDone(client); - } - finally { - clientFuts.remove(rmtNode.id(), fut); - } - } - } - else { - if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) { - if (log.isDebugEnabled()) { - log.debug("Received incoming connection from remote node while " + - "connecting to this node, rejecting [locNode=" + locNode.id() + - ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() + - ", rmtNodeOrder=" + rmtNode.order() + ']'); - } - - ses.send(new RecoveryLastReceivedMessage(-1)); - } - else { - boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut)); - - if (reserved) { - GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg0.received(), true); - - fut.onDone(client); - } - } - } - } - else { - rcvdMsgsCnt.increment(); - - GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor(); - - if (recovery != null) { - if (msg instanceof RecoveryLastReceivedMessage) { - RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg; - - if (log.isDebugEnabled()) - log.debug("Received recovery acknowledgement [rmtNode=" + sndId + - ", rcvCnt=" + msg0.received() + ']'); - - recovery.ackReceived(msg0.received()); - - return; - } - else { - long rcvCnt = recovery.onReceived(); - - if (rcvCnt % ackSndThreshold == 0) { - if (log.isDebugEnabled()) - log.debug("Send recovery acknowledgement [rmtNode=" + sndId + - ", rcvCnt=" + rcvCnt + ']'); - - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt)); - - recovery.lastAcknowledged(rcvCnt); - } - } - } - - IgniteRunnable c; - - if (msgQueueLimit > 0) { - GridNioMessageTracker tracker = ses.meta(TRACKER_META); - - if (tracker == null) { - GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker = - new GridNioMessageTracker(ses, msgQueueLimit)); - - assert old == null; - } - - tracker.onMessageReceived(); - - c = tracker; - } - else - c = NOOP; - - notifyListener(sndId, msg, c); - } - } - - /** - * @param recovery Recovery descriptor. - * @param ses Session. - * @param node Node. - * @param rcvCnt Number of received messages.. - * @param sndRes If {@code true} sends response for recovery handshake. - * @return Client. - */ - private GridTcpNioCommunicationClient connected( - GridNioRecoveryDescriptor recovery, - GridNioSession ses, - ClusterNode node, - long rcvCnt, - boolean sndRes) { - recovery.onHandshake(rcvCnt); - - ses.recoveryDescriptor(recovery); - - nioSrvr.resend(ses); - - if (sndRes) - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.receivedCount())); - - recovery.connected(); - - GridTcpNioCommunicationClient client = new GridTcpNioCommunicationClient(ses, log); - - GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); - - assert oldClient == null : "Client already created [node=" + node + ", client=" + client + - ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']'; - - return client; - } - - /** - * - */ - @SuppressWarnings("PackageVisibleInnerClass") - class ConnectClosure implements IgniteInClosure<Boolean> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final GridNioSession ses; - - /** */ - private final GridNioRecoveryDescriptor recoveryDesc; - - /** */ - private final ClusterNode rmtNode; - - /** */ - private final HandshakeMessage msg; - - /** */ - private final GridFutureAdapter<GridCommunicationClient> fut; - - /** - * @param ses Incoming session. - * @param recoveryDesc Recovery descriptor. - * @param rmtNode Remote node. - * @param msg Handshake message. - * @param fut Connect future. - */ - ConnectClosure(GridNioSession ses, - GridNioRecoveryDescriptor recoveryDesc, - ClusterNode rmtNode, - HandshakeMessage msg, - GridFutureAdapter<GridCommunicationClient> fut) { - this.ses = ses; - this.recoveryDesc = recoveryDesc; - this.rmtNode = rmtNode; - this.msg = msg; - this.fut = fut; - } - - /** {@inheritDoc} */ - @Override public void apply(Boolean success) { - if (success) { - IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> msgFut) { - try { - msgFut.get(); - - GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg.received(), false); - - fut.onDone(client); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send recovery handshake " + - "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); - - recoveryDesc.release(); - - fut.onDone(); - } - finally { - clientFuts.remove(rmtNode.id(), fut); - } - } - }; - - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.receivedCount()), lsnr); - } - else { - try { - fut.onDone(); - } - finally { - clientFuts.remove(rmtNode.id(), fut); - } - } - } - } - }; + private final GridNioServerListener<Message> srvLsnr = new ServerListener(); /** Logger. */ @LoggerResource @@ -1362,7 +1011,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // If configured TCP port is busy, find first available in range. for (int port = locPort; port < locPort + locPortRange; port++) { try { - MessageFactory messageFactory = new MessageFactory() { + MessageFactory msgFactory = new MessageFactory() { private MessageFactory impl; @Nullable @Override public Message create(byte type) { @@ -1375,7 +1024,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } }; - MessageFormatter messageFormatter = new MessageFormatter() { + MessageFormatter msgFormatter = new MessageFormatter() { private MessageFormatter impl; @Override public MessageWriter writer() { @@ -1397,7 +1046,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } }; - GridDirectParser parser = new GridDirectParser(messageFactory, messageFormatter); + GridDirectParser parser = new GridDirectParser(msgFactory, msgFormatter); IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() { @Override public boolean apply(Message msg) { @@ -1424,7 +1073,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .writeTimeout(sockWriteTimeout) .filters(new GridNioCodecFilter(parser, log, true), new GridConnectionBytesVerifyFilter(log)) - .messageFormatter(messageFormatter) + .messageFormatter(msgFormatter) .skipRecoveryPredicate(skipRecoveryPred) .build(); @@ -1955,7 +1604,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ch.write(ByteBuffer.wrap(U.IGNITE_HEADER)); if (recovery != null) { - HandshakeMessage msg = new HandshakeMessage(getLocalNodeId(), + Message msg = new HandshakeMessage(getLocalNodeId(), recovery.incrementConnectCount(), recovery.receivedCount()); @@ -1975,7 +1624,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ch.write(buf); } else - ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType)); + ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType())); if (recovery != null) { if (log.isDebugEnabled()) @@ -2540,7 +2189,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (obj instanceof GridCommunicationClient) ((GridCommunicationClient)obj).forceClose(); else - U.closeQuiet((AbstractInterruptibleChannel)obj); + U.closeQuiet((AutoCloseable)obj); return true; } @@ -2623,7 +2272,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter try { out.write(U.IGNITE_HEADER); out.write(NODE_ID_MSG_TYPE); - out.write(nodeIdMsg.nodeIdBytes); + out.write(nodeIdMsg.nodeIdBytes()); out.flush(); @@ -2821,75 +2470,360 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** - * Node ID message. + * */ - @SuppressWarnings("PublicInnerClass") - public static class NodeIdMessage implements Message { - /** */ - private static final long serialVersionUID = 0L; + private class ServerListener extends GridNioServerListenerAdapter<Message> { + /** {@inheritDoc} */ + @Override public void onSessionWriteTimeout(GridNioSession ses) { + LT.warn(log, null, "Communication SPI Session write timed out (consider increasing " + + "'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() + + ", writeTimeout=" + sockWriteTimeout + ']'); - /** */ - private byte[] nodeIdBytes; + if (log.isDebugEnabled()) + log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() + + ", writeTimeout=" + sockWriteTimeout + ']'); - /** */ - private byte[] nodeIdBytesWithType; + ses.close(); + } - /** */ - public NodeIdMessage() { - // No-op. + /** {@inheritDoc} */ + @Override public void onConnected(GridNioSession ses) { + if (ses.accepted()) { + if (log.isDebugEnabled()) + log.debug("Sending local node ID to newly accepted session: " + ses); + + ses.send(nodeIdMsg); + } } - /** - * @param nodeId Node ID. - */ - private NodeIdMessage(UUID nodeId) { - nodeIdBytes = U.uuidToBytes(nodeId); + /** {@inheritDoc} */ + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + UUID id = ses.meta(NODE_ID_META); - nodeIdBytesWithType = new byte[nodeIdBytes.length + 1]; + if (id != null) { + GridCommunicationClient rmv = clients.get(id); - nodeIdBytesWithType[0] = NODE_ID_MSG_TYPE; + if (rmv instanceof GridTcpNioCommunicationClient && + ((GridTcpNioCommunicationClient)rmv).session() == ses && + clients.remove(id, rmv)) { + rmv.forceClose(); - System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 1, nodeIdBytes.length); - } + if (!isNodeStopping()) { + GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor(); - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - assert nodeIdBytes.length == 16; + if (recoveryData != null) { + if (recoveryData.nodeAlive(getSpiContext().node(id))) { + if (!recoveryData.messagesFutures().isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Session was closed but there are unacknowledged messages, " + + "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']'); - if (buf.remaining() < 17) - return false; + recoveryWorker.addReconnectRequest(recoveryData); + } + } + else + recoveryData.onNodeLeft(); + } + } + } - buf.put(NODE_ID_MSG_TYPE); - buf.put(nodeIdBytes); + CommunicationListener<Message> lsnr0 = lsnr; - return true; + if (lsnr0 != null) + lsnr0.onDisconnected(id); + } } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - if (buf.remaining() < 16) - return false; + @Override public void onMessage(GridNioSession ses, Message msg) { + UUID sndId = ses.meta(NODE_ID_META); - nodeIdBytes = new byte[16]; + if (sndId == null) { + assert ses.accepted(); - buf.get(nodeIdBytes); + if (msg instanceof NodeIdMessage) + sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes(), 0); + else { + assert msg instanceof HandshakeMessage : msg; - return true; - } + sndId = ((HandshakeMessage)msg).nodeId(); + } - /** {@inheritDoc} */ - @Override public byte directType() { - return NODE_ID_MSG_TYPE; + if (log.isDebugEnabled()) + log.debug("Remote node ID received: " + sndId); + + final UUID old = ses.addMeta(NODE_ID_META, sndId); + + assert old == null; + + final ClusterNode rmtNode = getSpiContext().node(sndId); + + if (rmtNode == null) { + ses.close(); + + return; + } + + ClusterNode locNode = getSpiContext().localNode(); + + if (ses.remoteAddress() == null) + return; + + GridCommunicationClient oldClient = clients.get(sndId); + + if (oldClient != null) { + if (oldClient instanceof GridTcpNioCommunicationClient) { + if (log.isDebugEnabled()) + log.debug("Received incoming connection when already connected " + + "to this node, rejecting [locNode=" + locNode.id() + + ", rmtNode=" + sndId + ']'); + + ses.send(new RecoveryLastReceivedMessage(-1)); + + return; + } + } + + GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>(); + + GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut); + + assert msg instanceof HandshakeMessage : msg; + + HandshakeMessage msg0 = (HandshakeMessage)msg; + + final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode); + + if (oldFut == null) { + oldClient = clients.get(sndId); + + if (oldClient != null) { + if (oldClient instanceof GridTcpNioCommunicationClient) { + if (log.isDebugEnabled()) + log.debug("Received incoming connection when already connected " + + "to this node, rejecting [locNode=" + locNode.id() + + ", rmtNode=" + sndId + ']'); + + ses.send(new RecoveryLastReceivedMessage(-1)); + + return; + } + } + + boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), + new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut)); + + if (log.isDebugEnabled()) + log.debug("Received incoming connection from remote node " + + "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']'); + + if (reserved) { + try { + GridTcpNioCommunicationClient client = + connected(recoveryDesc, ses, rmtNode, msg0.received(), true); + + fut.onDone(client); + } + finally { + clientFuts.remove(rmtNode.id(), fut); + } + } + } + else { + if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) { + if (log.isDebugEnabled()) { + log.debug("Received incoming connection from remote node while " + + "connecting to this node, rejecting [locNode=" + locNode.id() + + ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() + + ", rmtNodeOrder=" + rmtNode.order() + ']'); + } + + ses.send(new RecoveryLastReceivedMessage(-1)); + } + else { + boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), + new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut)); + + if (reserved) { + GridTcpNioCommunicationClient client = + connected(recoveryDesc, ses, rmtNode, msg0.received(), true); + + fut.onDone(client); + } + } + } + } + else { + rcvdMsgsCnt.increment(); + + GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor(); + + if (recovery != null) { + if (msg instanceof RecoveryLastReceivedMessage) { + RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg; + + if (log.isDebugEnabled()) + log.debug("Received recovery acknowledgement [rmtNode=" + sndId + + ", rcvCnt=" + msg0.received() + ']'); + + recovery.ackReceived(msg0.received()); + + return; + } + else { + long rcvCnt = recovery.onReceived(); + + if (rcvCnt % ackSndThreshold == 0) { + if (log.isDebugEnabled()) + log.debug("Send recovery acknowledgement [rmtNode=" + sndId + + ", rcvCnt=" + rcvCnt + ']'); + + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt)); + + recovery.lastAcknowledged(rcvCnt); + } + } + } + + IgniteRunnable c; + + if (msgQueueLimit > 0) { + GridNioMessageTracker tracker = ses.meta(TRACKER_META); + + if (tracker == null) { + GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker = + new GridNioMessageTracker(ses, msgQueueLimit)); + + assert old == null; + } + + tracker.onMessageReceived(); + + c = tracker; + } + else + c = NOOP; + + notifyListener(sndId, msg, c); + } } - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 0; + /** + * @param recovery Recovery descriptor. + * @param ses Session. + * @param node Node. + * @param rcvCnt Number of received messages.. + * @param sndRes If {@code true} sends response for recovery handshake. + * @return Client. + */ + private GridTcpNioCommunicationClient connected( + GridNioRecoveryDescriptor recovery, + GridNioSession ses, + ClusterNode node, + long rcvCnt, + boolean sndRes) { + recovery.onHandshake(rcvCnt); + + ses.recoveryDescriptor(recovery); + + nioSrvr.resend(ses); + + if (sndRes) + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.receivedCount())); + + recovery.connected(); + + GridTcpNioCommunicationClient client = new GridTcpNioCommunicationClient(ses, log); + + GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); + + assert oldClient == null : "Client already created [node=" + node + ", client=" + client + + ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']'; + + return client; } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(NodeIdMessage.class, this); + /** + * + */ + @SuppressWarnings("PackageVisibleInnerClass") + class ConnectClosure implements IgniteInClosure<Boolean> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final GridNioSession ses; + + /** */ + private final GridNioRecoveryDescriptor recoveryDesc; + + /** */ + private final ClusterNode rmtNode; + + /** */ + private final HandshakeMessage msg; + + /** */ + private final GridFutureAdapter<GridCommunicationClient> fut; + + /** + * @param ses Incoming session. + * @param recoveryDesc Recovery descriptor. + * @param rmtNode Remote node. + * @param msg Handshake message. + * @param fut Connect future. + */ + ConnectClosure(GridNioSession ses, + GridNioRecoveryDescriptor recoveryDesc, + ClusterNode rmtNode, + HandshakeMessage msg, + GridFutureAdapter<GridCommunicationClient> fut) { + this.ses = ses; + this.recoveryDesc = recoveryDesc; + this.rmtNode = rmtNode; + this.msg = msg; + this.fut = fut; + } + + /** {@inheritDoc} */ + @Override public void apply(Boolean success) { + if (success) { + IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> msgFut) { + try { + msgFut.get(); + + GridTcpNioCommunicationClient client = + connected(recoveryDesc, ses, rmtNode, msg.received(), false); + + fut.onDone(client); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send recovery handshake " + + "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); + + recoveryDesc.release(); + + fut.onDone(); + } + finally { + clientFuts.remove(rmtNode.id(), fut); + } + } + }; + + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.receivedCount()), lsnr); + } + else { + try { + fut.onDone(); + } + finally { + clientFuts.remove(rmtNode.id(), fut); + } + } + } } } }
