Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-499 [created] a89dcff14


# IGNITE-499 Minor refactoring.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a89dcff1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a89dcff1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a89dcff1

Branch: refs/heads/ignite-499
Commit: a89dcff14ed5f09b2cee650f4e30e6311a9cda30
Parents: d471a0b
Author: sevdokimov <[email protected]>
Authored: Thu Apr 9 20:07:57 2015 +0300
Committer: sevdokimov <[email protected]>
Committed: Thu Apr 9 20:07:57 2015 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   2 +-
 .../spi/communication/tcp/NodeIdMessage.java    | 110 +++
 .../communication/tcp/TcpCommunicationSpi.java  | 744 +++++++++----------
 3 files changed, 450 insertions(+), 406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a89dcff1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index b82147b..93eb0c9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -71,7 +71,7 @@ public class GridIoMessageFactory implements MessageFactory {
 
         switch (type) {
             case TcpCommunicationSpi.NODE_ID_MSG_TYPE:
-                msg = new TcpCommunicationSpi.NodeIdMessage();
+                msg = new NodeIdMessage();
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a89dcff1/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
new file mode 100644
index 0000000..3beb26a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Message with node ID.
+ */
+public class NodeIdMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private byte[] nodeIdBytes;
+
+    /** */
+    private byte[] nodeIdBytesWithType;
+
+    /** */
+    public NodeIdMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param nodeId Node ID.
+     */
+    public NodeIdMessage(UUID nodeId) {
+        nodeIdBytes = U.uuidToBytes(nodeId);
+
+        nodeIdBytesWithType = new byte[nodeIdBytes.length + 1];
+
+        nodeIdBytesWithType[0] = TcpCommunicationSpi.NODE_ID_MSG_TYPE;
+
+        System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 1, 
nodeIdBytes.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        assert nodeIdBytes.length == 16;
+
+        if (buf.remaining() < 17)
+            return false;
+
+        buf.put(TcpCommunicationSpi.NODE_ID_MSG_TYPE);
+        buf.put(nodeIdBytes);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        if (buf.remaining() < 16)
+            return false;
+
+        nodeIdBytes = new byte[16];
+
+        buf.get(nodeIdBytes);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return TcpCommunicationSpi.NODE_ID_MSG_TYPE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 0;
+    }
+
+    /**
+     * @return Node id bytes with type.
+     */
+    public byte[] nodeIdBytesWithType() {
+        return nodeIdBytesWithType;
+    }
+
+    /**
+     * @return Node id bytes.
+     */
+    public byte[] nodeIdBytes() {
+        return nodeIdBytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(NodeIdMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a89dcff1/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index ff84e5b..9731060 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -41,7 +41,6 @@ import java.io.*;
 import java.net.*;
 import java.nio.*;
 import java.nio.channels.*;
-import java.nio.channels.spi.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -223,357 +222,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     public static final byte HANDSHAKE_MSG_TYPE = -3;
 
     /** Server listener. */
-    private final GridNioServerListener<Message> srvLsnr =
-        new GridNioServerListenerAdapter<Message>() {
-            @Override public void onSessionWriteTimeout(GridNioSession ses) {
-                LT.warn(log, null, "Communication SPI Session write timed out 
(consider increasing " +
-                    "'socketWriteTimeout' " + "configuration property) 
[remoteAddr=" + ses.remoteAddress() +
-                    ", writeTimeout=" + sockWriteTimeout + ']');
-
-                if (log.isDebugEnabled())
-                    log.debug("Closing communication SPI session on write 
timeout [remoteAddr=" + ses.remoteAddress() +
-                        ", writeTimeout=" + sockWriteTimeout + ']');
-
-                ses.close();
-            }
-
-            @Override public void onConnected(GridNioSession ses) {
-                if (ses.accepted()) {
-                    if (log.isDebugEnabled())
-                        log.debug("Sending local node ID to newly accepted 
session: " + ses);
-
-                    ses.send(nodeIdMsg);
-                }
-            }
-
-            @Override public void onDisconnected(GridNioSession ses, @Nullable 
Exception e) {
-                UUID id = ses.meta(NODE_ID_META);
-
-                if (id != null) {
-                    GridCommunicationClient rmv = clients.get(id);
-
-                    if (rmv instanceof GridTcpNioCommunicationClient &&
-                        ((GridTcpNioCommunicationClient)rmv).session() == ses 
&&
-                        clients.remove(id, rmv)) {
-                        rmv.forceClose();
-
-                        if (!isNodeStopping()) {
-                            GridNioRecoveryDescriptor recoveryData = 
ses.recoveryDescriptor();
-
-                            if (recoveryData != null) {
-                                if 
(recoveryData.nodeAlive(getSpiContext().node(id))) {
-                                    if 
(!recoveryData.messagesFutures().isEmpty()) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Session was closed but 
there are unacknowledged messages, " +
-                                                "will try to reconnect 
[rmtNode=" + recoveryData.node().id() + ']');
-
-                                        
recoveryWorker.addReconnectRequest(recoveryData);
-                                    }
-                                }
-                                else
-                                    recoveryData.onNodeLeft();
-                            }
-                        }
-                    }
-
-                    CommunicationListener<Message> lsnr0 = lsnr;
-
-                    if (lsnr0 != null)
-                        lsnr0.onDisconnected(id);
-                }
-            }
-
-            @Override public void onMessage(GridNioSession ses, Message msg) {
-                UUID sndId = ses.meta(NODE_ID_META);
-
-                if (sndId == null) {
-                    assert ses.accepted();
-
-                    if (msg instanceof NodeIdMessage)
-                        sndId = U.bytesToUuid(((NodeIdMessage) 
msg).nodeIdBytes, 0);
-                    else {
-                        assert msg instanceof HandshakeMessage : msg;
-
-                        sndId = ((HandshakeMessage)msg).nodeId();
-                    }
-
-                    if (log.isDebugEnabled())
-                        log.debug("Remote node ID received: " + sndId);
-
-                    final UUID old = ses.addMeta(NODE_ID_META, sndId);
-
-                    assert old == null;
-
-                    final ClusterNode rmtNode = getSpiContext().node(sndId);
-
-                    if (rmtNode == null) {
-                        ses.close();
-
-                        return;
-                    }
-
-                    ClusterNode locNode = getSpiContext().localNode();
-
-                    if (ses.remoteAddress() == null)
-                        return;
-
-                    GridCommunicationClient oldClient = clients.get(sndId);
-
-                    if (oldClient != null) {
-                        if (oldClient instanceof 
GridTcpNioCommunicationClient) {
-                            if (log.isDebugEnabled())
-                                log.debug("Received incoming connection when 
already connected " +
-                                    "to this node, rejecting [locNode=" + 
locNode.id() +
-                                    ", rmtNode=" + sndId + ']');
-
-                            ses.send(new RecoveryLastReceivedMessage(-1));
-
-                            return;
-                        }
-                    }
-
-                    GridFutureAdapter<GridCommunicationClient> fut = new 
GridFutureAdapter<>();
-
-                    GridFutureAdapter<GridCommunicationClient> oldFut = 
clientFuts.putIfAbsent(sndId, fut);
-
-                    assert msg instanceof HandshakeMessage : msg;
-
-                    HandshakeMessage msg0 = (HandshakeMessage)msg;
-
-                    final GridNioRecoveryDescriptor recoveryDesc = 
recoveryDescriptor(rmtNode);
-
-                    if (oldFut == null) {
-                        oldClient = clients.get(sndId);
-
-                        if (oldClient != null) {
-                            if (oldClient instanceof 
GridTcpNioCommunicationClient) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Received incoming connection 
when already connected " +
-                                        "to this node, rejecting [locNode=" + 
locNode.id() +
-                                        ", rmtNode=" + sndId + ']');
-
-                                ses.send(new RecoveryLastReceivedMessage(-1));
-
-                                return;
-                            }
-                        }
-
-                        boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
-                            new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, fut));
-
-                        if (log.isDebugEnabled())
-                            log.debug("Received incoming connection from 
remote node " +
-                                "[rmtNode=" + rmtNode.id() + ", reserved=" + 
reserved + ']');
-
-                        if (reserved) {
-                            try {
-                                GridTcpNioCommunicationClient client =
-                                    connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true);
-
-                                fut.onDone(client);
-                            }
-                            finally {
-                                clientFuts.remove(rmtNode.id(), fut);
-                            }
-                        }
-                    }
-                    else {
-                        if (oldFut instanceof ConnectFuture && locNode.order() 
< rmtNode.order()) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("Received incoming connection from 
remote node while " +
-                                    "connecting to this node, rejecting 
[locNode=" + locNode.id() +
-                                    ", locNodeOrder=" + locNode.order() + ", 
rmtNode=" + rmtNode.id() +
-                                    ", rmtNodeOrder=" + rmtNode.order() + ']');
-                            }
-
-                            ses.send(new RecoveryLastReceivedMessage(-1));
-                        }
-                        else {
-                            boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
-                                new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, fut));
-
-                            if (reserved) {
-                                GridTcpNioCommunicationClient client =
-                                    connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true);
-
-                                fut.onDone(client);
-                            }
-                        }
-                    }
-                }
-                else {
-                    rcvdMsgsCnt.increment();
-
-                    GridNioRecoveryDescriptor recovery = 
ses.recoveryDescriptor();
-
-                    if (recovery != null) {
-                        if (msg instanceof RecoveryLastReceivedMessage) {
-                            RecoveryLastReceivedMessage msg0 = 
(RecoveryLastReceivedMessage)msg;
-
-                            if (log.isDebugEnabled())
-                                log.debug("Received recovery acknowledgement 
[rmtNode=" + sndId +
-                                    ", rcvCnt=" + msg0.received() + ']');
-
-                            recovery.ackReceived(msg0.received());
-
-                            return;
-                        }
-                        else {
-                            long rcvCnt = recovery.onReceived();
-
-                            if (rcvCnt % ackSndThreshold == 0) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Send recovery acknowledgement 
[rmtNode=" + sndId +
-                                        ", rcvCnt=" + rcvCnt + ']');
-
-                                nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(rcvCnt));
-
-                                recovery.lastAcknowledged(rcvCnt);
-                            }
-                        }
-                    }
-
-                    IgniteRunnable c;
-
-                    if (msgQueueLimit > 0) {
-                        GridNioMessageTracker tracker = ses.meta(TRACKER_META);
-
-                        if (tracker == null) {
-                            GridNioMessageTracker old = 
ses.addMeta(TRACKER_META, tracker =
-                                new GridNioMessageTracker(ses, msgQueueLimit));
-
-                            assert old == null;
-                        }
-
-                        tracker.onMessageReceived();
-
-                        c = tracker;
-                    }
-                    else
-                        c = NOOP;
-
-                    notifyListener(sndId, msg, c);
-                }
-            }
-
-            /**
-             * @param recovery Recovery descriptor.
-             * @param ses Session.
-             * @param node Node.
-             * @param rcvCnt Number of received messages..
-             * @param sndRes If {@code true} sends response for recovery 
handshake.
-             * @return Client.
-             */
-            private GridTcpNioCommunicationClient connected(
-                GridNioRecoveryDescriptor recovery,
-                GridNioSession ses,
-                ClusterNode node,
-                long rcvCnt,
-                boolean sndRes) {
-                recovery.onHandshake(rcvCnt);
-
-                ses.recoveryDescriptor(recovery);
-
-                nioSrvr.resend(ses);
-
-                if (sndRes)
-                    nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(recovery.receivedCount()));
-
-                recovery.connected();
-
-                GridTcpNioCommunicationClient client = new 
GridTcpNioCommunicationClient(ses, log);
-
-                GridCommunicationClient oldClient = 
clients.putIfAbsent(node.id(), client);
-
-                assert oldClient == null : "Client already created [node=" + 
node + ", client=" + client +
-                        ", oldClient=" + oldClient + ", recoveryDesc=" + 
recovery + ']';
-
-                return client;
-            }
-
-            /**
-             *
-             */
-            @SuppressWarnings("PackageVisibleInnerClass")
-            class ConnectClosure implements IgniteInClosure<Boolean> {
-                /** */
-                private static final long serialVersionUID = 0L;
-
-                /** */
-                private final GridNioSession ses;
-
-                /** */
-                private final GridNioRecoveryDescriptor recoveryDesc;
-
-                /** */
-                private final ClusterNode rmtNode;
-
-                /** */
-                private final HandshakeMessage msg;
-
-                /** */
-                private final GridFutureAdapter<GridCommunicationClient> fut;
-
-                /**
-                 * @param ses Incoming session.
-                 * @param recoveryDesc Recovery descriptor.
-                 * @param rmtNode Remote node.
-                 * @param msg Handshake message.
-                 * @param fut Connect future.
-                 */
-                ConnectClosure(GridNioSession ses,
-                    GridNioRecoveryDescriptor recoveryDesc,
-                    ClusterNode rmtNode,
-                    HandshakeMessage msg,
-                    GridFutureAdapter<GridCommunicationClient> fut) {
-                    this.ses = ses;
-                    this.recoveryDesc = recoveryDesc;
-                    this.rmtNode = rmtNode;
-                    this.msg = msg;
-                    this.fut = fut;
-                }
-
-                /** {@inheritDoc} */
-                @Override public void apply(Boolean success) {
-                    if (success) {
-                        IgniteInClosure<IgniteInternalFuture<?>> lsnr = new 
IgniteInClosure<IgniteInternalFuture<?>>() {
-                            @Override public void 
apply(IgniteInternalFuture<?> msgFut) {
-                                try {
-                                    msgFut.get();
-
-                                    GridTcpNioCommunicationClient client =
-                                        connected(recoveryDesc, ses, rmtNode, 
msg.received(), false);
-
-                                    fut.onDone(client);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Failed to send recovery 
handshake " +
-                                            "[rmtNode=" + rmtNode.id() + ", 
err=" + e + ']');
-
-                                    recoveryDesc.release();
-
-                                    fut.onDone();
-                                }
-                                finally {
-                                    clientFuts.remove(rmtNode.id(), fut);
-                                }
-                            }
-                        };
-
-                        nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(recoveryDesc.receivedCount()), lsnr);
-                    }
-                    else {
-                        try {
-                            fut.onDone();
-                        }
-                        finally {
-                            clientFuts.remove(rmtNode.id(), fut);
-                        }
-                    }
-                }
-            }
-        };
+    private final GridNioServerListener<Message> srvLsnr = new 
ServerListener();
 
     /** Logger. */
     @LoggerResource
