Minor: moved inner TcpCommunicationSpi messages to top-level classes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c576d5a0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c576d5a0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c576d5a0 Branch: refs/heads/ignite-zk-ce Commit: c576d5a0ab1bbaab4a1b23a2c03a21ef2a7b610c Parents: 061ec6a Author: sboikov <[email protected]> Authored: Fri Dec 15 11:28:18 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 15 11:28:18 2017 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 14 +- .../spi/communication/tcp/HandshakeMessage.java | 156 ++++++++ .../communication/tcp/HandshakeMessage2.java | 95 +++++ .../spi/communication/tcp/NodeIdMessage.java | 115 ++++++ .../tcp/RecoveryLastReceivedMessage.java | 113 ++++++ .../communication/tcp/TcpCommunicationSpi.java | 391 +------------------ 6 files changed, 492 insertions(+), 392 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 97e06bf..791dd91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -137,11 +137,11 @@ import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerResponse; import org.apache.ignite.internal.processors.igfs.IgfsSyncMessage; import org.apache.ignite.internal.processors.marshaller.MissingMappingRequestMessage; import org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage; -import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; +import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage; import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRequest; import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse; import org.apache.ignite.internal.util.GridByteArrayList; @@ -153,6 +153,10 @@ import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest; +import org.apache.ignite.spi.communication.tcp.HandshakeMessage; +import org.apache.ignite.spi.communication.tcp.HandshakeMessage2; +import org.apache.ignite.spi.communication.tcp.NodeIdMessage; +import org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.jsr166.ConcurrentHashMap8; @@ -221,7 +225,7 @@ public class GridIoMessageFactory implements MessageFactory { break; case -44: - msg = new TcpCommunicationSpi.HandshakeMessage2(); + msg = new HandshakeMessage2(); break; @@ -291,17 +295,17 @@ public class GridIoMessageFactory implements MessageFactory { break; case TcpCommunicationSpi.NODE_ID_MSG_TYPE: - msg = new TcpCommunicationSpi.NodeIdMessage(); + msg = new NodeIdMessage(); break; case TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE: - msg = new TcpCommunicationSpi.RecoveryLastReceivedMessage(); + msg = new RecoveryLastReceivedMessage(); break; case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE: - msg = new TcpCommunicationSpi.HandshakeMessage(); + msg = new HandshakeMessage(); break; http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java new file mode 100644 index 0000000..00e8e46 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Handshake message. + */ +public class HandshakeMessage implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Message body size in bytes. */ + private static final int MESSAGE_SIZE = 32; + + /** Full message size (with message type) in bytes. */ + public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE; + + /** */ + private UUID nodeId; + + /** */ + private long rcvCnt; + + /** */ + private long connectCnt; + + /** + * Default constructor required by {@link Message}. + */ + public HandshakeMessage() { + // No-op. + } + + /** + * @param nodeId Node ID. + * @param connectCnt Connect count. + * @param rcvCnt Number of received messages. + */ + public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) { + assert nodeId != null; + assert rcvCnt >= 0 : rcvCnt; + + this.nodeId = nodeId; + this.connectCnt = connectCnt; + this.rcvCnt = rcvCnt; + } + + /** + * @return Connection index. + */ + public int connectionIndex() { + return 0; + } + + /** + * @return Connect count. + */ + public long connectCount() { + return connectCnt; + } + + /** + * @return Number of received messages. + */ + public long received() { + return rcvCnt; + } + + /** + * @return Node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + if (buf.remaining() < MESSAGE_FULL_SIZE) + return false; + + TcpCommunicationSpi.writeMessageType(buf, directType()); + + byte[] bytes = U.uuidToBytes(nodeId); + + assert bytes.length == 16 : bytes.length; + + buf.put(bytes); + + buf.putLong(rcvCnt); + + buf.putLong(connectCnt); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + if (buf.remaining() < MESSAGE_SIZE) + return false; + + byte[] nodeIdBytes = new byte[NodeIdMessage.MESSAGE_SIZE]; + + buf.get(nodeIdBytes); + + nodeId = U.bytesToUuid(nodeIdBytes, 0); + + rcvCnt = buf.getLong(); + + connectCnt = buf.getLong(); + + return true; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TcpCommunicationSpi.HANDSHAKE_MSG_TYPE; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HandshakeMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java new file mode 100644 index 0000000..1e8fdd9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Updated handshake message. + */ +public class HandshakeMessage2 extends HandshakeMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private int connIdx; + + /** + * + */ + public HandshakeMessage2() { + // No-op. + } + + /** + * @param nodeId Node ID. + * @param connectCnt Connect count. + * @param rcvCnt Number of received messages. + * @param connIdx Connection index. + */ + HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) { + super(nodeId, connectCnt, rcvCnt); + + this.connIdx = connIdx; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -44; + } + + /** {@inheritDoc} */ + @Override public int connectionIndex() { + return connIdx; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + if (!super.writeTo(buf, writer)) + return false; + + if (buf.remaining() < 4) + return false; + + buf.putInt(connIdx); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + if (!super.readFrom(buf, reader)) + return false; + + if (buf.remaining() < 4) + return false; + + connIdx = buf.getInt(); + + return true; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HandshakeMessage2.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java new file mode 100644 index 0000000..d05b7ff --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Node ID message. + */ +public class NodeIdMessage implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Message body size (with message type) in bytes. */ + static final int MESSAGE_SIZE = 16; + + /** Full message size (with message type) in bytes. */ + public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE; + + /** */ + byte[] nodeIdBytes; + + /** */ + byte[] nodeIdBytesWithType; + + /** */ + public NodeIdMessage() { + // No-op. + } + + /** + * @param nodeId Node ID. + */ + NodeIdMessage(UUID nodeId) { + assert nodeId != null; + + nodeIdBytes = U.uuidToBytes(nodeId); + + assert nodeIdBytes.length == MESSAGE_SIZE : "Node ID size must be " + MESSAGE_SIZE; + + nodeIdBytesWithType = new byte[MESSAGE_FULL_SIZE]; + + nodeIdBytesWithType[0] = (byte)(TcpCommunicationSpi.NODE_ID_MSG_TYPE & 0xFF); + nodeIdBytesWithType[1] = (byte)((TcpCommunicationSpi.NODE_ID_MSG_TYPE >> 8) & 0xFF); + + System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 2, nodeIdBytes.length); + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + assert nodeIdBytes.length == MESSAGE_SIZE; + + if (buf.remaining() < MESSAGE_FULL_SIZE) + return false; + + TcpCommunicationSpi.writeMessageType(buf, directType()); + + buf.put(nodeIdBytes); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + if (buf.remaining() < MESSAGE_SIZE) + return false; + + nodeIdBytes = new byte[MESSAGE_SIZE]; + + buf.get(nodeIdBytes); + + return true; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TcpCommunicationSpi.NODE_ID_MSG_TYPE; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(NodeIdMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java new file mode 100644 index 0000000..5460084 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Recovery acknowledgment message. + */ +public class RecoveryLastReceivedMessage implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + static final long ALREADY_CONNECTED = -1; + + /** */ + static final long NODE_STOPPING = -2; + + /** Need wait. */ + static final long NEED_WAIT = -3; + + /** Message body size in bytes. */ + private static final int MESSAGE_SIZE = 8; + + /** Full message size (with message type) in bytes. */ + public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE; + + /** */ + private long rcvCnt; + + /** + * Default constructor required by {@link Message}. + */ + public RecoveryLastReceivedMessage() { + // No-op. + } + + /** + * @param rcvCnt Number of received messages. + */ + public RecoveryLastReceivedMessage(long rcvCnt) { + this.rcvCnt = rcvCnt; + } + + /** + * @return Number of received messages. + */ + public long received() { + return rcvCnt; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + if (buf.remaining() < MESSAGE_FULL_SIZE) + return false; + + TcpCommunicationSpi.writeMessageType(buf, directType()); + + buf.putLong(rcvCnt); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + if (buf.remaining() < MESSAGE_SIZE) + return false; + + rcvCnt = buf.getLong(); + + return true; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RecoveryLastReceivedMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 9290f24..1f0061f 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -141,9 +141,9 @@ import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META; -import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.ALREADY_CONNECTED; -import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.NEED_WAIT; -import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.NODE_STOPPING; +import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.ALREADY_CONNECTED; +import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.NEED_WAIT; +import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.NODE_STOPPING; /** * <tt>TcpCommunicationSpi</tt> is default communication SPI which uses @@ -3923,7 +3923,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * @param buf Byte buffer. * @param type Message type. */ - private static void writeMessageType(ByteBuffer buf, short type) { + static void writeMessageType(ByteBuffer buf, short type) { buf.put((byte)(type & 0xFF)); buf.put((byte)((type >> 8) & 0xFF)); } @@ -4440,389 +4440,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } /** - * Handshake message. - */ - @SuppressWarnings("PublicInnerClass") - public static class HandshakeMessage implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** Message body size in bytes. */ - private static final int MESSAGE_SIZE = 32; - - /** Full message size (with message type) in bytes. */ - public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE; - - /** */ - private UUID nodeId; - - /** */ - private long rcvCnt; - - /** */ - private long connectCnt; - - /** - * Default constructor required by {@link Message}. - */ - public HandshakeMessage() { - // No-op. - } - - /** - * @param nodeId Node ID. - * @param connectCnt Connect count. - * @param rcvCnt Number of received messages. - */ - public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) { - assert nodeId != null; - assert rcvCnt >= 0 : rcvCnt; - - this.nodeId = nodeId; - this.connectCnt = connectCnt; - this.rcvCnt = rcvCnt; - } - - /** - * @return Connection index. - */ - public int connectionIndex() { - return 0; - } - - /** - * @return Connect count. - */ - public long connectCount() { - return connectCnt; - } - - /** - * @return Number of received messages. - */ - public long received() { - return rcvCnt; - } - - /** - * @return Node ID. - */ - public UUID nodeId() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - if (buf.remaining() < MESSAGE_FULL_SIZE) - return false; - - writeMessageType(buf, directType()); - - byte[] bytes = U.uuidToBytes(nodeId); - - assert bytes.length == 16 : bytes.length; - - buf.put(bytes); - - buf.putLong(rcvCnt); - - buf.putLong(connectCnt); - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - if (buf.remaining() < MESSAGE_SIZE) - return false; - - byte[] nodeIdBytes = new byte[NodeIdMessage.MESSAGE_SIZE]; - - buf.get(nodeIdBytes); - - nodeId = U.bytesToUuid(nodeIdBytes, 0); - - rcvCnt = buf.getLong(); - - connectCnt = buf.getLong(); - - return true; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return HANDSHAKE_MSG_TYPE; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HandshakeMessage.class, this); - } - } - - /** - * Updated handshake message. - */ - @SuppressWarnings("PublicInnerClass") - public static class HandshakeMessage2 extends HandshakeMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private int connIdx; - - /** - * - */ - public HandshakeMessage2() { - // No-op. - } - - /** - * @param nodeId Node ID. - * @param connectCnt Connect count. - * @param rcvCnt Number of received messages. - * @param connIdx Connection index. - */ - HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) { - super(nodeId, connectCnt, rcvCnt); - - this.connIdx = connIdx; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return -44; - } - - /** {@inheritDoc} */ - @Override public int connectionIndex() { - return connIdx; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - if (!super.writeTo(buf, writer)) - return false; - - if (buf.remaining() < 4) - return false; - - buf.putInt(connIdx); - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - if (!super.readFrom(buf, reader)) - return false; - - if (buf.remaining() < 4) - return false; - - connIdx = buf.getInt(); - - return true; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HandshakeMessage2.class, this); - } - } - - /** - * Recovery acknowledgment message. - */ - @SuppressWarnings("PublicInnerClass") - public static class RecoveryLastReceivedMessage implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - static final long ALREADY_CONNECTED = -1; - - /** */ - static final long NODE_STOPPING = -2; - - /** Need wait. */ - static final long NEED_WAIT = -3; - - /** Message body size in bytes. */ - private static final int MESSAGE_SIZE = 8; - - /** Full message size (with message type) in bytes. */ - public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE; - - /** */ - private long rcvCnt; - - /** - * Default constructor required by {@link Message}. - */ - public RecoveryLastReceivedMessage() { - // No-op. - } - - /** - * @param rcvCnt Number of received messages. - */ - public RecoveryLastReceivedMessage(long rcvCnt) { - this.rcvCnt = rcvCnt; - } - - /** - * @return Number of received messages. - */ - public long received() { - return rcvCnt; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - if (buf.remaining() < MESSAGE_FULL_SIZE) - return false; - - writeMessageType(buf, directType()); - - buf.putLong(rcvCnt); - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - if (buf.remaining() < MESSAGE_SIZE) - return false; - - rcvCnt = buf.getLong(); - - return true; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return RECOVERY_LAST_ID_MSG_TYPE; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 0; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(RecoveryLastReceivedMessage.class, this); - } - } - - /** - * Node ID message. - */ - @SuppressWarnings("PublicInnerClass") - public static class NodeIdMessage implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** Message body size (with message type) in bytes. */ - private static final int MESSAGE_SIZE = 16; - - /** Full message size (with message type) in bytes. */ - public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE; - - /** */ - private byte[] nodeIdBytes; - - /** */ - private byte[] nodeIdBytesWithType; - - /** */ - public NodeIdMessage() { - // No-op. - } - - /** - * @param nodeId Node ID. - */ - private NodeIdMessage(UUID nodeId) { - assert nodeId != null; - - nodeIdBytes = U.uuidToBytes(nodeId); - - assert nodeIdBytes.length == MESSAGE_SIZE : "Node ID size must be " + MESSAGE_SIZE; - - nodeIdBytesWithType = new byte[MESSAGE_FULL_SIZE]; - - nodeIdBytesWithType[0] = (byte)(NODE_ID_MSG_TYPE & 0xFF); - nodeIdBytesWithType[1] = (byte)((NODE_ID_MSG_TYPE >> 8) & 0xFF); - - System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 2, nodeIdBytes.length); - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - assert nodeIdBytes.length == MESSAGE_SIZE; - - if (buf.remaining() < MESSAGE_FULL_SIZE) - return false; - - writeMessageType(buf, directType()); - - buf.put(nodeIdBytes); - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - if (buf.remaining() < MESSAGE_SIZE) - return false; - - nodeIdBytes = new byte[MESSAGE_SIZE]; - - buf.get(nodeIdBytes); - - return true; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return NODE_ID_MSG_TYPE; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 0; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(NodeIdMessage.class, this); - } - } - - /** * */ private class ConnectGateway {
