This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch ignite-16771
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-16771 by this push:
     new 349d27280 Init secondary channels in the background
349d27280 is described below

commit 349d272806ec724ee9f2610b5a748e27e5146085
Author: Pavel Tupitsyn <ptupit...@apache.org>
AuthorDate: Thu Apr 14 11:20:09 2022 +0300

    Init secondary channels in the background
---
 .../org/apache/ignite/network/ClusterNode.java     |  2 +-
 .../ignite/internal/client/ClientChannel.java      |  7 ++++
 .../ignite/internal/client/ReliableChannel.java    | 38 +++++++++++++++++++---
 .../ignite/internal/client/TcpClientChannel.java   |  6 ++++
 4 files changed, 47 insertions(+), 6 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java 
b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
index da10f0c94..e267d292e 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
@@ -57,7 +57,7 @@ public class ClusterNode implements Serializable {
     }
 
     /**
-     * Returns the unique name of this node in a cluster. Doesn't change 
between restarts.
+     * Returns the unique name (consistent id) of this node in a cluster. 
Doesn't change between restarts.
      *
      * @return Unique name of the member in a cluster.
      */
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
index 9c2e28e83..b22ea91f0 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
@@ -50,4 +50,11 @@ public interface ClientChannel extends AutoCloseable {
      * @return {@code True} channel is closed.
      */
     public boolean closed();
+
+    /**
+     * Returns protocol context.
+     *
+     * @return Protocol context.
+     */
+    public ProtocolContext protocolContext();
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index c0b2792cf..e726da325 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -30,6 +30,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -63,7 +64,7 @@ public final class ReliableChannel implements AutoCloseable {
     private final IgniteClientConfiguration clientCfg;
 
     /** Node channels. */
-    private final Map<UUID, ClientChannelHolder> nodeChannels = new 
ConcurrentHashMap<>();
+    private final Map<String, ClientChannelHolder> nodeChannels = new 
ConcurrentHashMap<>();
 
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
@@ -453,8 +454,7 @@ public final class ReliableChannel implements AutoCloseable 
{
     }
 
     /**
-     * Establishing connections to servers. If partition awareness feature is 
enabled connections are created for every configured server.
-     * Otherwise, only default channel is connected.
+     * Init channel holders, establish connection to default channel.
      */
     CompletableFuture<Void> channelsInitAsync() {
         // Do not establish connections if interrupted.
@@ -462,9 +462,12 @@ public final class ReliableChannel implements 
AutoCloseable {
             return CompletableFuture.completedFuture(null);
         }
 
-        // Apply no-op function. Establish default channel connection.
+        // Establish default channel connection.
         getDefaultChannel();
 
+        // Establish secondary connections in the background.
+        initAllChannelsAsync();
+
         // TODO: Async startup IGNITE-15357.
         return CompletableFuture.completedFuture(null);
     }
@@ -530,6 +533,29 @@ public final class ReliableChannel implements 
AutoCloseable {
         return plc.shouldRetry(ctx);
     }
 
+    /**
+     * Asynchronously try to establish a connection to all configured servers.
+     */
+    private void initAllChannelsAsync() {
+        ForkJoinPool.commonPool().submit(
+                () -> {
+                    List<ClientChannelHolder> holders = channels;
+
+                    for (ClientChannelHolder hld : holders) {
+                        if (closed)
+                            return; // New reinit task scheduled or channel is 
closed.
+
+                        try {
+                            hld.getOrCreateChannel(true);
+                        }
+                        catch (Exception ignore) {
+                            // No-op.
+                        }
+                    }
+                }
+        );
+    }
+
     /**
      * Channels holder.
      */
@@ -542,7 +568,7 @@ public final class ReliableChannel implements AutoCloseable 
{
         private volatile ClientChannel ch;
 
         /** ID of the last server node that channel is or was connected to. */
-        private volatile UUID serverNodeId;
+        private volatile String serverNodeId;
 
         /** Address that holder is bind to (chCfg.addr) is not in use now. So 
close the holder. */
         private volatile boolean close;
@@ -615,6 +641,8 @@ public final class ReliableChannel implements AutoCloseable 
{
                     }
 
                     ch = chFactory.apply(chCfg, connMgr);
+
+                    
nodeChannels.put(ch.protocolContext().clusterNode().name(), this);
                 }
             }
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 60d46d6b4..7aca1c436 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -321,6 +321,12 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         return closed.get();
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public ProtocolContext protocolContext() {
+        return protocolCtx;
+    }
+
     private static void validateConfiguration(ClientChannelConfiguration cfg) {
         String error = null;
 

Reply via email to