sergey-chugunov-1985 commented on code in PR #12729:
URL: https://github.com/apache/ignite/pull/12729#discussion_r3241024990
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3894,6 +3968,107 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
Review Comment:
Please use this text for comment:
"Edge node scenario. The next node belongs to a neighboring DC, which may be
completely unavailable.
To avoid sequential node failures within the current DC, we need to
determine if the neighboring DC is reachable.
We perform parallel pings to nodes in the other DC using the same timeout."
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3894,6 +3968,107 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
+ // To prevent sequential nodes failure in current DC we have to
guess whether we can reach neighbour DC.
+ // We ping nodes from another DC within the same timeout in
parallel.
+ if (!F.isEmpty(locNode.dataCenterId())) {
+ assert !F.isEmpty(next.dataCenterId());
+
+ if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+ Stream<TcpDiscoveryNode> otherDcsSrvrs =
ring.serverNodes().stream()
+ .filter(n ->
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+ synchronized (mux) {
+ otherDcsSrvrs = otherDcsSrvrs.filter(n ->
!failedNodes.containsKey(n));
+
+
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+ }
+ }
+ }
+
+ return recoveryState;
+ }
+
+ /** @return {@code True} if current node fails to recover the ring
connection. */
+ private boolean checkConnectionRecoveryFailed(
+ CrossRingMessageSendState connRecoverState,
+ List<TcpDiscoveryNode> failedNodes
+ ) {
+ if (connRecoverState.timeout()) {
+ // Ensure of the ping pool release.
+ connRecoverState.stopRemoteDcPing();
+
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return true;
+ }
+
+ if (!connRecoverState.remoteDcPingStarted() ||
!connRecoverState.remoteDcPingFinished()
+ || connRecoverState.unavailableDCs != null)
+ return false;
+
+ // Remote DC statuses: alive or not, Dc id -> true/false.
+ Collection<String> rmtDcIds =
connRecoverState.rmtDcPingRes.keySet().stream().map(ClusterNode::dataCenterId)
Review Comment:
Please introduce additional method to ConnectionRecoveryState instead of
exposing rmtDcPingRes and performing opeations on it. Also I think comment
above this line is incorrect.
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3894,6 +3968,107 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
+ // To prevent sequential nodes failure in current DC we have to
guess whether we can reach neighbour DC.
+ // We ping nodes from another DC within the same timeout in
parallel.
+ if (!F.isEmpty(locNode.dataCenterId())) {
+ assert !F.isEmpty(next.dataCenterId());
+
+ if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+ Stream<TcpDiscoveryNode> otherDcsSrvrs =
ring.serverNodes().stream()
+ .filter(n ->
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+ synchronized (mux) {
+ otherDcsSrvrs = otherDcsSrvrs.filter(n ->
!failedNodes.containsKey(n));
+
+
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+ }
+ }
+ }
+
+ return recoveryState;
+ }
+
+ /** @return {@code True} if current node fails to recover the ring
connection. */
+ private boolean checkConnectionRecoveryFailed(
+ CrossRingMessageSendState connRecoverState,
+ List<TcpDiscoveryNode> failedNodes
+ ) {
+ if (connRecoverState.timeout()) {
+ // Ensure of the ping pool release.
+ connRecoverState.stopRemoteDcPing();
+
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return true;
+ }
+
+ if (!connRecoverState.remoteDcPingStarted() ||
!connRecoverState.remoteDcPingFinished()
+ || connRecoverState.unavailableDCs != null)
+ return false;
+
+ // Remote DC statuses: alive or not, Dc id -> true/false.
+ Collection<String> rmtDcIds =
connRecoverState.rmtDcPingRes.keySet().stream().map(ClusterNode::dataCenterId)
+ .collect(Collectors.toSet());
+
+ Collection<String> failedDCs = U.newHashSet(rmtDcIds.size());
+
+ Map<String, Integer> availablePerDC =
countPerDC(connRecoverState.availableNodes());
Review Comment:
```suggestion
Map<String, Integer> aliveNodesPerDC =
countPerDC(connRecoverState.availableNodes());
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3894,6 +3968,107 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
+ // To prevent sequential nodes failure in current DC we have to
guess whether we can reach neighbour DC.
+ // We ping nodes from another DC within the same timeout in
parallel.
+ if (!F.isEmpty(locNode.dataCenterId())) {
+ assert !F.isEmpty(next.dataCenterId());
+
+ if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+ Stream<TcpDiscoveryNode> otherDcsSrvrs =
ring.serverNodes().stream()
+ .filter(n ->
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+ synchronized (mux) {
+ otherDcsSrvrs = otherDcsSrvrs.filter(n ->
!failedNodes.containsKey(n));
+
+
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+ }
+ }
+ }
+
+ return recoveryState;
+ }
+
+ /** @return {@code True} if current node fails to recover the ring
connection. */
+ private boolean checkConnectionRecoveryFailed(
+ CrossRingMessageSendState connRecoverState,
Review Comment:
```suggestion
CrossRingMessageSendState connRecoveryState,
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3894,6 +3968,107 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
+ // To prevent sequential nodes failure in current DC we have to
guess whether we can reach neighbour DC.
+ // We ping nodes from another DC within the same timeout in
parallel.
+ if (!F.isEmpty(locNode.dataCenterId())) {
+ assert !F.isEmpty(next.dataCenterId());
+
+ if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+ Stream<TcpDiscoveryNode> otherDcsSrvrs =
ring.serverNodes().stream()
+ .filter(n ->
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+ synchronized (mux) {
+ otherDcsSrvrs = otherDcsSrvrs.filter(n ->
!failedNodes.containsKey(n));
+
+
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+ }
+ }
+ }
+
+ return recoveryState;
+ }
+
+ /** @return {@code True} if current node fails to recover the ring
connection. */
+ private boolean checkConnectionRecoveryFailed(
+ CrossRingMessageSendState connRecoverState,
+ List<TcpDiscoveryNode> failedNodes
+ ) {
+ if (connRecoverState.timeout()) {
+ // Ensure of the ping pool release.
+ connRecoverState.stopRemoteDcPing();
+
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return true;
+ }
+
+ if (!connRecoverState.remoteDcPingStarted() ||
!connRecoverState.remoteDcPingFinished()
+ || connRecoverState.unavailableDCs != null)
+ return false;
+
+ // Remote DC statuses: alive or not, Dc id -> true/false.
+ Collection<String> rmtDcIds =
connRecoverState.rmtDcPingRes.keySet().stream().map(ClusterNode::dataCenterId)
+ .collect(Collectors.toSet());
+
+ Collection<String> failedDCs = U.newHashSet(rmtDcIds.size());
+
+ Map<String, Integer> availablePerDC =
countPerDC(connRecoverState.availableNodes());
+
+ for (String dcId : rmtDcIds) {
+ int availCnt =
Optional.ofNullable(availablePerDC.get(dcId)).orElse(0);
+
+ if (availCnt == 0)
+ failedDCs.add(dcId);
+ }
+
+ String msg = "During the connection recovery, nodes ping of DCs '"
+ String.join(", ", rmtDcIds)
+ + "' from current edge node has finished. Responded nodes: " +
connRecoverState.availableNodes()
+ + ". Unavailable nodes: " + connRecoverState.unavailableNodes()
Review Comment:
```suggestion
+ ", unavailable nodes: " +
connRecoverState.unavailableNodes()
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -6357,6 +6561,28 @@ private void checkConnection() {
}
}
+ /** @return {@code True} if we've tries all the other nodes and should
close the ring to current DC. */
Review Comment:
```suggestion
/** @return {@code True} if we've checked all nodes from remote DCs and
should close the ring to local DC. */
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3894,6 +3968,107 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
+ // To prevent sequential nodes failure in current DC we have to
guess whether we can reach neighbour DC.
+ // We ping nodes from another DC within the same timeout in
parallel.
+ if (!F.isEmpty(locNode.dataCenterId())) {
+ assert !F.isEmpty(next.dataCenterId());
+
+ if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+ Stream<TcpDiscoveryNode> otherDcsSrvrs =
ring.serverNodes().stream()
+ .filter(n ->
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+ synchronized (mux) {
+ otherDcsSrvrs = otherDcsSrvrs.filter(n ->
!failedNodes.containsKey(n));
+
+
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+ }
+ }
+ }
+
+ return recoveryState;
+ }
+
+ /** @return {@code True} if current node fails to recover the ring
connection. */
+ private boolean checkConnectionRecoveryFailed(
+ CrossRingMessageSendState connRecoverState,
+ List<TcpDiscoveryNode> failedNodes
+ ) {
+ if (connRecoverState.timeout()) {
+ // Ensure of the ping pool release.
+ connRecoverState.stopRemoteDcPing();
+
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return true;
+ }
+
+ if (!connRecoverState.remoteDcPingStarted() ||
!connRecoverState.remoteDcPingFinished()
+ || connRecoverState.unavailableDCs != null)
+ return false;
+
+ // Remote DC statuses: alive or not, Dc id -> true/false.
+ Collection<String> rmtDcIds =
connRecoverState.rmtDcPingRes.keySet().stream().map(ClusterNode::dataCenterId)
+ .collect(Collectors.toSet());
+
+ Collection<String> failedDCs = U.newHashSet(rmtDcIds.size());
+
+ Map<String, Integer> availablePerDC =
countPerDC(connRecoverState.availableNodes());
+
+ for (String dcId : rmtDcIds) {
+ int availCnt =
Optional.ofNullable(availablePerDC.get(dcId)).orElse(0);
Review Comment:
```suggestion
int aliveNodesCnt =
Optional.ofNullable(availablePerDC.get(dcId)).orElse(0);
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -6357,6 +6561,28 @@ private void checkConnection() {
}
}
+ /** @return {@code True} if we've tries all the other nodes and should
close the ring to current DC. */
+ private boolean allRemoteDCsTraversed(
+ @Nullable CrossRingMessageSendState connRecoverState,
+ Collection<TcpDiscoveryNode> failedNodes,
+ TcpDiscoveryNode nextTried
Review Comment:
```suggestion
TcpDiscoveryNode currentNextNode
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3894,6 +3968,107 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
+ // To prevent sequential nodes failure in current DC we have to
guess whether we can reach neighbour DC.
+ // We ping nodes from another DC within the same timeout in
parallel.
+ if (!F.isEmpty(locNode.dataCenterId())) {
+ assert !F.isEmpty(next.dataCenterId());
+
+ if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+ Stream<TcpDiscoveryNode> otherDcsSrvrs =
ring.serverNodes().stream()
+ .filter(n ->
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+ synchronized (mux) {
+ otherDcsSrvrs = otherDcsSrvrs.filter(n ->
!failedNodes.containsKey(n));
+
+
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+ }
+ }
+ }
+
+ return recoveryState;
+ }
+
+ /** @return {@code True} if current node fails to recover the ring
connection. */
+ private boolean checkConnectionRecoveryFailed(
+ CrossRingMessageSendState connRecoverState,
+ List<TcpDiscoveryNode> failedNodes
+ ) {
+ if (connRecoverState.timeout()) {
+ // Ensure of the ping pool release.
+ connRecoverState.stopRemoteDcPing();
+
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return true;
+ }
+
+ if (!connRecoverState.remoteDcPingStarted() ||
!connRecoverState.remoteDcPingFinished()
+ || connRecoverState.unavailableDCs != null)
+ return false;
+
+ // Remote DC statuses: alive or not, Dc id -> true/false.
+ Collection<String> rmtDcIds =
connRecoverState.rmtDcPingRes.keySet().stream().map(ClusterNode::dataCenterId)
+ .collect(Collectors.toSet());
+
+ Collection<String> failedDCs = U.newHashSet(rmtDcIds.size());
+
+ Map<String, Integer> availablePerDC =
countPerDC(connRecoverState.availableNodes());
+
+ for (String dcId : rmtDcIds) {
+ int availCnt =
Optional.ofNullable(availablePerDC.get(dcId)).orElse(0);
+
+ if (availCnt == 0)
+ failedDCs.add(dcId);
+ }
+
+ String msg = "During the connection recovery, nodes ping of DCs '"
+ String.join(", ", rmtDcIds)
+ + "' from current edge node has finished. Responded nodes: " +
connRecoverState.availableNodes()
+ + ". Unavailable nodes: " + connRecoverState.unavailableNodes()
+ + ". Time to recover the ring connection left: "
+ + Math.max(0, U.nanosToMillis(connRecoverState.failTimeNanos -
System.nanoTime())) + "ms.";
+
+ if (failedDCs.isEmpty()) {
+ msg += " At least one node of each DC has responded. Keep
trying to reconnect to the ring.";
+
+ if (log.isInfoEnabled())
+ log.info(msg);
+ }
+ else {
+ msg += " No node of the following remote DCs responded.
Considering DCs '" + String.join(", ", failedDCs)
+ + "' unavailable. Due to the remote DC unavailability
policy, current node will try to skip those DCs.";
Review Comment:
```suggestion
+ "' as unavailable. Current node will skip those DCs
and close the ring without them.";
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -6782,20 +7009,39 @@ else if (req.previousNodeId() != null) {
liveAddr = checkConnection(previous,
backwardCheckTimeout);
}
- ok = liveAddr != null;
+ prevNodeIsAvailable = liveAddr != null;
- assert !(ok &&
liveAddr.getAddress().isLoopbackAddress() &&
spi.locNodeAddrs.contains(liveAddr));
+ assert !(prevNodeIsAvailable &&
liveAddr.getAddress().isLoopbackAddress()
+ && spi.locNodeAddrs.contains(liveAddr));
}
- res.previousNodeAlive(ok);
+ if (forcedConnection) {
+ // If new node is considered as failed or if it is
not in the ring we answer with
+ // the previous status is ok meaning that
connection is impossible.
+ synchronized (mux) {
+ prevNodeIsAvailable =
failedNodes.keySet().stream().anyMatch(n -> n.id().equals(nodeId));
+ }
+
+ String msg0 = "Incoming node [id=" + nodeId + "]
has requested a forced connection " +
+ "(without checking the previous node). This
may happen if a corner node in local DC " +
+ "tries to close ring into this DC.";
Review Comment:
```suggestion
"tries to close the ring into this DC.";
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8147,8 +8387,31 @@ private class CrossRingMessageSendState {
private final long failTimeNanos;
/**
- *
+ * Decision upon results of remote DC ping. {@code Null} if state of
remote DC is not estimated yet.
Review Comment:
Please rewrite this block. This collection is not a decision, its just a
collection of DC_IDs of data centers that are considered unavailable after a
parallel ping procedure.
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8232,6 +8479,185 @@ boolean markLastFailedNodeAlive() {
@Override public String toString() {
return S.toString(CrossRingMessageSendState.class, this);
}
+
+ /** */
+ void pingRemoteDCs(List<TcpDiscoveryNode> nodesToPing) {
+ assert !remoteDcPingStarted();
+ assert !F.isEmpty(nodesToPing);
+
+ rmtDcPingMaxTimeNs = System.nanoTime() + (long)((failTimeNanos -
System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO);
+
+ rmtDcPingPool = new IgniteThreadPoolExecutor(
+ "disco-remote-dc-ping-worker",
+ spi.ignite().name(),
+ pingRmtDcPoolSz,
+ pingRmtDcPoolSz,
+ 0,
+ new LinkedBlockingQueue<>()
+ );
+
+ if (log.isInfoEnabled()) {
+ log.info("During the connection recovery, starting ping of the
remote DCs. Nodes number to ping: "
Review Comment:
```suggestion
log.info("Parallel ping of nodes in remote DCs is starting.
Number of nodes to ping: "
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -6782,20 +7009,39 @@ else if (req.previousNodeId() != null) {
liveAddr = checkConnection(previous,
backwardCheckTimeout);
}
- ok = liveAddr != null;
+ prevNodeIsAvailable = liveAddr != null;
- assert !(ok &&
liveAddr.getAddress().isLoopbackAddress() &&
spi.locNodeAddrs.contains(liveAddr));
+ assert !(prevNodeIsAvailable &&
liveAddr.getAddress().isLoopbackAddress()
+ && spi.locNodeAddrs.contains(liveAddr));
}
- res.previousNodeAlive(ok);
+ if (forcedConnection) {
+ // If new node is considered as failed or if it is
not in the ring we answer with
+ // the previous status is ok meaning that
connection is impossible.
+ synchronized (mux) {
+ prevNodeIsAvailable =
failedNodes.keySet().stream().anyMatch(n -> n.id().equals(nodeId));
+ }
+
+ String msg0 = "Incoming node [id=" + nodeId + "]
has requested a forced connection " +
+ "(without checking the previous node). This
may happen if a corner node in local DC " +
+ "tries to close ring into this DC.";
- if (log.isInfoEnabled()) {
- log.info("Previous node alive status [alive=" + ok
+
+ if (prevNodeIsAvailable)
+ log.warning(msg0 + " But the node is already
considered as failed. Denying.");
+ else if (prevNodeIsAvailable =
ring.serverNodes().stream().noneMatch(n -> n.id().equals(nodeId)))
Review Comment:
Assignment right in the if statement is confusing, please rewrite this code.
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8232,6 +8479,185 @@ boolean markLastFailedNodeAlive() {
@Override public String toString() {
return S.toString(CrossRingMessageSendState.class, this);
}
+
+ /** */
+ void pingRemoteDCs(List<TcpDiscoveryNode> nodesToPing) {
+ assert !remoteDcPingStarted();
+ assert !F.isEmpty(nodesToPing);
+
+ rmtDcPingMaxTimeNs = System.nanoTime() + (long)((failTimeNanos -
System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO);
+
+ rmtDcPingPool = new IgniteThreadPoolExecutor(
+ "disco-remote-dc-ping-worker",
+ spi.ignite().name(),
+ pingRmtDcPoolSz,
+ pingRmtDcPoolSz,
+ 0,
+ new LinkedBlockingQueue<>()
+ );
+
+ if (log.isInfoEnabled()) {
+ log.info("During the connection recovery, starting ping of the
remote DCs. Nodes number to ping: "
+ + nodesToPing.size() + ". Timeout: " +
U.nanosToMillis(rmtDcPingMaxTimeNs - System.nanoTime()) + "ms.");
+ }
+
+ rmtDcPingRes = new ConcurrentHashMap<>(nodesToPing.size(), 1.0f);
+
+ // Mark every node ping started.
+ nodesToPing.forEach(n -> rmtDcPingRes.put(n, -1));
+
+ // We should assume that the ping won't receive any response or
exception and would spend its whole timeout.
+ // If the nodes number exceeds the pool size we have to split the
timeout and enqueue the ping jobs.
+ int steps = nodesToPing.size() / rmtDcPingPool.getMaximumPoolSize()
Review Comment:
```suggestion
int batches= nodesToPing.size() /
rmtDcPingPool.getMaximumPoolSize()
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8232,6 +8479,185 @@ boolean markLastFailedNodeAlive() {
@Override public String toString() {
return S.toString(CrossRingMessageSendState.class, this);
}
+
+ /** */
+ void pingRemoteDCs(List<TcpDiscoveryNode> nodesToPing) {
+ assert !remoteDcPingStarted();
+ assert !F.isEmpty(nodesToPing);
+
+ rmtDcPingMaxTimeNs = System.nanoTime() + (long)((failTimeNanos -
System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO);
+
+ rmtDcPingPool = new IgniteThreadPoolExecutor(
+ "disco-remote-dc-ping-worker",
+ spi.ignite().name(),
+ pingRmtDcPoolSz,
+ pingRmtDcPoolSz,
+ 0,
+ new LinkedBlockingQueue<>()
+ );
+
+ if (log.isInfoEnabled()) {
+ log.info("During the connection recovery, starting ping of the
remote DCs. Nodes number to ping: "
+ + nodesToPing.size() + ". Timeout: " +
U.nanosToMillis(rmtDcPingMaxTimeNs - System.nanoTime()) + "ms.");
+ }
+
+ rmtDcPingRes = new ConcurrentHashMap<>(nodesToPing.size(), 1.0f);
+
+ // Mark every node ping started.
+ nodesToPing.forEach(n -> rmtDcPingRes.put(n, -1));
+
+ // We should assume that the ping won't receive any response or
exception and would spend its whole timeout.
+ // If the nodes number exceeds the pool size we have to split the
timeout and enqueue the ping jobs.
+ int steps = nodesToPing.size() / rmtDcPingPool.getMaximumPoolSize()
+ + (nodesToPing.size() % rmtDcPingPool.getMaximumPoolSize() ==
0 ? 0 : 1);
+
+ for (TcpDiscoveryNode node : nodesToPing)
+ scheduleNodePingJob(node, steps);
+ }
+
+ /** */
+ void scheduleNodePingJob(TcpDiscoveryNode node, int steps) {
+ if (stopRmtDcPing)
+ return;
+
+ synchronized (this) {
+ if (stopRmtDcPing)
+ return;
+
+ try {
+ rmtDcPingPool.execute(() -> pingNodeJob(node, steps));
+ }
+ catch (Throwable t) {
+ log.warning("During the connection recovery, attempt to
ping " + node + " of DC '"
+ + node.dataCenterId() + "' failed.", t);
+ }
+ }
+ }
+
+ /** */
+ void pingNodeJob(TcpDiscoveryNode node, int steps) {
+ // Total allowed ping timeout per step.
+ double stepTimeoutNs = ((failTimeNanos - System.nanoTime()) *
RMT_DC_PING_TIMEOUT_RATIO) / steps;
+
+ Collection<InetSocketAddress> nodeAddrs =
spi.getEffectiveNodeAddresses(node);
+
+ // Tmeout per node address.
+ long addrsTimeoutMs = U.nanosToMillis((long)(stepTimeoutNs /
nodeAddrs.size()));
+
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("During the connection recovery, pinging " +
node + " of DC '" + node.dataCenterId()
+ + ". Total time left (ms): " + remoteDcPingTimeLeft()
+ ". Nodes to ping left: " + nodesToPingLeft() + '.');
+ }
+
+ for (InetSocketAddress addrs : nodeAddrs) {
+ // There is no guarantee that a job is executed
immediately.
+ if (System.nanoTime() + U.millisToNanos(addrsTimeoutMs) >
failTimeNanos)
+ addrsTimeoutMs = U.nanosToMillis(failTimeNanos -
System.nanoTime());
+
+ if (remoteDcPingStopped() || addrsTimeoutMs < 1)
+ return;
+
+ if (pingNode(addrs, node.id(), null, addrsTimeoutMs,
CONNECTION_RECOVERY_TICKS,
+ RMT_DC_PING_ATTEMPT_DELAY_RATIO, false) != null) {
+ // Mark node has pesponded.
+ rmtDcPingRes.put(node, 1);
+
+ if (log.isDebugEnabled()) {
+ log.debug("During the connection recovery, node "
+ node + " of DC '" + node.dataCenterId()
+ + "' has responded to the ping.");
+ }
+
+ // At least one node's address reached.
Review Comment:
```suggestion
// At least one node's address responded to ping.
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8232,6 +8479,185 @@ boolean markLastFailedNodeAlive() {
@Override public String toString() {
return S.toString(CrossRingMessageSendState.class, this);
}
+
+ /** */
+ void pingRemoteDCs(List<TcpDiscoveryNode> nodesToPing) {
+ assert !remoteDcPingStarted();
+ assert !F.isEmpty(nodesToPing);
+
+ rmtDcPingMaxTimeNs = System.nanoTime() + (long)((failTimeNanos -
System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO);
+
+ rmtDcPingPool = new IgniteThreadPoolExecutor(
+ "disco-remote-dc-ping-worker",
+ spi.ignite().name(),
+ pingRmtDcPoolSz,
+ pingRmtDcPoolSz,
+ 0,
+ new LinkedBlockingQueue<>()
+ );
+
+ if (log.isInfoEnabled()) {
+ log.info("During the connection recovery, starting ping of the
remote DCs. Nodes number to ping: "
+ + nodesToPing.size() + ". Timeout: " +
U.nanosToMillis(rmtDcPingMaxTimeNs - System.nanoTime()) + "ms.");
+ }
+
+ rmtDcPingRes = new ConcurrentHashMap<>(nodesToPing.size(), 1.0f);
+
+ // Mark every node ping started.
+ nodesToPing.forEach(n -> rmtDcPingRes.put(n, -1));
+
+ // We should assume that the ping won't receive any response or
exception and would spend its whole timeout.
+ // If the nodes number exceeds the pool size we have to split the
timeout and enqueue the ping jobs.
+ int steps = nodesToPing.size() / rmtDcPingPool.getMaximumPoolSize()
+ + (nodesToPing.size() % rmtDcPingPool.getMaximumPoolSize() ==
0 ? 0 : 1);
+
+ for (TcpDiscoveryNode node : nodesToPing)
+ scheduleNodePingJob(node, steps);
+ }
+
+ /** */
+ void scheduleNodePingJob(TcpDiscoveryNode node, int steps) {
+ if (stopRmtDcPing)
+ return;
+
+ synchronized (this) {
+ if (stopRmtDcPing)
+ return;
+
+ try {
+ rmtDcPingPool.execute(() -> pingNodeJob(node, steps));
+ }
+ catch (Throwable t) {
+ log.warning("During the connection recovery, attempt to
ping " + node + " of DC '"
+ + node.dataCenterId() + "' failed.", t);
+ }
+ }
+ }
+
+ /** */
+ void pingNodeJob(TcpDiscoveryNode node, int steps) {
+ // Total allowed ping timeout per step.
+ double stepTimeoutNs = ((failTimeNanos - System.nanoTime()) *
RMT_DC_PING_TIMEOUT_RATIO) / steps;
+
+ Collection<InetSocketAddress> nodeAddrs =
spi.getEffectiveNodeAddresses(node);
+
+ // Tmeout per node address.
+ long addrsTimeoutMs = U.nanosToMillis((long)(stepTimeoutNs /
nodeAddrs.size()));
+
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("During the connection recovery, pinging " +
node + " of DC '" + node.dataCenterId()
+ + ". Total time left (ms): " + remoteDcPingTimeLeft()
+ ". Nodes to ping left: " + nodesToPingLeft() + '.');
+ }
+
+ for (InetSocketAddress addrs : nodeAddrs) {
+ // There is no guarantee that a job is executed
immediately.
+ if (System.nanoTime() + U.millisToNanos(addrsTimeoutMs) >
failTimeNanos)
+ addrsTimeoutMs = U.nanosToMillis(failTimeNanos -
System.nanoTime());
+
+ if (remoteDcPingStopped() || addrsTimeoutMs < 1)
+ return;
+
+ if (pingNode(addrs, node.id(), null, addrsTimeoutMs,
CONNECTION_RECOVERY_TICKS,
+ RMT_DC_PING_ATTEMPT_DELAY_RATIO, false) != null) {
+ // Mark node has pesponded.
+ rmtDcPingRes.put(node, 1);
+
+ if (log.isDebugEnabled()) {
+ log.debug("During the connection recovery, node "
+ node + " of DC '" + node.dataCenterId()
Review Comment:
```suggestion
log.debug("Node " + node + " of DC '" +
node.dataCenterId()
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3894,6 +3968,107 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
+ // To prevent sequential nodes failure in current DC we have to
guess whether we can reach neighbour DC.
+ // We ping nodes from another DC within the same timeout in
parallel.
+ if (!F.isEmpty(locNode.dataCenterId())) {
+ assert !F.isEmpty(next.dataCenterId());
+
+ if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+ Stream<TcpDiscoveryNode> otherDcsSrvrs =
ring.serverNodes().stream()
+ .filter(n ->
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+ synchronized (mux) {
+ otherDcsSrvrs = otherDcsSrvrs.filter(n ->
!failedNodes.containsKey(n));
+
+
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+ }
+ }
+ }
+
+ return recoveryState;
+ }
+
+ /** @return {@code True} if current node fails to recover the ring
connection. */
+ private boolean checkConnectionRecoveryFailed(
+ CrossRingMessageSendState connRecoverState,
+ List<TcpDiscoveryNode> failedNodes
+ ) {
+ if (connRecoverState.timeout()) {
+ // Ensure of the ping pool release.
+ connRecoverState.stopRemoteDcPing();
+
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return true;
+ }
+
+ if (!connRecoverState.remoteDcPingStarted() ||
!connRecoverState.remoteDcPingFinished()
+ || connRecoverState.unavailableDCs != null)
+ return false;
+
+ // Remote DC statuses: alive or not, Dc id -> true/false.
+ Collection<String> rmtDcIds =
connRecoverState.rmtDcPingRes.keySet().stream().map(ClusterNode::dataCenterId)
+ .collect(Collectors.toSet());
+
+ Collection<String> failedDCs = U.newHashSet(rmtDcIds.size());
+
+ Map<String, Integer> availablePerDC =
countPerDC(connRecoverState.availableNodes());
Review Comment:
Moving `countPerDc` method to CrossRingMessageSendState class would make API
a bit better. Also `countPerDc` isn't very informative name too, please change
it to something like `aliveNodesCountPerDc`
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3894,6 +3968,107 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
+ // To prevent sequential nodes failure in current DC we have to
guess whether we can reach neighbour DC.
+ // We ping nodes from another DC within the same timeout in
parallel.
+ if (!F.isEmpty(locNode.dataCenterId())) {
+ assert !F.isEmpty(next.dataCenterId());
+
+ if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+ Stream<TcpDiscoveryNode> otherDcsSrvrs =
ring.serverNodes().stream()
+ .filter(n ->
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+ synchronized (mux) {
+ otherDcsSrvrs = otherDcsSrvrs.filter(n ->
!failedNodes.containsKey(n));
+
+
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+ }
+ }
+ }
+
+ return recoveryState;
+ }
+
+ /** @return {@code True} if current node fails to recover the ring
connection. */
+ private boolean checkConnectionRecoveryFailed(
+ CrossRingMessageSendState connRecoverState,
+ List<TcpDiscoveryNode> failedNodes
+ ) {
+ if (connRecoverState.timeout()) {
+ // Ensure of the ping pool release.
+ connRecoverState.stopRemoteDcPing();
+
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return true;
+ }
+
+ if (!connRecoverState.remoteDcPingStarted() ||
!connRecoverState.remoteDcPingFinished()
+ || connRecoverState.unavailableDCs != null)
+ return false;
+
+ // Remote DC statuses: alive or not, Dc id -> true/false.
+ Collection<String> rmtDcIds =
connRecoverState.rmtDcPingRes.keySet().stream().map(ClusterNode::dataCenterId)
+ .collect(Collectors.toSet());
+
+ Collection<String> failedDCs = U.newHashSet(rmtDcIds.size());
+
+ Map<String, Integer> availablePerDC =
countPerDC(connRecoverState.availableNodes());
+
+ for (String dcId : rmtDcIds) {
+ int availCnt =
Optional.ofNullable(availablePerDC.get(dcId)).orElse(0);
+
+ if (availCnt == 0)
+ failedDCs.add(dcId);
+ }
+
+ String msg = "During the connection recovery, nodes ping of DCs '"
+ String.join(", ", rmtDcIds)
+ + "' from current edge node has finished. Responded nodes: " +
connRecoverState.availableNodes()
Review Comment:
```suggestion
+ "' from current edge node has finished. Alive nodes: " +
connRecoverState.availableNodes()
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3894,6 +3968,107 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
+ // To prevent sequential nodes failure in current DC we have to
guess whether we can reach neighbour DC.
+ // We ping nodes from another DC within the same timeout in
parallel.
+ if (!F.isEmpty(locNode.dataCenterId())) {
+ assert !F.isEmpty(next.dataCenterId());
+
+ if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+ Stream<TcpDiscoveryNode> otherDcsSrvrs =
ring.serverNodes().stream()
+ .filter(n ->
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+ synchronized (mux) {
+ otherDcsSrvrs = otherDcsSrvrs.filter(n ->
!failedNodes.containsKey(n));
+
+
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+ }
+ }
+ }
+
+ return recoveryState;
+ }
+
+ /** @return {@code True} if current node fails to recover the ring
connection. */
+ private boolean checkConnectionRecoveryFailed(
+ CrossRingMessageSendState connRecoverState,
+ List<TcpDiscoveryNode> failedNodes
+ ) {
+ if (connRecoverState.timeout()) {
+ // Ensure of the ping pool release.
+ connRecoverState.stopRemoteDcPing();
+
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return true;
+ }
+
+ if (!connRecoverState.remoteDcPingStarted() ||
!connRecoverState.remoteDcPingFinished()
+ || connRecoverState.unavailableDCs != null)
+ return false;
+
+ // Remote DC statuses: alive or not, Dc id -> true/false.
+ Collection<String> rmtDcIds =
connRecoverState.rmtDcPingRes.keySet().stream().map(ClusterNode::dataCenterId)
+ .collect(Collectors.toSet());
+
+ Collection<String> failedDCs = U.newHashSet(rmtDcIds.size());
+
+ Map<String, Integer> availablePerDC =
countPerDC(connRecoverState.availableNodes());
+
+ for (String dcId : rmtDcIds) {
+ int availCnt =
Optional.ofNullable(availablePerDC.get(dcId)).orElse(0);
+
+ if (availCnt == 0)
+ failedDCs.add(dcId);
+ }
+
+ String msg = "During the connection recovery, nodes ping of DCs '"
+ String.join(", ", rmtDcIds)
+ + "' from current edge node has finished. Responded nodes: " +
connRecoverState.availableNodes()
+ + ". Unavailable nodes: " + connRecoverState.unavailableNodes()
+ + ". Time to recover the ring connection left: "
Review Comment:
```suggestion
+ ". Time left to recover the ring connection: "
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3894,6 +3968,107 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
+ // To prevent sequential nodes failure in current DC we have to
guess whether we can reach neighbour DC.
+ // We ping nodes from another DC within the same timeout in
parallel.
+ if (!F.isEmpty(locNode.dataCenterId())) {
+ assert !F.isEmpty(next.dataCenterId());
+
+ if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+ Stream<TcpDiscoveryNode> otherDcsSrvrs =
ring.serverNodes().stream()
+ .filter(n ->
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+ synchronized (mux) {
+ otherDcsSrvrs = otherDcsSrvrs.filter(n ->
!failedNodes.containsKey(n));
+
+
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+ }
+ }
+ }
+
+ return recoveryState;
+ }
+
+ /** @return {@code True} if current node fails to recover the ring
connection. */
+ private boolean checkConnectionRecoveryFailed(
+ CrossRingMessageSendState connRecoverState,
+ List<TcpDiscoveryNode> failedNodes
+ ) {
+ if (connRecoverState.timeout()) {
+ // Ensure of the ping pool release.
+ connRecoverState.stopRemoteDcPing();
+
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return true;
+ }
+
+ if (!connRecoverState.remoteDcPingStarted() ||
!connRecoverState.remoteDcPingFinished()
+ || connRecoverState.unavailableDCs != null)
+ return false;
+
+ // Remote DC statuses: alive or not, Dc id -> true/false.
+ Collection<String> rmtDcIds =
connRecoverState.rmtDcPingRes.keySet().stream().map(ClusterNode::dataCenterId)
+ .collect(Collectors.toSet());
+
+ Collection<String> failedDCs = U.newHashSet(rmtDcIds.size());
+
+ Map<String, Integer> availablePerDC =
countPerDC(connRecoverState.availableNodes());
+
+ for (String dcId : rmtDcIds) {
+ int availCnt =
Optional.ofNullable(availablePerDC.get(dcId)).orElse(0);
+
+ if (availCnt == 0)
+ failedDCs.add(dcId);
+ }
+
+ String msg = "During the connection recovery, nodes ping of DCs '"
+ String.join(", ", rmtDcIds)
+ + "' from current edge node has finished. Responded nodes: " +
connRecoverState.availableNodes()
+ + ". Unavailable nodes: " + connRecoverState.unavailableNodes()
+ + ". Time to recover the ring connection left: "
+ + Math.max(0, U.nanosToMillis(connRecoverState.failTimeNanos -
System.nanoTime())) + "ms.";
+
+ if (failedDCs.isEmpty()) {
+ msg += " At least one node of each DC has responded. Keep
trying to reconnect to the ring.";
+
+ if (log.isInfoEnabled())
+ log.info(msg);
+ }
+ else {
+ msg += " No node of the following remote DCs responded.
Considering DCs '" + String.join(", ", failedDCs)
+ + "' unavailable. Due to the remote DC unavailability
policy, current node will try to skip those DCs.";
+
+ log.warning(msg);
+
+ skipDCs(connRecoverState, failedDCs, failedNodes);
+ }
+
+ return false;
+ }
+
+ /** @return Noes numbers per data canter. */
Review Comment:
```suggestion
/** @return Number of nodes per data center. */
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -6782,20 +7009,39 @@ else if (req.previousNodeId() != null) {
liveAddr = checkConnection(previous,
backwardCheckTimeout);
}
- ok = liveAddr != null;
+ prevNodeIsAvailable = liveAddr != null;
- assert !(ok &&
liveAddr.getAddress().isLoopbackAddress() &&
spi.locNodeAddrs.contains(liveAddr));
+ assert !(prevNodeIsAvailable &&
liveAddr.getAddress().isLoopbackAddress()
+ && spi.locNodeAddrs.contains(liveAddr));
}
- res.previousNodeAlive(ok);
+ if (forcedConnection) {
+ // If new node is considered as failed or if it is
not in the ring we answer with
+ // the previous status is ok meaning that
connection is impossible.
+ synchronized (mux) {
+ prevNodeIsAvailable =
failedNodes.keySet().stream().anyMatch(n -> n.id().equals(nodeId));
+ }
+
+ String msg0 = "Incoming node [id=" + nodeId + "]
has requested a forced connection " +
+ "(without checking the previous node). This
may happen if a corner node in local DC " +
Review Comment:
```suggestion
"without checking the previous node. This
may happen if an edge node in local DC " +
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3894,6 +3968,107 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
+ // To prevent sequential nodes failure in current DC we have to
guess whether we can reach neighbour DC.
+ // We ping nodes from another DC within the same timeout in
parallel.
+ if (!F.isEmpty(locNode.dataCenterId())) {
+ assert !F.isEmpty(next.dataCenterId());
+
+ if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+ Stream<TcpDiscoveryNode> otherDcsSrvrs =
ring.serverNodes().stream()
+ .filter(n ->
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+ synchronized (mux) {
+ otherDcsSrvrs = otherDcsSrvrs.filter(n ->
!failedNodes.containsKey(n));
+
+
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+ }
+ }
+ }
+
+ return recoveryState;
+ }
+
+ /** @return {@code True} if current node fails to recover the ring
connection. */
+ private boolean checkConnectionRecoveryFailed(
+ CrossRingMessageSendState connRecoverState,
+ List<TcpDiscoveryNode> failedNodes
+ ) {
+ if (connRecoverState.timeout()) {
+ // Ensure of the ping pool release.
+ connRecoverState.stopRemoteDcPing();
+
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return true;
+ }
+
+ if (!connRecoverState.remoteDcPingStarted() ||
!connRecoverState.remoteDcPingFinished()
+ || connRecoverState.unavailableDCs != null)
+ return false;
+
+ // Remote DC statuses: alive or not, Dc id -> true/false.
+ Collection<String> rmtDcIds =
connRecoverState.rmtDcPingRes.keySet().stream().map(ClusterNode::dataCenterId)
+ .collect(Collectors.toSet());
+
+ Collection<String> failedDCs = U.newHashSet(rmtDcIds.size());
+
+ Map<String, Integer> availablePerDC =
countPerDC(connRecoverState.availableNodes());
+
+ for (String dcId : rmtDcIds) {
+ int availCnt =
Optional.ofNullable(availablePerDC.get(dcId)).orElse(0);
+
+ if (availCnt == 0)
+ failedDCs.add(dcId);
+ }
+
+ String msg = "During the connection recovery, nodes ping of DCs '"
+ String.join(", ", rmtDcIds)
+ + "' from current edge node has finished. Responded nodes: " +
connRecoverState.availableNodes()
+ + ". Unavailable nodes: " + connRecoverState.unavailableNodes()
+ + ". Time to recover the ring connection left: "
+ + Math.max(0, U.nanosToMillis(connRecoverState.failTimeNanos -
System.nanoTime())) + "ms.";
+
+ if (failedDCs.isEmpty()) {
+ msg += " At least one node of each DC has responded. Keep
trying to reconnect to the ring.";
Review Comment:
```suggestion
msg += " At least one node from each DC has responded.
Connection recovery will keep trying to restore the ring.";
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -6357,6 +6561,28 @@ private void checkConnection() {
}
}
+ /** @return {@code True} if we've tries all the other nodes and should
close the ring to current DC. */
+ private boolean allRemoteDCsTraversed(
+ @Nullable CrossRingMessageSendState connRecoverState,
+ Collection<TcpDiscoveryNode> failedNodes,
+ TcpDiscoveryNode nextTried
+ ) {
+ String locDcId = locNode.dataCenterId();
+
+ // Do not make any decision if: no ping started at all; the ping isn't
finished yet; a decision is already made.
+ if (connRecoverState == null || locDcId == null ||
!connRecoverState.remoteDcPingStarted()
+ || locDcId.equals(nextTried.dataCenterId()))
+ return false;
+
+ assert failedNodes.contains(nextTried);
+
+ TcpDiscoveryNode nextNext = ring.nextNode(failedNodes);
+ String nextNextDcId = nextNext == null ? null :
nextNext.dataCenterId();
+
+ // We just met local DC again or even we met a third DC.
Review Comment:
Please rephrase this comment, its meaning is unclear.
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8147,8 +8387,31 @@ private class CrossRingMessageSendState {
private final long failTimeNanos;
/**
- *
+ * Decision upon results of remote DC ping. {@code Null} if state of
remote DC is not estimated yet.
+ * Not {@code null} but empty if all the remote DCs are considered
available.
+ * Not {@code null} and not empty if contains ids of remoted DCs
considered unavailable.
+ */
+ private @Nullable Collection<String> unavailableDCs;
+
+ /** Nodes ping results. */
Review Comment:
Please elaborate on this. Which code corresponds to which result.
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -6357,6 +6561,28 @@ private void checkConnection() {
}
}
+ /** @return {@code True} if we've tries all the other nodes and should
close the ring to current DC. */
+ private boolean allRemoteDCsTraversed(
+ @Nullable CrossRingMessageSendState connRecoverState,
+ Collection<TcpDiscoveryNode> failedNodes,
+ TcpDiscoveryNode nextTried
+ ) {
+ String locDcId = locNode.dataCenterId();
+
+ // Do not make any decision if: no ping started at all; the ping isn't
finished yet; a decision is already made.
+ if (connRecoverState == null || locDcId == null ||
!connRecoverState.remoteDcPingStarted()
+ || locDcId.equals(nextTried.dataCenterId()))
+ return false;
+
+ assert failedNodes.contains(nextTried);
+
+ TcpDiscoveryNode nextNext = ring.nextNode(failedNodes);
Review Comment:
```suggestion
TcpDiscoveryNode newNext = ring.nextNode(failedNodes);
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8232,6 +8479,185 @@ boolean markLastFailedNodeAlive() {
@Override public String toString() {
return S.toString(CrossRingMessageSendState.class, this);
}
+
+ /** */
+ void pingRemoteDCs(List<TcpDiscoveryNode> nodesToPing) {
+ assert !remoteDcPingStarted();
+ assert !F.isEmpty(nodesToPing);
+
+ rmtDcPingMaxTimeNs = System.nanoTime() + (long)((failTimeNanos -
System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO);
+
+ rmtDcPingPool = new IgniteThreadPoolExecutor(
+ "disco-remote-dc-ping-worker",
+ spi.ignite().name(),
+ pingRmtDcPoolSz,
+ pingRmtDcPoolSz,
+ 0,
+ new LinkedBlockingQueue<>()
+ );
+
+ if (log.isInfoEnabled()) {
+ log.info("During the connection recovery, starting ping of the
remote DCs. Nodes number to ping: "
+ + nodesToPing.size() + ". Timeout: " +
U.nanosToMillis(rmtDcPingMaxTimeNs - System.nanoTime()) + "ms.");
+ }
+
+ rmtDcPingRes = new ConcurrentHashMap<>(nodesToPing.size(), 1.0f);
+
+ // Mark every node ping started.
+ nodesToPing.forEach(n -> rmtDcPingRes.put(n, -1));
+
+ // We should assume that the ping won't receive any response or
exception and would spend its whole timeout.
Review Comment:
How about rephrasing this comment:
"In the worst case scenario ping of each node would take the whole timeout.
But if number of nodes exceeds number of threads in the pool, we need to
reduce a timeout for each ping
to make sure that all nodes are pinged"
?
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8232,6 +8479,185 @@ boolean markLastFailedNodeAlive() {
@Override public String toString() {
return S.toString(CrossRingMessageSendState.class, this);
}
+
+ /** */
+ void pingRemoteDCs(List<TcpDiscoveryNode> nodesToPing) {
+ assert !remoteDcPingStarted();
+ assert !F.isEmpty(nodesToPing);
+
+ rmtDcPingMaxTimeNs = System.nanoTime() + (long)((failTimeNanos -
System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO);
+
+ rmtDcPingPool = new IgniteThreadPoolExecutor(
+ "disco-remote-dc-ping-worker",
+ spi.ignite().name(),
+ pingRmtDcPoolSz,
+ pingRmtDcPoolSz,
+ 0,
+ new LinkedBlockingQueue<>()
+ );
+
+ if (log.isInfoEnabled()) {
+ log.info("During the connection recovery, starting ping of the
remote DCs. Nodes number to ping: "
+ + nodesToPing.size() + ". Timeout: " +
U.nanosToMillis(rmtDcPingMaxTimeNs - System.nanoTime()) + "ms.");
+ }
+
+ rmtDcPingRes = new ConcurrentHashMap<>(nodesToPing.size(), 1.0f);
+
+ // Mark every node ping started.
+ nodesToPing.forEach(n -> rmtDcPingRes.put(n, -1));
+
+ // We should assume that the ping won't receive any response or
exception and would spend its whole timeout.
+ // If the nodes number exceeds the pool size we have to split the
timeout and enqueue the ping jobs.
+ int steps = nodesToPing.size() / rmtDcPingPool.getMaximumPoolSize()
+ + (nodesToPing.size() % rmtDcPingPool.getMaximumPoolSize() ==
0 ? 0 : 1);
+
+ for (TcpDiscoveryNode node : nodesToPing)
+ scheduleNodePingJob(node, steps);
+ }
+
+ /** */
+ void scheduleNodePingJob(TcpDiscoveryNode node, int steps) {
+ if (stopRmtDcPing)
+ return;
+
+ synchronized (this) {
+ if (stopRmtDcPing)
+ return;
+
+ try {
+ rmtDcPingPool.execute(() -> pingNodeJob(node, steps));
+ }
+ catch (Throwable t) {
+ log.warning("During the connection recovery, attempt to
ping " + node + " of DC '"
+ + node.dataCenterId() + "' failed.", t);
+ }
+ }
+ }
+
+ /** */
+ void pingNodeJob(TcpDiscoveryNode node, int steps) {
+ // Total allowed ping timeout per step.
+ double stepTimeoutNs = ((failTimeNanos - System.nanoTime()) *
RMT_DC_PING_TIMEOUT_RATIO) / steps;
+
+ Collection<InetSocketAddress> nodeAddrs =
spi.getEffectiveNodeAddresses(node);
+
+ // Tmeout per node address.
Review Comment:
```suggestion
// Timeout per node address.
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8232,6 +8479,185 @@ boolean markLastFailedNodeAlive() {
@Override public String toString() {
return S.toString(CrossRingMessageSendState.class, this);
}
+
+ /** */
+ void pingRemoteDCs(List<TcpDiscoveryNode> nodesToPing) {
+ assert !remoteDcPingStarted();
+ assert !F.isEmpty(nodesToPing);
+
+ rmtDcPingMaxTimeNs = System.nanoTime() + (long)((failTimeNanos -
System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO);
+
+ rmtDcPingPool = new IgniteThreadPoolExecutor(
+ "disco-remote-dc-ping-worker",
+ spi.ignite().name(),
+ pingRmtDcPoolSz,
+ pingRmtDcPoolSz,
+ 0,
+ new LinkedBlockingQueue<>()
+ );
+
+ if (log.isInfoEnabled()) {
+ log.info("During the connection recovery, starting ping of the
remote DCs. Nodes number to ping: "
+ + nodesToPing.size() + ". Timeout: " +
U.nanosToMillis(rmtDcPingMaxTimeNs - System.nanoTime()) + "ms.");
+ }
+
+ rmtDcPingRes = new ConcurrentHashMap<>(nodesToPing.size(), 1.0f);
+
+ // Mark every node ping started.
+ nodesToPing.forEach(n -> rmtDcPingRes.put(n, -1));
+
+ // We should assume that the ping won't receive any response or
exception and would spend its whole timeout.
+ // If the nodes number exceeds the pool size we have to split the
timeout and enqueue the ping jobs.
+ int steps = nodesToPing.size() / rmtDcPingPool.getMaximumPoolSize()
+ + (nodesToPing.size() % rmtDcPingPool.getMaximumPoolSize() ==
0 ? 0 : 1);
+
+ for (TcpDiscoveryNode node : nodesToPing)
+ scheduleNodePingJob(node, steps);
+ }
+
+ /** */
+ void scheduleNodePingJob(TcpDiscoveryNode node, int steps) {
+ if (stopRmtDcPing)
+ return;
+
+ synchronized (this) {
+ if (stopRmtDcPing)
+ return;
+
+ try {
+ rmtDcPingPool.execute(() -> pingNodeJob(node, steps));
+ }
+ catch (Throwable t) {
+ log.warning("During the connection recovery, attempt to
ping " + node + " of DC '"
+ + node.dataCenterId() + "' failed.", t);
+ }
+ }
+ }
+
+ /** */
+ void pingNodeJob(TcpDiscoveryNode node, int steps) {
+ // Total allowed ping timeout per step.
+ double stepTimeoutNs = ((failTimeNanos - System.nanoTime()) *
RMT_DC_PING_TIMEOUT_RATIO) / steps;
+
+ Collection<InetSocketAddress> nodeAddrs =
spi.getEffectiveNodeAddresses(node);
+
+ // Tmeout per node address.
+ long addrsTimeoutMs = U.nanosToMillis((long)(stepTimeoutNs /
nodeAddrs.size()));
+
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("During the connection recovery, pinging " +
node + " of DC '" + node.dataCenterId()
Review Comment:
```suggestion
log.debug("Pinging " + node + " of DC '" +
node.dataCenterId()
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8168,10 +8431,10 @@ boolean isBackward() {
}
/**
- * @return {@code True} if state is {@link
RingMessageSendState#FAILED}.
+ * @return {@code True} if state timeout expired.
*/
- boolean isFailed() {
- return state == RingMessageSendState.FAILED;
+ boolean timeout() {
Review Comment:
```suggestion
boolean timeoutReached() {
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8232,6 +8479,185 @@ boolean markLastFailedNodeAlive() {
@Override public String toString() {
return S.toString(CrossRingMessageSendState.class, this);
}
+
+ /** */
+ void pingRemoteDCs(List<TcpDiscoveryNode> nodesToPing) {
+ assert !remoteDcPingStarted();
+ assert !F.isEmpty(nodesToPing);
+
+ rmtDcPingMaxTimeNs = System.nanoTime() + (long)((failTimeNanos -
System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO);
+
+ rmtDcPingPool = new IgniteThreadPoolExecutor(
+ "disco-remote-dc-ping-worker",
+ spi.ignite().name(),
+ pingRmtDcPoolSz,
+ pingRmtDcPoolSz,
+ 0,
+ new LinkedBlockingQueue<>()
+ );
+
+ if (log.isInfoEnabled()) {
+ log.info("During the connection recovery, starting ping of the
remote DCs. Nodes number to ping: "
+ + nodesToPing.size() + ". Timeout: " +
U.nanosToMillis(rmtDcPingMaxTimeNs - System.nanoTime()) + "ms.");
+ }
+
+ rmtDcPingRes = new ConcurrentHashMap<>(nodesToPing.size(), 1.0f);
+
+ // Mark every node ping started.
+ nodesToPing.forEach(n -> rmtDcPingRes.put(n, -1));
+
+ // We should assume that the ping won't receive any response or
exception and would spend its whole timeout.
+ // If the nodes number exceeds the pool size we have to split the
timeout and enqueue the ping jobs.
+ int steps = nodesToPing.size() / rmtDcPingPool.getMaximumPoolSize()
+ + (nodesToPing.size() % rmtDcPingPool.getMaximumPoolSize() ==
0 ? 0 : 1);
+
+ for (TcpDiscoveryNode node : nodesToPing)
+ scheduleNodePingJob(node, steps);
+ }
+
+ /** */
+ void scheduleNodePingJob(TcpDiscoveryNode node, int steps) {
+ if (stopRmtDcPing)
+ return;
+
+ synchronized (this) {
+ if (stopRmtDcPing)
+ return;
+
+ try {
+ rmtDcPingPool.execute(() -> pingNodeJob(node, steps));
+ }
+ catch (Throwable t) {
+ log.warning("During the connection recovery, attempt to
ping " + node + " of DC '"
+ + node.dataCenterId() + "' failed.", t);
+ }
+ }
+ }
+
+ /** */
+ void pingNodeJob(TcpDiscoveryNode node, int steps) {
+ // Total allowed ping timeout per step.
+ double stepTimeoutNs = ((failTimeNanos - System.nanoTime()) *
RMT_DC_PING_TIMEOUT_RATIO) / steps;
Review Comment:
```suggestion
double nodePingTimeoutNs = ((failTimeNanos - System.nanoTime())
* RMT_DC_PING_TIMEOUT_RATIO) / steps;
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8232,6 +8479,185 @@ boolean markLastFailedNodeAlive() {
@Override public String toString() {
return S.toString(CrossRingMessageSendState.class, this);
}
+
+ /** */
+ void pingRemoteDCs(List<TcpDiscoveryNode> nodesToPing) {
+ assert !remoteDcPingStarted();
+ assert !F.isEmpty(nodesToPing);
+
+ rmtDcPingMaxTimeNs = System.nanoTime() + (long)((failTimeNanos -
System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO);
+
+ rmtDcPingPool = new IgniteThreadPoolExecutor(
+ "disco-remote-dc-ping-worker",
+ spi.ignite().name(),
+ pingRmtDcPoolSz,
+ pingRmtDcPoolSz,
+ 0,
+ new LinkedBlockingQueue<>()
+ );
+
+ if (log.isInfoEnabled()) {
+ log.info("During the connection recovery, starting ping of the
remote DCs. Nodes number to ping: "
+ + nodesToPing.size() + ". Timeout: " +
U.nanosToMillis(rmtDcPingMaxTimeNs - System.nanoTime()) + "ms.");
+ }
+
+ rmtDcPingRes = new ConcurrentHashMap<>(nodesToPing.size(), 1.0f);
+
+ // Mark every node ping started.
+ nodesToPing.forEach(n -> rmtDcPingRes.put(n, -1));
+
+ // We should assume that the ping won't receive any response or
exception and would spend its whole timeout.
+ // If the nodes number exceeds the pool size we have to split the
timeout and enqueue the ping jobs.
+ int steps = nodesToPing.size() / rmtDcPingPool.getMaximumPoolSize()
+ + (nodesToPing.size() % rmtDcPingPool.getMaximumPoolSize() ==
0 ? 0 : 1);
+
+ for (TcpDiscoveryNode node : nodesToPing)
+ scheduleNodePingJob(node, steps);
+ }
+
+ /** */
+ void scheduleNodePingJob(TcpDiscoveryNode node, int steps) {
+ if (stopRmtDcPing)
+ return;
+
+ synchronized (this) {
+ if (stopRmtDcPing)
+ return;
+
+ try {
+ rmtDcPingPool.execute(() -> pingNodeJob(node, steps));
+ }
+ catch (Throwable t) {
+ log.warning("During the connection recovery, attempt to
ping " + node + " of DC '"
+ + node.dataCenterId() + "' failed.", t);
+ }
+ }
+ }
+
+ /** */
+ void pingNodeJob(TcpDiscoveryNode node, int steps) {
+ // Total allowed ping timeout per step.
+ double stepTimeoutNs = ((failTimeNanos - System.nanoTime()) *
RMT_DC_PING_TIMEOUT_RATIO) / steps;
+
+ Collection<InetSocketAddress> nodeAddrs =
spi.getEffectiveNodeAddresses(node);
+
+ // Tmeout per node address.
+ long addrsTimeoutMs = U.nanosToMillis((long)(stepTimeoutNs /
nodeAddrs.size()));
+
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("During the connection recovery, pinging " +
node + " of DC '" + node.dataCenterId()
+ + ". Total time left (ms): " + remoteDcPingTimeLeft()
+ ". Nodes to ping left: " + nodesToPingLeft() + '.');
+ }
+
+ for (InetSocketAddress addrs : nodeAddrs) {
+ // There is no guarantee that a job is executed
immediately.
+ if (System.nanoTime() + U.millisToNanos(addrsTimeoutMs) >
failTimeNanos)
+ addrsTimeoutMs = U.nanosToMillis(failTimeNanos -
System.nanoTime());
+
+ if (remoteDcPingStopped() || addrsTimeoutMs < 1)
+ return;
+
+ if (pingNode(addrs, node.id(), null, addrsTimeoutMs,
CONNECTION_RECOVERY_TICKS,
+ RMT_DC_PING_ATTEMPT_DELAY_RATIO, false) != null) {
+ // Mark node has pesponded.
Review Comment:
```suggestion
// Mark node responded.
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8232,6 +8479,185 @@ boolean markLastFailedNodeAlive() {
@Override public String toString() {
return S.toString(CrossRingMessageSendState.class, this);
}
+
+ /** */
+ void pingRemoteDCs(List<TcpDiscoveryNode> nodesToPing) {
+ assert !remoteDcPingStarted();
+ assert !F.isEmpty(nodesToPing);
+
+ rmtDcPingMaxTimeNs = System.nanoTime() + (long)((failTimeNanos -
System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO);
+
+ rmtDcPingPool = new IgniteThreadPoolExecutor(
+ "disco-remote-dc-ping-worker",
+ spi.ignite().name(),
+ pingRmtDcPoolSz,
+ pingRmtDcPoolSz,
+ 0,
+ new LinkedBlockingQueue<>()
+ );
+
+ if (log.isInfoEnabled()) {
+ log.info("During the connection recovery, starting ping of the
remote DCs. Nodes number to ping: "
+ + nodesToPing.size() + ". Timeout: " +
U.nanosToMillis(rmtDcPingMaxTimeNs - System.nanoTime()) + "ms.");
+ }
+
+ rmtDcPingRes = new ConcurrentHashMap<>(nodesToPing.size(), 1.0f);
+
+ // Mark every node ping started.
+ nodesToPing.forEach(n -> rmtDcPingRes.put(n, -1));
+
+ // We should assume that the ping won't receive any response or
exception and would spend its whole timeout.
+ // If the nodes number exceeds the pool size we have to split the
timeout and enqueue the ping jobs.
+ int steps = nodesToPing.size() / rmtDcPingPool.getMaximumPoolSize()
+ + (nodesToPing.size() % rmtDcPingPool.getMaximumPoolSize() ==
0 ? 0 : 1);
+
+ for (TcpDiscoveryNode node : nodesToPing)
+ scheduleNodePingJob(node, steps);
+ }
+
+ /** */
+ void scheduleNodePingJob(TcpDiscoveryNode node, int steps) {
+ if (stopRmtDcPing)
+ return;
+
+ synchronized (this) {
+ if (stopRmtDcPing)
+ return;
+
+ try {
+ rmtDcPingPool.execute(() -> pingNodeJob(node, steps));
+ }
+ catch (Throwable t) {
+ log.warning("During the connection recovery, attempt to
ping " + node + " of DC '"
+ + node.dataCenterId() + "' failed.", t);
+ }
+ }
+ }
+
+ /** */
+ void pingNodeJob(TcpDiscoveryNode node, int steps) {
Review Comment:
```suggestion
void pingNodeJob(TcpDiscoveryNode node, int batches) {
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8232,6 +8479,185 @@ boolean markLastFailedNodeAlive() {
@Override public String toString() {
return S.toString(CrossRingMessageSendState.class, this);
}
+
+ /** */
+ void pingRemoteDCs(List<TcpDiscoveryNode> nodesToPing) {
+ assert !remoteDcPingStarted();
+ assert !F.isEmpty(nodesToPing);
+
+ rmtDcPingMaxTimeNs = System.nanoTime() + (long)((failTimeNanos -
System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO);
+
+ rmtDcPingPool = new IgniteThreadPoolExecutor(
+ "disco-remote-dc-ping-worker",
+ spi.ignite().name(),
+ pingRmtDcPoolSz,
+ pingRmtDcPoolSz,
+ 0,
+ new LinkedBlockingQueue<>()
+ );
+
+ if (log.isInfoEnabled()) {
+ log.info("During the connection recovery, starting ping of the
remote DCs. Nodes number to ping: "
+ + nodesToPing.size() + ". Timeout: " +
U.nanosToMillis(rmtDcPingMaxTimeNs - System.nanoTime()) + "ms.");
+ }
+
+ rmtDcPingRes = new ConcurrentHashMap<>(nodesToPing.size(), 1.0f);
+
+ // Mark every node ping started.
+ nodesToPing.forEach(n -> rmtDcPingRes.put(n, -1));
+
+ // We should assume that the ping won't receive any response or
exception and would spend its whole timeout.
+ // If the nodes number exceeds the pool size we have to split the
timeout and enqueue the ping jobs.
+ int steps = nodesToPing.size() / rmtDcPingPool.getMaximumPoolSize()
+ + (nodesToPing.size() % rmtDcPingPool.getMaximumPoolSize() ==
0 ? 0 : 1);
+
+ for (TcpDiscoveryNode node : nodesToPing)
+ scheduleNodePingJob(node, steps);
+ }
+
+ /** */
+ void scheduleNodePingJob(TcpDiscoveryNode node, int steps) {
+ if (stopRmtDcPing)
+ return;
+
+ synchronized (this) {
+ if (stopRmtDcPing)
+ return;
+
+ try {
+ rmtDcPingPool.execute(() -> pingNodeJob(node, steps));
+ }
+ catch (Throwable t) {
+ log.warning("During the connection recovery, attempt to
ping " + node + " of DC '"
+ + node.dataCenterId() + "' failed.", t);
+ }
+ }
+ }
+
+ /** */
+ void pingNodeJob(TcpDiscoveryNode node, int steps) {
+ // Total allowed ping timeout per step.
+ double stepTimeoutNs = ((failTimeNanos - System.nanoTime()) *
RMT_DC_PING_TIMEOUT_RATIO) / steps;
+
+ Collection<InetSocketAddress> nodeAddrs =
spi.getEffectiveNodeAddresses(node);
+
+ // Tmeout per node address.
+ long addrsTimeoutMs = U.nanosToMillis((long)(stepTimeoutNs /
nodeAddrs.size()));
+
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("During the connection recovery, pinging " +
node + " of DC '" + node.dataCenterId()
+ + ". Total time left (ms): " + remoteDcPingTimeLeft()
+ ". Nodes to ping left: " + nodesToPingLeft() + '.');
Review Comment:
```suggestion
+ ". Total time left: " + remoteDcPingTimeLeft() +
"ms. Nodes to ping left: " + nodesToPingLeft() + '.');
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8232,6 +8479,185 @@ boolean markLastFailedNodeAlive() {
@Override public String toString() {
return S.toString(CrossRingMessageSendState.class, this);
}
+
+ /** */
+ void pingRemoteDCs(List<TcpDiscoveryNode> nodesToPing) {
+ assert !remoteDcPingStarted();
+ assert !F.isEmpty(nodesToPing);
+
+ rmtDcPingMaxTimeNs = System.nanoTime() + (long)((failTimeNanos -
System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO);
+
+ rmtDcPingPool = new IgniteThreadPoolExecutor(
+ "disco-remote-dc-ping-worker",
+ spi.ignite().name(),
+ pingRmtDcPoolSz,
+ pingRmtDcPoolSz,
+ 0,
+ new LinkedBlockingQueue<>()
+ );
+
+ if (log.isInfoEnabled()) {
+ log.info("During the connection recovery, starting ping of the
remote DCs. Nodes number to ping: "
+ + nodesToPing.size() + ". Timeout: " +
U.nanosToMillis(rmtDcPingMaxTimeNs - System.nanoTime()) + "ms.");
+ }
+
+ rmtDcPingRes = new ConcurrentHashMap<>(nodesToPing.size(), 1.0f);
+
+ // Mark every node ping started.
+ nodesToPing.forEach(n -> rmtDcPingRes.put(n, -1));
+
+ // We should assume that the ping won't receive any response or
exception and would spend its whole timeout.
+ // If the nodes number exceeds the pool size we have to split the
timeout and enqueue the ping jobs.
+ int steps = nodesToPing.size() / rmtDcPingPool.getMaximumPoolSize()
+ + (nodesToPing.size() % rmtDcPingPool.getMaximumPoolSize() ==
0 ? 0 : 1);
+
+ for (TcpDiscoveryNode node : nodesToPing)
+ scheduleNodePingJob(node, steps);
+ }
+
+ /** */
+ void scheduleNodePingJob(TcpDiscoveryNode node, int steps) {
+ if (stopRmtDcPing)
+ return;
+
+ synchronized (this) {
+ if (stopRmtDcPing)
+ return;
+
+ try {
+ rmtDcPingPool.execute(() -> pingNodeJob(node, steps));
+ }
+ catch (Throwable t) {
+ log.warning("During the connection recovery, attempt to
ping " + node + " of DC '"
+ + node.dataCenterId() + "' failed.", t);
+ }
+ }
+ }
+
+ /** */
+ void pingNodeJob(TcpDiscoveryNode node, int steps) {
+ // Total allowed ping timeout per step.
+ double stepTimeoutNs = ((failTimeNanos - System.nanoTime()) *
RMT_DC_PING_TIMEOUT_RATIO) / steps;
+
+ Collection<InetSocketAddress> nodeAddrs =
spi.getEffectiveNodeAddresses(node);
+
+ // Tmeout per node address.
+ long addrsTimeoutMs = U.nanosToMillis((long)(stepTimeoutNs /
nodeAddrs.size()));
+
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("During the connection recovery, pinging " +
node + " of DC '" + node.dataCenterId()
+ + ". Total time left (ms): " + remoteDcPingTimeLeft()
+ ". Nodes to ping left: " + nodesToPingLeft() + '.');
+ }
+
+ for (InetSocketAddress addrs : nodeAddrs) {
+ // There is no guarantee that a job is executed
immediately.
+ if (System.nanoTime() + U.millisToNanos(addrsTimeoutMs) >
failTimeNanos)
+ addrsTimeoutMs = U.nanosToMillis(failTimeNanos -
System.nanoTime());
+
+ if (remoteDcPingStopped() || addrsTimeoutMs < 1)
+ return;
+
+ if (pingNode(addrs, node.id(), null, addrsTimeoutMs,
CONNECTION_RECOVERY_TICKS,
+ RMT_DC_PING_ATTEMPT_DELAY_RATIO, false) != null) {
+ // Mark node has pesponded.
+ rmtDcPingRes.put(node, 1);
+
+ if (log.isDebugEnabled()) {
+ log.debug("During the connection recovery, node "
+ node + " of DC '" + node.dataCenterId()
+ + "' has responded to the ping.");
+ }
+
+ // At least one node's address reached.
+ return;
+ }
+ }
+ }
+ catch (Throwable t) {
+ // No-op.
+ }
+ finally {
+ rmtDcPingRes.compute(node, (n, nodeRes) -> nodeRes == 1 ?
nodeRes : 0);
+
+ if (nodesToPingLeft() == 0)
+ stopRemoteDcPing();
+
+ if (log.isDebugEnabled() && 1 != rmtDcPingRes.get(node)) {
+ log.debug("During the connection recovery, node " + node +
" of DC '" + node.dataCenterId()
+ + "' didn't respond to the ping within the timeout " +
addrsTimeoutMs + "ms.");
Review Comment:
```suggestion
+ "' hasn't responded to the ping within the timeout
" + U.nanosToMillis((long)nodePingTimeoutNs) + "ms.");
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -8232,6 +8479,185 @@ boolean markLastFailedNodeAlive() {
@Override public String toString() {
return S.toString(CrossRingMessageSendState.class, this);
}
+
+ /** */
+ void pingRemoteDCs(List<TcpDiscoveryNode> nodesToPing) {
+ assert !remoteDcPingStarted();
+ assert !F.isEmpty(nodesToPing);
+
+ rmtDcPingMaxTimeNs = System.nanoTime() + (long)((failTimeNanos -
System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO);
+
+ rmtDcPingPool = new IgniteThreadPoolExecutor(
+ "disco-remote-dc-ping-worker",
+ spi.ignite().name(),
+ pingRmtDcPoolSz,
+ pingRmtDcPoolSz,
+ 0,
+ new LinkedBlockingQueue<>()
+ );
+
+ if (log.isInfoEnabled()) {
+ log.info("During the connection recovery, starting ping of the
remote DCs. Nodes number to ping: "
+ + nodesToPing.size() + ". Timeout: " +
U.nanosToMillis(rmtDcPingMaxTimeNs - System.nanoTime()) + "ms.");
+ }
+
+ rmtDcPingRes = new ConcurrentHashMap<>(nodesToPing.size(), 1.0f);
+
+ // Mark every node ping started.
+ nodesToPing.forEach(n -> rmtDcPingRes.put(n, -1));
+
+ // We should assume that the ping won't receive any response or
exception and would spend its whole timeout.
+ // If the nodes number exceeds the pool size we have to split the
timeout and enqueue the ping jobs.
+ int steps = nodesToPing.size() / rmtDcPingPool.getMaximumPoolSize()
+ + (nodesToPing.size() % rmtDcPingPool.getMaximumPoolSize() ==
0 ? 0 : 1);
+
+ for (TcpDiscoveryNode node : nodesToPing)
+ scheduleNodePingJob(node, steps);
+ }
+
+ /** */
+ void scheduleNodePingJob(TcpDiscoveryNode node, int steps) {
+ if (stopRmtDcPing)
+ return;
+
+ synchronized (this) {
+ if (stopRmtDcPing)
+ return;
+
+ try {
+ rmtDcPingPool.execute(() -> pingNodeJob(node, steps));
+ }
+ catch (Throwable t) {
+ log.warning("During the connection recovery, attempt to
ping " + node + " of DC '"
+ + node.dataCenterId() + "' failed.", t);
+ }
+ }
+ }
+
+ /** */
+ void pingNodeJob(TcpDiscoveryNode node, int steps) {
+ // Total allowed ping timeout per step.
+ double stepTimeoutNs = ((failTimeNanos - System.nanoTime()) *
RMT_DC_PING_TIMEOUT_RATIO) / steps;
+
+ Collection<InetSocketAddress> nodeAddrs =
spi.getEffectiveNodeAddresses(node);
+
+ // Tmeout per node address.
+ long addrsTimeoutMs = U.nanosToMillis((long)(stepTimeoutNs /
nodeAddrs.size()));
+
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("During the connection recovery, pinging " +
node + " of DC '" + node.dataCenterId()
+ + ". Total time left (ms): " + remoteDcPingTimeLeft()
+ ". Nodes to ping left: " + nodesToPingLeft() + '.');
+ }
+
+ for (InetSocketAddress addrs : nodeAddrs) {
+ // There is no guarantee that a job is executed
immediately.
+ if (System.nanoTime() + U.millisToNanos(addrsTimeoutMs) >
failTimeNanos)
+ addrsTimeoutMs = U.nanosToMillis(failTimeNanos -
System.nanoTime());
+
+ if (remoteDcPingStopped() || addrsTimeoutMs < 1)
+ return;
+
+ if (pingNode(addrs, node.id(), null, addrsTimeoutMs,
CONNECTION_RECOVERY_TICKS,
+ RMT_DC_PING_ATTEMPT_DELAY_RATIO, false) != null) {
+ // Mark node has pesponded.
+ rmtDcPingRes.put(node, 1);
+
+ if (log.isDebugEnabled()) {
+ log.debug("During the connection recovery, node "
+ node + " of DC '" + node.dataCenterId()
+ + "' has responded to the ping.");
+ }
+
+ // At least one node's address reached.
+ return;
+ }
+ }
+ }
+ catch (Throwable t) {
+ // No-op.
+ }
+ finally {
+ rmtDcPingRes.compute(node, (n, nodeRes) -> nodeRes == 1 ?
nodeRes : 0);
+
+ if (nodesToPingLeft() == 0)
+ stopRemoteDcPing();
+
+ if (log.isDebugEnabled() && 1 != rmtDcPingRes.get(node)) {
+ log.debug("During the connection recovery, node " + node +
" of DC '" + node.dataCenterId()
Review Comment:
```suggestion
log.debug("Node " + node + " of DC '" +
node.dataCenterId()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]