IGNITE-3060 Discovery: optimize resource usage for client connections
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/83242336 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/83242336 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/83242336 Branch: refs/heads/ignite-3163 Commit: 83242336f5a3c0f067cfac9ed138af59970b5a92 Parents: 1f8b394 Author: sboikov <[email protected]> Authored: Fri Apr 29 09:57:10 2016 +0300 Committer: sboikov <[email protected]> Committed: Fri Apr 29 09:57:10 2016 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 48 +++++++++++--------- .../TcpDiscoveryClientReconnectMessage.java | 16 +++++++ 2 files changed, 43 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/83242336/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 43f4b58..e30dd24 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2168,7 +2168,8 @@ class ServerImpl extends TcpDiscoveryImpl { void addMessage(TcpDiscoveryAbstractMessage msg) { if ((msg instanceof TcpDiscoveryStatusCheckMessage || msg instanceof TcpDiscoveryJoinRequestMessage || - msg instanceof TcpDiscoveryCustomEventMessage) && + msg instanceof TcpDiscoveryCustomEventMessage || + msg instanceof TcpDiscoveryClientReconnectMessage) && queue.contains(msg)) { if (log.isDebugEnabled()) log.debug("Ignoring duplicate message: " + msg); @@ -2290,9 +2291,6 @@ class ServerImpl extends TcpDiscoveryImpl { else if (msg instanceof TcpDiscoveryNodeFailedMessage) processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg); - else if (msg instanceof TcpDiscoveryClientHeartbeatMessage) - processClientHeartbeatMessage((TcpDiscoveryClientHeartbeatMessage)msg); - else if (msg instanceof TcpDiscoveryHeartbeatMessage) processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg); @@ -4521,22 +4519,6 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Processes client heartbeat message. - * - * @param msg Heartbeat message. - */ - private void processClientHeartbeatMessage(TcpDiscoveryClientHeartbeatMessage msg) { - assert msg.client(); - - ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); - - if (wrk != null) - wrk.metrics(msg.metrics()); - else if (log.isDebugEnabled()) - log.debug("Received heartbeat message from unknown client node: " + msg); - } - - /** * @param nodeId Node ID. * @param metrics Metrics. * @param cacheMetrics Cache metrics. @@ -5436,7 +5418,12 @@ class ServerImpl extends TcpDiscoveryImpl { continue; } - msgWorker.addMessage(msg); + TcpDiscoveryClientHeartbeatMessage heartbeatMsg = null; + + if (msg instanceof TcpDiscoveryClientHeartbeatMessage) + heartbeatMsg = (TcpDiscoveryClientHeartbeatMessage)msg; + else + msgWorker.addMessage(msg); // Send receipt back. if (clientMsgWrk != null) { @@ -5448,6 +5435,9 @@ class ServerImpl extends TcpDiscoveryImpl { } else spi.writeToSocket(msg, sock, RES_OK, socketTimeout); + + if (heartbeatMsg != null) + processClientHeartbeatMessage(heartbeatMsg); } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) @@ -5515,6 +5505,22 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * Processes client heartbeat message. + * + * @param msg Heartbeat message. + */ + private void processClientHeartbeatMessage(TcpDiscoveryClientHeartbeatMessage msg) { + assert msg.client(); + + ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); + + if (wrk != null) + wrk.metrics(msg.metrics()); + else if (log.isDebugEnabled()) + log.debug("Received heartbeat message from unknown client node: " + msg); + } + + /** * @param msg Join request message. * @param clientMsgWrk Client message worker to start. * @return Whether connection was successful. http://git-wip-us.apache.org/repos/asf/ignite/blob/83242336/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java index 7c0cd5d..7cce78b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java @@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.Collection; import java.util.UUID; import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -96,6 +97,21 @@ public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMess } /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + // NOTE! + // Do not call super. As IDs will differ, but we can ignore this. + + if (!(obj instanceof TcpDiscoveryClientReconnectMessage)) + return false; + + TcpDiscoveryClientReconnectMessage other = (TcpDiscoveryClientReconnectMessage)obj; + + return F.eq(creatorNodeId(), other.creatorNodeId()) && + F.eq(routerNodeId, other.routerNodeId) && + F.eq(lastMsgId, other.lastMsgId); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryClientReconnectMessage.class, this, "super", super.toString()); }
