This is an automated email from the ASF dual-hosted git repository.
sergey-chugunov-1985 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b75f909bc1c IGNITE-27746 MDC. Implement parallel ping of nodes in
remote DCs as part of connection recovery procedure. (#12729)
b75f909bc1c is described below
commit b75f909bc1c74a072822c5b145e94dc41812946e
Author: Vladimir Steshin <[email protected]>
AuthorDate: Mon May 18 11:44:18 2026 +0300
IGNITE-27746 MDC. Implement parallel ping of nodes in remote DCs as part of
connection recovery procedure. (#12729)
---
.../ignite/spi/discovery/tcp/ServerImpl.java | 617 ++++++++++++++++++---
.../spi/discovery/tcp/TcpDiscoveryIoSession.java | 2 +-
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 10 +-
.../tcp/internal/TcpDiscoveryNodesRing.java | 8 +-
.../discovery/tcp/MultiDataCenterSplitTest.java | 398 +++++++++++++
...cpClientDiscoverySpiFailureTimeoutSelfTest.java | 5 +-
.../tcp/TcpDiscoveryNetworkIssuesTest.java | 15 +-
.../TcpDiscoveryPendingMessageDeliveryTest.java | 35 +-
.../IgniteSpiDiscoverySelfTestSuite.java | 2 +
9 files changed, 980 insertions(+), 112 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2d2480f01f3..0861b7f88be 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -42,6 +42,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
+import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.SortedMap;
@@ -58,6 +59,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLServerSocket;
import javax.net.ssl.SSLSocket;
@@ -220,15 +223,43 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Maximal interval of connection check to next node in the ring. */
private static final long MAX_CON_CHECK_INTERVAL = 500;
+ /**
+ * @see #connCheckTick
+ * @see #effectiveExchangeTimeout()
+ */
+ private static final int CONNECTION_RECOVERY_TICKS = 3;
+
+ /**
+ * Part of the connection recovery timeout to ping remote DC. We can't
spend the whole timeout.
+ * We need some time reserved to skip failed DCs.
+ *
+ * @see #CONNECTION_RECOVERY_TICKS
+ * @see #connCheckTick
+ */
+ private static final double RMT_DC_PING_TIMEOUT_RATIO = 0.5;
+
+ /** Part of remote DC ping timeout to wait before new attempt. */
+ private static final float RMT_DC_PING_ATTEMPT_DELAY_RATIO = 0.35f;
+
/** Interval of checking connection to next node in the ring. */
private long connCheckInterval;
- /** Fundamental value for connection checking actions. */
+ /**
+ * The fundamental timeout tick for actions associated with the recovery
of a ring connection.
+ * In many scenarios, the total connection recovery timeout cannot be
allocated to a single action.
+ * This is because the recovery process may require sequentially checking
multiple nodes.
+ * A smaller timeout tick value enables a greater number of
connection-checking actions to be performed within
+ * the overall recovery timeout, allowing more nodes to be examined.
However, this comes at the cost of a reduced
+ * timeout for each individual action.
+ */
private long connCheckTick;
/** */
private final IgniteThreadPoolExecutor utilityPool;
+ /** Pool size to ping remote DC if a corner node loses the ring
connection. */
+ private final int pingRmtDcPoolSz;
+
/** Nodes ring. */
@GridToStringExclude
private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing();
@@ -273,7 +304,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private final Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
/** Collection to track joining nodes. */
- private Set<UUID> joiningNodes = new HashSet<>();
+ private final Set<UUID> joiningNodes = new HashSet<>();
/** Pending custom messages that should not be sent between NodeAdded and
NodeAddFinished messages. */
private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new
ArrayDeque<>();
@@ -328,7 +359,7 @@ class ServerImpl extends TcpDiscoveryImpl {
/**
* @param adapter Adapter.
*/
- ServerImpl(TcpDiscoverySpi adapter, int utilityPoolSize) {
+ ServerImpl(TcpDiscoverySpi adapter, int utilityPoolSize, int
pingRmtDcPoolSize) {
super(adapter);
utilityPool = new IgniteThreadPoolExecutor("disco-pool",
@@ -347,6 +378,8 @@ class ServerImpl extends TcpDiscoveryImpl {
cliConnEnabled = props.get(0);
srvConnEnabled = props.get(1);
+
+ pingRmtDcPoolSz = pingRmtDcPoolSize;
}
/** {@inheritDoc} */
@@ -414,8 +447,7 @@ class ServerImpl extends TcpDiscoveryImpl {
lastRingMsgSentTime = 0;
- // Foundumental timeout value for actions related to connection check.
- connCheckTick = effectiveExchangeTimeout() / 3;
+ connCheckTick = effectiveExchangeTimeout() / CONNECTION_RECOVERY_TICKS;
// Since we take in account time of last sent message, the interval
should be quite short to give enough piece
// of failure detection timeout as send-and-acknowledge timeout of the
message to send.
@@ -435,7 +467,7 @@ class ServerImpl extends TcpDiscoveryImpl {
fromAddrs.clear();
noResAddrs.clear();
- msgWorker = new RingMessageWorker(log);
+ msgWorker = createMessageWorker();
msgWorkerThread = new MessageWorkerDiscoveryThread(msgWorker, log);
msgWorkerThread.start();
@@ -495,6 +527,11 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.printStartInfo();
}
+ /** */
+ protected RingMessageWorker createMessageWorker() {
+ return new RingMessageWorker(log);
+ }
+
/** {@inheritDoc} */
@Override public void onContextInitialized0(IgniteSpiContext spiCtx)
throws IgniteSpiException {
spiCtx.registerPort(tcpSrvr.port, TCP);
@@ -830,12 +867,41 @@ class ServerImpl extends TcpDiscoveryImpl {
long timeout,
boolean logError
) throws IgniteCheckedException {
+ return pingNode(addr, nodeId, clientNodeId, timeout,
spi.getReconnectCount(), 0, logError);
+ }
+
+ /**
+ * Pings the node by its address to see if it's alive.
+ *
+ * @param addr Address of the node.
+ * @param nodeId Node ID to ping. In case when client node ID is not null
this node ID is an ID of the router node.
+ * @param clientNodeId Client node ID.
+ * @param timeout Timeout on operation in milliseconds. If 0, a value
based on {@link TcpDiscoverySpi} is used.
+ * @param reconnectAttempts Reconnects number.
+ * @param reconDelayRatio Part of ping attempt timeout spent on waiting
before actual reconnection try.
+ * @param logError Boolean flag indicating whether information should be
printed into the node log.
+ * @return ID of the remote node and "client exists" flag if node alive or
{@code null} if the remote node has
+ * left a topology during the ping process.
+ * @throws IgniteCheckedException If an error occurs.
+ */
+ @Nullable private IgniteBiTuple<UUID, Boolean> pingNode(
+ InetSocketAddress addr,
+ @Nullable UUID nodeId,
+ @Nullable UUID clientNodeId,
+ long timeout,
+ int reconnectAttempts,
+ float reconDelayRatio,
+ boolean logError
+ ) throws IgniteCheckedException {
+ long timeoutThreshold = timeout > 0 ? System.nanoTime() +
U.millisToNanos(timeout) : 0;
+
+ assert reconnectAttempts > 0;
assert addr != null;
assert timeout >= 0;
+ assert reconDelayRatio >= 0.0f;
- IgniteSpiOperationTimeoutHelper timeoutHelper = timeout == 0
- ? new IgniteSpiOperationTimeoutHelper(spi, clientNodeId == null)
- : new IgniteSpiOperationTimeoutHelper(timeout);
+ long attemptTimeout = (long)(timeout * (1.0f - reconDelayRatio)) /
reconnectAttempts;
+ long attemptDelayTimeout = reconnectAttempts > 1 ? (long)(timeout *
reconDelayRatio) / (reconnectAttempts - 1) : 0;
UUID locNodeId = getLocalNodeId();
@@ -850,6 +916,10 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean clientPingRes;
+ IgniteSpiOperationTimeoutHelper timeoutHelper = timeout == 0
+ ? new IgniteSpiOperationTimeoutHelper(spi, clientNodeId ==
null)
+ : new IgniteSpiOperationTimeoutHelper(timeout);
+
try {
clientPingRes = clientWorker.ping(timeoutHelper);
}
@@ -878,9 +948,11 @@ class ServerImpl extends TcpDiscoveryImpl {
int reconCnt = 0;
- boolean openedSock = false;
-
while (true) {
+ IgniteSpiOperationTimeoutHelper timeoutHelper = timeout == 0
+ ? new IgniteSpiOperationTimeoutHelper(spi,
clientNodeId == null)
+ : new IgniteSpiOperationTimeoutHelper(attemptTimeout);
+
try {
if (addr.isUnresolved())
addr = new
InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
@@ -891,8 +963,6 @@ class ServerImpl extends TcpDiscoveryImpl {
sock = spi.openSocket(sock, addr, timeoutHelper);
- openedSock = true;
-
TcpDiscoveryIoSession ses = createSession(sock);
spi.writeMessage(ses, new
TcpDiscoveryPingRequest(locNodeId, clientNodeId),
@@ -932,31 +1002,23 @@ class ServerImpl extends TcpDiscoveryImpl {
reconCnt++;
- if (!openedSock && reconCnt == 2) {
- logPingError(errMsgPrefix + "Was unable to open
the socket at all. " +
- "Cause: " + e.getMessage(), logError);
-
- break;
- }
-
- if
(IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e)
- && (spi.failureDetectionTimeoutEnabled() ||
timeout != 0)) {
+ if ((timeoutThreshold > 0 && System.nanoTime() >=
timeoutThreshold) || (spi.failureDetectionTimeoutEnabled()
+ &&
IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))) {
logPingError(errMsgPrefix + "Reached the timeout "
+
(timeout == 0 ? spi.failureDetectionTimeout()
: timeout) +
"ms. Cause: " + e.getMessage(), logError);
break;
}
- else if (!spi.failureDetectionTimeoutEnabled() &&
reconCnt == spi.getReconnectCount()) {
- logPingError(errMsgPrefix + "Reached the
reconnection count spi.getReconnectCount(). " +
- "Cause: " + e.getMessage(), logError);
+ else if (reconCnt >= reconnectAttempts) {
+ logPingError(errMsgPrefix + "Max reconnect
attempts have been reached: " + reconnectAttempts
+ + ". Cause: " + e.getMessage(), logError);
break;
}
if (spi.isNodeStopping0()) {
- logPingError(errMsgPrefix + "Current node is
stopping. " +
- "Cause: " + e.getMessage(), logError);
+ logPingError(errMsgPrefix + "Current node is
stopping. Cause: " + e.getMessage(), logError);
break;
}
@@ -964,6 +1026,9 @@ class ServerImpl extends TcpDiscoveryImpl {
finally {
U.closeQuiet(sock);
}
+
+ if (attemptDelayTimeout > 0)
+ U.sleep(attemptDelayTimeout);
}
}
catch (Throwable t) {
@@ -2839,7 +2904,7 @@ class ServerImpl extends TcpDiscoveryImpl {
/**
* Message worker for discovery messages processing.
*/
- private class RingMessageWorker extends
MessageWorker<TcpDiscoveryAbstractMessage> {
+ protected class RingMessageWorker extends
MessageWorker<TcpDiscoveryAbstractMessage> {
/** Next node. */
private TcpDiscoveryNode next;
@@ -2901,7 +2966,7 @@ class ServerImpl extends TcpDiscoveryImpl {
/**
* @param log Logger.
*/
- private RingMessageWorker(IgniteLogger log) {
+ protected RingMessageWorker(IgniteLogger log) {
super("tcp-disco-msg-worker-[]", log, 10, getWorkerRegistry(spi));
setBeforeEachPollAction(() -> {
@@ -3404,7 +3469,12 @@ class ServerImpl extends TcpDiscoveryImpl {
List<InetSocketAddress> locNodeAddrs =
U.arrayList(locNode.socketAddresses());
- addr: for (InetSocketAddress addr : nextAddrs) {
+ Iterator<InetSocketAddress> nextNodeAddsIt =
nextAddrs.iterator();
+
+ // The next node and its addresses might be reset as a result
of the parallel ping of a remote DC.
+ addr: while (next != null && nextNodeAddsIt.hasNext()) {
+ InetSocketAddress addr = nextNodeAddsIt.next();
+
long ackTimeout0 = spi.getAckTimeout();
if (locNodeAddrs.contains(addr)) {
@@ -3444,11 +3514,14 @@ class ServerImpl extends TcpDiscoveryImpl {
// Handshake.
TcpDiscoveryHandshakeRequest hndMsg = new
TcpDiscoveryHandshakeRequest(locNodeId);
- // Topology treated as changes if next node is
not available.
- boolean changeTop = sndState != null &&
!sndState.isStartingPoint();
-
- if (changeTop)
-
hndMsg.previousNodeId(ring.previousNodeOf(next).id());
+ if (sndState != null) {
+ // If want a forced connection, we set the
change-topology node flag to current node id.
+ // The forced reconnect means we should
not check previous node.
+ if (!F.isEmpty(sndState.unavailableDCs))
+ hndMsg.previousNodeId(locNodeId);
+ else if (!sndState.isStartingPoint())
+
hndMsg.previousNodeId(ring.previousNodeOf(next).id());
+ }
if (log.isDebugEnabled()) {
log.debug("Sending handshake [hndMsg=" +
hndMsg + ", sndState=" + sndState +
@@ -3471,8 +3544,6 @@ class ServerImpl extends TcpDiscoveryImpl {
// We should take previousNodeAlive flag into
account
// only if we received the response from the
correct node.
if (res.creatorNodeId().equals(next.id()) &&
res.previousNodeAlive() && sndState != null) {
- sndState.checkTimeout();
-
// Remote node checked connection to it's
previous and got success.
boolean previousNode =
sndState.markLastFailedNodeAlive();
@@ -3488,11 +3559,8 @@ class ServerImpl extends TcpDiscoveryImpl {
sock = null;
- if (sndState.isFailed()) {
-
segmentLocalNodeOnSendFail(failedNodes);
-
+ if
(checkConnectionRecoveryFailed(sndState, failedNodes))
return; // Nothing to do here.
- }
if (previousNode)
U.warn(log, "New next node has
connection to it's previous, trying previous " +
@@ -3585,11 +3653,8 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Failed to connect to next node
[msg=" + msg + ", err=" + e + ']', e);
// Fastens failure detection.
- if (sndState != null &&
sndState.checkTimeout()) {
- segmentLocalNodeOnSendFail(failedNodes);
-
+ if (sndState != null &&
checkConnectionRecoveryFailed(sndState, failedNodes))
return; // Nothing to do here.
- }
if (!openSock)
break; // Don't retry if we can not
establish connection.
@@ -3802,27 +3867,38 @@ class ServerImpl extends TcpDiscoveryImpl {
if (!sent) {
if (sndState == null &&
spi.getEffectiveConnectionRecoveryTimeout() > 0)
- sndState = new CrossRingMessageSendState();
- else if (sndState != null && sndState.checkTimeout()) {
- segmentLocalNodeOnSendFail(failedNodes);
-
+ sndState = createConnectionRecoveryState(newNext);
+ else if (sndState != null &&
checkConnectionRecoveryFailed(sndState, failedNodes))
return; // Nothing to do here.
- }
- boolean failedNextNode = sndState == null ||
sndState.markNextNodeFailed();
+ // The next node might be reset a result of the parallel
remote DC ping process.
+ boolean failedNextNode = next != null && (sndState == null
|| sndState.markNextNodeFailed());
if (failedNextNode && !failedNodes.contains(next)) {
failedNodes.add(next);
+ TcpDiscoveryNode next0 = next;
+
+ if (allRemoteDCsTraversed(sndState, failedNodes,
next)) {
+ if (log.isInfoEnabled()) {
+ log.info("During the connection recovery, all
remote DCs have been traversed, none available. " +
+ "Current node will skip failed DCs. Ring
connection recovery time remaining: "
+ + Math.max(0,
U.nanosToMillis(sndState.failTimeNanos - System.nanoTime())) + "ms.");
+ }
+
+ skipDCs(sndState,
ring.serverNodes().stream().map(ClusterNode::dataCenterId)
+ .filter(dcId ->
!dcId.equals(locNode.dataCenterId())).collect(Collectors.toSet()), failedNodes);
+ }
+
if (state == CONNECTED) {
Exception err = errs != null ?
U.exceptionWithSuppressed("Failed to send
message to next node [msg=" + msg +
- ", next=" + U.toShortString(next) + ']',
errs) :
+ ", next=" + U.toShortString(next0) + ']',
errs) :
null;
// If node existed on connection initialization we
should check
// whether it has not gone yet.
- U.warn(log, "Failed to send message to next node
[msg=" + msg + ", next=" + next +
+ U.warn(log, "Failed to send message to next node
[msg=" + msg + ", next=" + next0 +
", errMsg=" + (err != null ? err.getMessage()
: "N/A") + ']');
}
}
@@ -3894,6 +3970,87 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // Edge node scenario. The next node belongs to a neighboring DC,
which may be completely unavailable.
+ // To avoid sequential node failures within the current DC, we
need to determine if the neighboring DC is reachable.
+ // We perform parallel pings to nodes in the other DC using the
same timeout.
+ if (!F.isEmpty(locNode.dataCenterId())) {
+ assert !F.isEmpty(next.dataCenterId());
+
+ if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+ Stream<TcpDiscoveryNode> otherDcsSrvrs =
ring.serverNodes().stream()
+ .filter(n ->
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+ synchronized (mux) {
+ otherDcsSrvrs = otherDcsSrvrs.filter(n ->
!failedNodes.containsKey(n));
+
+
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+ }
+ }
+ }
+
+ return recoveryState;
+ }
+
+ /** @return {@code True} if current node fails to recover the ring
connection. */
+ private boolean checkConnectionRecoveryFailed(
+ CrossRingMessageSendState connRecoveryState,
+ List<TcpDiscoveryNode> failedNodes
+ ) {
+ if (connRecoveryState.timeoutReached()) {
+ // Ensure of the ping pool release.
+ connRecoveryState.stopRemoteDcPing();
+
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return true;
+ }
+
+ if (!connRecoveryState.remoteDcPingStarted() ||
!connRecoveryState.remoteDcPingFinished()
+ || connRecoveryState.unavailableDCs != null)
+ return false;
+
+ Collection<String> rmtDcIds =
connRecoveryState.rmtDcPingRes.keySet().stream().map(ClusterNode::dataCenterId)
+ .collect(Collectors.toSet());
+
+ Collection<String> failedDCs = U.newHashSet(rmtDcIds.size());
+
+ Map<String, Integer> aliveNodesPerDC =
connRecoveryState.availableNodesPerDc();
+
+ for (String dcId : rmtDcIds) {
+ int aliveNodesCnt =
Optional.ofNullable(aliveNodesPerDC.get(dcId)).orElse(0);
+
+ if (aliveNodesCnt == 0)
+ failedDCs.add(dcId);
+ }
+
+ String msg = "During the connection recovery, nodes ping of DCs '"
+ String.join(", ", rmtDcIds)
+ + "' from current edge node has finished. Alive nodes: " +
connRecoveryState.availableNodes()
+ + ", unavailable nodes: " +
connRecoveryState.unavailableNodes()
+ + ". Time left to recover the ring connection: "
+ + Math.max(0, U.nanosToMillis(connRecoveryState.failTimeNanos
- System.nanoTime())) + "ms.";
+
+ if (failedDCs.isEmpty()) {
+ msg += " At least one node from each DC has responded.
Connection recovery will keep trying to restore the ring.";
+
+ if (log.isInfoEnabled())
+ log.info(msg);
+ }
+ else {
+ msg += " No node of the following remote DCs responded.
Considering DCs '" + String.join(", ", failedDCs)
+ + "' as unavailable. Current node will skip those DCs and
close the ring without them.";
+
+ log.warning(msg);
+
+ skipDCs(connRecoveryState, failedDCs, failedNodes);
+ }
+
+ return false;
+ }
+
/**
* Called when local node became the only alive node in topology.
*
@@ -3980,11 +4137,40 @@ class ServerImpl extends TcpDiscoveryImpl {
*
* @param node New next node.
*/
- private void newNextNode(TcpDiscoveryNode node) {
+ private void newNextNode(@Nullable TcpDiscoveryNode node) {
next = node;
nextAddrs = node == null ? null :
spi.getEffectiveNodeAddresses(node);
}
+ /** Skips nodes of failed data centers. */
+ protected void skipDCs(
+ CrossRingMessageSendState connRecoveryState,
+ Collection<String> failedDCs,
+ Collection<TcpDiscoveryNode> failedNodes
+ ) {
+ assert connRecoveryState.unavailableDCs == null : "Forced DC
skipping should not be requested yet.";
+
+ connRecoveryState.stopRemoteDcPing();
+
+ connRecoveryState.unavailableDCs = failedDCs;
+
+ for (TcpDiscoveryNode n : ring.serverNodes()) {
+ if (!failedDCs.contains(n.dataCenterId()))
+ continue;
+
+ assert !locNode.equals(n);
+
+ if (!failedNodes.contains(n))
+ failedNodes.add(n);
+ }
+
+ newNextNode(null);
+
+ // Let's keep the recovery state consistent.
+ connRecoveryState.state = RingMessageSendState.FORWARD_PASS;
+ connRecoveryState.failedNodes = failedNodes.size();
+ }
+
/**
* @param msg Message.
* @return Whether to redirect message to client nodes.
@@ -6357,6 +6543,28 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
+ /** @return {@code True} if we've checked all nodes from remote DCs and
should close the ring to local DC. */
+ private boolean allRemoteDCsTraversed(
+ @Nullable CrossRingMessageSendState connRecoveryState,
+ Collection<TcpDiscoveryNode> failedNodes,
+ TcpDiscoveryNode curNextNode
+ ) {
+ String locDcId = locNode.dataCenterId();
+
+ // Do not make any decision if: no ping started at all; the ping isn't
finished yet; a decision is already made.
+ if (connRecoveryState == null || locDcId == null ||
!connRecoveryState.remoteDcPingStarted()
+ || locDcId.equals(curNextNode.dataCenterId()))
+ return false;
+
+ assert failedNodes.contains(curNextNode);
+
+ TcpDiscoveryNode newNext = ring.nextNode(failedNodes);
+ String nextNextDcId = newNext == null ? null : newNext.dataCenterId();
+
+ // Entire next DC traversed. We've met next, other DC or even closed
back to local DC.
+ return nextNextDcId == null || locDcId.equals(nextNextDcId);
+ }
+
/**
* @param node Node to connect.
* @return {@code null} if connection allowed, error otherwise.
@@ -6731,9 +6939,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (!dcNodes.isEmpty()) {
Collection<InetSocketAddress> addrs = new
ArrayList<>(dcNodes.size());
- for (TcpDiscoveryNode dcNode : dcNodes) {
+ for (TcpDiscoveryNode dcNode : dcNodes)
addrs.addAll(dcNode.socketAddresses());
- }
res.redirectAddresses(addrs);
@@ -6748,13 +6955,16 @@ class ServerImpl extends TcpDiscoveryImpl {
// Need to check connectivity to it.
long rcvdTime = lastRingMsgReceivedTime;
long now = System.nanoTime();
- long timeThreshold = rcvdTime +
U.millisToNanos(effectiveExchangeTimeout());
+ long timeoutThreshold = rcvdTime +
U.millisToNanos(effectiveExchangeTimeout());
+ // Incoming node has set the previous-to-check node as
itself and requests the forced connection.
+ // The forced incoming reconnect means we should not
check previous node.
+ boolean forcedConnection =
nodeId.equals(req.previousNodeId());
// We got message from previous in less than effective
exchange timeout.
- boolean ok = timeThreshold > now;
+ boolean prevNodeIsAvailable = !forcedConnection &&
timeoutThreshold > now;
TcpDiscoveryNode previous = null;
- if (ok) {
+ if (prevNodeIsAvailable) {
// Check case when previous node suddenly died.
This will speed up
// node failing.
Set<TcpDiscoveryNode> failed;
@@ -6782,20 +6992,39 @@ class ServerImpl extends TcpDiscoveryImpl {
liveAddr = checkConnection(previous,
backwardCheckTimeout);
}
- ok = liveAddr != null;
+ prevNodeIsAvailable = liveAddr != null;
- assert !(ok &&
liveAddr.getAddress().isLoopbackAddress() &&
spi.locNodeAddrs.contains(liveAddr));
+ assert !(prevNodeIsAvailable &&
liveAddr.getAddress().isLoopbackAddress()
+ && spi.locNodeAddrs.contains(liveAddr));
}
- res.previousNodeAlive(ok);
+ if (forcedConnection) {
+ // If new node is considered as failed or if it is
not in the ring we answer with
+ // the previous status is ok meaning that
connection is impossible.
+ synchronized (mux) {
+ prevNodeIsAvailable =
failedNodes.keySet().stream().anyMatch(n -> n.id().equals(nodeId));
+ }
+
+ prevNodeIsAvailable = prevNodeIsAvailable ||
ring.serverNodes().stream().noneMatch(n -> n.id().equals(nodeId));
+
+ String logMsg = "Incoming node [id=" + nodeId + "]
has requested a forced connection " +
+ "without checking the previous node. This may
happen if an edge node in local DC " +
+ "tries to close the ring into this DC.";
- if (log.isInfoEnabled()) {
- log.info("Previous node alive status [alive=" + ok
+
+ if (prevNodeIsAvailable)
+ log.warning(logMsg + " But the ring has no
server with such node id. Denying.");
+ else if (log.isInfoEnabled())
+ log.info(logMsg);
+ }
+ else if (log.isInfoEnabled()) {
+ log.info("Previous node alive status [alive=" +
prevNodeIsAvailable +
", checkPreviousNodeId=" +
req.previousNodeId() +
", actualPreviousNode=" + previous +
", lastMessageReceivedTime=" + rcvdTime + ",
now=" + now +
", connCheckInterval=" + connCheckInterval +
']');
}
+
+ res.previousNodeAlive(prevNodeIsAvailable);
}
if (log.isDebugEnabled()) {
@@ -8116,9 +8345,6 @@ class ServerImpl extends TcpDiscoveryImpl {
/** */
BACKWARD_PASS,
-
- /** */
- FAILED
}
/**
@@ -8132,11 +8358,8 @@ class ServerImpl extends TcpDiscoveryImpl {
* has connection to it's previous and forces local node to try it
again.<br>
* {@link RingMessageSendState#BACKWARD_PASS} => {@link
RingMessageSendState#STARTING_POINT} when local node came back
* to initial next node and no topology changes should be performed.<br>
- * {@link RingMessageSendState#BACKWARD_PASS} => {@link
RingMessageSendState#FAILED} when recovery timeout is over and
- * all new next nodes have connections to their previous nodes. That means
local node has connectivity
- * issue and should be stopped.<br>
*/
- private class CrossRingMessageSendState {
+ protected class CrossRingMessageSendState {
/** */
private RingMessageSendState state =
RingMessageSendState.STARTING_POINT;
@@ -8147,8 +8370,37 @@ class ServerImpl extends TcpDiscoveryImpl {
private final long failTimeNanos;
/**
- *
+ * Decision upon results of remote DC ping. Contains ids of
unavailable DCs.
+ * {@code Null} if state of remote DC is not estimated yet.
+ */
+ private @Nullable Collection<String> unavailableDCs;
+
+ /** Remote DCs ping result per node. Values:
+ * <ul>
+ * <li>Empty map: ping not started.</li>
+ * <li>-1: ping started, result is unknown yet./li>
+ * <li>1: ping successfuly finished, node has responded./li>
+ * <li>0 - ping finished, node has not responded./li>
+ * </ul>
+ */
+ @Nullable private Map<TcpDiscoveryNode, Integer> rmtDcPingRes;
+
+ /**
+ * Thread pool to ping remote DC. We do not use {@link #utilityPool}
because we need significantly more threads
+ * to ping a remote DC during the connection recovery. The thread
pools here come with an unlimited task queue.
+ * With such a task queue, thread pools prefer putting a task in its
queue instead of creating a new worker thread.
+ * To utilize more threads we have to keep the core pool size large
enough. But we don't need a wide discovery
+ * thread pool for ordinary, typical tasks.
*/
+ @Nullable private volatile IgniteThreadPoolExecutor rmtDcPingPool;
+
+ /** Stop remote DC ping flag. */
+ @Nullable private volatile boolean stopRmtDcPing;
+
+ /** Remote DC ping overal time threshold. */
+ private volatile long rmtDcPingMaxTimeNs;
+
+ /** */
CrossRingMessageSendState() {
failTimeNanos =
U.millisToNanos(spi.getEffectiveConnectionRecoveryTimeout()) +
System.nanoTime();
}
@@ -8168,10 +8420,10 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * @return {@code True} if state is {@link
RingMessageSendState#FAILED}.
+ * @return {@code True} if state timeout expired.
*/
- boolean isFailed() {
- return state == RingMessageSendState.FAILED;
+ boolean timeoutReached() {
+ return System.nanoTime() >= failTimeNanos;
}
/**
@@ -8191,22 +8443,6 @@ class ServerImpl extends TcpDiscoveryImpl {
return false;
}
- /**
- * Checks if message sending has completely failed due to the timeout.
Sets {@code RingMessageSendState#FAILED}
- * if the timeout is reached.
- *
- * @return {@code True} if passed timeout is reached. {@code False}
otherwise.
- */
- boolean checkTimeout() {
- if (System.nanoTime() >= failTimeNanos) {
- state = RingMessageSendState.FAILED;
-
- return true;
- }
-
- return false;
- }
-
/**
* Marks last failed node as alive.
*
@@ -8232,6 +8468,205 @@ class ServerImpl extends TcpDiscoveryImpl {
@Override public String toString() {
return S.toString(CrossRingMessageSendState.class, this);
}
+
+ /** */
+ void pingRemoteDCs(List<TcpDiscoveryNode> nodesToPing) {
+ assert !remoteDcPingStarted();
+ assert !F.isEmpty(nodesToPing);
+
+ rmtDcPingMaxTimeNs = System.nanoTime() + (long)((failTimeNanos -
System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO);
+
+ rmtDcPingPool = new IgniteThreadPoolExecutor(
+ "disco-remote-dc-ping-worker",
+ spi.ignite().name(),
+ pingRmtDcPoolSz,
+ pingRmtDcPoolSz,
+ 0,
+ new LinkedBlockingQueue<>()
+ );
+
+ if (log.isInfoEnabled()) {
+ log.info("Parallel ping of nodes in remote DCs is starting.
Number of nodes to ping: "
+ + nodesToPing.size() + ". Timeout: " +
U.nanosToMillis(rmtDcPingMaxTimeNs - System.nanoTime()) + "ms.");
+ }
+
+ rmtDcPingRes = new ConcurrentHashMap<>(nodesToPing.size(), 1.0f);
+
+ // Mark every node ping started.
+ nodesToPing.forEach(n -> rmtDcPingRes.put(n, -1));
+
+ // In the worst case scenario ping of each node would take the
whole timeout.
+ // If nodes number exceeds the pool size, we need to reduce a
timeout for each ping to make sure that
+ // all nodes are pinged.
+ int batches = nodesToPing.size() /
rmtDcPingPool.getMaximumPoolSize()
+ + (nodesToPing.size() % rmtDcPingPool.getMaximumPoolSize() ==
0 ? 0 : 1);
+
+ for (TcpDiscoveryNode node : nodesToPing)
+ scheduleNodePingJob(node, batches);
+ }
+
+ /** */
+ void scheduleNodePingJob(TcpDiscoveryNode node, int batches) {
+ if (stopRmtDcPing)
+ return;
+
+ synchronized (this) {
+ if (stopRmtDcPing)
+ return;
+
+ try {
+ rmtDcPingPool.execute(() -> pingNodeJob(node, batches));
+ }
+ catch (Throwable t) {
+ log.warning("During the connection recovery, attempt to
ping " + node + " of DC '"
+ + node.dataCenterId() + "' failed.", t);
+ }
+ }
+ }
+
+ /** */
+ void pingNodeJob(TcpDiscoveryNode node, int batches) {
+ // Total allowed ping timeout per batches.
+ double nodePingTimeoutNs = ((failTimeNanos - System.nanoTime()) *
RMT_DC_PING_TIMEOUT_RATIO) / batches;
+
+ Collection<InetSocketAddress> nodeAddrs =
spi.getEffectiveNodeAddresses(node);
+
+ // Timeout per node address.
+ long addrsTimeoutMs = U.nanosToMillis((long)(nodePingTimeoutNs /
nodeAddrs.size()));
+
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Pinging " + node + " of DC '" +
node.dataCenterId() + ". Total time left: "
+ + remoteDcPingTimeLeft() + "ms. Nodes to ping left: "
+ nodesToPingLeft() + '.');
+ }
+
+ for (InetSocketAddress addrs : nodeAddrs) {
+ // There is no guarantee that a job is executed
immediately.
+ if (System.nanoTime() + U.millisToNanos(addrsTimeoutMs) >
failTimeNanos)
+ addrsTimeoutMs = U.nanosToMillis(failTimeNanos -
System.nanoTime());
+
+ if (remoteDcPingStopped() || addrsTimeoutMs < 1)
+ return;
+
+ if (pingNode(addrs, node.id(), null, addrsTimeoutMs,
CONNECTION_RECOVERY_TICKS,
+ RMT_DC_PING_ATTEMPT_DELAY_RATIO, false) != null) {
+ // Mark node pesponded.
+ rmtDcPingRes.put(node, 1);
+
+ if (log.isDebugEnabled())
+ log.debug("Node " + node + " of DC '" +
node.dataCenterId() + "' has responded to the ping.");
+
+ // At least one node's address responded to ping.
+ return;
+ }
+ }
+ }
+ catch (Throwable t) {
+ // No-op.
+ }
+ finally {
+ rmtDcPingRes.compute(node, (n, nodeRes) -> nodeRes == 1 ?
nodeRes : 0);
+
+ if (nodesToPingLeft() == 0)
+ stopRemoteDcPing();
+
+ if (log.isDebugEnabled() && 1 != rmtDcPingRes.get(node)) {
+ log.debug("Node " + node + " of DC '" + node.dataCenterId()
+ + "' hasn't responded to the ping within the timeout "
+ U.nanosToMillis((long)nodePingTimeoutNs) + "ms.");
+ }
+ }
+ }
+
+ /** */
+ void stopRemoteDcPing() {
+ if (stopRmtDcPing)
+ return;
+
+ synchronized (this) {
+ if (stopRmtDcPing)
+ return;
+
+ stopRmtDcPing = true;
+
+ if (rmtDcPingPool != null) {
+ rmtDcPingPool.shutdown();
+ rmtDcPingPool = null;
+ }
+ }
+ }
+
+ /** */
+ private boolean remoteDcPingStopped() {
+ boolean res = stopRmtDcPing ||
Thread.currentThread().isInterrupted() || remoteDcPingTimeLeft() == 0;
+
+ if (res)
+ stopRemoteDcPing();
+
+ return res;
+ }
+
+ /** */
+ private long remoteDcPingTimeLeft() {
+ assert remoteDcPingStarted();
+
+ return Math.max(0, U.nanosToMillis(rmtDcPingMaxTimeNs -
System.nanoTime()));
+ }
+
+ /** */
+ private Collection<UUID> availableNodes() {
+ assert remoteDcPingStarted();
+
+ return rmtDcPingRes.entrySet().stream().filter(e -> 1 ==
e.getValue())
+ .map(e -> e.getKey().id()).collect(toList());
+ }
+
+ /** */
+ private Collection<UUID> unavailableNodes() {
+ assert remoteDcPingStarted();
+
+ return rmtDcPingRes.entrySet().stream().filter(e -> e.getValue()
== null || 0 == e.getValue())
+ .map(e -> e.getKey().id()).collect(toList());
+ }
+
+ /** */
+ public boolean remoteDcPingFinished() {
+ assert remoteDcPingStarted();
+
+ return nodesToPingLeft() == 0;
+ }
+
+ /** */
+ public boolean remoteDcPingStarted() {
+ return rmtDcPingRes != null;
+ }
+
+ /** */
+ private int nodesToPingLeft() {
+ assert remoteDcPingStarted();
+
+ return (int)(rmtDcPingRes.size() -
rmtDcPingRes.values().stream().filter(nodeRes -> -1 != nodeRes).count());
+ }
+
+ /** @return Number of nodes per data center. */
+ private Map<String, Integer> availableNodesPerDc() {
+ Collection<UUID> nodesIds = availableNodes();
+
+ if (nodesIds.isEmpty())
+ return Collections.emptyMap();
+
+ Map<String, Integer> res = U.newHashMap(nodesIds.size());
+
+ for (UUID nid : nodesIds) {
+ res.compute(ring.node(nid).dataCenterId(), (dcId, cnt) -> {
+ if (cnt == null)
+ cnt = 0;
+
+ return ++cnt;
+ });
+ }
+
+ return res;
+ }
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
index 7b5ccd46ec5..b8fc4714930 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
@@ -207,7 +207,7 @@ public class TcpDiscoveryIoSession {
if (e instanceof IgniteCheckedException)
throw (IgniteCheckedException)e;
- throw new IgniteCheckedException(e);
+ throw new IgniteCheckedException("Failed to read a discovery
message.", e);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 2ae75539a1c..16d4578101d 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -300,6 +300,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
/** @see IgniteSystemProperties#IGNITE_DISCOVERY_METRICS_QNT_WARN */
public static final int DFLT_DISCOVERY_METRICS_QNT_WARN = 500;
+ /** */
+ public static final int DFLT_UTLITY_POOL_SIZE = 4;
+
+ /** Pool size to ping remote DC at the connection recovery. */
+ public static final int DFLT_RMT_DC_PING_POOL_SIZE = Math.max(8,
Runtime.getRuntime().availableProcessors() / 2);
+
/** Ssl message pattern for StreamCorruptedException. */
private static Pattern sslMsgPattern = Pattern.compile("invalid stream
header: 150\\d0\\d00");
@@ -1696,7 +1702,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
*/
protected void writeToSocket(
Socket sock,
- TcpDiscoveryAbstractMessage msg,
+ @Nullable TcpDiscoveryAbstractMessage msg,
byte[] data,
long timeout
) throws IOException, IgniteCheckedException {
@@ -2159,7 +2165,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
if (sockTimeout == 0)
sockTimeout = DFLT_SOCK_TIMEOUT;
- impl = new ServerImpl(this, 4);
+ impl = new ServerImpl(this, DFLT_UTLITY_POOL_SIZE,
DFLT_RMT_DC_PING_POOL_SIZE);
}
metricsUpdateFreq = ignite.configuration().getMetricsUpdateFrequency();
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index e6303ee4f0d..7752994cf00 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -48,7 +48,7 @@ import org.jetbrains.annotations.Nullable;
*/
public class TcpDiscoveryNodesRing {
/** */
- private static final boolean mdcAwareRing =
IgniteSystemProperties.getBoolean("MDC_AWARE_RING", true);
+ private static final boolean MDC_AWARE_RING =
IgniteSystemProperties.getBoolean("MDC_AWARE_RING", true);
/** Visible nodes filter. */
public static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new
P1<TcpDiscoveryNode>() {
@@ -512,7 +512,7 @@ public class TcpDiscoveryNodesRing {
Collection<TcpDiscoveryNode> sorted;
- if (mdcAwareRing) {
+ if (MDC_AWARE_RING) {
sorted = new TreeSet<>(new MdcAwareNodesComparator());
sorted.addAll(filtered);
}
@@ -556,7 +556,7 @@ public class TcpDiscoveryNodesRing {
Collection<TcpDiscoveryNode> sorted;
- if (mdcAwareRing) {
+ if (MDC_AWARE_RING) {
sorted = new TreeSet<>(new MdcAwareNodesComparator());
sorted.addAll(filtered);
}
@@ -593,7 +593,7 @@ public class TcpDiscoveryNodesRing {
Collection<TcpDiscoveryNode> sorted;
- if (mdcAwareRing) {
+ if (MDC_AWARE_RING) {
sorted = new TreeSet<>(new MdcAwareNodesComparator());
sorted.addAll(nodes);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/MultiDataCenterSplitTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/MultiDataCenterSplitTest.java
new file mode 100644
index 00000000000..b2d45216d38
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/MultiDataCenterSplitTest.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT;
+import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.junit.Assume.assumeTrue;
+
+/** */
+@RunWith(Parameterized.class)
+public class MultiDataCenterSplitTest extends GridCommonAbstractTest {
+ /** */
+ private static final String DC_ID_0 = "DC0";
+
+ /** */
+ private static final String DC_ID_1 = "DC1";
+
+ /** */
+ private static final String DC_ID_2 = "DC2";
+
+ /** */
+ private Supplier<TcpDiscoverySpi> discoSpiSupplier;
+
+ /** Log listener. */
+ private final ListeningTestLogger listeningLog = new
ListeningTestLogger(log);
+
+ /** Datacenters number. */
+ @Parameterized.Parameter()
+ public int dcCnt;
+
+ /** Nodes number per DC. */
+ @Parameterized.Parameter(1)
+ public int srvrsPerDc;
+
+ /** The ping pool size. */
+ @Parameterized.Parameter(2)
+ public int pingPoolSize;
+
+ /** If wait for full timeout before failure simulation. */
+ @Parameterized.Parameter(3)
+ public boolean fullTimeoutFailure;
+
+ /** */
+ @Parameterized.Parameters(name = "dcCnt={0}, serversPerDc={1},
pingPoolSize={2}, fullTimeoutFailure={3}")
+ public static Collection<Object[]> params() {
+ return cartesianProduct(
+ F.asList(2, 3), // DCs cnt.
+ F.asList(2, 3, 4), // Servers number per DC.
+ F.asList(1, 2, TcpDiscoverySpi.DFLT_RMT_DC_PING_POOL_SIZE), //
Ping pool size.
+ F.asList(true, false) // Full-timeout failure (or fail quickly).
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
+
+ listeningLog.clearListeners();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ // Setup DiscoverySPI.
+ TcpDiscoverySpi discoSpi = discoSpiSupplier.get();
+ discoSpi.setIpFinder(LOCAL_IP_FINDER);
+ cfg.setDiscoverySpi(discoSpi);
+
+ // Disable unnesessary discovery messages.
+ cfg.setMetricsUpdateFrequency(getTestTimeout() * 3);
+ cfg.setClientFailureDetectionTimeout(cfg.getMetricsUpdateFrequency());
+
+ // To block nodes traffic we rely on exact ports.
+ assert ((TcpDiscoverySpi)cfg.getDiscoverySpi()).locPort == DFLT_PORT;
+
+ // Fastens the tests.
+ cfg.setFailureDetectionTimeout(5000);
+
+ cfg.setGridLogger(listeningLog);
+
+ return cfg;
+ }
+
+ /** */
+ @Test
+ public void testConnectionRecoveryWithEntireDCFailure() throws Exception {
+ // Fastens the tests. Also reduces number of flaky tests. JVM/GC
pauses can change or disrupt the supposed
+ // connection recovery strategy. We should avoid to short timeouts.
+ assumeTrue(pingPoolSize <= srvrsPerDc && srvrsPerDc / pingPoolSize <=
2);
+
+ startDCs(dcCnt);
+
+ // Ensure the ports order.
+ for (int g = 0; g < srvrsPerDc * dcCnt; ++g)
+ assertEquals(discoSpi(grid(g)).locNode.discoveryPort(), DFLT_PORT
+ g);
+
+ // Register the log listeners.
+ // There is 2 close-ring-to-local-DC scenarios: 1 - remote DC is
completely pinged and doesn't answer enough
+ // in some time before the connection recovery timeout and before
corner node gets segmented; 2 - corner node
+ // is able to traverse entire remote DC in the connection recovery
timeout;
+ LogListener logStartPing = LogListener.matches("Parallel ping of nodes
in remote DCs is starting. " +
+ "Number of nodes to ping: " + srvrsPerDc * (dcCnt -
1)).times(2).build();
+
+ LogListener logSplit0 = LogListener.matches("No node of the following
remote DCs responded. Considering DCs '"
+ + DC_ID_1 + "' as unavailable").times(1).build();
+
+ LogListener logSplit1 = LogListener.matches("No node of the following
remote DCs responded. Considering DCs '"
+ + DC_ID_0 + ", " + DC_ID_2 + "' as unavailable").times(1).build();
+
+ LogListener logSplit2 = LogListener.matches("During the connection
recovery, all remote DCs have been traversed, "
+ + "none available.").build();
+
+ listeningLog.registerAllListeners(logStartPing, logSplit0, logSplit1,
logSplit2);
+
+ if (log.isInfoEnabled())
+ log.info("Splitting the datacenters...");
+
+ // Check the DCs and break connections between them.
+ for (ClusterNode n : grid(0).cluster().nodes())
+ discoSpi(G.ignite(n.id())).block = true;
+
+ long checkTimeout =
grid(0).configuration().getFailureDetectionTimeout() * 3;
+
+ checkDcSplited(DC_ID_1, null, checkTimeout);
+
+ if (dcCnt == 2)
+ checkDcSplited(DC_ID_0, null, checkTimeout);
+ else {
+ checkDcSplited(DC_ID_0, DC_ID_2, checkTimeout);
+ checkDcSplited(DC_ID_2, DC_ID_0, checkTimeout);
+ }
+
+ if (log.isInfoEnabled())
+ log.info("Waiting for the ping log...");
+
+ // Now we check the logs.
+ assertTrue(logStartPing.check(checkTimeout));
+
+ CountDownLatch logLatch = new CountDownLatch(1);
+
+ runAsync(() -> {
+ if (logSplit0.check(checkTimeout))
+ logLatch.countDown();
+ });
+ runAsync(() -> {
+ if (logSplit1.check(checkTimeout))
+ logLatch.countDown();
+ });
+ runAsync(() -> {
+ if (logSplit2.check(checkTimeout))
+ logLatch.countDown();
+ });
+
+ assertTrue(logLatch.await(checkTimeout, TimeUnit.MILLISECONDS));
+ }
+
+ /** */
+ private void startDCs(int cnt) throws Exception {
+ assert cnt == 2 || cnt == 3;
+
+ if (cnt == 2) {
+ // Start DC0. It misses connection to DC1.
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
DC_ID_0);
+ discoSpiSupplier = () -> testDiscovery(DFLT_PORT + srvrsPerDc,
DFLT_PORT + srvrsPerDc * 2 - 1);
+
+ startGrids(srvrsPerDc);
+
+ // Start DC1. It misses connection to DC0.
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
DC_ID_1);
+ discoSpiSupplier = () -> testDiscovery(DFLT_PORT, DFLT_PORT +
srvrsPerDc - 1);
+
+ for (int g = srvrsPerDc; g < srvrsPerDc << 1; ++g)
+ startGrid(g);
+ }
+ else {
+ // Start DC0. It misses connection to DC1.
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
DC_ID_0);
+ discoSpiSupplier = () -> testDiscovery(DFLT_PORT + srvrsPerDc,
DFLT_PORT + srvrsPerDc * 2 - 1);
+
+ startGrids(srvrsPerDc);
+
+ // Start DC1. It misses connection to DC0 and DC2.
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
DC_ID_1);
+ discoSpiSupplier = () -> testDiscovery(DFLT_PORT, DFLT_PORT +
srvrsPerDc * 3 - 1,
+ DFLT_PORT + srvrsPerDc, DFLT_PORT + srvrsPerDc * 2 - 1);
+
+ for (int g = srvrsPerDc; g < srvrsPerDc * 2; ++g)
+ startGrid(g);
+
+ // Start DC2. It misses connection to DC1.
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
DC_ID_2);
+ discoSpiSupplier = () -> testDiscovery(DFLT_PORT + srvrsPerDc,
DFLT_PORT + srvrsPerDc * 2 - 1);
+
+ for (int g = srvrsPerDc * 2; g < srvrsPerDc * 3; ++g)
+ startGrid(g);
+
+ for (int ig = srvrsPerDc; ig < srvrsPerDc * 2; ++ig)
+ discoSpi(grid(ig)).block = true;
+ }
+ }
+
+ /** Creates the test Discovery SPI. */
+ private TcpDiscoverySpi testDiscovery(int portFrom, int portTo) {
+ assert portTo >= portFrom;
+
+ return new TestTcpDiscoverySpi(IntStream.range(portFrom, portTo +
1).boxed().collect(Collectors.toSet()),
+ fullTimeoutFailure, pingPoolSize);
+ }
+
+ /** Creates the test Discovery SPI. */
+ private TcpDiscoverySpi testDiscovery(int allPortsFrom, int allPortsTo,
int workPortFrom, int workPortTo) {
+ assert allPortsTo >= allPortsFrom;
+ assert workPortFrom >= allPortsFrom;
+ assert workPortTo <= allPortsTo;
+
+ Set<Integer> failedPorts = IntStream.range(allPortsFrom, allPortsTo +
1).filter(p -> p < workPortFrom || p > workPortTo)
+ .boxed().collect(Collectors.toSet());
+
+ return new TestTcpDiscoverySpi(failedPorts, fullTimeoutFailure,
pingPoolSize);
+ }
+
+ /** Check whether datacenter {@code dcId} is separated. If {@code
otherAliveDc} is not {@code null}, these DCs are expected joined. */
+ private void checkDcSplited(String dcId, @Nullable String otherAliveDc,
long timeout) throws IgniteInterruptedCheckedException {
+ assert !dcId.equals(otherAliveDc);
+
+ if (log.isInfoEnabled())
+ log.info("Awaiting for DC is splitted, DC id: " + dcId + '.');
+
+ assertTrue(waitForCondition(() -> {
+ for (Ignite grid : G.allGrids()) {
+ if (!grid.cluster().localNode().dataCenterId().equals(dcId))
+ continue;
+
+ if (grid.cluster().nodes().size() != srvrsPerDc *
(otherAliveDc == null ? 1 : 2))
+ return false;
+
+ int curDcCnt = 0;
+ int otherAliveCnt = 0;
+
+ for (ClusterNode n : grid.cluster().nodes()) {
+ if (n.dataCenterId().equals(dcId))
+ ++curDcCnt;
+
+ if (otherAliveDc != null &&
n.dataCenterId().equals(otherAliveDc))
+ ++otherAliveCnt;
+ }
+
+ if (curDcCnt != srvrsPerDc || (otherAliveDc != null &&
otherAliveCnt != srvrsPerDc))
+ return false;
+ }
+
+ return true;
+ }, timeout, 500));
+ }
+
+ /** */
+ private static TestTcpDiscoverySpi discoSpi(Ignite node) {
+ return (TestTcpDiscoverySpi)node.configuration().getDiscoverySpi();
+ }
+
+ /** */
+ private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ private final Collection<Integer> failedPorts;
+
+ /** */
+ private final int pingPoolSize;
+
+ /** */
+ private final boolean fullTimeoutFailure;
+
+ /** */
+ private volatile boolean block;
+
+ /** */
+ private TestTcpDiscoverySpi(
+ Collection<Integer> failedPorts,
+ boolean fullTimeoutFailure,
+ int pingPoolSize
+ ) {
+ this.failedPorts = failedPorts;
+ this.fullTimeoutFailure = fullTimeoutFailure;
+ this.pingPoolSize = pingPoolSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void initializeImpl() {
+ if (impl != null)
+ return;
+
+ super.initializeImpl();
+
+ // In theory, might be a ClientImpl.
+ if (impl instanceof ServerImpl)
+ impl = new ServerImpl(this, DFLT_UTLITY_POOL_SIZE,
pingPoolSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ tryToBlock(ses.socket(), null, timeout);
+
+ super.writeMessage(ses, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, @Nullable
TcpDiscoveryAbstractMessage msg, byte[] data,
+ long timeout) throws IOException, IgniteCheckedException {
+ tryToBlock(sock, data, timeout);
+
+ super.writeToSocket(sock, msg, data, timeout);
+ }
+
+ /** */
+ private void tryToBlock(
+ Socket sock,
+ @Nullable byte[] data,
+ long timeout
+ ) throws IOException {
+ if (!block)
+ return;
+
+ int rmpPort =
((InetSocketAddress)sock.getRemoteSocketAddress()).getPort();
+
+ if (!failedPorts.contains(rmpPort))
+ return;
+
+ if (data != null && Arrays.equals(U.IGNITE_HEADER, data))
+ return;
+
+ if (log.isDebugEnabled())
+ log.debug("Simulation network delay of " + (fullTimeoutFailure
? timeout : 5) + "ms on " + sock);
+
+ try {
+ U.sleep(fullTimeoutFailure ? timeout : 5);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IOException("Network delay simulation interrupted.",
e);
+ }
+
+ throw new SocketTimeoutException("Simulated timeout.");
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index fce28c1f2a4..10645f5fc25 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -174,7 +174,6 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest
extends TcpClientDiscov
checkNodes(1, 1);
Ignite srvNode = G.ignite("server-0");
- final TcpDiscoverySpi srvSpi =
(TcpDiscoverySpi)srvNode.configuration().getDiscoverySpi();
Ignite clientNode = G.ignite("client-0");
final TcpDiscoverySpi clientSpi =
(TcpDiscoverySpi)clientNode.configuration().getDiscoverySpi();
@@ -228,13 +227,11 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest
extends TcpClientDiscov
checkNodes(3, 0);
Ignite srv0 = G.ignite("server-0");
- final TestTcpDiscoverySpi2 spi0 =
(TestTcpDiscoverySpi2)srv0.configuration().getDiscoverySpi();
final Ignite srv1 = G.ignite("server-1");
final TestTcpDiscoverySpi2 spi1 =
(TestTcpDiscoverySpi2)srv1.configuration().getDiscoverySpi();
Ignite srv2 = G.ignite("server-2");
- final TestTcpDiscoverySpi2 spi2 =
(TestTcpDiscoverySpi2)srv2.configuration().getDiscoverySpi();
long failureTime = U.currentTimeMillis();
@@ -299,7 +296,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest
extends TcpClientDiscov
Thread.sleep(failureDetectionTimeout());
- assertTrue(X.hasCause(firstSpi.err, SocketTimeoutException.class));
+ assertNotNull(X.cause(firstSpi.err, IOException.class));
firstSpi.reset();
secondSpi.reset();
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
index 4175f0b61fa..f87a0753891 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
@@ -406,12 +406,15 @@ public class TcpDiscoveryNetworkIssuesTest extends
GridCommonAbstractTest {
assertTrue(node1AliveStatus.get());
- // Wait a bit until node0 restore connection node1.
- U.sleep(failureDetectionTimeout / 2);
-
// Node 1 must not be kicked.
- for (Ignite ig : G.allGrids())
- assertEquals(3, ig.cluster().nodes().size());
+ assertTrue(waitForCondition(() -> {
+ for (Ignite ig : G.allGrids()) {
+ if (ig.cluster().nodes().size() != 3)
+ return false;
+ }
+
+ return true;
+ }, failureDetectionTimeout * 3));
}
/**
@@ -599,7 +602,7 @@ public class TcpDiscoveryNetworkIssuesTest extends
GridCommonAbstractTest {
// To make the test stable, we want a loopback paddress of the
previous node responds first.
// We don't need a concurrent ping execution.
if (impl instanceof ServerImpl)
- impl = new ServerImpl(this, 1);
+ impl = new ServerImpl(this, 1, DFLT_RMT_DC_PING_POOL_SIZE);
}
/** */
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
index ff37efc0b66..bdd67ce404f 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.spi.MessagesPluginProvider;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -65,7 +66,7 @@ public class TcpDiscoveryPendingMessageDeliveryTest extends
GridCommonAbstractTe
else if (igniteInstanceName.startsWith("receiver"))
disco = new DyingThreadDiscoverySpi();
else
- disco = new TestTcpDiscoverySpi();
+ disco = new NoRingClosingTcpDiscoverySpi();
disco.setIpFinder(sharedStaticIpFinder);
cfg.setDiscoverySpi(disco);
@@ -234,10 +235,36 @@ public class TcpDiscoveryPendingMessageDeliveryTest
extends GridCommonAbstractTe
disco.sendCustomEvent(new DummyCustomDiscoveryMessage());
}
+ /** {@link TcpDiscoverySpi} which doesn't itsring to current DC. */
+ private class NoRingClosingTcpDiscoverySpi extends TestTcpDiscoverySpi {
+ /** {@inheritDoc} */
+ @Override protected void initializeImpl() {
+ if (impl != null)
+ return;
+
+ super.initializeImpl();
+
+ // In theory, might be a ClientImpl.
+ if (impl instanceof ServerImpl) {
+ impl = new ServerImpl(this, DFLT_UTLITY_POOL_SIZE, 0) {
+ @Override protected ServerImpl.RingMessageWorker
createMessageWorker() {
+ return new ServerImpl.RingMessageWorker(impl.log) {
+ @Override protected
ServerImpl.CrossRingMessageSendState createConnectionRecoveryState(
+ TcpDiscoveryNode n) {
+ // Do not start remote DC ping.
+ return new
ServerImpl.CrossRingMessageSendState();
+ }
+ };
+ }
+ };
+ }
+ }
+ }
+
/**
* Discovery SPI, that makes a thread to die when {@code blockMsgs} is set
to {@code true}.
*/
- private class DyingThreadDiscoverySpi extends TestTcpDiscoverySpi {
+ private class DyingThreadDiscoverySpi extends NoRingClosingTcpDiscoverySpi
{
/** {@inheritDoc} */
@Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
if (blockMsgs)
@@ -248,7 +275,7 @@ public class TcpDiscoveryPendingMessageDeliveryTest extends
GridCommonAbstractTe
/**
* Discovery SPI, that makes a node stop sending messages when {@code
blockMsgs} is set to {@code true}.
*/
- private class DyingDiscoverySpi extends TestTcpDiscoverySpi {
+ private class DyingDiscoverySpi extends NoRingClosingTcpDiscoverySpi {
/** {@inheritDoc} */
@Override protected void writeToSocket(
Socket sock,
@@ -282,7 +309,7 @@ public class TcpDiscoveryPendingMessageDeliveryTest extends
GridCommonAbstractTe
/**
*
*/
- private class ListeningDiscoverySpi extends TestTcpDiscoverySpi {
+ private class ListeningDiscoverySpi extends NoRingClosingTcpDiscoverySpi {
/** {@inheritDoc} */
@Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
if (ensured(msg))
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index fa55157bc17..5b85617c0fc 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -38,6 +38,7 @@ import
org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownS
import
org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownTest;
import org.apache.ignite.spi.discovery.tcp.IgniteMetricsOverflowTest;
import org.apache.ignite.spi.discovery.tcp.MultiDataCenterRingTest;
+import org.apache.ignite.spi.discovery.tcp.MultiDataCenterSplitTest;
import
org.apache.ignite.spi.discovery.tcp.TcpClientDiscoveryMarshallerCheckSelfTest;
import
org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiCoordinatorChangeTest;
import
org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiFailureTimeoutSelfTest;
@@ -203,6 +204,7 @@ import static
org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
TcpDiscoveryPendingMessageDeliveryMdcReversedTest.class,
MultiDataCenterDeploymentTest.class,
MultiDataCenterRingTest.class,
+ MultiDataCenterSplitTest.class,
MultiDataCenterClientRoutingTest.class
})
public class IgniteSpiDiscoverySelfTestSuite {