rpuch commented on code in PR #7964:
URL: https://github.com/apache/ignite-3/pull/7964#discussion_r3073776636
##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +504,139 @@ private CompletableFuture<Void> sendViaNetwork(
return sender.send(
new OutNetworkObject(message, descriptors),
- () -> triggerChannelCreation(nodeId, type, addr)
+ () -> triggerChannelCreation(nodeId, type, addr,
strictIdCheck, retryStrategy, channelTimeoutMillis)
);
});
}
- private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType
type, InetSocketAddress addr) {
+ private OrderingFuture<NettySender> openChannelWithRetries(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ boolean strictIdCheck,
+ TerminalIoErrorRetryStrategy retryStrategy,
+ long timeoutMillis
+ ) {
+ Long deadlineMillis = timeoutMillis == NO_TIMEOUT ||
!timeoutDefined(timeoutMillis) ? null
+ : coarseCurrentTimeMillis() + timeoutMillis;
+ if (deadlineMillis != null && deadlineMillis < 0) {
+ deadlineMillis = Long.MAX_VALUE;
+ }
+
+ return openChannelWithRetriesInternal(nodeId, type, addr,
strictIdCheck, retryStrategy, deadlineMillis, 0);
+ }
+
+ private OrderingFuture<NettySender> openChannelWithRetriesInternal(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ boolean strictIdCheck,
+ TerminalIoErrorRetryStrategy retryStrategy,
+ @Nullable Long deadlineMillis,
+ int attemptOrdinal
+ ) {
+ if (attemptOrdinal >= 1000) {
+ return OrderingFuture.failedFuture(new IllegalStateException(
+ "Too many channel creation attempts [attempts=" +
attemptOrdinal + ", nodeId=" + nodeId + "]."
+ ));
+ }
+
+ return connectionManager.channel(nodeId, type, addr)
+ .handle((res, ex) -> {
+ if (ex == null) {
+ return OrderingFuture.completedFuture(res);
+ }
+
+ // Only I/O exceptions and BrokenHandshakeException are
retriable.
+ if (!causedByIoException(ex) && !hasCause(ex,
BrokenHandshakeException.class)) {
+ return OrderingFuture.<NettySender>failedFuture(ex);
+ }
+
+ if (strictIdCheck && nodeId != null &&
staleIdDetector.isIdStale(nodeId)) {
+ metrics.incrementMessageRecipientNotFound();
+
+ return OrderingFuture.<NettySender>failedFuture(
+ new RecipientLeftException("Recipient is stale
[id=" + nodeId + "]")
+ );
+ }
+
+ // We always retry 'temporary I/O failures' (those that
are not ConnectException or NoRouteToHostException).
+ boolean temporaryFailure = !recipientIsNotThere(ex);
+
+ if (!temporaryFailure && !retryStrategy.stillRetriable()) {
+ Throwable exToReturn =
retryStrategy.retriesExhaustedMeansRecipientLeft()
+ ? new RecipientLeftException("Recipient left
[id=" + nodeId + "].") : ex;
+ return
OrderingFuture.<NettySender>failedFuture(exToReturn);
+ }
+
+ if (deadlineReached(deadlineMillis)) {
+ return OrderingFuture.<NettySender>failedFuture(
+ new TimeoutException("Channel creation timed
out [id=" + nodeId + "].")
+ );
+ }
+
+ return attemptToOpenChannelAfterDelay(
+ nodeId,
+ type,
+ addr,
+ strictIdCheck,
+ retryStrategy,
+ deadlineMillis,
+ attemptOrdinal + 1
+ );
+ })
+ .thenCompose(identity());
+ }
+
+ private static boolean causedByIoException(Throwable ex) {
+ return hasCause(ex, IOException.class);
+ }
+
+ private static boolean recipientIsNotThere(Throwable ex) {
+ return hasCause(ex, ConnectException.class,
NoRouteToHostException.class);
+ }
+
+ private static boolean deadlineReached(@Nullable Long deadlineMillis) {
+ return deadlineMillis != null && coarseCurrentTimeMillis() >=
deadlineMillis;
+ }
+
+ private OrderingFuture<NettySender> attemptToOpenChannelAfterDelay(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ boolean strictIdCheck,
+ TerminalIoErrorRetryStrategy retryStrategy,
+ @Nullable Long deadlineMillis,
+ int attemptOrdinal
+ ) {
+ OrderingFuture<NettySender> nextAttemptFuture = new OrderingFuture<>();
+
+ // Formally, we break ordering guarantees here, but it's not a problem
as we are only concerned about order of messages,
+ // and the corresponding callbacks will be added to a future we return
from here, so ordering of message sends will be maintained.
+
+ connectionRetryExecutor.execute(() -> {
+ try {
+ openChannelWithRetriesInternal(nodeId, type, addr,
strictIdCheck, retryStrategy, deadlineMillis, attemptOrdinal)
+ .whenComplete((nextRes, nextEx) -> {
+ if (nextEx != null) {
+
nextAttemptFuture.completeExceptionally(nextEx);
+ } else {
+ nextAttemptFuture.complete(nextRes);
+ }
+ });
Review Comment:
`OrderingFuture<NettySender> nextAttemptFuture = new OrderingFuture<>();`
It's not a `CompletableFuture`, so `copyStateTo()` will not work here.
`openChannelWithRetriesInternal()` is called in another thread (in the
executor), so its return value cannot be used here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]