This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ec496400 IGNITE-16823 .NET: Add cluster awareness to Compute (#771)
3ec496400 is described below

commit 3ec4964003feafbf8f066dc4acac1f095c55f86c
Author: Pavel Tupitsyn <ptupit...@apache.org>
AuthorDate: Thu Apr 14 10:12:08 2022 +0300

    IGNITE-16823 .NET: Add cluster awareness to Compute (#771)
    
    * Include node id and name into handshake response.
    * Establish connections to all known endpoints in background.
    * During Compute calls, match target node name against active connections 
and send request directly to the correct node when possible.
---
 .../handler/ClientInboundMessageHandler.java       |  10 +-
 .../compute/ClientComputeExecuteRequest.java       |  13 +-
 .../ignite/internal/client/ProtocolContext.java    |  31 ++++-
 .../ignite/internal/client/TcpClientChannel.java   |  27 ++--
 .../internal/client/compute/ClientCompute.java     |   4 -
 .../internal/client/io/ClientConnection.java       |   8 ++
 .../client/io/netty/NettyClientConnection.java     |   7 +
 .../ignite/client/TestClientHandlerModule.java     |   8 +-
 .../java/org/apache/ignite/client/TestServer.java  |   8 +-
 .../platforms/dotnet/Apache.Ignite.Tests.ruleset   |   3 +
 .../Compute/ComputeClusterAwarenessTests.cs        | 108 +++++++++++++++
 .../Apache.Ignite.Tests/Compute/ComputeTests.cs    |  12 ++
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       |  47 ++++++-
 .../RawSocketConnectionTests.cs                    |   2 +-
 .../dotnet/Apache.Ignite.Tests/TestUtils.cs        |  25 ++++
 .../dotnet/Apache.Ignite/IIgniteClient.cs          |   8 ++
 .../platforms/dotnet/Apache.Ignite/IgniteClient.cs |   4 +-
 .../Apache.Ignite/Internal/ClientFailoverSocket.cs | 153 +++++++++++++++++++--
 .../dotnet/Apache.Ignite/Internal/ClientSocket.cs  |  41 +++---
 .../Apache.Ignite/Internal/Compute/Compute.cs      |  43 ++++--
 .../{Table/Schema.cs => ConnectionContext.cs}      |  23 ++--
 .../Apache.Ignite/Internal/IgniteClientInternal.cs |   5 +
 .../dotnet/Apache.Ignite/Internal/Table/Schema.cs  |   7 +-
 23 files changed, 500 insertions(+), 97 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 72194073c..7656432c7 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
@@ -189,11 +190,16 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter {
 
             // Response.
             ProtocolVersion.LATEST_VER.pack(packer);
-
             packer.packInt(ClientErrorCode.SUCCESS);
+
+            packer.packLong(configuration.idleTimeout());
+
+            ClusterNode localMember = 
clusterService.topologyService().localMember();
+            packer.packString(localMember.id());
+            packer.packString(localMember.name());
+
             packer.packBinaryHeader(0); // Features.
             packer.packMapHeader(0); // Extensions.
-            packer.packLong(configuration.idleTimeout());
 
             write(packer, ctx);
         } catch (Throwable t) {
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index 78c4ff5f6..926e3821b 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -24,9 +24,8 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
-import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NetworkAddress;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -47,9 +46,15 @@ public class ClientComputeExecuteRequest {
             ClientMessagePacker out,
             IgniteCompute compute,
             ClusterService cluster) {
-        var node = in.tryUnpackNil()
+        var nodeName = in.tryUnpackNil() ? null : in.unpackString();
+
+        var node = nodeName == null
                 ? cluster.topologyService().localMember()
-                : new ClusterNode(in.unpackString(), in.unpackString(), new 
NetworkAddress(in.unpackString(), in.unpackInt()));
+                : cluster.topologyService().getByConsistentId(nodeName);
+
+        if (node == null) {
+            throw new IgniteException("Specified node is not present in the 
cluster: " + nodeName);
+        }
 
         String jobClassName = in.unpackString();
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
index 70a14188b..bacb0e1b4 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
@@ -17,9 +17,12 @@
 
 package org.apache.ignite.internal.client;
 
+import java.util.Collections;
 import java.util.EnumSet;
+import java.util.Set;
 import 
org.apache.ignite.client.IgniteClientFeatureNotSupportedByServerException;
 import org.apache.ignite.internal.client.proto.ProtocolVersion;
+import org.apache.ignite.network.ClusterNode;
 
 /**
  * Protocol Context.
@@ -29,22 +32,31 @@ public class ProtocolContext {
     private final ProtocolVersion ver;
 
     /** Features. */
-    private final EnumSet<ProtocolBitmaskFeature> features;
+    private final Set<ProtocolBitmaskFeature> features;
 
     /** Server idle timeout. */
     private final long serverIdleTimeout;
 
+    /** Cluster node. */
+    private final ClusterNode clusterNode;
+
     /**
      * Constructor.
      *
      * @param ver Protocol version.
      * @param features Supported features.
      * @param serverIdleTimeout Server idle timeout.
+     * @param clusterNode Cluster node.
      */
-    public ProtocolContext(ProtocolVersion ver, 
EnumSet<ProtocolBitmaskFeature> features, long serverIdleTimeout) {
+    public ProtocolContext(
+            ProtocolVersion ver,
+            EnumSet<ProtocolBitmaskFeature> features,
+            long serverIdleTimeout,
+            ClusterNode clusterNode) {
         this.ver = ver;
-        this.features = features != null ? features : 
EnumSet.noneOf(ProtocolBitmaskFeature.class);
+        this.features = Collections.unmodifiableSet(features != null ? 
features : EnumSet.noneOf(ProtocolBitmaskFeature.class));
         this.serverIdleTimeout = serverIdleTimeout;
+        this.clusterNode = clusterNode;
     }
 
     /**
@@ -74,7 +86,7 @@ public class ProtocolContext {
      *
      * @return Supported features.
      */
-    public EnumSet<ProtocolBitmaskFeature> features() {
+    public Set<ProtocolBitmaskFeature> features() {
         return features;
     }
 
@@ -92,7 +104,16 @@ public class ProtocolContext {
      *
      * @return Server idle timeout.
      */
-    public long getServerIdleTimeout() {
+    public long serverIdleTimeout() {
         return serverIdleTimeout;
     }
+
+    /**
+     * Returns cluster node.
+     *
+     * @return Cluster node.
+     */
+    public ClusterNode clusterNode() {
+        return clusterNode;
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 6effbb7d4..60d46d6b4 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -49,6 +49,8 @@ import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.proto.ProtocolVersion;
 import org.apache.ignite.internal.client.proto.ServerMessageType;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -368,17 +370,6 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         write(req).syncUninterruptibly();
     }
 
-    /**
-     * Returns protocol context for a version.
-     *
-     * @param ver Protocol version.
-     * @param serverIdleTimeout Server idle timeout.
-     * @return Protocol context for a version.
-     */
-    private ProtocolContext protocolContextFromVersion(ProtocolVersion ver, 
long serverIdleTimeout) {
-        return new ProtocolContext(ver, 
ProtocolBitmaskFeature.allFeaturesAsEnumSet(), serverIdleTimeout);
-    }
-
     /** Receive and handle handshake response. */
     private void handshakeRes(ClientMessageUnpacker unpacker, ProtocolVersion 
proposedVer)
             throws IgniteClientConnectionException, 
IgniteClientAuthenticationException {
@@ -394,7 +385,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                 if (errCode == ClientErrorCode.AUTH_FAILED) {
                     throw new IgniteClientAuthenticationException(msg);
                 } else if (proposedVer.equals(srvVer)) {
-                    throw new IgniteClientException("Client protocol error: 
unexpected server response.");
+                    throw new IgniteClientException("Client protocol error: 
unexpected server response '" + msg + "'.");
                 } else if (!supportedVers.contains(srvVer)) {
                     throw new IgniteClientException(String.format(
                             "Protocol version mismatch: client %s / server %s. 
Server details: %s",
@@ -409,15 +400,19 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                 throw new IgniteClientConnectionException(msg);
             }
 
+            var serverIdleTimeout = unpacker.unpackLong();
+            var clusterNodeId = unpacker.unpackString();
+            var clusterNodeName = unpacker.unpackString();
+            var addr = sock.remoteAddress();
+            var clusterNode = new ClusterNode(clusterNodeId, clusterNodeName, 
new NetworkAddress(addr.getHostName(), addr.getPort()));
+
             var featuresLen = unpacker.unpackBinaryHeader();
             unpacker.skipValues(featuresLen);
 
             var extensionsLen = unpacker.unpackMapHeader();
             unpacker.skipValues(extensionsLen);
 
-            var serverIdleTimeout = unpacker.unpackLong();
-
-            protocolCtx = protocolContextFromVersion(srvVer, 
serverIdleTimeout);
+            protocolCtx = new ProtocolContext(srvVer, 
ProtocolBitmaskFeature.allFeaturesAsEnumSet(), serverIdleTimeout, clusterNode);
         }
     }
 
@@ -453,7 +448,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
      * @return Resolved interval.
      */
     private long getHeartbeatInterval(long configuredInterval) {
-        long serverIdleTimeoutMs = protocolCtx.getServerIdleTimeout();
+        long serverIdleTimeoutMs = protocolCtx.serverIdleTimeout();
 
         if (serverIdleTimeoutMs <= 0) {
             return configuredInterval;
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index 5aaa95d07..b5fb1df8d 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -131,11 +131,7 @@ public class ClientCompute implements IgniteCompute {
     private <R> CompletableFuture<R> executeOnOneNode(ClusterNode node, String 
jobClassName, Object[] args) {
         return ch.serviceAsync(ClientOp.COMPUTE_EXECUTE, w -> {
             // TODO: Cluster awareness (IGNITE-16771): if the specified node 
matches existing connection, send nil.
-            w.out().packString(node.id());
             w.out().packString(node.name());
-            w.out().packString(node.address().host());
-            w.out().packInt(node.address().port());
-
             w.out().packString(jobClassName);
             w.out().packObjectArray(args);
         }, r -> (R) r.in().unpackObjectWithType());
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java
index 42b80b9ad..70bc4115b 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client.io;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
+import java.net.InetSocketAddress;
 import org.apache.ignite.lang.IgniteException;
 
 /**
@@ -40,6 +41,13 @@ public interface ClientConnection extends AutoCloseable {
      */
     ByteBuf getBuffer();
 
+    /**
+     * Gets the remote address.
+     *
+     * @return Remote address.
+     */
+    InetSocketAddress remoteAddress();
+
     /**
      * Closes the connection.
      */
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
index a0541cbad..a746517b1 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
@@ -22,6 +22,7 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.AttributeKey;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import org.apache.ignite.internal.client.io.ClientConnection;
 import org.apache.ignite.internal.client.io.ClientConnectionStateHandler;
 import org.apache.ignite.internal.client.io.ClientMessageHandler;
@@ -71,6 +72,12 @@ public class NettyClientConnection implements 
ClientConnection {
         return channel.alloc().buffer();
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public InetSocketAddress remoteAddress() {
+        return (InetSocketAddress) channel.remoteAddress();
+    }
+
     /** {@inheritDoc} */
     @Override
     public void close() {
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
 
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index 79e85c8f9..d4d2e662d 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.client;
 
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 
 import io.netty.bootstrap.ServerBootstrap;
@@ -43,6 +44,7 @@ import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.jetbrains.annotations.Nullable;
+import org.mockito.Mockito;
 
 /**
  * Client handler module for tests.
@@ -139,6 +141,10 @@ public class TestClientHandlerModule implements 
IgniteComponent {
 
         ServerBootstrap bootstrap = bootstrapFactory.createServerBootstrap();
 
+        ClusterService clusterService = mock(ClusterService.class, 
RETURNS_DEEP_STUBS);
+        
Mockito.when(clusterService.topologyService().localMember().id()).thenReturn("id");
+        
Mockito.when(clusterService.topologyService().localMember().name()).thenReturn("consistent-id");
+
         bootstrap.childHandler(new ChannelInitializer<>() {
                     @Override
                     protected void initChannel(Channel ch) {
@@ -151,7 +157,7 @@ public class TestClientHandlerModule implements 
IgniteComponent {
                                         mock(QueryProcessor.class),
                                         configuration,
                                         mock(IgniteCompute.class),
-                                        mock(ClusterService.class)));
+                                        clusterService));
                     }
                 })
                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
configuration.connectTimeout());
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java 
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index 7fa63f240..7f8b8f791 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.client;
 
 import static 
org.apache.ignite.configuration.annotation.ConfigurationType.LOCAL;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 
 import java.net.InetSocketAddress;
@@ -37,6 +38,7 @@ import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NettyBootstrapFactory;
+import org.mockito.Mockito;
 
 /**
  * Test server.
@@ -98,6 +100,10 @@ public class TestServer implements AutoCloseable {
 
         bootstrapFactory.start();
 
+        ClusterService clusterService = mock(ClusterService.class, 
RETURNS_DEEP_STUBS);
+        
Mockito.when(clusterService.topologyService().localMember().id()).thenReturn("id");
+        
Mockito.when(clusterService.topologyService().localMember().name()).thenReturn("consistent-id");
+
         module = shouldDropConnection != null
                 ? new TestClientHandlerModule(ignite, cfg, bootstrapFactory, 
shouldDropConnection)
                 : new ClientHandlerModule(
@@ -106,7 +112,7 @@ public class TestServer implements AutoCloseable {
                         ignite.transactions(),
                         cfg,
                         mock(IgniteCompute.class),
-                        mock(ClusterService.class),
+                        clusterService,
                         bootstrapFactory
                 );
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests.ruleset 
b/modules/platforms/dotnet/Apache.Ignite.Tests.ruleset
index 4015c5020..a95763d3d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests.ruleset
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests.ruleset
@@ -52,5 +52,8 @@
 
     <!-- UnusedParameters -->
     <Rule Id="CA1801" Action="None" />
+
+    <!-- Validate parameters. -->
+    <Rule Id="CA1062" Action="None" />
   </Rules>
 </RuleSet>
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
new file mode 100644
index 000000000..8c798f7bc
--- /dev/null
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests.Compute
+{
+    using System.Linq;
+    using System.Threading.Tasks;
+    using Internal.Proto;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Tests compute cluster awareness: client requests can be sent to 
correct server nodes when a direct connection is available.
+    /// </summary>
+    public class ComputeClusterAwarenessTests
+    {
+        [Test]
+        public async Task 
TestClientSendsComputeJobToTargetNodeWhenDirectConnectionExists()
+        {
+            using var server1 = new FakeServer(nodeName: "s1");
+            using var server2 = new FakeServer(nodeName: "s2");
+            using var server3 = new FakeServer(nodeName: "s3");
+
+            var clientCfg = new IgniteClientConfiguration
+            {
+                Endpoints = { server1.Node.Address.ToString(), 
server2.Node.Address.ToString(), server3.Node.Address.ToString() }
+            };
+
+            using var client = await IgniteClient.StartAsync(clientCfg);
+
+            // ReSharper disable once AccessToDisposedClosure
+            TestUtils.WaitForCondition(() => client.GetConnections().Count == 
3);
+
+            var res2 = await client.Compute.ExecuteAsync<string>(nodes: new[] 
{ server2.Node }, jobClassName: string.Empty);
+            var res3 = await client.Compute.ExecuteAsync<string>(nodes: new[] 
{ server3.Node }, jobClassName: string.Empty);
+
+            Assert.AreEqual("s2", res2);
+            Assert.AreEqual("s3", res3);
+
+            Assert.AreEqual(ClientOp.ComputeExecute, 
server2.ClientOps.Single());
+            Assert.AreEqual(ClientOp.ComputeExecute, 
server3.ClientOps.Single());
+
+            Assert.IsEmpty(server1.ClientOps);
+        }
+
+        [Test]
+        public async Task 
TestClientSendsComputeJobToDefaultNodeWhenDirectConnectionToTargetDoesNotExist()
+        {
+            using var server1 = new FakeServer(nodeName: "s1");
+            using var server2 = new FakeServer(nodeName: "s2");
+            using var server3 = new FakeServer(nodeName: "s3");
+
+            using var client = await server1.ConnectClientAsync();
+
+            var res2 = await client.Compute.ExecuteAsync<string>(nodes: new[] 
{ server2.Node }, jobClassName: string.Empty);
+            var res3 = await client.Compute.ExecuteAsync<string>(nodes: new[] 
{ server3.Node }, jobClassName: string.Empty);
+
+            Assert.AreEqual("s1", res2);
+            Assert.AreEqual("s1", res3);
+            Assert.AreEqual(new[] { ClientOp.ComputeExecute, 
ClientOp.ComputeExecute }, server1.ClientOps);
+
+            Assert.IsEmpty(server2.ClientOps);
+            Assert.IsEmpty(server3.ClientOps);
+
+            Assert.AreEqual(server1.Node, client.GetConnections().Single());
+        }
+
+        [Test]
+        public async Task TestClientRetriesComputeJobOnPrimaryAndDefaultNodes()
+        {
+            using var server1 = new FakeServer(shouldDropConnection: cnt => 
cnt % 2 == 0, nodeName: "s1");
+            using var server2 = new FakeServer(shouldDropConnection: cnt => 
cnt % 2 == 0, nodeName: "s2");
+
+            var clientCfg = new IgniteClientConfiguration
+            {
+                Endpoints = { server1.Node.Address.ToString(), 
server2.Node.Address.ToString() },
+                RetryPolicy = new RetryLimitPolicy { RetryLimit = 2 }
+            };
+
+            using var client = await IgniteClient.StartAsync(clientCfg);
+
+            // ReSharper disable once AccessToDisposedClosure
+            TestUtils.WaitForCondition(() => client.GetConnections().Count == 
2);
+
+            for (int i = 0; i < 100; i++)
+            {
+                var node = i % 2 == 0 ? server1.Node : server2.Node;
+
+                var res = await client.Compute.ExecuteAsync<string>(nodes: 
new[] { node }, jobClassName: string.Empty);
+
+                Assert.AreEqual(node.Name, res);
+            }
+        }
+    }
+}
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 843e70ebd..35b1d38f5 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -23,6 +23,7 @@ namespace Apache.Ignite.Tests.Compute
     using System.Net;
     using System.Threading.Tasks;
     using Ignite.Compute;
+    using Internal.Network;
     using Network;
     using NUnit.Framework;
 
@@ -130,6 +131,17 @@ namespace Apache.Ignite.Tests.Compute
             Assert.AreEqual("class org.apache.ignite.tx.TransactionException: 
Custom job error", ex!.Message);
         }
 
+        [Test]
+        public void TestUnknownNodeThrows()
+        {
+            var unknownNode = new ClusterNode("x", "y", new 
IPEndPoint(IPAddress.Loopback, 0));
+
+            var ex = Assert.ThrowsAsync<IgniteClientException>(async () =>
+                await Client.Compute.ExecuteAsync<string>(new[] { unknownNode 
}, EchoJob, "unused"));
+
+            Assert.AreEqual("Specified node is not present in the cluster: y", 
ex!.Message);
+        }
+
         // TODO: Support all types (IGNITE-15431).
         [Test]
         public async Task TestAllSupportedArgTypes()
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 8df6aa6e3..54e57b7c8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -19,12 +19,17 @@ namespace Apache.Ignite.Tests
 {
     using System;
     using System.Buffers;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Linq;
     using System.Net;
     using System.Net.Sockets;
     using System.Threading;
     using System.Threading.Tasks;
+    using Internal.Network;
     using Internal.Proto;
     using MessagePack;
+    using Network;
 
     /// <summary>
     /// Fake Ignite server for test purposes.
@@ -41,7 +46,9 @@ namespace Apache.Ignite.Tests
 
         private readonly Func<int, bool> _shouldDropConnection;
 
-        public FakeServer(Func<int, bool>? shouldDropConnection = null)
+        private readonly ConcurrentQueue<ClientOp> _ops = new();
+
+        public FakeServer(Func<int, bool>? shouldDropConnection = null, string 
nodeName = "fake-server")
         {
             _shouldDropConnection = shouldDropConnection ?? (_ => false);
             _listener = new Socket(IPAddress.Loopback.AddressFamily, 
SocketType.Stream, ProtocolType.Tcp);
@@ -49,9 +56,15 @@ namespace Apache.Ignite.Tests
             _listener.Bind(new IPEndPoint(IPAddress.Loopback, 0));
             _listener.Listen(backlog: 1);
 
+            Node = new ClusterNode("id-" + nodeName, nodeName, 
(IPEndPoint)_listener.LocalEndPoint);
+
             Task.Run(ListenLoop);
         }
 
+        public IClusterNode Node { get; }
+
+        internal IList<ClientOp> ClientOps => _ops.ToList();
+
         public async Task<IIgniteClient> 
ConnectClientAsync(IgniteClientConfiguration? cfg = null)
         {
             var port = ((IPEndPoint)_listener.LocalEndPoint).Port;
@@ -110,8 +123,20 @@ namespace Apache.Ignite.Tests
 
                 // Write handshake response.
                 handler.Send(ProtoCommon.MagicBytes);
-                handler.Send(new byte[] { 0, 0, 0, 8 }); // Size.
-                handler.Send(new byte[] { 3, 0, 0, 0, 196, 0, 128, 0 });
+
+                var handshakeBufferWriter = new ArrayBufferWriter<byte>();
+                var handshakeWriter = new 
MessagePackWriter(handshakeBufferWriter);
+                handshakeWriter.Write(0); // Idle timeout.
+                handshakeWriter.Write(Node.Id); // Node id.
+                handshakeWriter.Write(Node.Name); // Node name (consistent id).
+                handshakeWriter.WriteBinHeader(0); // Features.
+                handshakeWriter.WriteMapHeader(0); // Extensions.
+                handshakeWriter.Flush();
+
+                handler.Send(new byte[] { 0, 0, 0, (byte)(4 + 
handshakeBufferWriter.WrittenCount) }); // Size.
+                handler.Send(new byte[] { 3, 0, 0, 0 }); // Version and 
success flag.
+
+                handler.Send(handshakeBufferWriter.WrittenSpan);
 
                 while (!_cts.IsCancellationRequested)
                 {
@@ -127,6 +152,8 @@ namespace Apache.Ignite.Tests
                     var opCode = (ClientOp)msg[0];
                     var requestId = msg[1];
 
+                    _ops.Enqueue(opCode);
+
                     if (opCode == ClientOp.TablesGet)
                     {
                         handler.Send(new byte[] { 0, 0, 0, 4 }); // Size.
@@ -189,6 +216,20 @@ namespace Apache.Ignite.Tests
                         continue;
                     }
 
+                    if (opCode == ClientOp.ComputeExecute)
+                    {
+                        var arrayBufferWriter = new ArrayBufferWriter<byte>();
+                        var writer = new MessagePackWriter(arrayBufferWriter);
+                        writer.Write(Node.Name);
+                        writer.Flush();
+
+                        handler.Send(new byte[] { 0, 0, 0, (byte)(4 + 
arrayBufferWriter.WrittenCount) }); // Size.
+                        handler.Send(new byte[] { 0, requestId, 0, 
(byte)ClientDataType.String });
+                        handler.Send(arrayBufferWriter.WrittenSpan);
+
+                        continue;
+                    }
+
                     // Fake error message for any other op code.
                     handler.Send(new byte[] { 0, 0, 0, 8 }); // Size.
                     handler.Send(new byte[] { 0, requestId, 1, 160 | 4, 
(byte)Err[0], (byte)Err[1], (byte)Err[2], (byte)Err[3] });
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
index 222d55a81..e819b162b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
@@ -83,7 +83,7 @@ namespace Apache.Ignite.Tests
 
             var str = Encoding.UTF8.GetString(msg);
 
-            Assert.AreEqual(10, msgSize, str);
+            Assert.AreEqual(110, msgSize, str);
 
             // Protocol version.
             Assert.AreEqual(3, msg[0]);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
index 5a49866ab..fa35d387a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
@@ -18,9 +18,12 @@
 namespace Apache.Ignite.Tests
 {
     using System;
+    using System.Diagnostics;
     using System.IO;
     using System.Reflection;
     using System.Runtime.InteropServices;
+    using System.Threading;
+    using NUnit.Framework;
 
     public static class TestUtils
     {
@@ -30,6 +33,28 @@ namespace Apache.Ignite.Tests
 
         public static bool IsWindows => 
RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
 
+        public static void WaitForCondition(Func<bool> condition, int 
timeoutMs = 1000)
+        {
+            if (condition())
+            {
+                return;
+            }
+
+            var sw = Stopwatch.StartNew();
+
+            while (sw.ElapsedMilliseconds < timeoutMs)
+            {
+                if (condition())
+                {
+                    return;
+                }
+
+                Thread.Sleep(50);
+            }
+
+            Assert.Fail("Condition not reached after " + sw.Elapsed);
+        }
+
         private static string GetSolutionDir()
         {
             var dir = 
Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
diff --git a/modules/platforms/dotnet/Apache.Ignite/IIgniteClient.cs 
b/modules/platforms/dotnet/Apache.Ignite/IIgniteClient.cs
index 83b302083..2ef60afff 100644
--- a/modules/platforms/dotnet/Apache.Ignite/IIgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IIgniteClient.cs
@@ -18,6 +18,8 @@
 namespace Apache.Ignite
 {
     using System;
+    using System.Collections.Generic;
+    using Network;
 
     /// <summary>
     /// Ignite client.
@@ -30,5 +32,11 @@ namespace Apache.Ignite
         /// Gets the configuration.
         /// </summary>
         IgniteClientConfiguration Configuration { get; }
+
+        /// <summary>
+        /// Gets active connections.
+        /// </summary>
+        /// <returns>A list of connected cluster nodes.</returns>
+        IList<IClusterNode> GetConnections();
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs 
b/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs
index 5a4819146..80f642c8e 100644
--- a/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs
@@ -35,9 +35,7 @@ namespace Apache.Ignite
         {
             IgniteArgumentCheck.NotNull(configuration, nameof(configuration));
 
-            var socket = new ClientFailoverSocket(configuration);
-
-            await socket.ConnectAsync().ConfigureAwait(false);
+            var socket = await 
ClientFailoverSocket.ConnectAsync(configuration).ConfigureAwait(false);
 
             return new IgniteClientInternal(socket);
         }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index fa5d2d16a..6519e9f80 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -18,6 +18,7 @@
 namespace Apache.Ignite.Internal
 {
     using System;
+    using System.Collections.Concurrent;
     using System.Collections.Generic;
     using System.Diagnostics;
     using System.Diagnostics.CodeAnalysis;
@@ -42,7 +43,10 @@ namespace Apache.Ignite.Internal
         private readonly IIgniteLogger? _logger;
 
         /** Endpoints with corresponding hosts - from configuration. */
-        private readonly IReadOnlyList<SocketEndpoint> _endPoints;
+        private readonly IReadOnlyList<SocketEndpoint> _endpoints;
+
+        /** Cluster node unique name to endpoint map. */
+        private readonly ConcurrentDictionary<string, SocketEndpoint> 
_endpointsMap = new();
 
         /** <see cref="_socket"/> lock. */
         [SuppressMessage(
@@ -51,7 +55,7 @@ namespace Apache.Ignite.Internal
             Justification = "WaitHandle is not used in SemaphoreSlim, no need 
to dispose.")]
         private readonly SemaphoreSlim _socketLock = new(1);
 
-        /** Primary socket. */
+        /** Primary socket. Guarded by <see cref="_socketLock"/>. */
         private ClientSocket? _socket;
 
         /** Disposed flag. */
@@ -61,7 +65,7 @@ namespace Apache.Ignite.Internal
         /// Initializes a new instance of the <see 
cref="ClientFailoverSocket"/> class.
         /// </summary>
         /// <param name="configuration">Client configuration.</param>
-        public ClientFailoverSocket(IgniteClientConfiguration configuration)
+        private ClientFailoverSocket(IgniteClientConfiguration configuration)
         {
             if (configuration.Endpoints.Count == 0)
             {
@@ -71,7 +75,7 @@ namespace Apache.Ignite.Internal
             }
 
             _logger = configuration.Logger.GetLogger(GetType());
-            _endPoints = GetIpEndPoints(configuration).ToList();
+            _endpoints = GetIpEndPoints(configuration).ToList();
 
             Configuration = new(configuration); // Defensive copy.
         }
@@ -84,10 +88,19 @@ namespace Apache.Ignite.Internal
         /// <summary>
         /// Connects the socket.
         /// </summary>
+        /// <param name="configuration">Client configuration.</param>
         /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
-        public async Task ConnectAsync()
+        public static async Task<ClientFailoverSocket> 
ConnectAsync(IgniteClientConfiguration configuration)
         {
-            await GetSocketAsync().ConfigureAwait(false);
+            var socket = new ClientFailoverSocket(configuration);
+
+            await socket.GetSocketAsync().ConfigureAwait(false);
+
+            // Because this call is not awaited, execution of the current 
method continues before the call is completed.
+            // Secondary connections are established in the background.
+            _ = socket.ConnectAllSockets();
+
+            return socket;
         }
 
         /// <summary>
@@ -119,6 +132,59 @@ namespace Apache.Ignite.Internal
             }
         }
 
+        /// <summary>
+        /// Gets the endpoint by unique cluster node name.
+        /// </summary>
+        /// <param name="clusterNodeName">Cluster node name.</param>
+        /// <returns>Endpoint or null.</returns>
+        public SocketEndpoint? GetEndpoint(string clusterNodeName)
+        {
+            return _endpointsMap.TryGetValue(clusterNodeName, out var e) ? e : 
null;
+        }
+
+        /// <summary>
+        /// Performs an in-out operation on the specified endpoint.
+        /// </summary>
+        /// <param name="endpoint">Endpoint.</param>
+        /// <param name="clientOp">Client op code.</param>
+        /// <param name="request">Request data.</param>
+        /// <returns>Response data.</returns>
+        public async Task<PooledBuffer?> TryDoOutInOpAsync(SocketEndpoint 
endpoint, ClientOp clientOp, PooledArrayBufferWriter? request)
+        {
+            try
+            {
+                var socket = endpoint.Socket;
+
+                if (socket == null || socket.IsDisposed)
+                {
+                    await _socketLock.WaitAsync().ConfigureAwait(false);
+
+                    try
+                    {
+                        socket = await 
ConnectAsync(endpoint).ConfigureAwait(false);
+                    }
+                    finally
+                    {
+                        _socketLock.Release();
+                    }
+                }
+
+                return await socket.DoOutInOpAsync(clientOp, 
request).ConfigureAwait(false);
+            }
+            catch (Exception e)
+            {
+                int attempt = 0;
+                List<Exception>? errors = null;
+
+                if (HandleOpError(e, clientOp, ref attempt, ref errors))
+                {
+                    return null;
+                }
+
+                throw;
+            }
+        }
+
         /// <inheritdoc/>
         public void Dispose()
         {
@@ -133,7 +199,10 @@ namespace Apache.Ignite.Internal
 
                 _disposed = true;
 
-                _socket?.Dispose();
+                foreach (var endpoint in _endpoints)
+                {
+                    endpoint.Socket?.Dispose();
+                }
             }
             finally
             {
@@ -171,6 +240,57 @@ namespace Apache.Ignite.Internal
             }
         }
 
+        /// <summary>
+        /// Gets active connections.
+        /// </summary>
+        /// <returns>Active connections.</returns>
+        public IEnumerable<ConnectionContext> GetConnections() =>
+            _endpoints
+                .Select(e => e.Socket?.ConnectionContext)
+                .Where(ctx => ctx != null)
+                .ToList()!;
+
+        [SuppressMessage(
+            "Microsoft.Design",
+            "CA1031:DoNotCatchGeneralExceptionTypes",
+            Justification = "Secondary connection errors can be ignored.")]
+        private async Task ConnectAllSockets()
+        {
+            if (_endpoints.Count == 1)
+            {
+                return;
+            }
+
+            await _socketLock.WaitAsync().ConfigureAwait(false);
+
+            var tasks = new List<Task>(_endpoints.Count);
+
+            _logger?.Debug("Establishing secondary connections...");
+
+            try
+            {
+                foreach (var endpoint in _endpoints)
+                {
+                    if (endpoint.Socket?.IsDisposed == false)
+                    {
+                        continue;
+                    }
+
+                    tasks.Add(ConnectAsync(endpoint));
+                }
+
+                await Task.WhenAll(tasks).ConfigureAwait(false);
+            }
+            catch (Exception e)
+            {
+                _logger?.Warn("Error while trying to establish secondary 
connections: " + e.Message, e);
+            }
+            finally
+            {
+                _socketLock.Release();
+            }
+        }
+
         /// <summary>
         /// Throws if disposed.
         /// </summary>
@@ -190,10 +310,10 @@ namespace Apache.Ignite.Internal
             List<Exception>? errors = null;
             var startIdx = (int) Interlocked.Increment(ref _endPointIndex);
 
-            for (var i = 0; i < _endPoints.Count; i++)
+            for (var i = 0; i < _endpoints.Count; i++)
             {
-                var idx = (startIdx + i) % _endPoints.Count;
-                var endPoint = _endPoints[idx];
+                var idx = (startIdx + i) % _endpoints.Count;
+                var endPoint = _endpoints[idx];
 
                 if (endPoint.Socket is { IsDisposed: false })
                 {
@@ -219,11 +339,18 @@ namespace Apache.Ignite.Internal
         /// <summary>
         /// Connects to the given endpoint.
         /// </summary>
-        private async Task<ClientSocket> ConnectAsync(SocketEndpoint endPoint)
+        private async Task<ClientSocket> ConnectAsync(SocketEndpoint endpoint)
         {
-            var socket = await ClientSocket.ConnectAsync(endPoint.EndPoint, 
Configuration).ConfigureAwait(false);
+            if (endpoint.Socket?.IsDisposed == false)
+            {
+                return endpoint.Socket;
+            }
+
+            var socket = await ClientSocket.ConnectAsync(endpoint.EndPoint, 
Configuration).ConfigureAwait(false);
+
+            endpoint.Socket = socket;
 
-            endPoint.Socket = socket;
+            _endpointsMap[socket.ConnectionContext.ClusterNode.Name] = 
endpoint;
 
             return socket;
         }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index f8366d2f6..bdee9816f 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -30,6 +30,7 @@ namespace Apache.Ignite.Internal
     using Buffers;
     using Log;
     using MessagePack;
+    using Network;
     using Proto;
 
     /// <summary>
@@ -38,8 +39,6 @@ namespace Apache.Ignite.Internal
     // ReSharper disable SuggestBaseTypeForParameter (NetworkStream has more 
efficient read/write methods).
     internal sealed class ClientSocket : IDisposable
     {
-        private record ConnectionContext(ClientProtocolVersion Version, 
TimeSpan IdleTimeout);
-
         /** General-purpose client type code. */
         private const byte ClientType = 2;
 
@@ -95,13 +94,14 @@ namespace Apache.Ignite.Internal
         /// </summary>
         /// <param name="stream">Network stream.</param>
         /// <param name="configuration">Configuration.</param>
-        /// <param name="context">Connection context.</param>
-        private ClientSocket(NetworkStream stream, IgniteClientConfiguration 
configuration, ConnectionContext context)
+        /// <param name="connectionContext">Connection context.</param>
+        private ClientSocket(NetworkStream stream, IgniteClientConfiguration 
configuration, ConnectionContext connectionContext)
         {
             _stream = stream;
+            ConnectionContext = connectionContext;
             _logger = configuration.Logger.GetLogger(GetType());
 
-            _heartbeatInterval = 
GetHeartbeatInterval(configuration.HeartbeatInterval, context.IdleTimeout, 
_logger);
+            _heartbeatInterval = 
GetHeartbeatInterval(configuration.HeartbeatInterval, 
connectionContext.IdleTimeout, _logger);
 
             // ReSharper disable once AsyncVoidLambda (timer callback)
             _heartbeatTimer = new Timer(
@@ -112,9 +112,7 @@ namespace Apache.Ignite.Internal
 
             // Because this call is not awaited, execution of the current 
method continues before the call is completed.
             // Receive loop runs in the background and should not be awaited.
-#pragma warning disable 4014
-            RunReceiveLoop(_disposeTokenSource.Token);
-#pragma warning restore 4014
+            _ = RunReceiveLoop(_disposeTokenSource.Token);
         }
 
         /// <summary>
@@ -122,6 +120,11 @@ namespace Apache.Ignite.Internal
         /// </summary>
         public bool IsDisposed => _disposeTokenSource.IsCancellationRequested;
 
+        /// <summary>
+        /// Gets the connection context.
+        /// </summary>
+        public ConnectionContext ConnectionContext { get; }
+
         /// <summary>
         /// Connects the socket to the specified endpoint and performs 
handshake.
         /// </summary>
@@ -132,7 +135,7 @@ namespace Apache.Ignite.Internal
             "Microsoft.Reliability",
             "CA2000:Dispose objects before losing scope",
             Justification = "NetworkStream is returned from this method in the 
socket.")]
-        public static async Task<ClientSocket> ConnectAsync(EndPoint endPoint, 
IgniteClientConfiguration configuration)
+        public static async Task<ClientSocket> ConnectAsync(IPEndPoint 
endPoint, IgniteClientConfiguration configuration)
         {
             var socket = new Socket(SocketType.Stream, ProtocolType.Tcp)
             {
@@ -148,7 +151,7 @@ namespace Apache.Ignite.Internal
 
                 var stream = new NetworkStream(socket, ownsSocket: true);
 
-                var context = await 
HandshakeAsync(stream).ConfigureAwait(false);
+                var context = await HandshakeAsync(stream, 
endPoint).ConfigureAwait(false);
                 logger?.Debug($"Handshake succeeded. Server protocol version: 
{context.Version}, idle timeout: {context.IdleTimeout}");
 
                 return new ClientSocket(stream, configuration, context);
@@ -222,7 +225,8 @@ namespace Apache.Ignite.Internal
         /// Performs the handshake exchange.
         /// </summary>
         /// <param name="stream">Network stream.</param>
-        private static async Task<ConnectionContext> 
HandshakeAsync(NetworkStream stream)
+        /// <param name="endPoint">Endpoint.</param>
+        private static async Task<ConnectionContext> 
HandshakeAsync(NetworkStream stream, IPEndPoint endPoint)
         {
             await 
stream.WriteAsync(ProtoCommon.MagicBytes).ConfigureAwait(false);
             await WriteHandshakeAsync(stream, 
CurrentProtocolVersion).ConfigureAwait(false);
@@ -232,7 +236,7 @@ namespace Apache.Ignite.Internal
             await CheckMagicBytesAsync(stream).ConfigureAwait(false);
 
             using var response = await ReadResponseAsync(stream, new byte[4], 
CancellationToken.None).ConfigureAwait(false);
-            return ReadHandshakeResponse(response.GetReader());
+            return ReadHandshakeResponse(response.GetReader(), endPoint);
         }
 
         private static async ValueTask CheckMagicBytesAsync(NetworkStream 
stream)
@@ -258,7 +262,7 @@ namespace Apache.Ignite.Internal
             }
         }
 
-        private static ConnectionContext 
ReadHandshakeResponse(MessagePackReader reader)
+        private static ConnectionContext 
ReadHandshakeResponse(MessagePackReader reader, IPEndPoint endPoint)
         {
             var serverVer = new ClientProtocolVersion(reader.ReadInt16(), 
reader.ReadInt16(), reader.ReadInt16());
 
@@ -274,12 +278,17 @@ namespace Apache.Ignite.Internal
                 throw exception;
             }
 
+            var idleTimeoutMs = reader.ReadInt64();
+            var clusterNodeId = reader.ReadString();
+            var clusterNodeName = reader.ReadString();
+
             reader.Skip(); // Features.
             reader.Skip(); // Extensions.
 
-            var idleTimeoutMs = reader.ReadInt64();
-
-            return new ConnectionContext(serverVer, 
TimeSpan.FromMilliseconds(idleTimeoutMs));
+            return new ConnectionContext(
+                serverVer,
+                TimeSpan.FromMilliseconds(idleTimeoutMs),
+                new ClusterNode(clusterNodeId, clusterNodeName, endPoint));
         }
 
         private static IgniteClientException? ReadError(ref MessagePackReader 
reader)
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index f80de5b80..348c05c4d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -89,21 +89,42 @@ namespace Apache.Ignite.Internal.Compute
         {
             IgniteArgumentCheck.NotNull(node, nameof(node));
 
-            using var writer = new PooledArrayBufferWriter();
-            Write();
+            // Try direct connection to the specified node.
+            if (_socket.GetEndpoint(node.Name) is { } endpoint)
+            {
+                using var writerWithoutNode = new PooledArrayBufferWriter();
+                Write(writerWithoutNode, writeNode: false);
+
+                using var res1 = await _socket.TryDoOutInOpAsync(endpoint, 
ClientOp.ComputeExecute, writerWithoutNode)
+                    .ConfigureAwait(false);
+
+                // Result is null when there was a connection issue, but retry 
policy allows another try.
+                if (res1 != null)
+                {
+                    return Read(res1.Value);
+                }
+            }
+
+            // When direct connection is not available, use default connection 
and pass target node info to the server.
+            using var writerWithNode = new PooledArrayBufferWriter();
+            Write(writerWithNode, writeNode: true);
 
-            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.ComputeExecute, writer).ConfigureAwait(false);
+            using var res2 = await 
_socket.DoOutInOpAsync(ClientOp.ComputeExecute, 
writerWithNode).ConfigureAwait(false);
 
-            return Read();
+            return Read(res2);
 
-            void Write()
+            void Write(PooledArrayBufferWriter writer, bool writeNode)
             {
                 var w = writer.GetMessageWriter();
 
-                w.Write(node.Id);
-                w.Write(node.Name);
-                w.Write(node.Address.Address.ToString());
-                w.Write(node.Address.Port);
+                if (writeNode)
+                {
+                    w.Write(node.Name);
+                }
+                else
+                {
+                    w.WriteNil();
+                }
 
                 w.Write(jobClassName);
                 w.WriteObjectArrayWithTypes(args);
@@ -111,9 +132,9 @@ namespace Apache.Ignite.Internal.Compute
                 w.Flush();
             }
 
-            T Read()
+            static T Read(in PooledBuffer buf)
             {
-                var reader = resBuf.GetReader();
+                var reader = buf.GetReader();
 
                 return (T)reader.ReadObjectWithType()!;
             }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs
similarity index 64%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
copy to modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs
index 54c0df714..505eacb2c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs
@@ -15,22 +15,19 @@
  * limitations under the License.
  */
 
-namespace Apache.Ignite.Internal.Table
-{
-    using System.Collections.Generic;
-
-    // XMLDoc check fails on older SDKs: 
https://github.com/dotnet/roslyn/issues/44571.
+// XMLDoc check fails on older SDKs: 
https://github.com/dotnet/roslyn/issues/44571.
 #pragma warning disable CS1572
 #pragma warning disable CS1573
+namespace Apache.Ignite.Internal
+{
+    using System;
+    using Ignite.Network;
 
     /// <summary>
-    /// Schema.
+    /// Socket connection context.
     /// </summary>
-    /// <param name="Version">Version.</param>
-    /// <param name="KeyColumnCount">Key column count.</param>
-    /// <param name="Columns">Columns in schema order.</param>
-    internal record Schema(
-        int Version,
-        int KeyColumnCount,
-        IReadOnlyList<Column> Columns);
+    /// <param name="Version">Protocol version.</param>
+    /// <param name="IdleTimeout">Server idle timeout.</param>
+    /// <param name="ClusterNode">Cluster node.</param>
+    internal record ConnectionContext(ClientProtocolVersion Version, TimeSpan 
IdleTimeout, IClusterNode ClusterNode);
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
index 22fd69abf..20dededa1 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
@@ -18,6 +18,7 @@
 namespace Apache.Ignite.Internal
 {
     using System.Collections.Generic;
+    using System.Linq;
     using System.Net;
     using System.Threading.Tasks;
     using Ignite.Compute;
@@ -86,6 +87,10 @@ namespace Apache.Ignite.Internal
             }
         }
 
+        /// <inheritdoc/>
+        public IList<IClusterNode> GetConnections() =>
+            _socket.GetConnections().Select(ctx => ctx.ClusterNode).ToList();
+
         /// <inheritdoc/>
         public void Dispose()
         {
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
index 54c0df714..836e32cd9 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
@@ -15,14 +15,13 @@
  * limitations under the License.
  */
 
+// XMLDoc check fails on older SDKs: 
https://github.com/dotnet/roslyn/issues/44571.
+#pragma warning disable CS1572
+#pragma warning disable CS1573
 namespace Apache.Ignite.Internal.Table
 {
     using System.Collections.Generic;
 
-    // XMLDoc check fails on older SDKs: 
https://github.com/dotnet/roslyn/issues/44571.
-#pragma warning disable CS1572
-#pragma warning disable CS1573
-
     /// <summary>
     /// Schema.
     /// </summary>

Reply via email to