This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new df6df8db0ea IGNITE-26584 Discovery optimizations for MultiDC (#12517)
df6df8db0ea is described below
commit df6df8db0eadcc867d7a28cb56935d947366cc53
Author: Anton Vinogradov <[email protected]>
AuthorDate: Mon Jan 26 14:40:22 2026 +0300
IGNITE-26584 Discovery optimizations for MultiDC (#12517)
---
.../dht/preloader/GridDhtPreloader.java | 7 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 2 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 237 ++++-----------------
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 7 -
.../tcp/internal/MdcAwareNodesComparator.java | 41 ++++
.../tcp/internal/TcpDiscoveryNodesRing.java | 41 +++-
.../tcp/messages/TcpDiscoveryNodeAddedMessage.java | 35 +--
.../distributed/CacheExchangeMergeMdcTest.java | 56 +++++
.../spi/discovery/tcp/MultiDataCenterRingTest.java | 149 +++++++++++++
.../spi/discovery/tcp/TcpDiscoveryMdcSelfTest.java | 56 +++++
...overyPendingMessageDeliveryMdcReversedTest.java | 33 +++
.../TcpDiscoveryPendingMessageDeliveryMdcTest.java | 56 +++++
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 10 +-
.../ignite/testsuites/IgniteCacheTestSuite6.java | 2 +
.../IgniteSpiDiscoverySelfTestSuite.java | 9 +
15 files changed, 492 insertions(+), 249 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index b4126c955f9..ccba94eacda 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -41,6 +41,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture
import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander.RebalanceFuture;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -221,12 +222,14 @@ public class GridDhtPreloader extends
GridCachePreloaderAdapter {
assert part != null;
assert part.id() == p;
+ GridDhtPartitionState state = part.state();
+
// Do not rebalance OWNING or LOST partitions.
- if (part.state() == OWNING || part.state() == LOST)
+ if (state == OWNING || state == LOST)
continue;
// State should be switched to MOVING during PME.
- if (part.state() != MOVING) {
+ if (state != MOVING) {
throw new AssertionError("Partition has invalid state for
rebalance "
+ aff.topologyVersion() + " " + part);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 238fdd13eab..151c0857ce4 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -923,7 +923,7 @@ class ClientImpl extends TcpDiscoveryImpl {
Collection<ClusterNode> top = topHist.get(topVer);
- assert top != null : "Failed to find topology history [msg=" + msg
+ ", hist=" + topHist + ']';
+ assert top != null : "Failed to find topology history [top=" +
topVer + ", msg=" + msg + ", hist=" + topHist + ']';
return top;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index ea8dd5f5c2d..0d0323bbc09 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -41,7 +41,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
@@ -1146,7 +1145,7 @@ class ServerImpl extends TcpDiscoveryImpl {
FutureTask<Void> fut = msgWorker.addTask(new
FutureTask<Void>() {
@Override protected Void body() {
pendingCustomMsgs.clear();
- msgWorker.pendingMsgs.reset(null, null, null);
+ msgWorker.pendingMsgs.reset(null);
msgWorker.newNextNode(null);
failedNodes.clear();
leavingNodes.clear();
@@ -1853,8 +1852,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private void prepareNodeAddedMessage(
TcpDiscoveryAbstractMessage msg,
UUID destNodeId,
- @Nullable Collection<PendingMessage> msgs,
- @Nullable IgniteUuid discardCustomMsgId
+ @Nullable Collection<PendingMessage> msgs
) {
assert destNodeId != null;
@@ -1891,10 +1889,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- // No need to send discardMsgId because we already filtered out
- // cleaned up messages.
- // TODO IGNITE-11271
- nodeAddedMsg.messages(msgs0, null, discardCustomMsgId);
+ nodeAddedMsg.messages(msgs0);
Map<Long, Collection<ClusterNode>> hist;
@@ -1917,7 +1912,7 @@ class ServerImpl extends TcpDiscoveryImpl {
nodeAddedMsg.topology(null);
nodeAddedMsg.topologyHistory(null);
- nodeAddedMsg.messages(null, null, null);
+ nodeAddedMsg.messages(null);
nodeAddedMsg.clearUnmarshalledDiscoveryData();
}
}
@@ -2100,29 +2095,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- /**
- * <strong>FOR TEST ONLY!!!</strong>
- * <p>
- * Simulates situation when next node is still alive but is bypassed
- * since it has been excluded from the ring, possibly, due to short time
- * network problems.
- * <p>
- * This method is intended for test purposes only.
- */
- void forceNextNodeFailure() {
- U.warn(log, "Next node will be forcibly failed (if any).");
-
- TcpDiscoveryNode next;
-
- synchronized (mux) {
- next = ring.nextNode(failedNodes.keySet());
- }
-
- if (next != null)
- msgWorker.addMessage(new
TcpDiscoveryNodeFailedMessage(getLocalNodeId(), next.id(),
- next.internalOrder()));
- }
-
/**
* <strong>FOR TEST ONLY!!!</strong>
* <p>
@@ -2667,7 +2639,7 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryNodeAddedMessage msg0 = new
TcpDiscoveryNodeAddedMessage(addedMsg);
- prepareNodeAddedMessage(msg0, destNodeId, null, null);
+ prepareNodeAddedMessage(msg0, destNodeId, null);
msg0.topology(addedMsg.clientTopology());
@@ -2725,13 +2697,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private final Queue<PendingMessage> msgs = new ArrayDeque<>(MAX * 2);
/** Processed custom message IDs. */
- private Set<IgniteUuid> procCustomMsgs = new
GridBoundedLinkedHashSet<>(MAX * 2);
-
- /** Discarded message ID. */
- private IgniteUuid discardId;
-
- /** Discarded custom message ID. */
- private IgniteUuid customDiscardId;
+ private final Set<IgniteUuid> procCustomMsgs = new
GridBoundedLinkedHashSet<>(MAX * 2);
/**
* Adds pending message and shrinks queue if it exceeds limit
@@ -2741,42 +2707,15 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
void add(TcpDiscoveryAbstractMessage msg) {
msgs.add(new PendingMessage(msg));
-
- while (msgs.size() > MAX) {
- PendingMessage queueHead = msgs.peek();
-
- assert queueHead != null;
-
- if (queueHead.customMsg && customDiscardId != null) {
- if (queueHead.id.equals(customDiscardId))
- customDiscardId = null;
- }
- else if (!queueHead.customMsg && discardId != null) {
- if (queueHead.id.equals(discardId))
- discardId = null;
- }
- else
- break;
-
- msgs.poll();
- }
}
/**
* Resets pending messages.
*
* @param msgs Message.
- * @param discardId Discarded message ID.
- * @param customDiscardId Discarded custom event message ID.
*/
- void reset(
- @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
- @Nullable IgniteUuid discardId,
- @Nullable IgniteUuid customDiscardId
- ) {
+ void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs) {
this.msgs.clear();
- this.customDiscardId = null;
- this.discardId = null;
if (msgs != null) {
for (TcpDiscoveryAbstractMessage msg : msgs) {
@@ -2785,9 +2724,6 @@ class ServerImpl extends TcpDiscoveryImpl {
this.msgs.add(pm);
}
}
-
- this.discardId = discardId;
- this.customDiscardId = customDiscardId;
}
/**
@@ -2800,12 +2736,31 @@ class ServerImpl extends TcpDiscoveryImpl {
if (!hasPendingMessage(custom, id))
return;
- if (custom)
- customDiscardId = id;
- else
- discardId = id;
+ IgniteUuid customDiscardId = custom ? id : null;
+ IgniteUuid discardId = custom ? null : id;
+
+ Iterator<PendingMessage> it = msgs.iterator();
+
+ while (it.hasNext()) {
+ PendingMessage msg = it.next();
+
+ if (msg.customMsg) {
+ if (customDiscardId != null) {
+ it.remove();
+
+ if (Objects.equals(customDiscardId, msg.id))
+ customDiscardId = null;
+ }
+ }
+ else {
+ if (discardId != null) {
+ it.remove();
- cleanup();
+ if (Objects.equals(discardId, msg.id))
+ discardId = null;
+ }
+ }
+ }
}
/**
@@ -2822,134 +2777,36 @@ class ServerImpl extends TcpDiscoveryImpl {
return false;
}
- /**
- *
- */
- void cleanup() {
- Iterator<PendingMessage> msgIt = msgs.iterator();
-
- boolean skipMsg = discardId != null;
- boolean skipCustomMsg = customDiscardId != null;
-
- while (msgIt.hasNext()) {
- PendingMessage msg = msgIt.next();
-
- if (msg.customMsg) {
- if (skipCustomMsg) {
- assert customDiscardId != null;
-
- if (Objects.equals(customDiscardId, msg.id)) {
- msg.msg = null;
-
- if (msg.verified)
- return;
- }
- }
- }
- else {
- if (skipMsg) {
- assert discardId != null;
-
- if (Objects.equals(discardId, msg.id)) {
- msg.msg = null;
-
- if (msg.verified)
- return;
- }
- }
- }
- }
- }
-
/**
* Gets iterator for non-discarded messages.
*
* @return Non-discarded messages iterator.
*/
@Override public Iterator<TcpDiscoveryAbstractMessage> iterator() {
- return new SkipIterator();
+ return new PendingMessageIterator();
}
/**
*
*/
- private class SkipIterator implements
Iterator<TcpDiscoveryAbstractMessage> {
- /** Skip non-custom messages flag. */
- private boolean skipMsg = discardId != null;
-
- /** Skip custom messages flag. */
- private boolean skipCustomMsg = customDiscardId != null;
-
+ private class PendingMessageIterator implements
Iterator<TcpDiscoveryAbstractMessage> {
/** Internal iterator. */
- private Iterator<PendingMessage> msgIt = msgs.iterator();
-
- /** Next message. */
- private TcpDiscoveryAbstractMessage next;
-
- {
- advance();
- }
+ private final Iterator<PendingMessage> msgIt = msgs.iterator();
/** {@inheritDoc} */
@Override public boolean hasNext() {
- return next != null;
+ return msgIt.hasNext();
}
/** {@inheritDoc} */
@Override public TcpDiscoveryAbstractMessage next() {
- if (next == null)
- throw new NoSuchElementException();
-
- TcpDiscoveryAbstractMessage next0 = next;
-
- advance();
-
- return next0;
+ return msgIt.next().msg;
}
/** {@inheritDoc} */
@Override public void remove() {
throw new UnsupportedOperationException();
}
-
- /**
- * Advances iterator to the next available item.
- */
- private void advance() {
- next = null;
-
- while (msgIt.hasNext()) {
- PendingMessage msg0 = msgIt.next();
-
- if (msg0.customMsg) {
- if (skipCustomMsg) {
- assert customDiscardId != null;
-
- if (Objects.equals(customDiscardId, msg0.id) &&
msg0.verified)
- skipCustomMsg = false;
-
- continue;
- }
- }
- else {
- if (skipMsg) {
- assert discardId != null;
-
- if (Objects.equals(discardId, msg0.id) &&
msg0.verified)
- skipMsg = false;
-
- continue;
- }
- }
-
- if (msg0.msg == null)
- continue;
-
- next = msg0.msg;
-
- break;
- }
- }
}
}
@@ -3414,7 +3271,7 @@ class ServerImpl extends TcpDiscoveryImpl {
msg0 = U.unmarshal(spi.marshaller(), msgBytes,
U.resolveClassLoader(spi.ignite().configuration()));
- prepareNodeAddedMessage(msg0,
clientMsgWorker.clientNodeId, null, null);
+ prepareNodeAddedMessage(msg0,
clientMsgWorker.clientNodeId, null);
msgBytes0 = null;
}
@@ -3763,8 +3620,7 @@ class ServerImpl extends TcpDiscoveryImpl {
for (TcpDiscoveryAbstractMessage pendingMsg :
pendingMsgs) {
long tsNanos = System.nanoTime();
- prepareNodeAddedMessage(pendingMsg,
next.id(), pendingMsgs.msgs,
- pendingMsgs.customDiscardId);
+ prepareNodeAddedMessage(pendingMsg,
next.id(), pendingMsgs.msgs);
addFailedNodes(pendingMsg, failedNodes);
@@ -3805,8 +3661,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (!(msg instanceof
TcpDiscoveryConnectionCheckMessage))
- prepareNodeAddedMessage(msg, next.id(),
pendingMsgs.msgs,
- pendingMsgs.customDiscardId);
+ prepareNodeAddedMessage(msg, next.id(),
pendingMsgs.msgs);
try {
SecurityUtils.serializeVersion(1);
@@ -4019,8 +3874,7 @@ class ServerImpl extends TcpDiscoveryImpl {
UUID locNodeId = getLocalNodeId();
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
- prepareNodeAddedMessage(pendingMsg, locNodeId,
pendingMsgs.msgs,
- pendingMsgs.customDiscardId);
+ prepareNodeAddedMessage(pendingMsg, locNodeId,
pendingMsgs.msgs);
pendingMsg.senderNodeId(locNodeId);
@@ -4139,7 +3993,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (pendingMsg.msg instanceof TcpDiscoveryNodeAddedMessage) {
TcpDiscoveryNodeAddedMessage addMsg =
(TcpDiscoveryNodeAddedMessage)pendingMsg.msg;
- if (addMsg.node().id().equals(nodeId) &&
addMsg.id().compareTo(pendingMsgs.discardId) > 0)
+ if (addMsg.node().id().equals(nodeId))
return true;
}
}
@@ -5224,14 +5078,7 @@ class ServerImpl extends TcpDiscoveryImpl {
topHist.clear();
topHist.putAll(msg.topologyHistory());
- pendingMsgs.reset(msg.messages(),
msg.discardedMessageId(),
- msg.discardedCustomMessageId());
-
- // Clear data to minimize message size.
- msg.messages(null, null, null);
- msg.topology(null);
- msg.topologyHistory(null);
- msg.clearDiscoveryData();
+ pendingMsgs.reset(msg.messages());
}
else {
if (log.isDebugEnabled())
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index b8cba5dd83f..c748fee1832 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2347,13 +2347,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
return ((ServerImpl)impl).clientWorkersCount();
}
- /**
- * <strong>FOR TEST ONLY!!!</strong>
- */
- void forceNextNodeFailure() {
- ((ServerImpl)impl).forceNextNodeFailure();
- }
-
/**
* <strong>FOR TEST ONLY!!!</strong>
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/MdcAwareNodesComparator.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/MdcAwareNodesComparator.java
new file mode 100644
index 00000000000..3a70573293d
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/MdcAwareNodesComparator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.internal;
+
+import java.util.Comparator;
+
+/** Compares nodes using the Data Center id as a primary factor. */
+public class MdcAwareNodesComparator implements Comparator<TcpDiscoveryNode> {
+ /** */
+ @Override public int compare(TcpDiscoveryNode n1, TcpDiscoveryNode n2) {
+ int res = 0;
+
+ String n1DcId = n1.dataCenterId();
+ String n2DcId = n2.dataCenterId();
+
+ assert (n1DcId != null && n2DcId != null) || (n1DcId == null && n2DcId
== null) : "[node1=" + n1 + ", node2=" + n2 + "]";
+
+ if (n1DcId != null)
+ res = n1DcId.compareTo(n2DcId);
+
+ if (res == 0)
+ res = n1.compareTo(n2);
+
+ return res;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index 6a316a9a599..e6303ee4f0d 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -28,6 +28,7 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.lang.ClusterNodeFunc;
import org.apache.ignite.internal.util.lang.IgnitePair;
@@ -46,6 +47,9 @@ import org.jetbrains.annotations.Nullable;
* Convenient way to represent topology for {@link
org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
*/
public class TcpDiscoveryNodesRing {
+ /** */
+ private static final boolean mdcAwareRing =
IgniteSystemProperties.getBoolean("MDC_AWARE_RING", true);
+
/** Visible nodes filter. */
public static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new
P1<TcpDiscoveryNode>() {
@Override public boolean apply(TcpDiscoveryNode node) {
@@ -506,7 +510,16 @@ public class TcpDiscoveryNodesRing {
if (filtered.size() < 2)
return null;
- Iterator<TcpDiscoveryNode> iter = filtered.iterator();
+ Collection<TcpDiscoveryNode> sorted;
+
+ if (mdcAwareRing) {
+ sorted = new TreeSet<>(new MdcAwareNodesComparator());
+ sorted.addAll(filtered);
+ }
+ else
+ sorted = filtered;
+
+ Iterator<TcpDiscoveryNode> iter = sorted.iterator();
while (iter.hasNext()) {
TcpDiscoveryNode node = iter.next();
@@ -515,7 +528,7 @@ public class TcpDiscoveryNodesRing {
break;
}
- return iter.hasNext() ? iter.next() : F.first(filtered);
+ return iter.hasNext() ? iter.next() : F.first(sorted);
}
finally {
rwLock.readLock().unlock();
@@ -541,10 +554,19 @@ public class TcpDiscoveryNodesRing {
if (filtered.size() < 2)
return null;
+ Collection<TcpDiscoveryNode> sorted;
+
+ if (mdcAwareRing) {
+ sorted = new TreeSet<>(new MdcAwareNodesComparator());
+ sorted.addAll(filtered);
+ }
+ else
+ sorted = filtered;
+
TcpDiscoveryNode previous = null;
// Get last node that is previous in a ring
- for (TcpDiscoveryNode node : filtered) {
+ for (TcpDiscoveryNode node : sorted) {
if (locNode.equals(node) && previous != null)
break;
@@ -569,11 +591,20 @@ public class TcpDiscoveryNodesRing {
try {
TcpDiscoveryNode prev = null;
- for (TcpDiscoveryNode node : nodes) {
+ Collection<TcpDiscoveryNode> sorted;
+
+ if (mdcAwareRing) {
+ sorted = new TreeSet<>(new MdcAwareNodesComparator());
+ sorted.addAll(nodes);
+ }
+ else
+ sorted = nodes;
+
+ for (TcpDiscoveryNode node : sorted) {
if (node.equals(ringNode)) {
if (prev == null)
// ringNode is the first node, return last node in the
ring.
- return nodes.last();
+ return F.last(sorted);
return prev;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 5c624b4c6bb..36540d8b7df 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -23,7 +23,6 @@ import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.jetbrains.annotations.Nullable;
@@ -48,12 +47,6 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
/** Pending messages from previous node. */
private Collection<TcpDiscoveryAbstractMessage> msgs;
- /** Discarded message ID. */
- private IgniteUuid discardMsgId;
-
- /** Discarded message ID. */
- private IgniteUuid discardCustomMsgId;
-
/** Current topology. Initialized by coordinator. */
@GridToStringInclude
private Collection<TcpDiscoveryNode> top;
@@ -99,8 +92,6 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
this.node = msg.node;
this.msgs = msg.msgs;
- this.discardMsgId = msg.discardMsgId;
- this.discardCustomMsgId = msg.discardCustomMsgId;
this.top = msg.top;
this.clientTop = msg.clientTop;
this.topHist = msg.topHist;
@@ -126,39 +117,15 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
return msgs;
}
- /**
- * Gets discarded message ID.
- *
- * @return Discarded message ID.
- */
- @Nullable public IgniteUuid discardedMessageId() {
- return discardMsgId;
- }
-
- /**
- * Gets discarded custom message ID.
- *
- * @return Discarded message ID.
- */
- @Nullable public IgniteUuid discardedCustomMessageId() {
- return discardCustomMsgId;
- }
-
/**
* Sets pending messages to send to new node.
*
* @param msgs Pending messages to send to new node.
- * @param discardMsgId Discarded message ID.
- * @param discardCustomMsgId Discarded custom message ID.
*/
public void messages(
- @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
- @Nullable IgniteUuid discardMsgId,
- @Nullable IgniteUuid discardCustomMsgId
+ @Nullable Collection<TcpDiscoveryAbstractMessage> msgs
) {
this.msgs = msgs;
- this.discardMsgId = discardMsgId;
- this.discardCustomMsgId = discardCustomMsgId;
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeMdcTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeMdcTest.java
new file mode 100644
index 00000000000..89439e4abca
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeMdcTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/** */
+public class CacheExchangeMergeMdcTest extends CacheExchangeMergeTest {
+ /** */
+ protected static final String DC_ID_0 = "DC0";
+
+ /** */
+ protected static final String DC_ID_1 = "DC1";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ applyDC();
+
+ return cfg;
+ }
+
+ /** */
+ protected void applyDC() {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ boolean mainDc = rnd.nextBoolean();
+
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
mainDc ? DC_ID_0 : DC_ID_1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/MultiDataCenterRingTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/MultiDataCenterRingTest.java
new file mode 100644
index 00000000000..bdec5e5e718
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/MultiDataCenterRingTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/** */
+public class MultiDataCenterRingTest extends GridCommonAbstractTest {
+ /** */
+ private static final String DC_ID_0 = "DC0";
+
+ /** */
+ private static final String DC_ID_1 = "DC1";
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
+ }
+
+ /** */
+ @Test
+ public void testRing() throws Exception {
+ int cnt = 10;
+
+ generateRandomDcOrderCluster(cnt);
+
+ assertEquals(cnt, grid(0).cluster().nodes().size());
+
+ checkHops(2);
+
+ stopGrid(cnt - 1);
+ stopGrid(0);
+
+ assertEquals(cnt - 2, grid(1).cluster().nodes().size());
+
+ checkHops(2);
+ }
+
+ /** */
+ @Test
+ public void testMessageOrder() throws Exception {
+ int cnt = 10;
+
+ generateRandomDcOrderCluster(cnt);
+
+ Collection<Ignite> nodes = G.allGrids();
+
+ CountDownLatch latch = new CountDownLatch(cnt);
+ List<String> dcs = new ArrayList<>();
+
+ for (Ignite node : nodes) {
+ DiscoverySpi disco = node.configuration().getDiscoverySpi();
+
+ ((TcpDiscoverySpi)disco).addSendMessageListener(new
IgniteInClosure<>() {
+ @Override public void apply(TcpDiscoveryAbstractMessage msg) {
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+
dcs.add(Ignition.localIgnite().cluster().localNode().dataCenterId());
+
+ latch.countDown();
+ }
+ }
+ });
+ }
+
+ startGrid(cnt + 1);
+
+ latch.await();
+
+ String curDc = null;
+ int hops = 0;
+
+ for (String dcId : dcs) {
+ if (!dcId.equals(curDc)) {
+ hops++;
+ curDc = dcId;
+ }
+ }
+
+ assertEquals(2, hops);
+ }
+
+ /** */
+ private void generateRandomDcOrderCluster(int cnt) throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < cnt; i++) {
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
rnd.nextBoolean() ? DC_ID_0 : DC_ID_1);
+
+ startGrid(i);
+ }
+
+ waitForTopology(cnt);
+ }
+
+ /** */
+ private void checkHops(int expected) {
+ Collection<Ignite> nodes = G.allGrids();
+
+ int hops = 0;
+
+ for (Ignite node : nodes) {
+ DiscoverySpi disco = node.configuration().getDiscoverySpi();
+
+ ServerImpl serverImpl = U.field(disco, "impl");
+
+ String nextDcId = serverImpl.ring().nextNode().dataCenterId();
+ String locDcId = node.cluster().localNode().dataCenterId();
+
+ if (!locDcId.equals(nextDcId))
+ hops++;
+ }
+
+ assertEquals(expected, hops);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMdcSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMdcSelfTest.java
new file mode 100644
index 00000000000..89ecb9bca94
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMdcSelfTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ * Test for {@link TcpDiscoverySpi} with Multi Data Centers.
+ */
+public class TcpDiscoveryMdcSelfTest extends TcpDiscoverySelfTest {
+ /** */
+ private static final String DC_ID_0 = "DC0";
+
+ /** */
+ private static final String DC_ID_1 = "DC1";
+
+ /**
+ * @throws Exception If fails.
+ */
+ public TcpDiscoveryMdcSelfTest() throws Exception {
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ String prev =
System.getProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
+
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, prev
== null ? DC_ID_0 : DC_ID_1);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryMdcReversedTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryMdcReversedTest.java
new file mode 100644
index 00000000000..3b525df7b60
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryMdcReversedTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.IgniteSystemProperties;
+
+/**
+ *
+ */
+public class TcpDiscoveryPendingMessageDeliveryMdcReversedTest extends
TcpDiscoveryPendingMessageDeliveryMdcTest {
+ /** */
+ @Override protected void applyDC() {
+ String prev =
System.getProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
+
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, prev
== null ? DC_ID_1 : DC_ID_0);
+ }
+}
+
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryMdcTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryMdcTest.java
new file mode 100644
index 00000000000..f4d53f2c85f
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryMdcTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ *
+ */
+public class TcpDiscoveryPendingMessageDeliveryMdcTest extends
TcpDiscoveryPendingMessageDeliveryTest {
+ /** */
+ protected static final String DC_ID_0 = "DC0";
+
+ /** */
+ protected static final String DC_ID_1 = "DC1";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ applyDC();
+
+ return cfg;
+ }
+
+ /** */
+ protected void applyDC() {
+ String prev =
System.getProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
+
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, prev
== null ? DC_ID_0 : DC_ID_1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
+ }
+}
+
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 3195284ba92..3b729cb19ea 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -311,11 +311,15 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
assertNotNull(node);
assertNotNull(node.lastSuccessfulAddress());
+ assertTrue(spi2.pingNode(ignite3.localNode().id()));
+
node = (TcpDiscoveryNode)spi2.getNode(ignite3.localNode().id());
assertNotNull(node);
assertNotNull(node.lastSuccessfulAddress());
+ assertTrue(spi3.pingNode(ignite1.localNode().id()));
+
node = (TcpDiscoveryNode)spi3.getNode(ignite1.localNode().id());
assertNotNull(node);
@@ -1901,11 +1905,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
spi.failSingleMsg = true;
- long order = ignite.cluster().localNode().order();
-
- long nextOrder = order == NODES ? 1 : order + 1;
-
- Ignite failingNode = nodes.get(nextOrder);
+ Ignite failingNode =
nodes.get(((ServerImpl)spi.impl).ring().nextNode().order());
assertNotNull(failingNode);
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index d0b620794a3..1f6bae5f4cd 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -36,6 +36,7 @@ import
org.apache.ignite.internal.processors.cache.SysCacheInconsistencyInternal
import
org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest;
import
org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerDiscoHistoryTest;
import
org.apache.ignite.internal.processors.cache.distributed.CacheClientsConcurrentStartTest;
+import
org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeMdcTest;
import
org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
import
org.apache.ignite.internal.processors.cache.distributed.CacheParallelStartTest;
import
org.apache.ignite.internal.processors.cache.distributed.CachePartitionLossWithRestartsTest;
@@ -100,6 +101,7 @@ public class IgniteCacheTestSuite6 {
GridTestUtils.addTestIfNeeded(suite,
IgnitePessimisticTxSuspendResumeTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheExchangeMergeTest.class,
ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, CacheExchangeMergeMdcTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
OnePhaseCommitAndNodeLeftTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, PendingExchangeTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
ExchangeMergeStaleServerNodesTest.class, ignoredTests);
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 77eaf7e8247..e5320437a46 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -36,6 +36,7 @@ import
org.apache.ignite.spi.discovery.tcp.IgniteClientConnectTest;
import
org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownSslTest;
import
org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownTest;
import org.apache.ignite.spi.discovery.tcp.IgniteMetricsOverflowTest;
+import org.apache.ignite.spi.discovery.tcp.MultiDataCenterRingTest;
import
org.apache.ignite.spi.discovery.tcp.TcpClientDiscoveryMarshallerCheckSelfTest;
import
org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiCoordinatorChangeTest;
import
org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiFailureTimeoutSelfTest;
@@ -49,6 +50,7 @@ import
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryDeadNodeAddressResolvingT
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryFailedJoinTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIpFinderCleanerTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIpFinderFailureTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMdcSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMetricsWarnLogTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMultiThreadedTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNetworkIssuesTest;
@@ -56,6 +58,8 @@ import
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeAttributesUpdateOnRec
import
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConfigConsistentIdSelfTest;
import
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConsistentIdSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeJoinAndFailureTest;
+import
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryPendingMessageDeliveryMdcReversedTest;
+import
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryPendingMessageDeliveryMdcTest;
import
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryPendingMessageDeliveryTest;
import
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryReconnectUnstableTopologyTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryRestartTest;
@@ -186,7 +190,12 @@ import static
org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
TcpDiscoveryDeadNodeAddressResolvingTest.class,
+ // MDC.
+ TcpDiscoveryMdcSelfTest.class,
+ TcpDiscoveryPendingMessageDeliveryMdcTest.class,
+ TcpDiscoveryPendingMessageDeliveryMdcReversedTest.class,
MultiDataCenterDeploymentTest.class,
+ MultiDataCenterRingTest.class,
MultiDataCenterClientRoutingTest.class,
IgniteDiscoveryMessageSerializationTest.class