This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch ignite-16771 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-16771 by this push: new 47d4df876 add serviceAsync overload with preferredNodeName 47d4df876 is described below commit 47d4df8763400d952fa58008a4c424b752ea71b5 Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Thu Apr 14 14:56:04 2022 +0300 add serviceAsync overload with preferredNodeName --- .../ignite/internal/client/ReliableChannel.java | 46 +++++++++++++++------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java index 5f4bd0310..49cda0aac 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java @@ -142,7 +142,7 @@ public final class ReliableChannel implements AutoCloseable { CompletableFuture<T> fut = new CompletableFuture<>(); // Use the only one attempt to avoid blocking async method. - handleServiceAsync(fut, opCode, payloadWriter, payloadReader, null, 0); + handleServiceAsync(fut, opCode, payloadWriter, payloadReader, preferredNodeName, null, 0); return fut; } @@ -180,26 +180,44 @@ public final class ReliableChannel implements AutoCloseable { int opCode, PayloadWriter payloadWriter, PayloadReader<T> payloadReader, + String preferredNodeName, IgniteClientConnectionException failure, int attempt) { - ClientChannel ch; - try { - ch = getDefaultChannel(); - } catch (Throwable ex) { - if (failure != null) { - failure.addSuppressed(ex); + ClientChannel ch = null; - fut.completeExceptionally(failure); + if (preferredNodeName != null) { + var holder = nodeChannels.get(preferredNodeName); - return; + if (holder != null) { + try { + ch = holder.getOrCreateChannel(); + } catch (Throwable ignored) { + // Ignore. + } } + } + + if (ch == null) { + try { + ch = getDefaultChannel(); + } catch (Throwable ex) { + if (failure != null) { + failure.addSuppressed(ex); - fut.completeExceptionally(ex); + fut.completeExceptionally(failure); - return; + return; + } + + fut.completeExceptionally(ex); + + return; + } } - ch + final ClientChannel ch0 = ch; + + ch0 .serviceAsync(opCode, payloadWriter, payloadReader) .handle((res, err) -> { if (err == null) { @@ -219,7 +237,7 @@ public final class ReliableChannel implements AutoCloseable { try { // Will try to reinit channels if topology changed. - onChannelFailure(ch); + onChannelFailure(ch0); } catch (Throwable ex) { fut.completeExceptionally(ex); @@ -233,7 +251,7 @@ public final class ReliableChannel implements AutoCloseable { } if (shouldRetry(opCode, attempt, connectionErr)) { - handleServiceAsync(fut, opCode, payloadWriter, payloadReader, failure0, attempt + 1); + handleServiceAsync(fut, opCode, payloadWriter, payloadReader, null, failure0, attempt + 1); return null; }