Merging IGNITE-1171 - fixed problems with custom events in discovery
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6f3ef6a8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6f3ef6a8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6f3ef6a8 Branch: refs/heads/master Commit: 6f3ef6a84ee1c3e77d32ca9930835d1720918e20 Parents: 517d0f5 Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Wed Sep 23 16:36:15 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Wed Sep 23 16:36:15 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/IgniteAtomicLong.java | 2 +- .../cache/DynamicCacheDescriptor.java | 10 +- .../GridCachePartitionExchangeManager.java | 6 + .../processors/cache/GridCacheProcessor.java | 18 +- .../continuous/CacheContinuousQueryManager.java | 10 +- .../communication/tcp/TcpCommunicationSpi.java | 7 +- .../discovery/DiscoverySpiCustomMessage.java | 12 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 301 ++++++++++++++---- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 6 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- .../tcp/internal/TcpDiscoveryNodesRing.java | 94 ++---- .../messages/TcpDiscoveryDiscardMessage.java | 15 +- .../TcpDiscoveryNodeAddFinishedMessage.java | 2 +- .../messages/TcpDiscoveryNodeAddedMessage.java | 19 +- .../distributed/CacheAffEarlySelfTest.java | 245 --------------- .../distributed/CacheAffinityEarlyTest.java | 168 ++++++++++ ...GridCacheValueConsistencyAtomicSelfTest.java | 2 +- .../tcp/TcpDiscoveryMultiThreadedTest.java | 53 ++-- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 315 ++++++++++++++++++- .../testsuites/IgniteCacheTestSuite4.java | 2 + 20 files changed, 864 insertions(+), 425 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java index 83e2525..bac1a68 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java @@ -160,4 +160,4 @@ public interface IgniteAtomicLong extends Closeable { * @throws IgniteException If operation failed. */ @Override public void close(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index f3c3be9..3cfc34e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -39,9 +39,6 @@ public class DynamicCacheDescriptor { @GridToStringExclude private CacheConfiguration cacheCfg; - /** Cancelled flag. */ - private boolean cancelled; - /** Locally configured flag. */ private boolean locCfg; @@ -156,6 +153,13 @@ public class DynamicCacheDescriptor { } /** + * @return Started flag. + */ + public boolean started() { + return started; + } + + /** * @return Cache configuration. */ public CacheConfiguration cacheConfiguration() { http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 34c571c..eb76233 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1435,6 +1435,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private static final long serialVersionUID = 0L; /** */ + @GridToStringInclude private AffinityTopologyVersion topVer; /** @@ -1455,5 +1456,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana return done; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AffinityReadyFuture.class, this, super.toString()); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index e92ea57..74124bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1522,10 +1522,15 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Collection of started cache names. */ public Collection<String> cacheNames() { - return F.viewReadOnly(registeredCaches.keySet(), - new IgniteClosure<String, String>() { - @Override public String apply(String s) { - return unmaskNull(s); + return F.viewReadOnly(registeredCaches.values(), + new IgniteClosure<DynamicCacheDescriptor, String>() { + @Override public String apply(DynamicCacheDescriptor desc) { + return desc.cacheConfiguration().getName(); + } + }, + new IgnitePredicate<DynamicCacheDescriptor>() { + @Override public boolean apply(DynamicCacheDescriptor desc) { + return desc.started(); } }); } @@ -1568,6 +1573,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.deploymentId(), topVer ); + + DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); + + if (desc != null) + desc.onStart(); } // Start statically configured caches received from remote nodes during exchange. http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index da02b97..c719f1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -448,8 +448,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { taskNameHash, skipPrimaryCheck); - UUID id = cctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval, - autoUnsubscribe, grp.predicate()).get(); + UUID id = cctx.kernalContext().continuous().startRoutine( + hnd, + bufSize, + timeInterval, + autoUnsubscribe, + grp.predicate()).get(); if (notifyExisting) { final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator(); @@ -811,4 +815,4 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 2594213..c93d5af 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -2126,8 +2126,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException || timeoutHelper.checkFailureTimeoutReached(e))) { - log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" + - failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']'); + if (log.isDebugEnabled()) + log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" + + failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']'); throw e; } @@ -2700,7 +2701,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * * FOR TEST PURPOSES ONLY!!! */ - void simulateNodeFailure() { + public void simulateNodeFailure() { if (nioSrvr != null) nioSrvr.stop(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java index 373c121..a0f9b75 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,13 +18,15 @@ package org.apache.ignite.spi.discovery; import java.io.Serializable; + +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.jetbrains.annotations.Nullable; /** * Message to send across ring. * - * @see org.apache.ignite.internal.managers.discovery.GridDiscoveryManager#sendCustomEvent( - * org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage) + * @see GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage) */ public interface DiscoverySpiCustomMessage extends Serializable { /** @@ -36,4 +38,4 @@ public interface DiscoverySpiCustomMessage extends Serializable { * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes. */ public boolean isMutable(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 8a205d2..d8ee953 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -37,10 +37,13 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Queue; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; @@ -64,6 +67,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.util.GridBoundedLinkedHashSet; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -145,7 +149,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe /** * */ -@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") +@SuppressWarnings("All") class ServerImpl extends TcpDiscoveryImpl { /** */ private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS, @@ -1368,8 +1372,13 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msgs Messages to include. * @param discardMsgId Discarded message ID. */ - private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId, - @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) { + private void prepareNodeAddedMessage( + TcpDiscoveryAbstractMessage msg, + UUID destNodeId, + @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, + @Nullable IgniteUuid discardMsgId, + @Nullable IgniteUuid discardCustomMsgId + ) { assert destNodeId != null; if (msg instanceof TcpDiscoveryNodeAddedMessage) { @@ -1393,7 +1402,7 @@ class ServerImpl extends TcpDiscoveryImpl { } nodeAddedMsg.topology(topToSnd); - nodeAddedMsg.messages(msgs, discardMsgId); + nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId); Map<Long, Collection<ClusterNode>> hist; @@ -1416,7 +1425,7 @@ class ServerImpl extends TcpDiscoveryImpl { nodeAddedMsg.topology(null); nodeAddedMsg.topologyHistory(null); - nodeAddedMsg.messages(null, null); + nodeAddedMsg.messages(null, null, null); } } @@ -1825,7 +1834,7 @@ class ServerImpl extends TcpDiscoveryImpl { */ private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) { if (msg instanceof TcpDiscoveryNodeAddedMessage) - prepareNodeAddedMessage(msg, destNodeId, null, null); + prepareNodeAddedMessage(msg, destNodeId, null, null, null); return msg; } @@ -1834,16 +1843,22 @@ class ServerImpl extends TcpDiscoveryImpl { /** * Pending messages container. */ - private static class PendingMessages { + private static class PendingMessages implements Iterable<TcpDiscoveryAbstractMessage> { /** */ private static final int MAX = 1024; /** Pending messages. */ private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2); + /** Processed custom message IDs. */ + private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<IgniteUuid>(MAX * 2); + /** Discarded message ID. */ private IgniteUuid discardId; + /** Discarded message ID. */ + private IgniteUuid customDiscardId; + /** * Adds pending message and shrinks queue if it exceeds limit * (messages that were not discarded yet are never removed). @@ -1869,31 +1884,118 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msgs Message. * @param discardId Discarded message ID. */ - void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) { + void reset( + @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, + @Nullable IgniteUuid discardId, + @Nullable IgniteUuid customDiscardId + ) { this.msgs.clear(); if (msgs != null) this.msgs.addAll(msgs); this.discardId = discardId; + this.customDiscardId = customDiscardId; } /** - * Clears pending messages. + * Discards message with provided ID and all before it. + * + * @param id Discarded message ID. */ - void clear() { - msgs.clear(); + void discard(IgniteUuid id, boolean custom) { + if (custom) + customDiscardId = id; + else + discardId = id; + } - discardId = null; + /** + * Gets iterator for non-discarded messages. + * + * @return Non-discarded messages iterator. + */ + public Iterator<TcpDiscoveryAbstractMessage> iterator() { + return new SkipIterator(); } /** - * Discards message with provided ID and all before it. * - * @param id Discarded message ID. */ - void discard(IgniteUuid id) { - discardId = id; + private class SkipIterator implements Iterator<TcpDiscoveryAbstractMessage> { + /** Skip non-custom messages flag. */ + private boolean skipMsg = discardId != null; + + /** Skip custom messages flag. */ + private boolean skipCustomMsg; + + /** Internal iterator. */ + private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator(); + + /** Next message. */ + private TcpDiscoveryAbstractMessage next; + + { + advance(); + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return next != null; + } + + /** {@inheritDoc} */ + @Override public TcpDiscoveryAbstractMessage next() { + if (next == null) + throw new NoSuchElementException(); + + TcpDiscoveryAbstractMessage next0 = next; + + advance(); + + return next0; + } + + /** {@inheritDoc} */ + @Override public void remove() { + throw new UnsupportedOperationException(); + } + + /** + * Advances iterator to the next available item. + */ + private void advance() { + next = null; + + while (msgIt.hasNext()) { + TcpDiscoveryAbstractMessage msg0 = msgIt.next(); + + if (msg0 instanceof TcpDiscoveryCustomEventMessage) { + if (skipCustomMsg) { + assert customDiscardId != null; + + if (F.eq(customDiscardId, msg0.id())) + skipCustomMsg = false; + + continue; + } + } + else { + if (skipMsg) { + assert discardId != null; + + if (F.eq(discardId, msg0.id())) + skipMsg = false; + + continue; + } + } + + next = msg0; + + break; + } + } } } @@ -1941,6 +2043,12 @@ class ServerImpl extends TcpDiscoveryImpl { /** Connection check threshold. */ private long connCheckThreshold; + /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */ + private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>(); + + /** Collection to track joining nodes. */ + private Set<UUID> joiningNodes = new HashSet<>(); + /** */ protected RingMessageWorker() { @@ -2046,6 +2154,8 @@ class ServerImpl extends TcpDiscoveryImpl { sendHeartbeatMessage(); checkHeartbeatsReceiving(); + + checkPendingCustomMessages(); } /** @@ -2323,20 +2433,11 @@ class ServerImpl extends TcpDiscoveryImpl { debugLog("Pending messages will be sent [failure=" + failure + ", forceSndPending=" + forceSndPending + ']'); - boolean skip = pendingMsgs.discardId != null; - - for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) { - if (skip) { - if (pendingMsg.id().equals(pendingMsgs.discardId)) - skip = false; - - continue; - } - + for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) { long tstamp = U.currentTimeMillis(); prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs, - pendingMsgs.discardId); + pendingMsgs.discardId, pendingMsgs.customDiscardId); if (timeoutHelper == null) timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); @@ -2354,13 +2455,13 @@ class ServerImpl extends TcpDiscoveryImpl { int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); if (log.isDebugEnabled()) - log.debug("Pending message has been sent to next node [msg=" + msg.id() + - ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + + log.debug("Pending message has been sent to next node [msgId=" + msg.id() + + ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() + ", res=" + res + ']'); if (debugMode) - debugLog("Pending message has been sent to next node [msg=" + msg.id() + - ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + + debugLog("Pending message has been sent to next node [msgId=" + msg.id() + + ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() + ", res=" + res + ']'); // Resetting timeout control object to create a new one for the next bunch of @@ -2377,7 +2478,8 @@ class ServerImpl extends TcpDiscoveryImpl { msg = new TcpDiscoveryStatusCheckMessage(locNode, null); } else - prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId); + prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId, + pendingMsgs.customDiscardId); try { long tstamp = U.currentTimeMillis(); @@ -2478,21 +2580,6 @@ class ServerImpl extends TcpDiscoveryImpl { } } - if (msg instanceof TcpDiscoveryStatusCheckMessage) { - TcpDiscoveryStatusCheckMessage msg0 = (TcpDiscoveryStatusCheckMessage)msg; - - if (next.id().equals(msg0.failedNodeId())) { - next = null; - - if (log.isDebugEnabled()) - log.debug("Discarding status check since next node has indeed failed [next=" + next + - ", msg=" + msg + ']'); - - // Discard status check message by exiting loop and handle failure. - break; - } - } - next = null; searchNext = true; @@ -2524,6 +2611,29 @@ class ServerImpl extends TcpDiscoveryImpl { for (TcpDiscoveryNode n : failedNodes) msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, n.id(), n.internalOrder())); + if (!sent) { + if (log.isDebugEnabled()) + log.debug("Pending messages will be resent to local node"); + + if (debugMode) + log.debug("Pending messages will be resent to local node"); + + for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) { + prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId, + pendingMsgs.customDiscardId); + + msgWorker.addMessage(pendingMsg); + + if (log.isDebugEnabled()) + log.debug("Pending message has been sent to local node [msg=" + msg.id() + + ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ']'); + + if (debugMode) + debugLog("Pending message has been sent to local node [msg=" + msg.id() + + ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ']'); + } + } + LT.warn(log, null, "Local node has detected failed nodes and started cluster-wide procedure. " + "To speed up failure detection please see 'Failure Detection' section under javadoc" + " for 'TcpDiscoverySpi'"); @@ -3077,7 +3187,7 @@ class ServerImpl extends TcpDiscoveryImpl { processNodeAddFinishedMessage(addFinishMsg); - addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id())); + addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); return; } @@ -3118,6 +3228,8 @@ class ServerImpl extends TcpDiscoveryImpl { return; } + joiningNodes.add(node.id()); + if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) { boolean authFailed = true; @@ -3222,6 +3334,8 @@ class ServerImpl extends TcpDiscoveryImpl { n.visible(true); } + joiningNodes.clear(); + locNode.setAttributes(node.attributes()); locNode.visible(true); @@ -3237,10 +3351,11 @@ class ServerImpl extends TcpDiscoveryImpl { topHist.clear(); topHist.putAll(msg.topologyHistory()); - pendingMsgs.discard(msg.discardedMessageId()); + pendingMsgs.reset(msg.messages(), msg.discardedMessageId(), + msg.discardedCustomMessageId()); // Clear data to minimize message size. - msg.messages(null, null); + msg.messages(null, null, null); msg.topology(null); msg.topologyHistory(null); msg.clearDiscoveryData(); @@ -3307,7 +3422,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.verified()) { spi.stats.onRingMessageReceived(msg); - addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id())); + addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); return; } @@ -3342,7 +3457,11 @@ class ServerImpl extends TcpDiscoveryImpl { } } - if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) { + joiningNodes.remove(nodeId); + + TcpDiscoverySpiState state = spiStateCopy(); + + if (msg.verified() && !locNodeId.equals(nodeId) && state != CONNECTING && fireEvt) { spi.stats.onNodeJoined(); // Make sure that node with greater order will never get EVT_NODE_JOINED @@ -3357,7 +3476,7 @@ class ServerImpl extends TcpDiscoveryImpl { boolean b = ring.topologyVersion(topVer); assert b : "Topology version has not been updated: [ring=" + ring + ", msg=" + msg + - ", lastMsg=" + lastMsg + ", spiState=" + spiStateCopy() + ']'; + ", lastMsg=" + lastMsg + ", spiState=" + state + ']'; if (log.isDebugEnabled()) log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']'); @@ -3365,7 +3484,8 @@ class ServerImpl extends TcpDiscoveryImpl { lastMsg = msg; } - notifyDiscovery(EVT_NODE_JOINED, topVer, node); + if (state == CONNECTED) + notifyDiscovery(EVT_NODE_JOINED, topVer, node); try { if (spi.ipFinder.isShared() && locNodeCoord) @@ -3381,7 +3501,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } - if (msg.verified() && locNodeId.equals(nodeId) && spiStateCopy() == CONNECTING) { + if (msg.verified() && locNodeId.equals(nodeId) && state == CONNECTING) { assert node != null; assert topVer > 0 : "Invalid topology version: " + msg; @@ -3402,6 +3522,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (ring.hasRemoteNodes()) sendMessageAcrossRing(msg); + + checkPendingCustomMessages(); } /** @@ -3481,7 +3603,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.verified()) { spi.stats.onRingMessageReceived(msg); - addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id())); + addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); return; } @@ -3553,6 +3675,8 @@ class ServerImpl extends TcpDiscoveryImpl { } } + joiningNodes.remove(leftNode.id()); + spi.stats.onNodeLeft(); notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode); @@ -3580,6 +3704,8 @@ class ServerImpl extends TcpDiscoveryImpl { U.closeQuiet(sock); } + + checkPendingCustomMessages(); } /** @@ -3650,7 +3776,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.verified()) { spi.stats.onRingMessageReceived(msg); - addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id())); + addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); return; } @@ -3707,6 +3833,8 @@ class ServerImpl extends TcpDiscoveryImpl { ", msg=" + msg.warning() + ']'); } + joiningNodes.remove(node.id()); + notifyDiscovery(EVT_NODE_FAILED, topVer, node); spi.stats.onNodeFailed(); @@ -3720,6 +3848,8 @@ class ServerImpl extends TcpDiscoveryImpl { U.closeQuiet(sock); } + + checkPendingCustomMessages(); } /** @@ -4046,7 +4176,7 @@ class ServerImpl extends TcpDiscoveryImpl { } if (msg.verified()) - pendingMsgs.discard(msgId); + pendingMsgs.discard(msgId, msg.customMessageDiscard()); if (ring.hasRemoteNodes()) sendMessageAcrossRing(msg); @@ -4098,18 +4228,23 @@ class ServerImpl extends TcpDiscoveryImpl { */ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { if (isLocalNodeCoordinator()) { - boolean sndNext; + if (!joiningNodes.isEmpty()) { + pendingCustomMsgs.add(msg); - if (!msg.verified()) { + return; + } + + boolean sndNext = !msg.verified(); + + if (sndNext) { msg.verify(getLocalNodeId()); msg.topologyVersion(ring.topologyVersion()); - notifyDiscoveryListener(msg); - - sndNext = true; + if (pendingMsgs.procCustomMsgs.add(msg.id())) + notifyDiscoveryListener(msg); + else + sndNext = false; } - else - sndNext = false; if (sndNext && ring.hasRemoteNodes()) sendMessageAcrossRing(msg); @@ -4139,12 +4274,30 @@ class ServerImpl extends TcpDiscoveryImpl { } } - addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id())); + addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true)); } } else { - if (msg.verified()) + TcpDiscoverySpiState state0; + + synchronized (mux) { + state0 = spiState; + } + + if (msg.verified() && msg.topologyVersion() != ring.topologyVersion()) { + if (log.isDebugEnabled()) + log.debug("Discarding custom event message [msg=" + msg + ", ring=" + ring + ']'); + + return; + } + + if (msg.verified() && state0 == CONNECTED && pendingMsgs.procCustomMsgs.add(msg.id())) { + assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes + ", msg=" + msg + ", loc=" + locNode.id() + + ", topver=" + ring.topologyVersion(); + assert msg.topologyVersion() == ring.topologyVersion() : "msg: " + msg + ", topver=" + ring.topologyVersion(); + notifyDiscoveryListener(msg); + } if (ring.hasRemoteNodes()) sendMessageAcrossRing(msg); @@ -4152,6 +4305,18 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * Checks and flushes custom event messages if no nodes are attempting to join the grid. + */ + private void checkPendingCustomMessages() { + if (joiningNodes.isEmpty() && isLocalNodeCoordinator()) { + TcpDiscoveryCustomEventMessage msg; + + while ((msg = pendingCustomMsgs.poll()) != null) + processCustomMessage(msg); + } + } + + /** * @param msg Custom message. */ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) { @@ -5081,7 +5246,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Redirecting message to client [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); - prepareNodeAddedMessage(msg, clientNodeId, null, null); + prepareNodeAddedMessage(msg, clientNodeId, null, null, null); writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index e5be530..2786d0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -313,4 +313,4 @@ abstract class TcpDiscoveryImpl { return res; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 80fcc46..6254605 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -2038,4 +2038,4 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T return S.toString(SocketTimeoutObject.class, this); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java index 2b17696..7ca092c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java @@ -17,7 +17,17 @@ package org.apache.ignite.spi.discovery.tcp.internal; -import java.util.ArrayList; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.PN; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.jetbrains.annotations.Nullable; + import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -29,16 +39,6 @@ import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.P1; -import org.apache.ignite.internal.util.typedef.PN; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgnitePredicate; -import org.jetbrains.annotations.Nullable; /** * Convenient way to represent topology for {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} @@ -81,6 +81,9 @@ public class TcpDiscoveryNodesRing { /** */ private long nodeOrder; + /** */ + private long maxInternalOrder; + /** Lock. */ @GridToStringExclude private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); @@ -99,6 +102,8 @@ public class TcpDiscoveryNodesRing { this.locNode = locNode; clear(); + + maxInternalOrder = locNode.internalOrder(); } finally { rwLock.writeLock().unlock(); @@ -204,7 +209,9 @@ public class TcpDiscoveryNodesRing { if (nodesMap.containsKey(node.id())) return false; - assert node.internalOrder() > maxInternalOrder() : "Adding node to the middle of the ring " + + long maxInternalOrder0 = maxInternalOrder(); + + assert node.internalOrder() > maxInternalOrder0 : "Adding node to the middle of the ring " + "[ring=" + this + ", node=" + node + ']'; nodesMap.put(node.id(), node); @@ -216,6 +223,8 @@ public class TcpDiscoveryNodesRing { nodes.add(node); nodeOrder = node.internalOrder(); + + maxInternalOrder = node.internalOrder(); } finally { rwLock.writeLock().unlock(); @@ -231,9 +240,13 @@ public class TcpDiscoveryNodesRing { rwLock.readLock().lock(); try { - TcpDiscoveryNode last = nodes.last(); + if (maxInternalOrder == 0) { + TcpDiscoveryNode last = nodes.last(); + + return last != null ? maxInternalOrder = last.internalOrder() : -1; + } - return last != null ? last.internalOrder() : -1; + return maxInternalOrder; } finally { rwLock.readLock().unlock(); @@ -336,47 +349,6 @@ public class TcpDiscoveryNodesRing { } /** - * Removes nodes from the topology. - * - * @param nodeIds IDs of the nodes to remove. - * @return Collection of removed nodes. - */ - public Collection<TcpDiscoveryNode> removeNodes(Collection<UUID> nodeIds) { - assert !F.isEmpty(nodeIds); - - rwLock.writeLock().lock(); - - try { - boolean firstRmv = true; - - Collection<TcpDiscoveryNode> res = null; - - for (UUID id : nodeIds) { - TcpDiscoveryNode rmv = nodesMap.remove(id); - - if (rmv != null) { - if (firstRmv) { - nodes = new TreeSet<>(nodes); - - res = new ArrayList<>(nodeIds.size()); - - firstRmv = false; - } - - nodes.remove(rmv); - - res.add(rmv); - } - } - - return res == null ? Collections.<TcpDiscoveryNode>emptyList() : res; - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** * Removes all remote nodes, leaves only local node. * <p> * This should be called when SPI should be disconnected from topology and @@ -397,6 +369,7 @@ public class TcpDiscoveryNodesRing { nodesMap.put(locNode.id(), locNode); nodeOrder = 0; + maxInternalOrder = 0; topVer = 0; } @@ -622,13 +595,8 @@ public class TcpDiscoveryNodesRing { rwLock.writeLock().lock(); try { - if (nodeOrder == 0) { - TcpDiscoveryNode last = nodes.last(); - - assert last != null; - - nodeOrder = last.internalOrder(); - } + if (nodeOrder == 0) + nodeOrder = maxInternalOrder(); return ++nodeOrder; } @@ -681,4 +649,4 @@ public class TcpDiscoveryNodesRing { rwLock.readLock().unlock(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java index 1e1fa6b..145f19e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java @@ -32,16 +32,20 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage { /** ID of the message to discard (this and all preceding). */ private final IgniteUuid msgId; + /** True if this is discard ID for custom event message. */ + private final boolean customMsgDiscard; + /** * Constructor. * * @param creatorNodeId Creator node ID. * @param msgId Message ID. */ - public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId) { + public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId, boolean customMsgDiscard) { super(creatorNodeId); this.msgId = msgId; + this.customMsgDiscard = customMsgDiscard; } /** @@ -53,6 +57,15 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage { return msgId; } + /** + * Flag indicating whether the ID to discard is for a custom message or not. + * + * @return Custom message flag. + */ + public boolean customMessageDiscard() { + return customMsgDiscard; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryDiscardMessage.class, this, "super", super.toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java index c6a469f..1b99a56 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -101,4 +101,4 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess @Override public String toString() { return S.toString(TcpDiscoveryNodeAddFinishedMessage.class, this, "super", super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 01c6789..5a7146d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -48,6 +48,9 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { /** Discarded message ID. */ private IgniteUuid discardMsgId; + /** Discarded message ID. */ + private IgniteUuid discardCustomMsgId; + /** Current topology. Initialized by coordinator. */ @GridToStringInclude private Collection<TcpDiscoveryNode> top; @@ -117,14 +120,28 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { } /** + * Gets discarded custom message ID. + * + * @return Discarded message ID. + */ + @Nullable public IgniteUuid discardedCustomMessageId() { + return discardCustomMsgId; + } + + /** * Sets pending messages to send to new node. * * @param msgs Pending messages to send to new node. * @param discardMsgId Discarded message ID. */ - public void messages(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) { + public void messages( + @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, + @Nullable IgniteUuid discardMsgId, + @Nullable IgniteUuid discardCustomMsgId + ) { this.msgs = msgs; this.discardMsgId = discardMsgId; + this.discardCustomMsgId = discardCustomMsgId; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java deleted file mode 100644 index 7f0ca11..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Random; -import java.util.UUID; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.lang.IgniteFutureTimeoutException; -import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * - */ -public class CacheAffEarlySelfTest extends GridCommonAbstractTest { - /** Grid count. */ - private static int GRID_CNT = 8; - - /** Operation timeout. */ - private static long OP_TIMEOUT = 5000; - - /** Always dump threads or only once per operation. */ - private static boolean ALWAYS_DUMP_THREADS = false; - - /** Stopped. */ - private volatile boolean stopped; - - /** Iteration. */ - private int iters = 10; - - /** Concurrent. */ - private boolean concurrent = true; - - /** Futs. */ - private Collection<IgniteInternalFuture<?>> futs = new ArrayList<>(GRID_CNT); - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder(true); - finder.setAddresses(Collections.singletonList("127.0.0.1:47500..47510")); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - discoSpi.setIpFinder(finder); - - cfg.setDiscoverySpi(discoSpi); - - OptimizedMarshaller marsh = new OptimizedMarshaller(); - marsh.setRequireSerializable(false); - - cfg.setMarshaller(marsh); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return 6 * 60 * 1000L; - } - - /** - * - */ - public void testStartNodes() throws Exception { - for (int i = 0; i < iters; i++) { - try { - System.out.println("*** Iteration " + (i + 1) + '/' + iters); - - IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { - @Override public void run() { - try { - doTest(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }, 1); - - fut.get(30000); - } - catch (IgniteFutureTimeoutCheckedException e) { - // No-op. - } - finally { - stopAllGrids(true); - } - } - } - - /** - * - */ - public void doTest() throws Exception { - for (int i = 0; i < GRID_CNT; i++) { - final int idx = i; - - final Ignite grid = concurrent ? null : startGrid(idx); - - IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { - @Override public void run() { - Random rnd = new Random(); - - try { - final Ignite ignite = grid == null ? startGrid(idx) : grid; - - final IgniteCache<UUID, UUID> cache = getCache(ignite).withAsync(); - - CacheAffEarlySelfTest.this.execute(cache, new IgniteInClosure<IgniteCache<UUID,UUID>>() { - @Override public void apply(IgniteCache<UUID, UUID> entries) { - cache.put(ignite.cluster().localNode().id(), UUID.randomUUID()); - } - }); - - while (!stopped) { - int val = Math.abs(rnd.nextInt(100)); - if (val >= 0 && val < 40) - execute(cache, new IgniteInClosure<IgniteCache<UUID, UUID>>() { - @Override public void apply(IgniteCache<UUID, UUID> entries) { - cache.containsKey(ignite.cluster().localNode().id()); - } - }); - else if (val >= 40 && val < 80) - execute(cache, new IgniteInClosure<IgniteCache<UUID, UUID>>() { - @Override public void apply(IgniteCache<UUID, UUID> entries) { - cache.get(ignite.cluster().localNode().id()); - } - }); - else - execute(cache, new IgniteInClosure<IgniteCache<UUID, UUID>>() { - @Override public void apply(IgniteCache<UUID, UUID> entries) { - cache.put(ignite.cluster().localNode().id(), UUID.randomUUID()); - } - }); - - Thread.sleep(50); - } - } - catch (Exception e) { - e.printStackTrace(); - } - } - }, 1); - - futs.add(fut); - } - - Thread.sleep(10000); - - stopped = true; - - for (IgniteInternalFuture<?> fut : futs) - fut.get(); - } - - /** - * @param cache Cache. - * @param c Closure. - */ - private void execute(IgniteCache<UUID, UUID> cache, IgniteInClosure<IgniteCache<UUID, UUID>> c) { - c.apply(cache); - - IgniteFuture<Object> fut = cache.future(); - - boolean success = false; - - int iter = 0; - - while (!success) { - try { - fut.get(OP_TIMEOUT); - - success = true; - } - catch (IgniteFutureTimeoutException e) { - debug(iter == 0 || ALWAYS_DUMP_THREADS); - } - - iter++; - } - } - - /** - * - */ - private void debug(boolean dumpThreads) { - log.info("DUMPING DEBUG INFO:"); - - for (Ignite ignite : G.allGrids()) - ((IgniteKernal)ignite).dumpDebugInfo(); - - if (dumpThreads) { - U.dumpThreads(null); - - U.dumpThreads(log); - } - } - - /** - * @param grid Grid. - */ - private IgniteCache<UUID, UUID> getCache(Ignite grid) { - CacheConfiguration<UUID, UUID> ccfg = defaultCacheConfiguration(); - - ccfg.setCacheMode(CacheMode.PARTITIONED); - ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); - ccfg.setBackups(1); - ccfg.setNearConfiguration(null); - - return grid.getOrCreateCache(ccfg); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java new file mode 100644 index 0000000..6b67139 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class CacheAffinityEarlyTest extends GridCommonAbstractTest { + /** Grid count. */ + private static int GRID_CNT = 8; + + /** Stopped. */ + private volatile boolean stopped; + + /** Iteration. */ + private static final int iters = 10; + + /** Concurrent. */ + private static final boolean concurrent = true; + + /** Futs. */ + private Collection<IgniteInternalFuture<?>> futs = new ArrayList<>(GRID_CNT); + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + + OptimizedMarshaller marsh = new OptimizedMarshaller(); + marsh.setRequireSerializable(false); + + cfg.setMarshaller(marsh); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 6 * 60 * 1000L; + } + + /** + * @throws Exception If failed. + */ + public void testStartNodes() throws Exception { + for (int i = 0; i < iters; i++) { + try { + log.info("Iteration: " + (i + 1) + '/' + iters); + + doTest(); + } + finally { + stopAllGrids(true); + } + } + } + + /** + * @throws Exception If failed. + */ + private void doTest() throws Exception { + final AtomicBoolean failed = new AtomicBoolean(); + + for (int i = 0; i < GRID_CNT; i++) { + final int idx = i; + + final Ignite grid = concurrent ? null : startGrid(idx); + + IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { + @Override public void run() { + Random rnd = new Random(); + + try { + Ignite ignite = grid == null ? startGrid(idx) : grid; + + IgniteCache<Object, Object> cache = getCache(ignite); + + cache.put(ignite.cluster().localNode().id(), UUID.randomUUID()); + + while (!stopped) { + int val = Math.abs(rnd.nextInt(100)); + + if (val >= 0 && val < 40) + cache.containsKey(ignite.cluster().localNode().id()); + else if (val >= 40 && val < 80) + cache.get(ignite.cluster().localNode().id()); + else + cache.put(ignite.cluster().localNode().id(), UUID.randomUUID()); + + Thread.sleep(50); + } + } + catch (Exception e) { + log.error("Unexpected error: " + e, e); + + failed.set(true); + } + } + }, 1); + + futs.add(fut); + } + + Thread.sleep(10_000); + + stopped = true; + + for (IgniteInternalFuture<?> fut : futs) + fut.get(); + + assertFalse(failed.get()); + } + + /** + * @param grid Grid. + * @return Cache. + */ + private IgniteCache getCache(Ignite grid) { + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setCacheMode(CacheMode.PARTITIONED); + ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); + ccfg.setBackups(1); + ccfg.setNearConfiguration(null); + + return grid.getOrCreateCache(ccfg); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java index 7451911..18c8d8e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java @@ -35,4 +35,4 @@ public class GridCacheValueConsistencyAtomicSelfTest extends GridCacheValueConsi @Override protected int iterationCount() { return 100_000; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 16fa662..1ccbe1f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -102,7 +102,7 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected long getTestTimeout() { - return 3 * 60 * 1000; + return 5 * 60 * 1000; } /** @@ -249,35 +249,48 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { * @throws Exception If any error occurs. */ public void testMultipleStartOnCoordinatorStop() throws Exception{ - clientFlagGlobal = false; + for (int k = 0; k < 3; k++) { + log.info("Iteration: " + k); - startGrids(GRID_CNT); + clientFlagGlobal = false; - final CyclicBarrier barrier = new CyclicBarrier(GRID_CNT + 4); + final int START_NODES = 5; + final int JOIN_NODES = 10; - final AtomicInteger startIdx = new AtomicInteger(GRID_CNT); + startGrids(START_NODES); - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - barrier.await(); + final CyclicBarrier barrier = new CyclicBarrier(JOIN_NODES + 1); - Ignite ignite = startGrid(startIdx.getAndIncrement()); + final AtomicInteger startIdx = new AtomicInteger(START_NODES); - assertFalse(ignite.configuration().isClientMode()); + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int idx = startIdx.getAndIncrement(); - log.info("Started node: " + ignite.name()); + Thread.currentThread().setName("start-thread-" + idx); - return null; - } - }, GRID_CNT + 3, "start-thread"); + barrier.await(); - barrier.await(); + Ignite ignite = startGrid(idx); - U.sleep(ThreadLocalRandom.current().nextInt(10, 100)); + assertFalse(ignite.configuration().isClientMode()); - for (int i = 0; i < GRID_CNT; i++) - stopGrid(i); + log.info("Started node: " + ignite.name()); + + return null; + } + }, JOIN_NODES, "start-thread"); - fut.get(); + barrier.await(); + + U.sleep(ThreadLocalRandom.current().nextInt(10, 100)); + + for (int i = 0; i < START_NODES; i++) + stopGrid(i); + + fut.get(); + + stopAllGrids(); + } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 981f649..0280e9c 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -35,8 +35,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; @@ -45,6 +47,7 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.port.GridPortRecord; +import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.X; @@ -52,11 +55,14 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; @@ -87,6 +93,9 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** */ private UUID nodeId; + /** */ + private TcpDiscoverySpi nodeSpi; + /** * @throws Exception If fails. */ @@ -99,8 +108,11 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpi spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ? - new TestTcpDiscoverySpi() : new TcpDiscoverySpi(); + TcpDiscoverySpi spi = nodeSpi; + + if (spi == null) + spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ? + new TestTcpDiscoverySpi() : new TcpDiscoverySpi(); discoMap.put(gridName, spi); @@ -1164,6 +1176,305 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed + */ + public void testCustomEventRace1_1() throws Exception { + try { + customEventRace1(true, false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed + */ + public void testCustomEventRace1_2() throws Exception { + try { + customEventRace1(false, false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed + */ + public void testCustomEventRace1_3() throws Exception { + try { + customEventRace1(true, true); + } + finally { + stopAllGrids(); + } + } + + /** + * @param cacheStartFrom1 If {code true} starts cache from node1. + * @param stopCrd If {@code true} stops coordinator. + * @throws Exception If failed + */ + private void customEventRace1(final boolean cacheStartFrom1, boolean stopCrd) throws Exception { + TestCustomEventRaceSpi spi0 = new TestCustomEventRaceSpi(); + + nodeSpi = spi0; + + final Ignite ignite0 = startGrid(0); + + nodeSpi = new TestCustomEventRaceSpi(); + + final Ignite ignite1 = startGrid(1); + + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + + spi0.nodeAdded1 = latch1; + spi0.nodeAdded2 = latch2; + spi0.debug = true; + + IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + log.info("Start 2"); + + nodeSpi = new TestCustomEventRaceSpi(); + + Ignite ignite2 = startGrid(2); + + return null; + } + }); + + latch1.await(); + + final String CACHE_NAME = "cache"; + + IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(CACHE_NAME); + + Ignite ignite = cacheStartFrom1 ? ignite1 : ignite0; + + ignite.createCache(ccfg); + + return null; + } + }); + + if (stopCrd) { + spi0.stop = true; + + latch2.countDown(); + + ignite0.close(); + } + else { + U.sleep(500); + + latch2.countDown(); + } + + fut1.get(); + fut2.get(); + + IgniteCache<Object, Object> cache = grid(2).cache(CACHE_NAME); + + assertNotNull(cache); + + cache.put(1, 1); + + assertEquals(1, cache.get(1)); + + nodeSpi = new TestCustomEventRaceSpi(); + + Ignite ignite = startGrid(3); + + cache = ignite.cache(CACHE_NAME); + + cache.put(2, 2); + + assertEquals(1, cache.get(1)); + assertEquals(2, cache.get(2)); + } + + /** + * @throws Exception If failed + */ + public void testCustomEventCoordinatorFailure1() throws Exception { + try { + customEventCoordinatorFailure(true); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed + */ + public void testCustomEventCoordinatorFailure2() throws Exception { + try { + customEventCoordinatorFailure(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @param twoNodes If {@code true} starts two nodes, otherwise three. + * @throws Exception If failed + */ + private void customEventCoordinatorFailure(boolean twoNodes) throws Exception { + TestCustomEventCoordinatorFailureSpi spi0 = new TestCustomEventCoordinatorFailureSpi(); + + nodeSpi = spi0; + + Ignite ignite0 = startGrid(0); + + nodeSpi = new TestCustomEventCoordinatorFailureSpi(); + + Ignite ignite1 = startGrid(1); + + nodeSpi = new TestCustomEventCoordinatorFailureSpi(); + + Ignite ignite2 = twoNodes ? null : startGrid(2); + + final Ignite createCacheNode = ignite2 != null ? ignite2 : ignite1; + + CountDownLatch latch = new CountDownLatch(1); + + spi0.latch = latch; + + final String CACHE_NAME = "test-cache"; + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + log.info("Create test cache"); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(CACHE_NAME); + + createCacheNode.createCache(ccfg); + + return null; + } + }, "create-cache-thread"); + + ((TcpCommunicationSpi)ignite0.configuration().getCommunicationSpi()).simulateNodeFailure(); + + latch.await(); + + ignite0.close(); + + fut.get(); + + IgniteCache<Object, Object> cache = grid(1).cache(CACHE_NAME); + + assertNotNull(cache); + + cache.put(1, 1); + + assertEquals(1, cache.get(1)); + + log.info("Try start one more node."); + + nodeSpi = new TestCustomEventCoordinatorFailureSpi(); + + Ignite ignite = startGrid(twoNodes ? 2 : 3); + + cache = ignite.cache(CACHE_NAME); + + assertNotNull(cache); + + cache.put(2, 2); + + assertEquals(1, cache.get(1)); + assertEquals(2, cache.get(2)); + } + + /** + * + */ + private static class TestCustomEventCoordinatorFailureSpi extends TcpDiscoverySpi { + /** */ + private volatile CountDownLatch latch; + + /** */ + private boolean stop; + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, + GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + if (msg instanceof TcpDiscoveryCustomEventMessage && latch != null) { + log.info("Stop node on custom event: " + msg); + + latch.countDown(); + + stop = true; + } + + if (stop) + return; + + super.writeToSocket(sock, msg, bout, timeout); + } + } + + /** + * + */ + private static class TestCustomEventRaceSpi extends TcpDiscoverySpi { + /** */ + private volatile CountDownLatch nodeAdded1; + + /** */ + private volatile CountDownLatch nodeAdded2; + + /** */ + private volatile boolean stop; + + /** */ + private boolean debug; + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, + GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + if (nodeAdded1 != null) { + nodeAdded1.countDown(); + + if (debug) + log.info("--- Wait node added: " + msg); + + U.await(nodeAdded2); + + nodeAdded1 = null; + nodeAdded2 = null; + } + + if (stop) + return; + + if (debug) + log.info("--- Send node added: " + msg); + } + + if (debug && msg instanceof TcpDiscoveryNodeAddFinishedMessage) + log.info("--- Send node finished: " + msg); + + if (debug && msg instanceof TcpDiscoveryCustomEventMessage) + log.info("--- Send custom event: " + msg); + + super.writeToSocket(sock, msg, bout, timeout); + } + } + + /** * Starts new grid with given index. Method optimize is not invoked. * * @param idx Index of the grid to start. http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 88977fb..289da3d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCacheTypesTest; import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransactionAtomicSelfTest; import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransactionSelfTest; import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest; import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheMultiTxLockSelfTest; @@ -195,6 +196,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(IgniteCacheConfigurationDefaultTemplateTest.class); suite.addTestSuite(IgniteDynamicClientCacheStartSelfTest.class); suite.addTestSuite(IgniteDynamicCacheStartNoExchangeTimeoutTest.class); + suite.addTestSuite(CacheAffinityEarlyTest.class); suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);