ibessonov commented on code in PR #7964:
URL: https://github.com/apache/ignite-3/pull/7964#discussion_r3071870604


##########
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/BrokenHandshakeException.java:
##########
@@ -15,31 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.jdbc.util;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
+package org.apache.ignite.internal.network.handshake;
 
 /**
- * Functional interface for extracting a value from the current row of a 
{@link ResultSet}.
- *
- * @param <T> Type of the extracted value.
+ * Exception that notifies of handshake that failed because a channel was 
closed or some internal error happened.
  */
-@FunctionalInterface
-public interface RowColumnProjection<T> {
-    /** Extracts a value from the current row of {@code rs}. */
-    T extract(ResultSet rs) throws SQLException;
-
-    /** Drains result set to list by projecting each record with provided 
extractor. */
-    static <T> List<T> projectRowsColumn(ResultSet rs, RowColumnProjection<T> 
extractor) throws SQLException {
-        List<T> result = new ArrayList<>();
-
-        while (rs.next()) {
-            result.add(extractor.extract(rs));
-        }
+public class BrokenHandshakeException extends HandshakeException {
+    private static final long serialVersionUID = 0L;
 
-        return result;
+    public BrokenHandshakeException() {

Review Comment:
   If some internal error happened, I would like to have it as a `cause` of 
this exception. This description is too broad. Do you assume that a single 
stack-trace of this exception would be enough?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +504,139 @@ private CompletableFuture<Void> sendViaNetwork(
 
                     return sender.send(
                             new OutNetworkObject(message, descriptors),
-                            () -> triggerChannelCreation(nodeId, type, addr)
+                            () -> triggerChannelCreation(nodeId, type, addr, 
strictIdCheck, retryStrategy, channelTimeoutMillis)
                     );
                 });
     }
 
-    private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType 
type, InetSocketAddress addr) {
+    private OrderingFuture<NettySender> openChannelWithRetries(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            long timeoutMillis
+    ) {
+        Long deadlineMillis = timeoutMillis == NO_TIMEOUT || 
!timeoutDefined(timeoutMillis) ? null
+                : coarseCurrentTimeMillis() + timeoutMillis;
+        if (deadlineMillis != null && deadlineMillis < 0) {
+            deadlineMillis = Long.MAX_VALUE;
+        }
+
+        return openChannelWithRetriesInternal(nodeId, type, addr, 
strictIdCheck, retryStrategy, deadlineMillis, 0);
+    }
+
+    private OrderingFuture<NettySender> openChannelWithRetriesInternal(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            @Nullable Long deadlineMillis,
+            int attemptOrdinal
+    ) {
+        if (attemptOrdinal >= 1000) {
+            return OrderingFuture.failedFuture(new IllegalStateException(
+                    "Too many channel creation attempts [attempts=" + 
attemptOrdinal + ", nodeId=" + nodeId + "]."
+            ));
+        }
+
+        return connectionManager.channel(nodeId, type, addr)
+                .handle((res, ex) -> {
+                    if (ex == null) {
+                        return OrderingFuture.completedFuture(res);
+                    }
+
+                    // Only I/O exceptions and BrokenHandshakeException are 
retriable.
+                    if (!causedByIoException(ex) && !hasCause(ex, 
BrokenHandshakeException.class)) {
+                        return OrderingFuture.<NettySender>failedFuture(ex);
+                    }
+
+                    if (strictIdCheck && nodeId != null && 
staleIdDetector.isIdStale(nodeId)) {
+                        metrics.incrementMessageRecipientNotFound();
+
+                        return OrderingFuture.<NettySender>failedFuture(
+                                new RecipientLeftException("Recipient is stale 
[id=" + nodeId + "]")
+                        );
+                    }
+
+                    // We always retry 'temporary I/O failures' (those that 
are not ConnectException or NoRouteToHostException).
+                    boolean temporaryFailure = !recipientIsNotThere(ex);
+
+                    if (!temporaryFailure && !retryStrategy.stillRetriable()) {
+                        Throwable exToReturn = 
retryStrategy.retriesExhaustedMeansRecipientLeft()
+                                ? new RecipientLeftException("Recipient left 
[id=" + nodeId + "].") : ex;
+                        return 
OrderingFuture.<NettySender>failedFuture(exToReturn);
+                    }
+
+                    if (deadlineReached(deadlineMillis)) {
+                        return OrderingFuture.<NettySender>failedFuture(
+                                new TimeoutException("Channel creation timed 
out [id=" + nodeId + "].")
+                        );
+                    }
+
+                    return attemptToOpenChannelAfterDelay(
+                            nodeId,
+                            type,
+                            addr,
+                            strictIdCheck,
+                            retryStrategy,
+                            deadlineMillis,
+                            attemptOrdinal + 1
+                    );
+                })
+                .thenCompose(identity());
+    }
+
+    private static boolean causedByIoException(Throwable ex) {
+        return hasCause(ex, IOException.class);
+    }
+
+    private static boolean recipientIsNotThere(Throwable ex) {
+        return hasCause(ex, ConnectException.class, 
NoRouteToHostException.class);
+    }
+
+    private static boolean deadlineReached(@Nullable Long deadlineMillis) {
+        return deadlineMillis != null && coarseCurrentTimeMillis() >= 
deadlineMillis;
+    }
+
+    private OrderingFuture<NettySender> attemptToOpenChannelAfterDelay(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            @Nullable Long deadlineMillis,
+            int attemptOrdinal
+    ) {
+        OrderingFuture<NettySender> nextAttemptFuture = new OrderingFuture<>();
+
+        // Formally, we break ordering guarantees here, but it's not a problem 
as we are only concerned about order of messages,
+        // and the corresponding callbacks will be added to a future we return 
from here, so ordering of message sends will be maintained.
+
+        connectionRetryExecutor.execute(() -> {
+            try {
+                openChannelWithRetriesInternal(nodeId, type, addr, 
strictIdCheck, retryStrategy, deadlineMillis, attemptOrdinal)
+                        .whenComplete((nextRes, nextEx) -> {
+                            if (nextEx != null) {
+                                
nextAttemptFuture.completeExceptionally(nextEx);
+                            } else {
+                                nextAttemptFuture.complete(nextRes);
+                            }
+                        });

Review Comment:
   But why not use the future returned from `openChannelWithRetriesInternal`?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -918,4 +1091,53 @@ public CompletableFuture<NetworkMessage> future() {
 
         return address.address();
     }
+
+    private interface TerminalIoErrorRetryStrategy {
+        boolean stillRetriable();
+
+        boolean retriesExhaustedMeansRecipientLeft();
+    }
+
+    private static class ByEphemeralIdRetryStrategy implements 
TerminalIoErrorRetryStrategy {
+        @Override
+        public boolean stillRetriable() {
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-28225 - make 
retriable.
+            return false;
+        }
+
+        @Override
+        public boolean retriesExhaustedMeansRecipientLeft() {
+            return false;
+        }
+    }
+
+    private class ByConsistentIdRetryStrategy implements 
TerminalIoErrorRetryStrategy {
+        private final String consistentId;
+
+        private ByConsistentIdRetryStrategy(String consistentId) {
+            this.consistentId = consistentId;
+        }
+
+        @Override
+        public boolean stillRetriable() {
+            return topologyService.getByConsistentId(consistentId) != null;

Review Comment:
   Can this method accept some arguments, do that we won't have to allocate new 
instances on each `send`? The interface itself is private, we're allowed to 
make it "not as pure" I think



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -220,7 +237,7 @@ public void weakSend(InternalClusterNode recipient, 
NetworkMessage msg) {
 
     @Override
     public CompletableFuture<Void> send(InternalClusterNode recipient, 
ChannelType channelType, NetworkMessage msg) {
-        return send0(recipient, channelType, msg, null, true);
+        return send0(recipient, channelType, msg, null, true, new 
ByEphemeralIdRetryStrategy());

Review Comment:
   I would probably make a constant for `new ByEphemeralIdRetryStrategy()` as 
well, does it make sense?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +504,139 @@ private CompletableFuture<Void> sendViaNetwork(
 
                     return sender.send(
                             new OutNetworkObject(message, descriptors),
-                            () -> triggerChannelCreation(nodeId, type, addr)
+                            () -> triggerChannelCreation(nodeId, type, addr, 
strictIdCheck, retryStrategy, channelTimeoutMillis)
                     );
                 });
     }
 
-    private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType 
type, InetSocketAddress addr) {
+    private OrderingFuture<NettySender> openChannelWithRetries(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            long timeoutMillis
+    ) {
+        Long deadlineMillis = timeoutMillis == NO_TIMEOUT || 
!timeoutDefined(timeoutMillis) ? null
+                : coarseCurrentTimeMillis() + timeoutMillis;
+        if (deadlineMillis != null && deadlineMillis < 0) {

Review Comment:
   Could you please clarify, how can it become negative? This is probably some 
non-trivial overflow handling in cases when `timeoutMillis` is passed as a very 
large number?
   
   If possible, I would also like to avoid allocations on this path as much as 
possible. I don't think that timeouts need to be `Long`, in other words



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +504,139 @@ private CompletableFuture<Void> sendViaNetwork(
 
                     return sender.send(
                             new OutNetworkObject(message, descriptors),
-                            () -> triggerChannelCreation(nodeId, type, addr)
+                            () -> triggerChannelCreation(nodeId, type, addr, 
strictIdCheck, retryStrategy, channelTimeoutMillis)
                     );
                 });
     }
 
-    private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType 
type, InetSocketAddress addr) {
+    private OrderingFuture<NettySender> openChannelWithRetries(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            long timeoutMillis
+    ) {
+        Long deadlineMillis = timeoutMillis == NO_TIMEOUT || 
!timeoutDefined(timeoutMillis) ? null
+                : coarseCurrentTimeMillis() + timeoutMillis;
+        if (deadlineMillis != null && deadlineMillis < 0) {
+            deadlineMillis = Long.MAX_VALUE;
+        }
+
+        return openChannelWithRetriesInternal(nodeId, type, addr, 
strictIdCheck, retryStrategy, deadlineMillis, 0);
+    }
+
+    private OrderingFuture<NettySender> openChannelWithRetriesInternal(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            @Nullable Long deadlineMillis,
+            int attemptOrdinal
+    ) {
+        if (attemptOrdinal >= 1000) {
+            return OrderingFuture.failedFuture(new IllegalStateException(
+                    "Too many channel creation attempts [attempts=" + 
attemptOrdinal + ", nodeId=" + nodeId + "]."
+            ));
+        }
+
+        return connectionManager.channel(nodeId, type, addr)
+                .handle((res, ex) -> {
+                    if (ex == null) {
+                        return OrderingFuture.completedFuture(res);
+                    }
+
+                    // Only I/O exceptions and BrokenHandshakeException are 
retriable.
+                    if (!causedByIoException(ex) && !hasCause(ex, 
BrokenHandshakeException.class)) {
+                        return OrderingFuture.<NettySender>failedFuture(ex);
+                    }
+
+                    if (strictIdCheck && nodeId != null && 
staleIdDetector.isIdStale(nodeId)) {
+                        metrics.incrementMessageRecipientNotFound();
+
+                        return OrderingFuture.<NettySender>failedFuture(
+                                new RecipientLeftException("Recipient is stale 
[id=" + nodeId + "]")
+                        );
+                    }
+
+                    // We always retry 'temporary I/O failures' (those that 
are not ConnectException or NoRouteToHostException).
+                    boolean temporaryFailure = !recipientIsNotThere(ex);
+
+                    if (!temporaryFailure && !retryStrategy.stillRetriable()) {
+                        Throwable exToReturn = 
retryStrategy.retriesExhaustedMeansRecipientLeft()
+                                ? new RecipientLeftException("Recipient left 
[id=" + nodeId + "].") : ex;
+                        return 
OrderingFuture.<NettySender>failedFuture(exToReturn);
+                    }
+
+                    if (deadlineReached(deadlineMillis)) {
+                        return OrderingFuture.<NettySender>failedFuture(
+                                new TimeoutException("Channel creation timed 
out [id=" + nodeId + "].")
+                        );
+                    }
+
+                    return attemptToOpenChannelAfterDelay(
+                            nodeId,
+                            type,
+                            addr,
+                            strictIdCheck,
+                            retryStrategy,
+                            deadlineMillis,
+                            attemptOrdinal + 1
+                    );
+                })
+                .thenCompose(identity());
+    }
+
+    private static boolean causedByIoException(Throwable ex) {
+        return hasCause(ex, IOException.class);
+    }
+
+    private static boolean recipientIsNotThere(Throwable ex) {
+        return hasCause(ex, ConnectException.class, 
NoRouteToHostException.class);
+    }
+
+    private static boolean deadlineReached(@Nullable Long deadlineMillis) {
+        return deadlineMillis != null && coarseCurrentTimeMillis() >= 
deadlineMillis;
+    }
+
+    private OrderingFuture<NettySender> attemptToOpenChannelAfterDelay(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            @Nullable Long deadlineMillis,
+            int attemptOrdinal
+    ) {
+        OrderingFuture<NettySender> nextAttemptFuture = new OrderingFuture<>();
+
+        // Formally, we break ordering guarantees here, but it's not a problem 
as we are only concerned about order of messages,
+        // and the corresponding callbacks will be added to a future we return 
from here, so ordering of message sends will be maintained.
+
+        connectionRetryExecutor.execute(() -> {
+            try {
+                openChannelWithRetriesInternal(nodeId, type, addr, 
strictIdCheck, retryStrategy, deadlineMillis, attemptOrdinal)
+                        .whenComplete((nextRes, nextEx) -> {
+                            if (nextEx != null) {
+                                
nextAttemptFuture.completeExceptionally(nextEx);
+                            } else {
+                                nextAttemptFuture.complete(nextRes);
+                            }
+                        });

Review Comment:
   ```suggestion
                           .whenComplete(copyStateTo(nextAttemptFuture));
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -465,12 +504,139 @@ private CompletableFuture<Void> sendViaNetwork(
 
                     return sender.send(
                             new OutNetworkObject(message, descriptors),
-                            () -> triggerChannelCreation(nodeId, type, addr)
+                            () -> triggerChannelCreation(nodeId, type, addr, 
strictIdCheck, retryStrategy, channelTimeoutMillis)
                     );
                 });
     }
 
-    private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType 
type, InetSocketAddress addr) {
+    private OrderingFuture<NettySender> openChannelWithRetries(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            long timeoutMillis
+    ) {
+        Long deadlineMillis = timeoutMillis == NO_TIMEOUT || 
!timeoutDefined(timeoutMillis) ? null
+                : coarseCurrentTimeMillis() + timeoutMillis;
+        if (deadlineMillis != null && deadlineMillis < 0) {
+            deadlineMillis = Long.MAX_VALUE;
+        }
+
+        return openChannelWithRetriesInternal(nodeId, type, addr, 
strictIdCheck, retryStrategy, deadlineMillis, 0);
+    }
+
+    private OrderingFuture<NettySender> openChannelWithRetriesInternal(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            @Nullable Long deadlineMillis,
+            int attemptOrdinal
+    ) {
+        if (attemptOrdinal >= 1000) {
+            return OrderingFuture.failedFuture(new IllegalStateException(
+                    "Too many channel creation attempts [attempts=" + 
attemptOrdinal + ", nodeId=" + nodeId + "]."
+            ));
+        }
+
+        return connectionManager.channel(nodeId, type, addr)
+                .handle((res, ex) -> {
+                    if (ex == null) {
+                        return OrderingFuture.completedFuture(res);
+                    }
+
+                    // Only I/O exceptions and BrokenHandshakeException are 
retriable.
+                    if (!causedByIoException(ex) && !hasCause(ex, 
BrokenHandshakeException.class)) {
+                        return OrderingFuture.<NettySender>failedFuture(ex);
+                    }
+
+                    if (strictIdCheck && nodeId != null && 
staleIdDetector.isIdStale(nodeId)) {
+                        metrics.incrementMessageRecipientNotFound();
+
+                        return OrderingFuture.<NettySender>failedFuture(
+                                new RecipientLeftException("Recipient is stale 
[id=" + nodeId + "]")
+                        );
+                    }
+
+                    // We always retry 'temporary I/O failures' (those that 
are not ConnectException or NoRouteToHostException).
+                    boolean temporaryFailure = !recipientIsNotThere(ex);
+
+                    if (!temporaryFailure && !retryStrategy.stillRetriable()) {
+                        Throwable exToReturn = 
retryStrategy.retriesExhaustedMeansRecipientLeft()
+                                ? new RecipientLeftException("Recipient left 
[id=" + nodeId + "].") : ex;
+                        return 
OrderingFuture.<NettySender>failedFuture(exToReturn);
+                    }
+
+                    if (deadlineReached(deadlineMillis)) {
+                        return OrderingFuture.<NettySender>failedFuture(
+                                new TimeoutException("Channel creation timed 
out [id=" + nodeId + "].")
+                        );
+                    }
+
+                    return attemptToOpenChannelAfterDelay(
+                            nodeId,
+                            type,
+                            addr,
+                            strictIdCheck,
+                            retryStrategy,
+                            deadlineMillis,
+                            attemptOrdinal + 1
+                    );
+                })
+                .thenCompose(identity());
+    }
+
+    private static boolean causedByIoException(Throwable ex) {
+        return hasCause(ex, IOException.class);
+    }
+
+    private static boolean recipientIsNotThere(Throwable ex) {
+        return hasCause(ex, ConnectException.class, 
NoRouteToHostException.class);
+    }
+
+    private static boolean deadlineReached(@Nullable Long deadlineMillis) {
+        return deadlineMillis != null && coarseCurrentTimeMillis() >= 
deadlineMillis;
+    }
+
+    private OrderingFuture<NettySender> attemptToOpenChannelAfterDelay(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            @Nullable Long deadlineMillis,
+            int attemptOrdinal
+    ) {
+        OrderingFuture<NettySender> nextAttemptFuture = new OrderingFuture<>();
+
+        // Formally, we break ordering guarantees here, but it's not a problem 
as we are only concerned about order of messages,
+        // and the corresponding callbacks will be added to a future we return 
from here, so ordering of message sends will be maintained.
+
+        connectionRetryExecutor.execute(() -> {
+            try {
+                openChannelWithRetriesInternal(nodeId, type, addr, 
strictIdCheck, retryStrategy, deadlineMillis, attemptOrdinal)
+                        .whenComplete((nextRes, nextEx) -> {
+                            if (nextEx != null) {
+                                
nextAttemptFuture.completeExceptionally(nextEx);
+                            } else {
+                                nextAttemptFuture.complete(nextRes);
+                            }
+                        });
+            } catch (Throwable e) {
+                nextAttemptFuture.completeExceptionally(e);
+
+                if (e instanceof Error) {
+                    throw e;
+                }

Review Comment:
   Could you please explain this particular part? Are we leaving errors for 
them to be logged, that's it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to