Copilot commented on code in PR #7964:
URL: https://github.com/apache/ignite-3/pull/7964#discussion_r3058496023


##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +490,113 @@ private CompletableFuture<Void> sendViaNetwork(
 
                     return sender.send(
                             new OutNetworkObject(message, descriptors),
-                            () -> triggerChannelCreation(nodeId, type, addr)
+                            () -> triggerChannelCreation(nodeId, type, addr, 
retryStrategy, channelTimeoutMillis)
                     );
                 });
     }
 
-    private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType 
type, InetSocketAddress addr) {
+    private OrderingFuture<NettySender> openChannelWithRetries(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            RetryStrategy retryStrategy,
+            long timeoutMillis
+    ) {
+        Long deadlineNanos = timeoutMillis == NO_TIMEOUT ? null : 
System.nanoTime() + MILLISECONDS.toNanos(timeoutMillis);
+
+        return openChannelWithRetriesInternal(nodeId, type, addr, 
retryStrategy, deadlineNanos, 0);
+    }
+
+    private OrderingFuture<NettySender> openChannelWithRetriesInternal(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            RetryStrategy retryStrategy,
+            @Nullable Long deadlineNanos,
+            int attemptOrdinal
+    ) {
+        if (attemptOrdinal >= 1000) {
+            throw new IllegalStateException("Too many channel creation 
attempts [attempts=" + attemptOrdinal + ", nodeId=" + nodeId + "].");
+        }
+
+        return connectionManager.channel(nodeId, type, addr)
+                .handle((res, ex) -> {
+                    if (ex == null) {
+                        return OrderingFuture.completedFuture(res);
+                    }
+
+                    // Only I/O exceptions and BrokenHandshakeException are 
retriable.
+                    if (!causedByIoException(ex) && !hasCause(ex, 
BrokenHandshakeException.class)) {
+                        return OrderingFuture.<NettySender>failedFuture(ex);
+                    }
+
+                    if (nodeId != null && staleIdDetector.isIdStale(nodeId)) {
+                        metrics.incrementMessageRecipientNotFound();
+
+                        return OrderingFuture.<NettySender>failedFuture(
+                                new RecipientLeftException("Recipient is stale 
[id=" + nodeId + "]")
+                        );
+                    }
+
+                    // We always retry 'temporary I/O failures' (those that 
are not ConnectionException or NoRouteToHostException).
+                    boolean temporaryFailure = !recipientIsNotThere(ex);
+                    boolean retry = temporaryFailure || 
retryStrategy.stillRetriable();
+
+                    if (!retry) {
+                        return OrderingFuture.<NettySender>failedFuture(ex);
+                    }
+
+                    if (deadlineReached(deadlineNanos)) {
+                        return OrderingFuture.<NettySender>failedFuture(
+                                new TimeoutException("Channel creation timed 
out [id=" + nodeId + "].")
+                        );
+                    }
+
+                    return attemptToOpenChannelAfterDelay(nodeId, type, addr, 
retryStrategy, deadlineNanos, attemptOrdinal + 1);
+                })
+                .thenCompose(identity());
+    }

Review Comment:
   New channel-open retry behavior is introduced here, but there don’t appear 
to be unit tests exercising retries/timeouts/terminal-vs-temporary exceptions 
(e.g., `DefaultMessagingServiceTest` has no coverage for `ConnectException` / 
`NoRouteToHostException` paths). Please add targeted tests to validate: (1) 
temporary I/O failures are retried, (2) terminal failures are retried only for 
consistent-id sends while the node stays in topology, and (3) invocation 
timeout bounds retry duration.



##########
modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java:
##########
@@ -191,41 +217,31 @@ public void testRestartDuringSends(TestInfo testInfo) {
         }
 
         ClusterService sender = services.get(0);
-        ClusterService receiver = services.get(1);
 
         AtomicBoolean sending = new AtomicBoolean(true);
 
+        int receiverIndex = 1;
+
         CompletableFuture<Void> sendingFuture = runAsync(() -> {
-            InternalClusterNode receiverNode = receiver.staticLocalNode();
+            InternalClusterNode receiverNode = 
services.get(receiverIndex).staticLocalNode();
 
             while (sending.get()) {
                 TestMessage message = new 
TestMessagesFactory().testMessage().build();
-                CompletableFuture<Void> future = 
sender.messagingService().send(receiverNode, message);
+                CompletableFuture<Void> future = 
operation.send(sender.messagingService(), receiverNode, message);
 

Review Comment:
   The sending thread reads `services.get(receiverIndex)` while the main test 
thread concurrently mutates the same `ArrayList` via `services.set(...)` during 
restarts. This is a data race and can make the test flaky. Consider capturing 
`InternalClusterNode receiverNode` (or a `ClusterService` reference) before 
starting `sendingFuture`, or using a thread-safe holder (e.g., 
`AtomicReference`) if the receiver is meant to change.



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +490,113 @@ private CompletableFuture<Void> sendViaNetwork(
 
                     return sender.send(
                             new OutNetworkObject(message, descriptors),
-                            () -> triggerChannelCreation(nodeId, type, addr)
+                            () -> triggerChannelCreation(nodeId, type, addr, 
retryStrategy, channelTimeoutMillis)
                     );
                 });
     }
 
-    private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType 
type, InetSocketAddress addr) {
+    private OrderingFuture<NettySender> openChannelWithRetries(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            RetryStrategy retryStrategy,
+            long timeoutMillis
+    ) {
+        Long deadlineNanos = timeoutMillis == NO_TIMEOUT ? null : 
System.nanoTime() + MILLISECONDS.toNanos(timeoutMillis);
+
+        return openChannelWithRetriesInternal(nodeId, type, addr, 
retryStrategy, deadlineNanos, 0);
+    }
+
+    private OrderingFuture<NettySender> openChannelWithRetriesInternal(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            RetryStrategy retryStrategy,
+            @Nullable Long deadlineNanos,
+            int attemptOrdinal
+    ) {
+        if (attemptOrdinal >= 1000) {
+            throw new IllegalStateException("Too many channel creation 
attempts [attempts=" + attemptOrdinal + ", nodeId=" + nodeId + "].");
+        }
+
+        return connectionManager.channel(nodeId, type, addr)
+                .handle((res, ex) -> {
+                    if (ex == null) {
+                        return OrderingFuture.completedFuture(res);
+                    }
+
+                    // Only I/O exceptions and BrokenHandshakeException are 
retriable.
+                    if (!causedByIoException(ex) && !hasCause(ex, 
BrokenHandshakeException.class)) {
+                        return OrderingFuture.<NettySender>failedFuture(ex);
+                    }
+
+                    if (nodeId != null && staleIdDetector.isIdStale(nodeId)) {
+                        metrics.incrementMessageRecipientNotFound();
+
+                        return OrderingFuture.<NettySender>failedFuture(
+                                new RecipientLeftException("Recipient is stale 
[id=" + nodeId + "]")
+                        );
+                    }
+
+                    // We always retry 'temporary I/O failures' (those that 
are not ConnectionException or NoRouteToHostException).

Review Comment:
   Comment typo: the code checks `ConnectException`, but the comment says 
`ConnectionException`. Please align the comment with the actual exception types 
to avoid confusion.
   ```suggestion
                       // We always retry 'temporary I/O failures' (those that 
are not ConnectException or NoRouteToHostException).
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/BrokenHandshakeException.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.handshake;
+
+/**
+ * Exception that notifies of handshake that failed because a channel was 
closed or handshake has failed.

Review Comment:
   Javadoc grammar is off and reads ambiguously (“handshake that failed because 
... or handshake has failed”). Please rephrase to a clearer sentence describing 
that the handshake was broken due to channel closure / handshake failure.
   ```suggestion
    * Exception indicating that the handshake was broken because the channel 
was closed or the handshake failed.
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +490,113 @@ private CompletableFuture<Void> sendViaNetwork(
 
                     return sender.send(
                             new OutNetworkObject(message, descriptors),
-                            () -> triggerChannelCreation(nodeId, type, addr)
+                            () -> triggerChannelCreation(nodeId, type, addr, 
retryStrategy, channelTimeoutMillis)
                     );
                 });
     }
 
-    private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType 
type, InetSocketAddress addr) {
+    private OrderingFuture<NettySender> openChannelWithRetries(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            RetryStrategy retryStrategy,
+            long timeoutMillis
+    ) {
+        Long deadlineNanos = timeoutMillis == NO_TIMEOUT ? null : 
System.nanoTime() + MILLISECONDS.toNanos(timeoutMillis);
+
+        return openChannelWithRetriesInternal(nodeId, type, addr, 
retryStrategy, deadlineNanos, 0);
+    }
+
+    private OrderingFuture<NettySender> openChannelWithRetriesInternal(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            RetryStrategy retryStrategy,
+            @Nullable Long deadlineNanos,
+            int attemptOrdinal
+    ) {
+        if (attemptOrdinal >= 1000) {
+            throw new IllegalStateException("Too many channel creation 
attempts [attempts=" + attemptOrdinal + ", nodeId=" + nodeId + "].");
+        }
+
+        return connectionManager.channel(nodeId, type, addr)
+                .handle((res, ex) -> {
+                    if (ex == null) {
+                        return OrderingFuture.completedFuture(res);
+                    }
+
+                    // Only I/O exceptions and BrokenHandshakeException are 
retriable.
+                    if (!causedByIoException(ex) && !hasCause(ex, 
BrokenHandshakeException.class)) {
+                        return OrderingFuture.<NettySender>failedFuture(ex);
+                    }
+
+                    if (nodeId != null && staleIdDetector.isIdStale(nodeId)) {
+                        metrics.incrementMessageRecipientNotFound();
+
+                        return OrderingFuture.<NettySender>failedFuture(
+                                new RecipientLeftException("Recipient is stale 
[id=" + nodeId + "]")
+                        );
+                    }
+

Review Comment:
   The stale-ID check here is unconditional (`nodeId != null && 
staleIdDetector.isIdStale(nodeId)`), which changes behavior for non-strict 
modes (send/invoke by consistentId previously skipped stale-ID validation). 
This can cause send/invoke-by-name to fail with `RecipientLeftException` even 
when the consistentId is still present and the operation should be retriable. 
Consider gating this check on `strictIdCheck`, or moving stale-ID handling back 
to `send0`/`invoke0` (where it was conditional) / re-resolving the recipient by 
consistentId on retries.
   ```suggestion
                       // Do not fail on stale node IDs in this shared retry 
path: non-strict
                       // send/invoke-by-consistentId operations must remain 
retriable so the
                       // recipient can be re-resolved by higher-level logic.
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +490,113 @@ private CompletableFuture<Void> sendViaNetwork(
 
                     return sender.send(
                             new OutNetworkObject(message, descriptors),
-                            () -> triggerChannelCreation(nodeId, type, addr)
+                            () -> triggerChannelCreation(nodeId, type, addr, 
retryStrategy, channelTimeoutMillis)
                     );
                 });
     }
 
-    private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType 
type, InetSocketAddress addr) {
+    private OrderingFuture<NettySender> openChannelWithRetries(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            RetryStrategy retryStrategy,
+            long timeoutMillis
+    ) {
+        Long deadlineNanos = timeoutMillis == NO_TIMEOUT ? null : 
System.nanoTime() + MILLISECONDS.toNanos(timeoutMillis);
+
+        return openChannelWithRetriesInternal(nodeId, type, addr, 
retryStrategy, deadlineNanos, 0);

Review Comment:
   `deadlineNanos` computation can overflow (e.g., many call sites use 
`Long.MAX_VALUE` timeout), and `timeoutMillis == 0` currently results in an 
immediate deadline even though `invoke0` treats `timeout <= 0` as “no timeout” 
(see `TimeoutObjectImpl` creation). Consider treating `timeoutMillis <= 0` as 
no-timeout and computing the deadline in an overflow-safe way (e.g., cap at 
`Long.MAX_VALUE` / detect overflow).



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +490,113 @@ private CompletableFuture<Void> sendViaNetwork(
 
                     return sender.send(
                             new OutNetworkObject(message, descriptors),
-                            () -> triggerChannelCreation(nodeId, type, addr)
+                            () -> triggerChannelCreation(nodeId, type, addr, 
retryStrategy, channelTimeoutMillis)
                     );
                 });
     }
 
-    private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType 
type, InetSocketAddress addr) {
+    private OrderingFuture<NettySender> openChannelWithRetries(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            RetryStrategy retryStrategy,
+            long timeoutMillis
+    ) {
+        Long deadlineNanos = timeoutMillis == NO_TIMEOUT ? null : 
System.nanoTime() + MILLISECONDS.toNanos(timeoutMillis);
+
+        return openChannelWithRetriesInternal(nodeId, type, addr, 
retryStrategy, deadlineNanos, 0);
+    }
+
+    private OrderingFuture<NettySender> openChannelWithRetriesInternal(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            RetryStrategy retryStrategy,
+            @Nullable Long deadlineNanos,
+            int attemptOrdinal
+    ) {
+        if (attemptOrdinal >= 1000) {
+            throw new IllegalStateException("Too many channel creation 
attempts [attempts=" + attemptOrdinal + ", nodeId=" + nodeId + "].");

Review Comment:
   `openChannelWithRetriesInternal` throws when `attemptOrdinal >= 1000`. 
Because this can happen on the retry executor thread, the returned 
`OrderingFuture` from `attemptToOpenChannelAfterDelay` may never be completed 
(exception escapes the runnable), leading to hanging sends/invokes. Return a 
failed `OrderingFuture` (or complete `nextAttemptFuture` exceptionally) instead 
of throwing, and/or defensively catch exceptions in the retry runnable.
   ```suggestion
               return OrderingFuture.failedFuture(
                       new IllegalStateException("Too many channel creation 
attempts [attempts=" + attemptOrdinal + ", nodeId=" + nodeId + "].")
               );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to