Fixed communication subsystem stop notification.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59f37266 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59f37266 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59f37266 Branch: refs/heads/ignite-1537 Commit: 59f3726696ec47b52f68103aa8250d9f2015b49b Parents: 3a6a463 Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Sat Nov 28 18:46:51 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Sat Nov 28 18:46:51 2015 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 88 +------------------- .../distributed/dht/GridDhtTxPrepareFuture.java | 4 +- .../cache/transactions/IgniteTxHandler.java | 4 +- .../communication/tcp/TcpCommunicationSpi.java | 17 ++-- .../ignite/testframework/GridTestUtils.java | 61 +++++++++++++- 5 files changed, 76 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index ea82d7f..a8557cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -46,8 +46,6 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteComponentType; import org.apache.ignite.internal.IgniteDeploymentCheckedException; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.direct.DirectMessageReader; import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.managers.GridManagerAdapter; @@ -103,9 +101,6 @@ import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIM * Grid communication manager. */ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializable>> { - /** */ - public static volatile boolean TURBO_DEBUG_MODE; - /** Empty array of message factories. */ public static final MessageFactory[] EMPTY = {}; @@ -775,7 +770,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa private void processRegularMessage( final UUID nodeId, final GridIoMessage msg, - byte plc, + final byte plc, final IgniteRunnable msgC ) throws IgniteCheckedException { Runnable c = new Runnable() { @@ -956,7 +951,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (msgC == null) { // Message from local node can be processed in sync manner. - assert locNodeId.equals(nodeId) || TURBO_DEBUG_MODE; + assert locNodeId.equals(nodeId); unwindMessageSet(set, lsnr); @@ -1089,85 +1084,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** - * This method can be used for debugging tricky concurrency issues - * with multi-nodes in single JVM. - * <p> - * This method eliminates network between nodes started in single JVM - * when {@link #TURBO_DEBUG_MODE} is set to {@code true}. - * <p> - * How to use it: - * <ol> - * <li>Replace {@link #send(ClusterNode, Object, int, Message, byte, boolean, long, boolean, IgniteInClosure)} - * with this method.</li> - * <li>Start all grids for your test, then set {@link #TURBO_DEBUG_MODE} to {@code true}.</li> - * <li>Perform test operations on the topology. No network will be there.</li> - * <li>DO NOT turn on turbo debug before all grids started. This will cause deadlocks.</li> - * </ol> - * - * @param node Destination node. - * @param topic Topic to send the message to. - * @param topicOrd GridTopic enumeration ordinal. - * @param msg Message to send. - * @param plc Type of processing. - * @param ordered Ordered flag. - * @param timeout Timeout. - * @param skipOnTimeout Whether message can be skipped on timeout. - * @throws IgniteCheckedException Thrown in case of any errors. - */ - private void sendTurboDebug( - ClusterNode node, - Object topic, - int topicOrd, - Message msg, - byte plc, - boolean ordered, - long timeout, - boolean skipOnTimeout - ) throws IgniteCheckedException { - assert node != null; - assert topic != null; - assert msg != null; - - GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout); - - IgniteKernal rmt; - - if (locNodeId.equals(node.id())) { - assert plc != P2P_POOL; - - CommunicationListener commLsnr = this.commLsnr; - - if (commLsnr == null) - throw new IgniteCheckedException("Trying to send message when grid is not fully started."); - - if (ordered) - processOrderedMessage(locNodeId, ioMsg, plc, null); - else - processRegularMessage0(ioMsg, locNodeId); - } - else if (TURBO_DEBUG_MODE && (rmt = IgnitionEx.gridxx(locNodeId)) != null) { - if (ioMsg.isOrdered()) - rmt.context().io().processOrderedMessage(locNodeId, ioMsg, ioMsg.policy(), null); - else - rmt.context().io().processRegularMessage0(ioMsg, locNodeId); - } - else { - if (topicOrd < 0) - ioMsg.topicBytes(marsh.marshal(topic)); - - try { - getSpi().sendMessage(node, ioMsg); - } - catch (IgniteSpiException e) { - throw new IgniteCheckedException("Failed to send message (node may have left the grid or " + - "TCP connection cannot be established due to firewall issues) " + - "[node=" + node + ", topic=" + topic + - ", msg=" + msg + ", policy=" + plc + ']', e); - } - } - } - - /** * @param nodeId Id of destination node. * @param topic Topic to send the message to. * @param msg Message to send. http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 34addfa..9f1f8a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -81,8 +81,6 @@ import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP; @@ -1208,7 +1206,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter assert req.transactionNodes() != null; try { - cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + cctx.io().send(nearMapping.node(), req, tx.ioPolicy()); } catch (ClusterTopologyCheckedException e) { fut.onNodeLeft(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 61a9bed..91ebfd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -842,7 +842,7 @@ public class IgniteTxHandler { try { // Reply back to sender. - ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + ctx.io().send(nodeId, res, req.policy()); } catch (IgniteCheckedException e) { if (e instanceof ClusterTopologyCheckedException) { @@ -1060,7 +1060,7 @@ public class IgniteTxHandler { } try { - ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + ctx.io().send(nodeId, res, req.policy()); } catch (Throwable e) { // Double-check. http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 68e2f43..b2f0f65 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -838,6 +838,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Context initialization latch. */ private final CountDownLatch ctxInitLatch = new CountDownLatch(1); + /** Stopping flag (set to {@code true} when SPI gets stopping signal). */ + private volatile boolean stopping; + /** metrics listener. */ private final GridNioMetricsListener metricsLsnr = new GridNioMetricsListener() { @Override public void onBytesSent(int bytesCnt) { @@ -1794,6 +1797,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override protected void onContextDestroyed0() { + stopping = true; + if (ctxInitLatch.getCount() > 0) // Safety. ctxInitLatch.countDown(); @@ -1976,7 +1981,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridCommunicationClient client = clients.get(nodeId); if (client == null) { - if (isNodeStopping()) + if (stopping) throw new IgniteSpiException("Node is stopping."); // Do not allow concurrent connects. @@ -2311,8 +2316,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter U.closeQuiet(ch); - throw new ClusterTopologyCheckedException("Failed to send message, " + - "node left cluster: " + node); + throw new ClusterTopologyCheckedException("Failed to send message " + + "(node left topology): " + node); } long rcvCnt = -1; @@ -2784,18 +2789,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @return Node ID message. */ private NodeIdMessage nodeIdMessage() { - ClusterNode localNode = getLocalNode(); + ClusterNode locNode = getLocalNode(); UUID id; - if (localNode == null) { + if (locNode == null) { U.warn(log, "Local node is not started or fully initialized [isStopping=" + getSpiContext().isStopping() + ']'); id = new UUID(0, 0); } else - id = localNode.id(); + id = locNode.id(); return new NodeIdMessage(id); } http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index d1c3d9f..7116227 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -42,10 +42,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Queue; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.CacheException; import javax.cache.configuration.Factory; @@ -81,6 +86,7 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridAbsClosure; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.T2; @@ -88,6 +94,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi; import org.apache.ignite.ssl.SslContextFactory; import org.apache.ignite.testframework.config.GridTestProperties; @@ -147,6 +154,9 @@ public final class GridTestUtils { /** */ private static final GridBusyLock busyLock = new GridBusyLock(); + /** */ + public static final ConcurrentMap<IgnitePair<UUID>, IgnitePair<Queue<Message>>> msgMap = new ConcurrentHashMap<>(); + /** * Ensure singleton. */ @@ -155,6 +165,55 @@ public final class GridTestUtils { } /** + * @param from From node ID. + * @param to To node ID. + * @param msg Message. + * @param sent Sent or received. + */ + public static void addMessage(UUID from, UUID to, Message msg, boolean sent) { + IgnitePair<UUID> key = F.pair(from, to); + + IgnitePair<Queue<Message>> val = msgMap.get(key); + + if (val == null) { + IgnitePair<Queue<Message>> old = msgMap.putIfAbsent(key, + val = F.<Queue<Message>>pair(new ConcurrentLinkedQueue<Message>(), new ConcurrentLinkedQueue<Message>())); + + if (old != null) + val = old; + } + + (sent ? val.get1() : val.get2()).add(msg); + } + + /** + * Dumps all messages tracked with {@link #addMessage(UUID, UUID, Message, boolean)} to std out. + */ + public static void dumpMessages() { + for (Map.Entry<IgnitePair<UUID>, IgnitePair<Queue<Message>>> entry : msgMap.entrySet()) { + U.debug("\n" + entry.getKey().get1() + " [sent to] " + entry.getKey().get2()); + + for (Message message : entry.getValue().get1()) + U.debug("\t" + message); + + U.debug(entry.getKey().get2() + " [received from] " + entry.getKey().get1()); + + for (Message message : entry.getValue().get2()) + U.debug("\t" + message); + } + } + +// static { +// new Thread(new Runnable() { +// @Override public void run() { +// JOptionPane.showMessageDialog(null, "Close this to dump messages."); +// +// dumpMessages(); +// } +// }).start(); +// } + + /** * Checks whether callable throws expected exception or not. * * @param log Logger (optional). @@ -1728,4 +1787,4 @@ public final class GridTestUtils { /** Evict to offheap with eviction policy + evict from offheap to swap when max offheap memory limit is reached. */ OFFHEAP_EVICT_SWAP, } -} \ No newline at end of file +}