Copilot commented on code in PR #7964:
URL: https://github.com/apache/ignite-3/pull/7964#discussion_r3058496023
##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +490,113 @@ private CompletableFuture<Void> sendViaNetwork(
return sender.send(
new OutNetworkObject(message, descriptors),
- () -> triggerChannelCreation(nodeId, type, addr)
+ () -> triggerChannelCreation(nodeId, type, addr,
retryStrategy, channelTimeoutMillis)
);
});
}
- private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType
type, InetSocketAddress addr) {
+ private OrderingFuture<NettySender> openChannelWithRetries(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ RetryStrategy retryStrategy,
+ long timeoutMillis
+ ) {
+ Long deadlineNanos = timeoutMillis == NO_TIMEOUT ? null :
System.nanoTime() + MILLISECONDS.toNanos(timeoutMillis);
+
+ return openChannelWithRetriesInternal(nodeId, type, addr,
retryStrategy, deadlineNanos, 0);
+ }
+
+ private OrderingFuture<NettySender> openChannelWithRetriesInternal(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ RetryStrategy retryStrategy,
+ @Nullable Long deadlineNanos,
+ int attemptOrdinal
+ ) {
+ if (attemptOrdinal >= 1000) {
+ throw new IllegalStateException("Too many channel creation
attempts [attempts=" + attemptOrdinal + ", nodeId=" + nodeId + "].");
+ }
+
+ return connectionManager.channel(nodeId, type, addr)
+ .handle((res, ex) -> {
+ if (ex == null) {
+ return OrderingFuture.completedFuture(res);
+ }
+
+ // Only I/O exceptions and BrokenHandshakeException are
retriable.
+ if (!causedByIoException(ex) && !hasCause(ex,
BrokenHandshakeException.class)) {
+ return OrderingFuture.<NettySender>failedFuture(ex);
+ }
+
+ if (nodeId != null && staleIdDetector.isIdStale(nodeId)) {
+ metrics.incrementMessageRecipientNotFound();
+
+ return OrderingFuture.<NettySender>failedFuture(
+ new RecipientLeftException("Recipient is stale
[id=" + nodeId + "]")
+ );
+ }
+
+ // We always retry 'temporary I/O failures' (those that
are not ConnectionException or NoRouteToHostException).
+ boolean temporaryFailure = !recipientIsNotThere(ex);
+ boolean retry = temporaryFailure ||
retryStrategy.stillRetriable();
+
+ if (!retry) {
+ return OrderingFuture.<NettySender>failedFuture(ex);
+ }
+
+ if (deadlineReached(deadlineNanos)) {
+ return OrderingFuture.<NettySender>failedFuture(
+ new TimeoutException("Channel creation timed
out [id=" + nodeId + "].")
+ );
+ }
+
+ return attemptToOpenChannelAfterDelay(nodeId, type, addr,
retryStrategy, deadlineNanos, attemptOrdinal + 1);
+ })
+ .thenCompose(identity());
+ }
Review Comment:
New channel-open retry behavior is introduced here, but there don’t appear
to be unit tests exercising retries/timeouts/terminal-vs-temporary exceptions
(e.g., `DefaultMessagingServiceTest` has no coverage for `ConnectException` /
`NoRouteToHostException` paths). Please add targeted tests to validate: (1)
temporary I/O failures are retried, (2) terminal failures are retried only for
consistent-id sends while the node stays in topology, and (3) invocation
timeout bounds retry duration.
##########
modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java:
##########
@@ -191,41 +217,31 @@ public void testRestartDuringSends(TestInfo testInfo) {
}
ClusterService sender = services.get(0);
- ClusterService receiver = services.get(1);
AtomicBoolean sending = new AtomicBoolean(true);
+ int receiverIndex = 1;
+
CompletableFuture<Void> sendingFuture = runAsync(() -> {
- InternalClusterNode receiverNode = receiver.staticLocalNode();
+ InternalClusterNode receiverNode =
services.get(receiverIndex).staticLocalNode();
while (sending.get()) {
TestMessage message = new
TestMessagesFactory().testMessage().build();
- CompletableFuture<Void> future =
sender.messagingService().send(receiverNode, message);
+ CompletableFuture<Void> future =
operation.send(sender.messagingService(), receiverNode, message);
Review Comment:
The sending thread reads `services.get(receiverIndex)` while the main test
thread concurrently mutates the same `ArrayList` via `services.set(...)` during
restarts. This is a data race and can make the test flaky. Consider capturing
`InternalClusterNode receiverNode` (or a `ClusterService` reference) before
starting `sendingFuture`, or using a thread-safe holder (e.g.,
`AtomicReference`) if the receiver is meant to change.
##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +490,113 @@ private CompletableFuture<Void> sendViaNetwork(
return sender.send(
new OutNetworkObject(message, descriptors),
- () -> triggerChannelCreation(nodeId, type, addr)
+ () -> triggerChannelCreation(nodeId, type, addr,
retryStrategy, channelTimeoutMillis)
);
});
}
- private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType
type, InetSocketAddress addr) {
+ private OrderingFuture<NettySender> openChannelWithRetries(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ RetryStrategy retryStrategy,
+ long timeoutMillis
+ ) {
+ Long deadlineNanos = timeoutMillis == NO_TIMEOUT ? null :
System.nanoTime() + MILLISECONDS.toNanos(timeoutMillis);
+
+ return openChannelWithRetriesInternal(nodeId, type, addr,
retryStrategy, deadlineNanos, 0);
+ }
+
+ private OrderingFuture<NettySender> openChannelWithRetriesInternal(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ RetryStrategy retryStrategy,
+ @Nullable Long deadlineNanos,
+ int attemptOrdinal
+ ) {
+ if (attemptOrdinal >= 1000) {
+ throw new IllegalStateException("Too many channel creation
attempts [attempts=" + attemptOrdinal + ", nodeId=" + nodeId + "].");
+ }
+
+ return connectionManager.channel(nodeId, type, addr)
+ .handle((res, ex) -> {
+ if (ex == null) {
+ return OrderingFuture.completedFuture(res);
+ }
+
+ // Only I/O exceptions and BrokenHandshakeException are
retriable.
+ if (!causedByIoException(ex) && !hasCause(ex,
BrokenHandshakeException.class)) {
+ return OrderingFuture.<NettySender>failedFuture(ex);
+ }
+
+ if (nodeId != null && staleIdDetector.isIdStale(nodeId)) {
+ metrics.incrementMessageRecipientNotFound();
+
+ return OrderingFuture.<NettySender>failedFuture(
+ new RecipientLeftException("Recipient is stale
[id=" + nodeId + "]")
+ );
+ }
+
+ // We always retry 'temporary I/O failures' (those that
are not ConnectionException or NoRouteToHostException).
Review Comment:
Comment typo: the code checks `ConnectException`, but the comment says
`ConnectionException`. Please align the comment with the actual exception types
to avoid confusion.
```suggestion
// We always retry 'temporary I/O failures' (those that
are not ConnectException or NoRouteToHostException).
```
##########
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/BrokenHandshakeException.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.handshake;
+
+/**
+ * Exception that notifies of handshake that failed because a channel was
closed or handshake has failed.
Review Comment:
Javadoc grammar is off and reads ambiguously (“handshake that failed because
... or handshake has failed”). Please rephrase to a clearer sentence describing
that the handshake was broken due to channel closure / handshake failure.
```suggestion
* Exception indicating that the handshake was broken because the channel
was closed or the handshake failed.
```
##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +490,113 @@ private CompletableFuture<Void> sendViaNetwork(
return sender.send(
new OutNetworkObject(message, descriptors),
- () -> triggerChannelCreation(nodeId, type, addr)
+ () -> triggerChannelCreation(nodeId, type, addr,
retryStrategy, channelTimeoutMillis)
);
});
}
- private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType
type, InetSocketAddress addr) {
+ private OrderingFuture<NettySender> openChannelWithRetries(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ RetryStrategy retryStrategy,
+ long timeoutMillis
+ ) {
+ Long deadlineNanos = timeoutMillis == NO_TIMEOUT ? null :
System.nanoTime() + MILLISECONDS.toNanos(timeoutMillis);
+
+ return openChannelWithRetriesInternal(nodeId, type, addr,
retryStrategy, deadlineNanos, 0);
+ }
+
+ private OrderingFuture<NettySender> openChannelWithRetriesInternal(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ RetryStrategy retryStrategy,
+ @Nullable Long deadlineNanos,
+ int attemptOrdinal
+ ) {
+ if (attemptOrdinal >= 1000) {
+ throw new IllegalStateException("Too many channel creation
attempts [attempts=" + attemptOrdinal + ", nodeId=" + nodeId + "].");
+ }
+
+ return connectionManager.channel(nodeId, type, addr)
+ .handle((res, ex) -> {
+ if (ex == null) {
+ return OrderingFuture.completedFuture(res);
+ }
+
+ // Only I/O exceptions and BrokenHandshakeException are
retriable.
+ if (!causedByIoException(ex) && !hasCause(ex,
BrokenHandshakeException.class)) {
+ return OrderingFuture.<NettySender>failedFuture(ex);
+ }
+
+ if (nodeId != null && staleIdDetector.isIdStale(nodeId)) {
+ metrics.incrementMessageRecipientNotFound();
+
+ return OrderingFuture.<NettySender>failedFuture(
+ new RecipientLeftException("Recipient is stale
[id=" + nodeId + "]")
+ );
+ }
+
Review Comment:
The stale-ID check here is unconditional (`nodeId != null &&
staleIdDetector.isIdStale(nodeId)`), which changes behavior for non-strict
modes (send/invoke by consistentId previously skipped stale-ID validation).
This can cause send/invoke-by-name to fail with `RecipientLeftException` even
when the consistentId is still present and the operation should be retriable.
Consider gating this check on `strictIdCheck`, or moving stale-ID handling back
to `send0`/`invoke0` (where it was conditional) / re-resolving the recipient by
consistentId on retries.
```suggestion
// Do not fail on stale node IDs in this shared retry
path: non-strict
// send/invoke-by-consistentId operations must remain
retriable so the
// recipient can be re-resolved by higher-level logic.
```
##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +490,113 @@ private CompletableFuture<Void> sendViaNetwork(
return sender.send(
new OutNetworkObject(message, descriptors),
- () -> triggerChannelCreation(nodeId, type, addr)
+ () -> triggerChannelCreation(nodeId, type, addr,
retryStrategy, channelTimeoutMillis)
);
});
}
- private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType
type, InetSocketAddress addr) {
+ private OrderingFuture<NettySender> openChannelWithRetries(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ RetryStrategy retryStrategy,
+ long timeoutMillis
+ ) {
+ Long deadlineNanos = timeoutMillis == NO_TIMEOUT ? null :
System.nanoTime() + MILLISECONDS.toNanos(timeoutMillis);
+
+ return openChannelWithRetriesInternal(nodeId, type, addr,
retryStrategy, deadlineNanos, 0);
Review Comment:
`deadlineNanos` computation can overflow (e.g., many call sites use
`Long.MAX_VALUE` timeout), and `timeoutMillis == 0` currently results in an
immediate deadline even though `invoke0` treats `timeout <= 0` as “no timeout”
(see `TimeoutObjectImpl` creation). Consider treating `timeoutMillis <= 0` as
no-timeout and computing the deadline in an overflow-safe way (e.g., cap at
`Long.MAX_VALUE` / detect overflow).
##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +490,113 @@ private CompletableFuture<Void> sendViaNetwork(
return sender.send(
new OutNetworkObject(message, descriptors),
- () -> triggerChannelCreation(nodeId, type, addr)
+ () -> triggerChannelCreation(nodeId, type, addr,
retryStrategy, channelTimeoutMillis)
);
});
}
- private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType
type, InetSocketAddress addr) {
+ private OrderingFuture<NettySender> openChannelWithRetries(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ RetryStrategy retryStrategy,
+ long timeoutMillis
+ ) {
+ Long deadlineNanos = timeoutMillis == NO_TIMEOUT ? null :
System.nanoTime() + MILLISECONDS.toNanos(timeoutMillis);
+
+ return openChannelWithRetriesInternal(nodeId, type, addr,
retryStrategy, deadlineNanos, 0);
+ }
+
+ private OrderingFuture<NettySender> openChannelWithRetriesInternal(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ RetryStrategy retryStrategy,
+ @Nullable Long deadlineNanos,
+ int attemptOrdinal
+ ) {
+ if (attemptOrdinal >= 1000) {
+ throw new IllegalStateException("Too many channel creation
attempts [attempts=" + attemptOrdinal + ", nodeId=" + nodeId + "].");
Review Comment:
`openChannelWithRetriesInternal` throws when `attemptOrdinal >= 1000`.
Because this can happen on the retry executor thread, the returned
`OrderingFuture` from `attemptToOpenChannelAfterDelay` may never be completed
(exception escapes the runnable), leading to hanging sends/invokes. Return a
failed `OrderingFuture` (or complete `nextAttemptFuture` exceptionally) instead
of throwing, and/or defensively catch exceptions in the retry runnable.
```suggestion
return OrderingFuture.failedFuture(
new IllegalStateException("Too many channel creation
attempts [attempts=" + attemptOrdinal + ", nodeId=" + nodeId + "].")
);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]