ignite-1139: - fixed race in GridDhtPartitionsExchangeFuture - fixed NPE in TcpCommunicationSpi when this SPI was not in the fully initialized state
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/84f8b956 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/84f8b956 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/84f8b956 Branch: refs/heads/ignite-1139 Commit: 84f8b956e40ae88d11e0ef125442203a497b8c4b Parents: 89da409 Author: dmagda <[email protected]> Authored: Fri Jul 24 13:35:32 2015 +0300 Committer: dmagda <[email protected]> Committed: Fri Jul 24 13:35:32 2015 +0300 ---------------------------------------------------------------------- .../GridDhtPartitionsExchangeFuture.java | 20 +++++----- .../communication/tcp/TcpCommunicationSpi.java | 39 ++++++++++++++++++-- 2 files changed, 46 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/84f8b956/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 3664220..cbf6b40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -583,7 +583,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT onDone(exchId.topologyVersion()); } else - sendPartitions(); + sendPartitions(oldest); } else { rmtIds = Collections.emptyList(); @@ -816,9 +816,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (log.isDebugEnabled()) log.debug("Initialized future: " + this); + ClusterNode oldest = oldestNode.get(); + // If this node is not oldest. - if (!oldestNode.get().id().equals(cctx.localNodeId())) - sendPartitions(); + if (!oldest.id().equals(cctx.localNodeId())) + sendPartitions(oldest); else { boolean allReceived = allReceived(); @@ -948,11 +950,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * + * @param oldestNode Oldest node. */ - private void sendPartitions() { - ClusterNode oldestNode = this.oldestNode.get(); - + private void sendPartitions(ClusterNode oldestNode) { try { sendLocalPartitions(oldestNode, exchId); } @@ -1402,8 +1402,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * */ private void recheck() { + ClusterNode oldest = oldestNode.get(); + // If this is the oldest node. - if (oldestNode.get().id().equals(cctx.localNodeId())) { + if (oldest.id().equals(cctx.localNodeId())) { Collection<UUID> remaining = remaining(); if (!remaining.isEmpty()) { @@ -1423,7 +1425,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } else - sendPartitions(); + sendPartitions(oldest); // Schedule another send. scheduleRecheck(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/84f8b956/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index e9fd696..5159e18 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1717,7 +1717,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isTraceEnabled()) log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']'); - if (node.id().equals(getLocalNode().id())) + ClusterNode localNode = getLocalNode(); + + if (localNode == null) + throw new IgniteSpiException("Local node is not started or fully initialized [isStopping=" + + getSpiContext().isStopping() + ']'); + + if (node.id().equals(localNode.id())) notifyListener(node.id(), msg, NOOP); else { GridCommunicationClient client = null; @@ -2263,8 +2269,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ch.write(ByteBuffer.wrap(U.IGNITE_HEADER)); + ClusterNode localNode = getLocalNode(); + + if (localNode == null) + throw new IgniteCheckedException("Local node is not started or fully initialized [isStopping=" + + getSpiContext().isStopping() + ']'); + if (recovery != null) { - HandshakeMessage msg = new HandshakeMessage(getLocalNode().id(), + HandshakeMessage msg = new HandshakeMessage(localNode.id(), recovery.incrementConnectCount(), recovery.receivedCount()); @@ -2415,7 +2427,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @return Node ID message. */ private NodeIdMessage nodeIdMessage() { - return new NodeIdMessage(getLocalNode().id()); + ClusterNode localNode = getLocalNode(); + + UUID id; + + if (localNode == null) { + log.warning("Local node is not started or fully initialized [isStopping=" + + getSpiContext().isStopping() + ']'); + + id = new UUID(0, 0); + } + else + id = localNode.id(); + + return new NodeIdMessage(id); } /** {@inheritDoc} */ @@ -2931,7 +2956,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { - UUID id = getLocalNode().id(); + ClusterNode localNode = getLocalNode(); + + if (localNode == null) + throw new IgniteSpiException("Local node is not started or fully initialized [isStopping=" + + getSpiContext().isStopping() + ']'); + + UUID id = localNode.id(); NodeIdMessage msg = new NodeIdMessage(id);