@@ -1362,7 +1011,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         // If configured TCP port is busy, find first available in range.
         for (int port = locPort; port < locPort + locPortRange; port++) {
             try {
-                MessageFactory messageFactory = new MessageFactory() {
+                MessageFactory msgFactory = new MessageFactory() {
                     private MessageFactory impl;
 
                     @Nullable @Override public Message create(byte type) {
@@ -1375,7 +1024,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                 };
 
-                MessageFormatter messageFormatter = new MessageFormatter() {
+                MessageFormatter msgFormatter = new MessageFormatter() {
                     private MessageFormatter impl;
 
                     @Override public MessageWriter writer() {
@@ -1397,7 +1046,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                 };
 
-                GridDirectParser parser = new GridDirectParser(messageFactory, 
messageFormatter);
+                GridDirectParser parser = new GridDirectParser(msgFactory, 
msgFormatter);
 
                 IgnitePredicate<Message> skipRecoveryPred = new 
IgnitePredicate<Message>() {
                     @Override public boolean apply(Message msg) {
@@ -1424,7 +1073,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .writeTimeout(sockWriteTimeout)
                         .filters(new GridNioCodecFilter(parser, log, true),
                             new GridConnectionBytesVerifyFilter(log))
-                        .messageFormatter(messageFormatter)
+                        .messageFormatter(msgFormatter)
                         .skipRecoveryPredicate(skipRecoveryPred)
                         .build();
 
@@ -1955,7 +1604,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
 
                     if (recovery != null) {
-                        HandshakeMessage msg = new 
HandshakeMessage(getLocalNodeId(),
+                        Message msg = new HandshakeMessage(getLocalNodeId(),
                             recovery.incrementConnectCount(),
                             recovery.receivedCount());
 
@@ -1975,7 +1624,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         ch.write(buf);
                     }
                     else
-                        
ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType));
+                        
ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType()));
 
                     if (recovery != null) {
                         if (log.isDebugEnabled())
@@ -2540,7 +2189,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 if (obj instanceof GridCommunicationClient)
                     ((GridCommunicationClient)obj).forceClose();
                 else
-                    U.closeQuiet((AbstractInterruptibleChannel)obj);
+                    U.closeQuiet((AutoCloseable)obj);
 
                 return true;
             }
@@ -2623,7 +2272,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             try {
                 out.write(U.IGNITE_HEADER);
                 out.write(NODE_ID_MSG_TYPE);
-                out.write(nodeIdMsg.nodeIdBytes);
+                out.write(nodeIdMsg.nodeIdBytes());
 
                 out.flush();
 
@@ -2821,75 +2470,360 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
     }
 
     /**
-     * Node ID message.
+     *
      */
-    @SuppressWarnings("PublicInnerClass")
-    public static class NodeIdMessage implements Message {
-        /** */
-        private static final long serialVersionUID = 0L;
+    private class ServerListener extends GridNioServerListenerAdapter<Message> 
{
+        /** {@inheritDoc} */
+        @Override public void onSessionWriteTimeout(GridNioSession ses) {
+            LT.warn(log, null, "Communication SPI Session write timed out 
(consider increasing " +
+                "'socketWriteTimeout' " + "configuration property) 
[remoteAddr=" + ses.remoteAddress() +
+                ", writeTimeout=" + sockWriteTimeout + ']');
 
-        /** */
-        private byte[] nodeIdBytes;
+            if (log.isDebugEnabled())
+                log.debug("Closing communication SPI session on write timeout 
[remoteAddr=" + ses.remoteAddress() +
+                    ", writeTimeout=" + sockWriteTimeout + ']');
 
-        /** */
-        private byte[] nodeIdBytesWithType;
+            ses.close();
+        }
 
-        /** */
-        public NodeIdMessage() {
-            // No-op.
+        /** {@inheritDoc} */
+        @Override public void onConnected(GridNioSession ses) {
+            if (ses.accepted()) {
+                if (log.isDebugEnabled())
+                    log.debug("Sending local node ID to newly accepted 
session: " + ses);
+
+                ses.send(nodeIdMsg);
+            }
         }
 
-        /**
-         * @param nodeId Node ID.
-         */
-        private NodeIdMessage(UUID nodeId) {
-            nodeIdBytes = U.uuidToBytes(nodeId);
+        /** {@inheritDoc} */
+        @Override public void onDisconnected(GridNioSession ses, @Nullable 
Exception e) {
+            UUID id = ses.meta(NODE_ID_META);
 
-            nodeIdBytesWithType = new byte[nodeIdBytes.length + 1];
+            if (id != null) {
+                GridCommunicationClient rmv = clients.get(id);
 
-            nodeIdBytesWithType[0] = NODE_ID_MSG_TYPE;
+                if (rmv instanceof GridTcpNioCommunicationClient &&
+                    ((GridTcpNioCommunicationClient)rmv).session() == ses &&
+                    clients.remove(id, rmv)) {
+                    rmv.forceClose();
 
-            System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 1, 
nodeIdBytes.length);
-        }
+                    if (!isNodeStopping()) {
+                        GridNioRecoveryDescriptor recoveryData = 
ses.recoveryDescriptor();
 
-        /** {@inheritDoc} */
-        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) 
{
-            assert nodeIdBytes.length == 16;
+                        if (recoveryData != null) {
+                            if 
(recoveryData.nodeAlive(getSpiContext().node(id))) {
+                                if (!recoveryData.messagesFutures().isEmpty()) 
{
+                                    if (log.isDebugEnabled())
+                                        log.debug("Session was closed but 
there are unacknowledged messages, " +
+                                            "will try to reconnect [rmtNode=" 
+ recoveryData.node().id() + ']');
 
-            if (buf.remaining() < 17)
-                return false;
+                                    
recoveryWorker.addReconnectRequest(recoveryData);
+                                }
+                            }
+                            else
+                                recoveryData.onNodeLeft();
+                        }
+                    }
+                }
 
-            buf.put(NODE_ID_MSG_TYPE);
-            buf.put(nodeIdBytes);
+                CommunicationListener<Message> lsnr0 = lsnr;
 
-            return true;
+                if (lsnr0 != null)
+                    lsnr0.onDisconnected(id);
+            }
         }
 
         /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf, MessageReader 
reader) {
-            if (buf.remaining() < 16)
-                return false;
+        @Override public void onMessage(GridNioSession ses, Message msg) {
+            UUID sndId = ses.meta(NODE_ID_META);
 
-            nodeIdBytes = new byte[16];
+            if (sndId == null) {
+                assert ses.accepted();
 
-            buf.get(nodeIdBytes);
+                if (msg instanceof NodeIdMessage)
+                    sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes(), 
0);
+                else {
+                    assert msg instanceof HandshakeMessage : msg;
 
-            return true;
-        }
+                    sndId = ((HandshakeMessage)msg).nodeId();
+                }
 
-        /** {@inheritDoc} */
-        @Override public byte directType() {
-            return NODE_ID_MSG_TYPE;
+                if (log.isDebugEnabled())
+                    log.debug("Remote node ID received: " + sndId);
+
+                final UUID old = ses.addMeta(NODE_ID_META, sndId);
+
+                assert old == null;
+
+                final ClusterNode rmtNode = getSpiContext().node(sndId);
+
+                if (rmtNode == null) {
+                    ses.close();
+
+                    return;
+                }
+
+                ClusterNode locNode = getSpiContext().localNode();
+
+                if (ses.remoteAddress() == null)
+                    return;
+
+                GridCommunicationClient oldClient = clients.get(sndId);
+
+                if (oldClient != null) {
+                    if (oldClient instanceof GridTcpNioCommunicationClient) {
+                        if (log.isDebugEnabled())
+                            log.debug("Received incoming connection when 
already connected " +
+                                "to this node, rejecting [locNode=" + 
locNode.id() +
+                                ", rmtNode=" + sndId + ']');
+
+                        ses.send(new RecoveryLastReceivedMessage(-1));
+
+                        return;
+                    }
+                }
+
+                GridFutureAdapter<GridCommunicationClient> fut = new 
GridFutureAdapter<>();
+
+                GridFutureAdapter<GridCommunicationClient> oldFut = 
clientFuts.putIfAbsent(sndId, fut);
+
+                assert msg instanceof HandshakeMessage : msg;
+
+                HandshakeMessage msg0 = (HandshakeMessage)msg;
+
+                final GridNioRecoveryDescriptor recoveryDesc = 
recoveryDescriptor(rmtNode);
+
+                if (oldFut == null) {
+                    oldClient = clients.get(sndId);
+
+                    if (oldClient != null) {
+                        if (oldClient instanceof 
GridTcpNioCommunicationClient) {
+                            if (log.isDebugEnabled())
+                                log.debug("Received incoming connection when 
already connected " +
+                                    "to this node, rejecting [locNode=" + 
locNode.id() +
+                                    ", rmtNode=" + sndId + ']');
+
+                            ses.send(new RecoveryLastReceivedMessage(-1));
+
+                            return;
+                        }
+                    }
+
+                    boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
+                        new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, 
fut));
+
+                    if (log.isDebugEnabled())
+                        log.debug("Received incoming connection from remote 
node " +
+                            "[rmtNode=" + rmtNode.id() + ", reserved=" + 
reserved + ']');
+
+                    if (reserved) {
+                        try {
+                            GridTcpNioCommunicationClient client =
+                                connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true);
+
+                            fut.onDone(client);
+                        }
+                        finally {
+                            clientFuts.remove(rmtNode.id(), fut);
+                        }
+                    }
+                }
+                else {
+                    if (oldFut instanceof ConnectFuture && locNode.order() < 
rmtNode.order()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Received incoming connection from 
remote node while " +
+                                "connecting to this node, rejecting [locNode=" 
+ locNode.id() +
+                                ", locNodeOrder=" + locNode.order() + ", 
rmtNode=" + rmtNode.id() +
+                                ", rmtNodeOrder=" + rmtNode.order() + ']');
+                        }
+
+                        ses.send(new RecoveryLastReceivedMessage(-1));
+                    }
+                    else {
+                        boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
+                            new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, fut));
+
+                        if (reserved) {
+                            GridTcpNioCommunicationClient client =
+                                connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true);
+
+                            fut.onDone(client);
+                        }
+                    }
+                }
+            }
+            else {
+                rcvdMsgsCnt.increment();
+
+                GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+
+                if (recovery != null) {
+                    if (msg instanceof RecoveryLastReceivedMessage) {
+                        RecoveryLastReceivedMessage msg0 = 
(RecoveryLastReceivedMessage)msg;
+
+                        if (log.isDebugEnabled())
+                            log.debug("Received recovery acknowledgement 
[rmtNode=" + sndId +
+                                ", rcvCnt=" + msg0.received() + ']');
+
+                        recovery.ackReceived(msg0.received());
+
+                        return;
+                    }
+                    else {
+                        long rcvCnt = recovery.onReceived();
+
+                        if (rcvCnt % ackSndThreshold == 0) {
+                            if (log.isDebugEnabled())
+                                log.debug("Send recovery acknowledgement 
[rmtNode=" + sndId +
+                                    ", rcvCnt=" + rcvCnt + ']');
+
+                            nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(rcvCnt));
+
+                            recovery.lastAcknowledged(rcvCnt);
+                        }
+                    }
+                }
+
+                IgniteRunnable c;
+
+                if (msgQueueLimit > 0) {
+                    GridNioMessageTracker tracker = ses.meta(TRACKER_META);
+
+                    if (tracker == null) {
+                        GridNioMessageTracker old = ses.addMeta(TRACKER_META, 
tracker =
+                            new GridNioMessageTracker(ses, msgQueueLimit));
+
+                        assert old == null;
+                    }
+
+                    tracker.onMessageReceived();
+
+                    c = tracker;
+                }
+                else
+                    c = NOOP;
+
+                notifyListener(sndId, msg, c);
+            }
         }
 
-        /** {@inheritDoc} */
-        @Override public byte fieldsCount() {
-            return 0;
+        /**
+         * @param recovery Recovery descriptor.
+         * @param ses Session.
+         * @param node Node.
+         * @param rcvCnt Number of received messages..
+         * @param sndRes If {@code true} sends response for recovery handshake.
+         * @return Client.
+         */
+        private GridTcpNioCommunicationClient connected(
+            GridNioRecoveryDescriptor recovery,
+            GridNioSession ses,
+            ClusterNode node,
+            long rcvCnt,
+            boolean sndRes) {
+            recovery.onHandshake(rcvCnt);
+
+            ses.recoveryDescriptor(recovery);
+
+            nioSrvr.resend(ses);
+
+            if (sndRes)
+                nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(recovery.receivedCount()));
+
+            recovery.connected();
+
+            GridTcpNioCommunicationClient client = new 
GridTcpNioCommunicationClient(ses, log);
+
+            GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), 
client);
+
+            assert oldClient == null : "Client already created [node=" + node 
+ ", client=" + client +
+                    ", oldClient=" + oldClient + ", recoveryDesc=" + recovery 
+ ']';
+
+            return client;
         }
 
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(NodeIdMessage.class, this);
+        /**
+         *
+         */
+        @SuppressWarnings("PackageVisibleInnerClass")
+        class ConnectClosure implements IgniteInClosure<Boolean> {
+            /** */
+            private static final long serialVersionUID = 0L;
+
+            /** */
+            private final GridNioSession ses;
+
+            /** */
+            private final GridNioRecoveryDescriptor recoveryDesc;
+
+            /** */
+            private final ClusterNode rmtNode;
+
+            /** */
+            private final HandshakeMessage msg;
+
+            /** */
+            private final GridFutureAdapter<GridCommunicationClient> fut;
+
+            /**
+             * @param ses Incoming session.
+             * @param recoveryDesc Recovery descriptor.
+             * @param rmtNode Remote node.
+             * @param msg Handshake message.
+             * @param fut Connect future.
+             */
+            ConnectClosure(GridNioSession ses,
+                GridNioRecoveryDescriptor recoveryDesc,
+                ClusterNode rmtNode,
+                HandshakeMessage msg,
+                GridFutureAdapter<GridCommunicationClient> fut) {
+                this.ses = ses;
+                this.recoveryDesc = recoveryDesc;
+                this.rmtNode = rmtNode;
+                this.msg = msg;
+                this.fut = fut;
+            }
+
+            /** {@inheritDoc} */
+            @Override public void apply(Boolean success) {
+                if (success) {
+                    IgniteInClosure<IgniteInternalFuture<?>> lsnr = new 
IgniteInClosure<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> 
msgFut) {
+                            try {
+                                msgFut.get();
+
+                                GridTcpNioCommunicationClient client =
+                                    connected(recoveryDesc, ses, rmtNode, 
msg.received(), false);
+
+                                fut.onDone(client);
+                            }
+                            catch (IgniteCheckedException e) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to send recovery 
handshake " +
+                                        "[rmtNode=" + rmtNode.id() + ", err=" 
+ e + ']');
+
+                                recoveryDesc.release();
+
+                                fut.onDone();
+                            }
+                            finally {
+                                clientFuts.remove(rmtNode.id(), fut);
+                            }
+                        }
+                    };
+
+                    nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(recoveryDesc.receivedCount()), lsnr);
+                }
+                else {
+                    try {
+                        fut.onDone();
+                    }
+                    finally {
+                        clientFuts.remove(rmtNode.id(), fut);
+                    }
+                }
+            }
         }
     }
 }

Reply via email to