This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch ignite-16771 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-16771 by this push: new 349d27280 Init secondary channels in the background 349d27280 is described below commit 349d272806ec724ee9f2610b5a748e27e5146085 Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Thu Apr 14 11:20:09 2022 +0300 Init secondary channels in the background --- .../org/apache/ignite/network/ClusterNode.java | 2 +- .../ignite/internal/client/ClientChannel.java | 7 ++++ .../ignite/internal/client/ReliableChannel.java | 38 +++++++++++++++++++--- .../ignite/internal/client/TcpClientChannel.java | 6 ++++ 4 files changed, 47 insertions(+), 6 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java index da10f0c94..e267d292e 100644 --- a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java +++ b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java @@ -57,7 +57,7 @@ public class ClusterNode implements Serializable { } /** - * Returns the unique name of this node in a cluster. Doesn't change between restarts. + * Returns the unique name (consistent id) of this node in a cluster. Doesn't change between restarts. * * @return Unique name of the member in a cluster. */ diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java index 9c2e28e83..b22ea91f0 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java @@ -50,4 +50,11 @@ public interface ClientChannel extends AutoCloseable { * @return {@code True} channel is closed. */ public boolean closed(); + + /** + * Returns protocol context. + * + * @return Protocol context. + */ + public ProtocolContext protocolContext(); } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java index c0b2792cf..e726da325 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -63,7 +64,7 @@ public final class ReliableChannel implements AutoCloseable { private final IgniteClientConfiguration clientCfg; /** Node channels. */ - private final Map<UUID, ClientChannelHolder> nodeChannels = new ConcurrentHashMap<>(); + private final Map<String, ClientChannelHolder> nodeChannels = new ConcurrentHashMap<>(); /** Channels reinit was scheduled. */ private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean(); @@ -453,8 +454,7 @@ public final class ReliableChannel implements AutoCloseable { } /** - * Establishing connections to servers. If partition awareness feature is enabled connections are created for every configured server. - * Otherwise, only default channel is connected. + * Init channel holders, establish connection to default channel. */ CompletableFuture<Void> channelsInitAsync() { // Do not establish connections if interrupted. @@ -462,9 +462,12 @@ public final class ReliableChannel implements AutoCloseable { return CompletableFuture.completedFuture(null); } - // Apply no-op function. Establish default channel connection. + // Establish default channel connection. getDefaultChannel(); + // Establish secondary connections in the background. + initAllChannelsAsync(); + // TODO: Async startup IGNITE-15357. return CompletableFuture.completedFuture(null); } @@ -530,6 +533,29 @@ public final class ReliableChannel implements AutoCloseable { return plc.shouldRetry(ctx); } + /** + * Asynchronously try to establish a connection to all configured servers. + */ + private void initAllChannelsAsync() { + ForkJoinPool.commonPool().submit( + () -> { + List<ClientChannelHolder> holders = channels; + + for (ClientChannelHolder hld : holders) { + if (closed) + return; // New reinit task scheduled or channel is closed. + + try { + hld.getOrCreateChannel(true); + } + catch (Exception ignore) { + // No-op. + } + } + } + ); + } + /** * Channels holder. */ @@ -542,7 +568,7 @@ public final class ReliableChannel implements AutoCloseable { private volatile ClientChannel ch; /** ID of the last server node that channel is or was connected to. */ - private volatile UUID serverNodeId; + private volatile String serverNodeId; /** Address that holder is bind to (chCfg.addr) is not in use now. So close the holder. */ private volatile boolean close; @@ -615,6 +641,8 @@ public final class ReliableChannel implements AutoCloseable { } ch = chFactory.apply(chCfg, connMgr); + + nodeChannels.put(ch.protocolContext().clusterNode().name(), this); } } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java index 60d46d6b4..7aca1c436 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java @@ -321,6 +321,12 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon return closed.get(); } + /** {@inheritDoc} */ + @Override + public ProtocolContext protocolContext() { + return protocolCtx; + } + private static void validateConfiguration(ClientChannelConfiguration cfg) { String error = null;