fixed slow exchange on topology startup (cherry picked from commit c88182c)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dc71125d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dc71125d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dc71125d Branch: refs/heads/ignite-2523-1 Commit: dc71125d23c03528f324458f0c6a14f3c34ab3ab Parents: 122b0f4 Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Thu Apr 21 16:15:02 2016 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Wed Apr 27 12:41:43 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/IgniteEventsImpl.java | 11 +- .../ignite/internal/IgniteMessagingImpl.java | 7 +- .../org/apache/ignite/internal/IgnitionEx.java | 2 +- .../continuous/CacheContinuousQueryManager.java | 1 + .../continuous/GridContinuousProcessor.java | 29 ++- .../ignite/spi/IgniteNodeValidationResult.java | 8 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 194 +++++++++++-------- .../tcp/internal/TcpDiscoveryNode.java | 2 +- .../messages/TcpDiscoveryAbstractMessage.java | 4 +- .../TcpDiscoveryCustomEventMessage.java | 13 +- .../TcpDiscoveryJoinRequestMessage.java | 16 +- .../TcpDiscoveryStatusCheckMessage.java | 18 +- 12 files changed, 208 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java index 505bc9d..3c6218d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java @@ -108,9 +108,16 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen guard(); try { + GridEventConsumeHandler hnd = new GridEventConsumeHandler((IgniteBiPredicate<UUID, Event>)locLsnr, + (IgnitePredicate<Event>)rmtFilter, types); + return saveOrGet(ctx.continuous().startRoutine( - new GridEventConsumeHandler((IgniteBiPredicate<UUID, Event>)locLsnr, - (IgnitePredicate<Event>)rmtFilter, types), bufSize, interval, autoUnsubscribe, prj.predicate())); + hnd, + false, + bufSize, + interval, + autoUnsubscribe, + prj.predicate())); } catch (IgniteCheckedException e) { throw U.convertException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java index 17c06fc..2800777 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java @@ -184,7 +184,12 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> try { GridContinuousHandler hnd = new GridMessageListenHandler(topic, (IgniteBiPredicate<UUID, Object>)p); - return saveOrGet(ctx.continuous().startRoutine(hnd, 1, 0, false, prj.predicate())); + return saveOrGet(ctx.continuous().startRoutine(hnd, + false, + 1, + 0, + false, + prj.predicate())); } catch (IgniteCheckedException e) { throw U.convertException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index f47b7ff..24249da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1278,7 +1278,7 @@ public class IgnitionEx { } /** - * Gets a name of the grid, which is owner of current thread. An Exception is thrown if + * Gets the grid, which is owner of current thread. An Exception is thrown if * current thread is not an {@link IgniteThread}. * * @return Grid instance related to current thread http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 9efc456..fafb830 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -641,6 +641,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { UUID id = cctx.kernalContext().continuous().startRoutine( hnd, + internal && loc, bufSize, timeInterval, autoUnsubscribe, http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index d7838f3..fd5e446 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -540,11 +540,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param bufSize Buffer size. * @param interval Time interval. * @param autoUnsubscribe Automatic unsubscribe flag. + * @param locOnly Local only flag. * @param prjPred Projection predicate. * @return Future. */ @SuppressWarnings("TooBroadScope") public IgniteInternalFuture<UUID> startRoutine(GridContinuousHandler hnd, + boolean locOnly, int bufSize, long interval, boolean autoUnsubscribe, @@ -553,12 +555,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter { assert bufSize > 0; assert interval >= 0; - // Whether local node is included in routine. - boolean locIncluded = prjPred == null || prjPred.apply(ctx.discovery().localNode()); - // Generate ID. final UUID routineId = UUID.randomUUID(); + // Register routine locally. + locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe)); + + if (locOnly) { + try { + registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true); + + hnd.onListenerRegistered(routineId, ctx); + + return new GridFinishedFuture<>(routineId); + } + catch (IgniteCheckedException e) { + unregisterHandler(routineId, hnd, true); + + return new GridFinishedFuture<>(e); + } + } + + // Whether local node is included in routine. + boolean locIncluded = prjPred == null || prjPred.apply(ctx.discovery().localNode()); + StartRequestData reqData = new StartRequestData(prjPred, hnd.clone(), bufSize, interval, autoUnsubscribe); try { @@ -613,9 +633,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { }); } - // Register routine locally. - locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe)); - StartFuture fut = new StartFuture(ctx, routineId); startFuts.put(routineId, fut); http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java index 2473a9e..3dd4caf 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi; import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; /** * Result of joining node validation. @@ -63,4 +64,9 @@ public class IgniteNodeValidationResult { public String sendMessage() { return sndMsg; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteNodeValidationResult.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 3283d99..450f628 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2584,6 +2584,9 @@ class ServerImpl extends TcpDiscoveryImpl { } finally { if (!success) { + if (log.isDebugEnabled()) + log.debug("Closing socket to next: " + next); + U.closeQuiet(sock); sock = null; @@ -2747,6 +2750,9 @@ class ServerImpl extends TcpDiscoveryImpl { forceSndPending = false; if (!sent) { + if (log.isDebugEnabled()) + log.debug("Closing socket to next (not sent): " + next); + U.closeQuiet(sock); sock = null; @@ -2897,12 +2903,12 @@ class ServerImpl extends TcpDiscoveryImpl { * * @param msg Join request message. */ - private void processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg) { + private void processJoinRequestMessage(final TcpDiscoveryJoinRequestMessage msg) { assert msg != null; - TcpDiscoveryNode node = msg.node(); + final TcpDiscoveryNode node = msg.node(); - UUID locNodeId = getLocalNodeId(); + final UUID locNodeId = getLocalNodeId(); if (!msg.client()) { boolean rmtHostLoopback = node.socketAddresses().size() == 1 && @@ -3110,92 +3116,107 @@ class ServerImpl extends TcpDiscoveryImpl { } } - IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node); + final IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node); if (err != null) { - boolean ping = node.id().equals(err.nodeId()) ? pingNode(node) : pingNode(err.nodeId()); + if (log.isDebugEnabled()) + log.debug("Node validation failed [res=" + err + ", node=" + node + ']'); - if (!ping) { - if (log.isDebugEnabled()) - log.debug("Conflicting node has already left, need to wait for event. " + - "Will ignore join request for now since it will be recent [req=" + msg + - ", err=" + err.message() + ']'); + utilityPool.submit( + new Runnable() { + @Override public void run() { + boolean ping = node.id().equals(err.nodeId()) ? pingNode(node) : pingNode(err.nodeId()); - // Ignore join request. - return; - } + if (!ping) { + if (log.isDebugEnabled()) + log.debug("Conflicting node has already left, need to wait for event. " + + "Will ignore join request for now since it will be recent [req=" + msg + + ", err=" + err.message() + ']'); - LT.warn(log, null, err.message()); + // Ignore join request. + return; + } - // Always output in debug. - if (log.isDebugEnabled()) - log.debug(err.message()); + LT.warn(log, null, err.message()); - try { - trySendMessageDirectly(node, - new TcpDiscoveryCheckFailedMessage(locNodeId, err.sendMessage())); - } - catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send hash ID resolver validation failed message to node " + - "[node=" + node + ", err=" + e.getMessage() + ']'); + // Always output in debug. + if (log.isDebugEnabled()) + log.debug(err.message()); - onException("Failed to send hash ID resolver validation failed message to node " + - "[node=" + node + ", err=" + e.getMessage() + ']', e); - } + try { + trySendMessageDirectly(node, + new TcpDiscoveryCheckFailedMessage(locNodeId, err.sendMessage())); + } + catch (IgniteSpiException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send hash ID resolver validation failed message to node " + + "[node=" + node + ", err=" + e.getMessage() + ']'); + + onException("Failed to send hash ID resolver validation failed message to node " + + "[node=" + node + ", err=" + e.getMessage() + ']', e); + } + } + } + ); // Ignore join request. return; } - String locMarsh = locNode.attribute(ATTR_MARSHALLER); - String rmtMarsh = node.attribute(ATTR_MARSHALLER); + final String locMarsh = locNode.attribute(ATTR_MARSHALLER); + final String rmtMarsh = node.attribute(ATTR_MARSHALLER); if (!F.eq(locMarsh, rmtMarsh)) { - String errMsg = "Local node's marshaller differs from remote node's marshaller " + - "(to make sure all nodes in topology have identical marshaller, " + - "configure marshaller explicitly in configuration) " + - "[locMarshaller=" + locMarsh + ", rmtMarshaller=" + rmtMarsh + - ", locNodeAddrs=" + U.addressesAsString(locNode) + - ", rmtNodeAddrs=" + U.addressesAsString(node) + - ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']'; + utilityPool.submit( + new Runnable() { + @Override public void run() { + String errMsg = "Local node's marshaller differs from remote node's marshaller " + + "(to make sure all nodes in topology have identical marshaller, " + + "configure marshaller explicitly in configuration) " + + "[locMarshaller=" + locMarsh + ", rmtMarshaller=" + rmtMarsh + + ", locNodeAddrs=" + U.addressesAsString(locNode) + + ", rmtNodeAddrs=" + U.addressesAsString(node) + + ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']'; - LT.warn(log, null, errMsg); + LT.warn(log, null, errMsg); - // Always output in debug. - if (log.isDebugEnabled()) - log.debug(errMsg); + // Always output in debug. + if (log.isDebugEnabled()) + log.debug(errMsg); - try { - String sndMsg = "Local node's marshaller differs from remote node's marshaller " + - "(to make sure all nodes in topology have identical marshaller, " + - "configure marshaller explicitly in configuration) " + - "[locMarshaller=" + rmtMarsh + ", rmtMarshaller=" + locMarsh + - ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() + - ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() + - ", rmtNodeId=" + locNode.id() + ']'; - - trySendMessageDirectly(node, - new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg)); - } - catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send marshaller check failed message to node " + - "[node=" + node + ", err=" + e.getMessage() + ']'); + try { + String sndMsg = "Local node's marshaller differs from remote node's marshaller " + + "(to make sure all nodes in topology have identical marshaller, " + + "configure marshaller explicitly in configuration) " + + "[locMarshaller=" + rmtMarsh + ", rmtMarshaller=" + locMarsh + + ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() + + ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() + + ", rmtNodeId=" + locNode.id() + ']'; + + trySendMessageDirectly(node, + new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg)); + } + catch (IgniteSpiException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send marshaller check failed message to node " + + "[node=" + node + ", err=" + e.getMessage() + ']'); - onException("Failed to send marshaller check failed message to node " + - "[node=" + node + ", err=" + e.getMessage() + ']', e); - } + onException("Failed to send marshaller check failed message to node " + + "[node=" + node + ", err=" + e.getMessage() + ']', e); + } + } + } + ); // Ignore join request. return; } // If node have no value for this attribute then we treat it as true. - Boolean locMarshUseDfltSuid = locNode.attribute(ATTR_MARSHALLER_USE_DFLT_SUID); + final Boolean locMarshUseDfltSuid = locNode.attribute(ATTR_MARSHALLER_USE_DFLT_SUID); boolean locMarshUseDfltSuidBool = locMarshUseDfltSuid == null ? true : locMarshUseDfltSuid; - Boolean rmtMarshUseDfltSuid = node.attribute(ATTR_MARSHALLER_USE_DFLT_SUID); + final Boolean rmtMarshUseDfltSuid = node.attribute(ATTR_MARSHALLER_USE_DFLT_SUID); boolean rmtMarshUseDfltSuidBool = rmtMarshUseDfltSuid == null ? true : rmtMarshUseDfltSuid; Boolean locLateAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT); @@ -3203,14 +3224,16 @@ class ServerImpl extends TcpDiscoveryImpl { boolean locLateAssignBool = locLateAssign != null ? locLateAssign : false; if (locMarshUseDfltSuidBool != rmtMarshUseDfltSuidBool) { - String errMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID + - " property value differs from remote node's value " + - "(to make sure all nodes in topology have identical marshaller settings, " + - "configure system property explicitly) " + - "[locMarshUseDfltSuid=" + locMarshUseDfltSuid + ", rmtMarshUseDfltSuid=" + rmtMarshUseDfltSuid + - ", locNodeAddrs=" + U.addressesAsString(locNode) + - ", rmtNodeAddrs=" + U.addressesAsString(node) + - ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']'; + utilityPool.submit(new Runnable() { + @Override public void run() { + String errMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID + + " property value differs from remote node's value " + + "(to make sure all nodes in topology have identical marshaller settings, " + + "configure system property explicitly) " + + "[locMarshUseDfltSuid=" + locMarshUseDfltSuid + ", rmtMarshUseDfltSuid=" + rmtMarshUseDfltSuid + + ", locNodeAddrs=" + U.addressesAsString(locNode) + + ", rmtNodeAddrs=" + U.addressesAsString(node) + + ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']'; String sndMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID + " property value differs from remote node's value " + @@ -3230,19 +3253,22 @@ class ServerImpl extends TcpDiscoveryImpl { // Validate compact footer flags. Boolean locMarshCompactFooter = locNode.attribute(ATTR_MARSHALLER_COMPACT_FOOTER); - boolean locMarshCompactFooterBool = locMarshCompactFooter != null ? locMarshCompactFooter : false; + final boolean locMarshCompactFooterBool = locMarshCompactFooter != null ? locMarshCompactFooter : false; Boolean rmtMarshCompactFooter = node.attribute(ATTR_MARSHALLER_COMPACT_FOOTER); - boolean rmtMarshCompactFooterBool = rmtMarshCompactFooter != null ? rmtMarshCompactFooter : false; + final boolean rmtMarshCompactFooterBool = rmtMarshCompactFooter != null ? rmtMarshCompactFooter : false; if (locMarshCompactFooterBool != rmtMarshCompactFooterBool) { - String errMsg = "Local node's binary marshaller \"compactFooter\" property differs from " + - "the same property on remote node (make sure all nodes in topology have the same value " + - "of \"compactFooter\" property) [locMarshallerCompactFooter=" + locMarshCompactFooterBool + - ", rmtMarshallerCompactFooter=" + rmtMarshCompactFooterBool + - ", locNodeAddrs=" + U.addressesAsString(locNode) + - ", rmtNodeAddrs=" + U.addressesAsString(node) + - ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']'; + utilityPool.submit( + new Runnable() { + @Override public void run() { + String errMsg = "Local node's binary marshaller \"compactFooter\" property differs from " + + "the same property on remote node (make sure all nodes in topology have the same value " + + "of \"compactFooter\" property) [locMarshallerCompactFooter=" + locMarshCompactFooterBool + + ", rmtMarshallerCompactFooter=" + rmtMarshCompactFooterBool + + ", locNodeAddrs=" + U.addressesAsString(locNode) + + ", rmtNodeAddrs=" + U.addressesAsString(node) + + ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']'; String sndMsg = "Local node's binary marshaller \"compactFooter\" property differs from " + "the same property on remote node (make sure all nodes in topology have the same value " + @@ -5871,6 +5897,16 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message to add. */ void addMessage(TcpDiscoveryAbstractMessage msg) { + if ((msg instanceof TcpDiscoveryStatusCheckMessage || + msg instanceof TcpDiscoveryJoinRequestMessage || + msg instanceof TcpDiscoveryCustomEventMessage) && + queue.contains(msg)) { + if (log.isDebugEnabled()) + log.debug("Ignoring duplicate message: " + msg); + + return; + } + if (msg.highPriority()) queue.addFirst(msg); else http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index d1fbecf..307aefe 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -645,4 +645,4 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste @Override public String toString() { return S.toString(TcpDiscoveryNode.class, this, "isClient", isClient()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 9cb47af..24f2a5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -274,7 +274,7 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { } /** {@inheritDoc} */ - @Override public final boolean equals(Object obj) { + @Override public boolean equals(Object obj) { if (this == obj) return true; else if (obj instanceof TcpDiscoveryAbstractMessage) @@ -292,4 +292,4 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { @Override public String toString() { return S.toString(TcpDiscoveryAbstractMessage.class, this, "isClient", getFlag(CLIENT_FLAG_POS)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index 2c759a1..c0e39d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -18,8 +18,8 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.UUID; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.jetbrains.annotations.NotNull; @@ -86,7 +86,16 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage } /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return super.equals(obj) && + obj instanceof TcpDiscoveryCustomEventMessage && + F.eq( + ((TcpDiscoveryCustomEventMessage)obj).verifierNodeId(), + verifierNodeId()); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryCustomEventMessage.class, this, "super", super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java index 2586a8b..22ffae8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.Map; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; @@ -79,7 +80,20 @@ public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage } /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + // NOTE! + // Do not call super. As IDs will differ, but we can ignore this. + + if (!(obj instanceof TcpDiscoveryJoinRequestMessage)) + return false; + + TcpDiscoveryJoinRequestMessage other = (TcpDiscoveryJoinRequestMessage)obj; + + return F.eqNodes(other.node, node); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryJoinRequestMessage.class, this, "super", super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java index 70b0080..fdbeb75 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.UUID; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; @@ -109,7 +110,22 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage } /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + // NOTE! + // Do not call super. As IDs will differ, but we can ignore this. + + if (!(obj instanceof TcpDiscoveryStatusCheckMessage)) + return false; + + TcpDiscoveryStatusCheckMessage other = (TcpDiscoveryStatusCheckMessage)obj; + + return F.eqNodes(other.creatorNode, creatorNode) && + F.eq(other.failedNodeId, failedNodeId) && + status == other.status; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryStatusCheckMessage.class, this, "super", super.toString()); } -} \ No newline at end of file +}