rpuch commented on code in PR #7964:
URL: https://github.com/apache/ignite-3/pull/7964#discussion_r3073776636


##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +504,139 @@ private CompletableFuture<Void> sendViaNetwork(
 
                     return sender.send(
                             new OutNetworkObject(message, descriptors),
-                            () -> triggerChannelCreation(nodeId, type, addr)
+                            () -> triggerChannelCreation(nodeId, type, addr, 
strictIdCheck, retryStrategy, channelTimeoutMillis)
                     );
                 });
     }
 
-    private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType 
type, InetSocketAddress addr) {
+    private OrderingFuture<NettySender> openChannelWithRetries(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            long timeoutMillis
+    ) {
+        Long deadlineMillis = timeoutMillis == NO_TIMEOUT || 
!timeoutDefined(timeoutMillis) ? null
+                : coarseCurrentTimeMillis() + timeoutMillis;
+        if (deadlineMillis != null && deadlineMillis < 0) {
+            deadlineMillis = Long.MAX_VALUE;
+        }
+
+        return openChannelWithRetriesInternal(nodeId, type, addr, 
strictIdCheck, retryStrategy, deadlineMillis, 0);
+    }
+
+    private OrderingFuture<NettySender> openChannelWithRetriesInternal(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            @Nullable Long deadlineMillis,
+            int attemptOrdinal
+    ) {
+        if (attemptOrdinal >= 1000) {
+            return OrderingFuture.failedFuture(new IllegalStateException(
+                    "Too many channel creation attempts [attempts=" + 
attemptOrdinal + ", nodeId=" + nodeId + "]."
+            ));
+        }
+
+        return connectionManager.channel(nodeId, type, addr)
+                .handle((res, ex) -> {
+                    if (ex == null) {
+                        return OrderingFuture.completedFuture(res);
+                    }
+
+                    // Only I/O exceptions and BrokenHandshakeException are 
retriable.
+                    if (!causedByIoException(ex) && !hasCause(ex, 
BrokenHandshakeException.class)) {
+                        return OrderingFuture.<NettySender>failedFuture(ex);
+                    }
+
+                    if (strictIdCheck && nodeId != null && 
staleIdDetector.isIdStale(nodeId)) {
+                        metrics.incrementMessageRecipientNotFound();
+
+                        return OrderingFuture.<NettySender>failedFuture(
+                                new RecipientLeftException("Recipient is stale 
[id=" + nodeId + "]")
+                        );
+                    }
+
+                    // We always retry 'temporary I/O failures' (those that 
are not ConnectException or NoRouteToHostException).
+                    boolean temporaryFailure = !recipientIsNotThere(ex);
+
+                    if (!temporaryFailure && !retryStrategy.stillRetriable()) {
+                        Throwable exToReturn = 
retryStrategy.retriesExhaustedMeansRecipientLeft()
+                                ? new RecipientLeftException("Recipient left 
[id=" + nodeId + "].") : ex;
+                        return 
OrderingFuture.<NettySender>failedFuture(exToReturn);
+                    }
+
+                    if (deadlineReached(deadlineMillis)) {
+                        return OrderingFuture.<NettySender>failedFuture(
+                                new TimeoutException("Channel creation timed 
out [id=" + nodeId + "].")
+                        );
+                    }
+
+                    return attemptToOpenChannelAfterDelay(
+                            nodeId,
+                            type,
+                            addr,
+                            strictIdCheck,
+                            retryStrategy,
+                            deadlineMillis,
+                            attemptOrdinal + 1
+                    );
+                })
+                .thenCompose(identity());
+    }
+
+    private static boolean causedByIoException(Throwable ex) {
+        return hasCause(ex, IOException.class);
+    }
+
+    private static boolean recipientIsNotThere(Throwable ex) {
+        return hasCause(ex, ConnectException.class, 
NoRouteToHostException.class);
+    }
+
+    private static boolean deadlineReached(@Nullable Long deadlineMillis) {
+        return deadlineMillis != null && coarseCurrentTimeMillis() >= 
deadlineMillis;
+    }
+
+    private OrderingFuture<NettySender> attemptToOpenChannelAfterDelay(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            @Nullable Long deadlineMillis,
+            int attemptOrdinal
+    ) {
+        OrderingFuture<NettySender> nextAttemptFuture = new OrderingFuture<>();
+
+        // Formally, we break ordering guarantees here, but it's not a problem 
as we are only concerned about order of messages,
+        // and the corresponding callbacks will be added to a future we return 
from here, so ordering of message sends will be maintained.
+
+        connectionRetryExecutor.execute(() -> {
+            try {
+                openChannelWithRetriesInternal(nodeId, type, addr, 
strictIdCheck, retryStrategy, deadlineMillis, attemptOrdinal)
+                        .whenComplete((nextRes, nextEx) -> {
+                            if (nextEx != null) {
+                                
nextAttemptFuture.completeExceptionally(nextEx);
+                            } else {
+                                nextAttemptFuture.complete(nextRes);
+                            }
+                        });

Review Comment:
   `OrderingFuture<NettySender> nextAttemptFuture = new OrderingFuture<>();`
   
   It's not a `CompletableFuture`, so `copyStateTo()` will not work here.
   
   `openChannelWithRetriesInternal()` is called in another thread (in the 
executor), so its return value cannot be used here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to