This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch ignite-2.16
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-2.16 by this push:
     new dedcf99c1c9 IGNITE-20656 Java thin: Service awareness (#11003)
dedcf99c1c9 is described below

commit dedcf99c1c9974adcfcfe24e42033deef44fc18b
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Nov 9 21:28:18 2023 +0300

    IGNITE-20656 Java thin: Service awareness (#11003)
    
    Co-authored-by: Pavel Tupitsyn <[email protected]>
    (cherry picked from commit 7bcf23279d99fb7d24b6f758ef3fe8ca58f45139)
---
 docs/_docs/services/services.adoc                  |  16 +
 docs/_docs/thin-clients/java-thin-client.adoc      |   9 +-
 .../apache/ignite/util/GridCommandHandlerTest.java |   4 +-
 .../client/thin/ClientCacheAffinityContext.java    |   7 +
 .../internal/client/thin/ClientOperation.java      |   3 +
 .../internal/client/thin/ClientServicesImpl.java   | 123 +++-
 .../client/thin/ProtocolBitmaskFeature.java        |   5 +-
 .../internal/client/thin/ReliableChannel.java      |  32 +-
 .../internal/client/thin/TcpIgniteClient.java      |   9 +-
 .../platform/client/ClientBitmaskFeature.java      |   5 +-
 .../platform/client/ClientConnectionContext.java   |   4 -
 .../platform/client/ClientMessageParser.java       |   7 +
 .../service/ClientServiceMappingsResponse.java     |  55 ++
 .../service/ClientServiceTopologyRequest.java      |  63 +++
 .../processors/service/IgniteServiceProcessor.java |   2 +-
 .../processors/service/ServiceDeploymentTask.java  |   2 +-
 .../org/apache/ignite/client/ReliabilityTest.java  |   2 +-
 .../internal/client/thin/ServiceAwarenessTest.java | 622 +++++++++++++++++++++
 .../util/future/GridFutureAdapterSelfTest.java     |   2 +-
 .../org/apache/ignite/client/ClientTestSuite.java  |   2 +
 20 files changed, 951 insertions(+), 23 deletions(-)

diff --git a/docs/_docs/services/services.adoc 
b/docs/_docs/services/services.adoc
index ce72603e908..7568b9d6c46 100644
--- a/docs/_docs/services/services.adoc
+++ b/docs/_docs/services/services.adoc
@@ -234,6 +234,22 @@ tab:C++[]
 //== Accessing Services from Compute Tasks
 // TODO the @ServiceResource annotation
 
+== Service Awareness [[service_awareness]]
+For link:../thin-clients/java-thin-client.adoc#java_thin_client[Java Thin 
Client] you can activate Service Awareness.
+To do that, enable 
link:../thin-clients/java-thin-client.adoc#partition_awareness[Partition 
Awareness].
+
+Without Service Awareness, the invocation requests are sent to a random node. 
If it has no service
+instance deployed, the request is redirected to a different node. This 
additional network hop adds overhead.
+
+With Service Awareness, the thin client knows where service instances are 
deployed and sends the request to the correct node.
+
+[NOTE]
+====
+The service topology is updated asynchronously starting with the first service 
invocation.
+Thus, some invocation redirects are still possible.
+====
+
+
 == Un-deploying Services
 
 To undeploy a service, use the `IgniteServices.cancel(serviceName)` or 
`IgniteServices.cancelAll()` methods.
diff --git a/docs/_docs/thin-clients/java-thin-client.adoc 
b/docs/_docs/thin-clients/java-thin-client.adoc
index 5f955d35615..5a744d545c4 100644
--- a/docs/_docs/thin-clients/java-thin-client.adoc
+++ b/docs/_docs/thin-clients/java-thin-client.adoc
@@ -12,7 +12,7 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
-= Java Thin Client
+= Java Thin Client [[java_thin_client]]
 
 :sourceCodeFile: {javaCodeDir}/JavaThinClient.java
 == Overview
@@ -75,7 +75,7 @@ include::{sourceCodeFile}[tag=connect-to-many-nodes,indent=0]
 
 Note that the code above provides a failover mechanism in case of server node 
failures. Refer to the <<Handling Node Failures>> section for more information.
 
-== Partition Awareness
+== Partition Awareness [partition_awareness]
 
 include::includes/partition-awareness.adoc[]
 
@@ -116,6 +116,11 @@ The code snippet shows how an example implementation might 
look like if you want
 
 Also, you can check a 
link:https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/client/ClientKubernetesPutGetExample.java#L50[real
 example] of the interface implementation. `ThinClientKubernetesAddressFinder` 
is created to handle scalable Kubernetes environment.
 
+[NOTE]
+====
+Partition Awareness also enables 
link:../services/services.adoc#service_awareness[Service Awareness]
+====
+
 == Using Key-Value API
 
 The Java thin client supports most of the key-value operations available in 
the thick client.
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 6a40bfb738e..f3869dbac24 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -2053,14 +2053,14 @@ public class GridCommandHandlerTest extends 
GridCommandHandlerClusterPerMethodAb
         String testOutStr = testOut.toString();
 
         // Ignite instase 1 can be logged only in arguments list.
-        boolean isInstanse1Found = Arrays.stream(testOutStr.split("\n"))
+        boolean isInstance1Found = Arrays.stream(testOutStr.split("\n"))
                                         .filter(s -> s.contains("Arguments:"))
                                         .noneMatch(s -> 
s.contains(getTestIgniteInstanceName() + "1"));
 
         assertTrue(testOutStr, testOutStr.contains("Node not found for 
consistent ID:"));
 
         if (commandHandler.equals(CLI_CMD_HND))
-            assertFalse(testOutStr, isInstanse1Found);
+            assertFalse(testOutStr, isInstance1Found);
     }
 
     /** */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
index 577a2c5f476..bcd681796da 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
@@ -329,6 +329,13 @@ public class ClientCacheAffinityContext {
         public Iterable<UUID> nodes() {
             return Collections.unmodifiableCollection(nodes);
         }
+
+        /**
+         * @return Topology version.
+         */
+        public AffinityTopologyVersion version() {
+            return topVer;
+        }
     }
 
     /** Holder of a mapper factory and cacheName. */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index a683154a235..c97ae47928a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -216,6 +216,9 @@ public enum ClientOperation {
     /** Get service descriptors. */
     SERVICE_GET_DESCRIPTOR(7002),
 
+    /** Get service topology. */
+    SERVICE_GET_TOPOLOGY(7003),
+
     /** Get or create an AtomicLong by name. */
     ATOMIC_LONG_CREATE(9000),
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java
index f9fcb37d1fe..869c8db8883 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java
@@ -23,8 +23,13 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.client.ClientClusterGroup;
 import org.apache.ignite.client.ClientException;
 import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
@@ -32,9 +37,11 @@ import org.apache.ignite.client.ClientServiceDescriptor;
 import org.apache.ignite.client.ClientServices;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.service.ServiceCallContextImpl;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.platform.PlatformServiceMethod;
 import org.apache.ignite.platform.PlatformType;
 import org.apache.ignite.services.ServiceCallContext;
@@ -45,6 +52,9 @@ import org.jetbrains.annotations.Nullable;
  * Implementation of {@link ClientServices}.
  */
 class ClientServicesImpl implements ClientServices {
+    /** Max service topology update period in mills. */
+    static final int SRV_TOP_UPDATE_PERIOD = 10_000;
+
     /** Channel. */
     private final ReliableChannel ch;
 
@@ -57,13 +67,23 @@ class ClientServicesImpl implements ClientServices {
     /** Cluster group. */
     private final ClientClusterGroupImpl grp;
 
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Services topology. {@code Null} if partition awareness is not enabled. 
*/
+    private final Map<String, ServiceTopology> servicesTopologies;
+
     /** Constructor. */
-    ClientServicesImpl(ReliableChannel ch, ClientBinaryMarshaller marsh, 
ClientClusterGroupImpl grp) {
+    ClientServicesImpl(ReliableChannel ch, ClientBinaryMarshaller marsh, 
ClientClusterGroupImpl grp, IgniteLogger log) {
         this.ch = ch;
         this.marsh = marsh;
         this.grp = grp;
 
         utils = new ClientUtils(marsh);
+
+        this.log = log;
+
+        servicesTopologies = ch.partitionAwarenessEnabled ? new 
ConcurrentHashMap<>() : Collections.emptyMap();
     }
 
     /** {@inheritDoc} */
@@ -165,7 +185,91 @@ class ClientServicesImpl implements ClientServices {
     ClientServices withClusterGroup(ClientClusterGroupImpl grp) {
         A.notNull(grp, "grp");
 
-        return new ClientServicesImpl(ch, marsh, grp);
+        return new ClientServicesImpl(ch, marsh, grp, log);
+    }
+
+    /**
+     * Keeps topology of certain service and its update progress meta.
+     */
+    private final class ServiceTopology {
+        /** The service name. */
+        private final String srvcName;
+
+        /** If {@code true}, topology update of current service is in 
progress. */
+        private final AtomicBoolean updateInProgress = new AtomicBoolean();
+
+        /** Time of the last received topology. */
+        private volatile long lastUpdateRequestTime;
+
+        /** UUID of the nodes with at least one service instance. */
+        private volatile List<UUID> nodes = Collections.emptyList();
+
+        /** Last cluster topology version when current service topology was 
actual. */
+        private volatile AffinityTopologyVersion lastAffTop;
+
+        /** */
+        private ServiceTopology(String name) {
+            srvcName = name;
+        }
+
+        /**
+         * @return {@code True} if update of the service topology is required. 
{@code False} otherwise.
+         */
+        private boolean isUpdateRequired(AffinityTopologyVersion curAffTop) {
+            return lastAffTop == null || curAffTop.topologyVersion() > 
lastAffTop.topologyVersion()
+                || U.nanosToMillis(System.nanoTime() - lastUpdateRequestTime) 
>= SRV_TOP_UPDATE_PERIOD;
+        }
+
+        /**
+         * Asynchronously updates the service topology.
+         */
+        private void updateTopologyAsync() {
+            AffinityTopologyVersion curAffTop = 
ch.affinityContext().lastTopology().version();
+
+            if (!updateInProgress.compareAndSet(false, true))
+                return;
+
+            ch.serviceAsync(
+                ClientOperation.SERVICE_GET_TOPOLOGY,
+                req -> utils.writeObject(req.out(), srvcName),
+                resp -> {
+                    int cnt = resp.in().readInt();
+
+                    List<UUID> res = new ArrayList<>(cnt);
+
+                    for (int i = 0; i < cnt; ++i)
+                        res.add(new UUID(resp.in().readLong(), 
resp.in().readLong()));
+
+                    return res;
+                }).whenComplete((nodes, err) -> {
+                    if (err == null) {
+                        this.nodes = nodes;
+                        lastAffTop = curAffTop;
+                        lastUpdateRequestTime = System.nanoTime();
+
+                        if (log.isDebugEnabled()) {
+                            log.debug("Topology of service '" + srvcName + "' 
has been updated. The " +
+                                "service instance nodes: " + nodes);
+                        }
+                    }
+                    else
+                        log.error("Failed to update topology of the service '" 
+ srvcName + "'.", err);
+
+                    updateInProgress.set(false);
+                });
+        }
+
+        /**
+         * Provides last known service topology and asynchronously updates it 
if required.
+         *
+         * @return Last known service topology.
+         */
+        public List<UUID> getAndUpdate() {
+            if 
(isUpdateRequired(ch.affinityContext().lastTopology().version()))
+                updateTopologyAsync();
+
+            return nodes;
+        }
     }
 
     /**
@@ -215,7 +319,8 @@ class ClientServicesImpl implements ClientServices {
 
                 return ch.service(ClientOperation.SERVICE_INVOKE,
                     req -> writeServiceInvokeRequest(req, nodeIds, method, 
args),
-                    res -> utils.readObject(res.in(), false, 
method.getReturnType())
+                    res -> utils.readObject(res.in(), false, 
method.getReturnType()),
+                    serviceTopology()
                 );
             }
             catch (ClientError e) {
@@ -223,6 +328,18 @@ class ClientServicesImpl implements ClientServices {
             }
         }
 
+        /**
+         * @return Actual known service topology or empty list if: service 
topology is not enabled, not supported or
+         * not received yet.
+         */
+        private List<UUID> serviceTopology() {
+            if (!ch.partitionAwarenessEnabled
+                || !ch.applyOnDefaultChannel(c -> 
c.protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.SERVICE_TOPOLOGY), 
null))
+                return Collections.emptyList();
+
+            return servicesTopologies.computeIfAbsent(name, 
ServiceTopology::new).getAndUpdate();
+        }
+
         /**
          * @param ch Payload output channel.
          * @param nodeIds Node IDs.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
index 751f66dbc27..7676fcc323c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
@@ -74,7 +74,10 @@ public enum ProtocolBitmaskFeature {
     INDEX_QUERY(14),
 
     /** IndexQuery limit. */
-    INDEX_QUERY_LIMIT(15);
+    INDEX_QUERY_LIMIT(15),
+
+    /** Service topology. */
+    SERVICE_TOPOLOGY(16);
 
     /** */
     private static final EnumSet<ProtocolBitmaskFeature> 
ALL_FEATURES_AS_ENUM_SET =
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index 2bcf6b116c2..f88a90e654e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client.thin;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -74,7 +75,7 @@ final class ReliableChannel implements AutoCloseable {
     private volatile int curChIdx = -1;
 
     /** Partition awareness enabled. */
-    private final boolean partitionAwarenessEnabled;
+    final boolean partitionAwarenessEnabled;
 
     /** Cache partition awareness context. */
     private final ClientCacheAffinityContext affinityCtx;
@@ -182,7 +183,32 @@ final class ReliableChannel implements AutoCloseable {
         Consumer<PayloadOutputChannel> payloadWriter,
         Function<PayloadInputChannel, T> payloadReader
     ) throws ClientException, ClientError {
-        return applyOnDefaultChannel(channel -> channel.service(op, 
payloadWriter, payloadReader), op);
+        return service(op, payloadWriter, payloadReader, 
Collections.emptyList());
+    }
+
+    /**
+     * Send request to one of the passed nodes and handle response.
+     *
+     * @throws ClientException Thrown by {@code payloadWriter} or {@code 
payloadReader}.
+     * @throws ClientAuthenticationException When user name or password is 
invalid.
+     * @throws ClientAuthorizationException When user has no permission to 
perform operation.
+     * @throws ClientProtocolError When failed to handshake with server.
+     * @throws ClientServerError When failed to process request on server.
+     */
+    public <T> T service(
+        ClientOperation op,
+        Consumer<PayloadOutputChannel> payloadWriter,
+        Function<PayloadInputChannel, T> payloadReader,
+        List<UUID> targetNodes
+    ) throws ClientException, ClientError {
+        if (F.isEmpty(targetNodes))
+            return applyOnDefaultChannel(channel -> channel.service(op, 
payloadWriter, payloadReader), op);
+
+        return applyOnNodeChannelWithFallback(
+            
targetNodes.get(ThreadLocalRandom.current().nextInt(targetNodes.size())),
+            channel -> channel.service(op, payloadWriter, payloadReader),
+            op
+        );
     }
 
     /**
@@ -943,7 +969,7 @@ final class ReliableChannel implements AutoCloseable {
         /** Channel. */
         private volatile ClientChannel ch;
 
-        /** ID of the last server node that {@link ch} is or was connected to. 
*/
+        /** ID of the last server node that {@link #ch} is or was connected 
to. */
         private volatile UUID serverNodeId;
 
         /** Address that holder is bind to (chCfg.addr) is not in use now. So 
close the holder. */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index f15293089fb..a4d9922143f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -107,6 +107,9 @@ public class TcpIgniteClient implements IgniteClient {
     /** Serializer/deserializer. */
     private final ClientUtils serDes;
 
+    /** Logger. */
+    private final IgniteLogger log;
+
     /**
      * Private constructor. Use {@link 
TcpIgniteClient#start(ClientConfiguration)} to create an instance of
      * {@code TcpIgniteClient}.
@@ -122,6 +125,8 @@ public class TcpIgniteClient implements IgniteClient {
             BiFunction<ClientChannelConfiguration, 
ClientConnectionMultiplexer, ClientChannel> chFactory,
             ClientConfiguration cfg
     ) throws ClientException {
+        log = NullLogger.whenNull(cfg.getLogger());
+
         final ClientBinaryMetadataHandler metadataHnd = new 
ClientBinaryMetadataHandler();
 
         ClientMarshallerContext marshCtx = new ClientMarshallerContext();
@@ -159,7 +164,7 @@ public class TcpIgniteClient implements IgniteClient {
 
             compute = new ClientComputeImpl(ch, marsh, 
cluster.defaultClusterGroup());
 
-            services = new ClientServicesImpl(ch, marsh, 
cluster.defaultClusterGroup());
+            services = new ClientServicesImpl(ch, marsh, 
cluster.defaultClusterGroup(), log);
 
             lsnrsRegistry = new ClientCacheEntryListenersRegistry();
         }
@@ -475,8 +480,6 @@ public class TcpIgniteClient implements IgniteClient {
         if (clusterCfg == null)
             return;
 
-        IgniteLogger log = NullLogger.whenNull(cfg.getLogger());
-
         if (log.isDebugEnabled())
             log.debug("Cluster binary configuration retrieved: " + clusterCfg);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
index e858bdbb8db..b0f673ac5ee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
@@ -72,7 +72,10 @@ public enum ClientBitmaskFeature implements 
ThinProtocolFeature {
     INDEX_QUERY(14),
 
     /** IndexQuery limit. */
-    INDEX_QUERY_LIMIT(15);
+    INDEX_QUERY_LIMIT(15),
+
+    /** Service topology. */
+    SERVICE_TOPOLOGY(16);
 
     /** */
     private static final EnumSet<ClientBitmaskFeature> 
ALL_FEATURES_AS_ENUM_SET =
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
index eb765bdf5ed..13568187266 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
@@ -78,10 +78,6 @@ public class ClientConnectionContext extends 
ClientListenerAbstractConnectionCon
     /** Default version. */
     public static final ClientListenerProtocolVersion DEFAULT_VER = VER_1_7_0;
 
-    /** Default protocol context. */
-    public static final ClientProtocolContext DEFAULT_PROTOCOL_CONTEXT =
-        new ClientProtocolContext(DEFAULT_VER, 
ClientBitmaskFeature.allFeaturesAsEnumSet());
-
     /** Supported versions. */
     private static final Collection<ClientListenerProtocolVersion> 
SUPPORTED_VERS = Arrays.asList(
         VER_1_7_0,
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index 22cc257e660..ab3fe4c1608 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -106,6 +106,7 @@ import 
org.apache.ignite.internal.processors.platform.client.datastructures.Clie
 import 
org.apache.ignite.internal.processors.platform.client.service.ClientServiceGetDescriptorRequest;
 import 
org.apache.ignite.internal.processors.platform.client.service.ClientServiceGetDescriptorsRequest;
 import 
org.apache.ignite.internal.processors.platform.client.service.ClientServiceInvokeRequest;
+import 
org.apache.ignite.internal.processors.platform.client.service.ClientServiceTopologyRequest;
 import 
org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerAddDataRequest;
 import 
org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerStartRequest;
 import 
org.apache.ignite.internal.processors.platform.client.tx.ClientTxEndRequest;
@@ -393,6 +394,9 @@ public class ClientMessageParser implements 
ClientListenerMessageParser {
     /** IgniteSet.iterator page. */
     private static final short OP_SET_ITERATOR_GET_PAGE = 9023;
 
+    /** Get service topology. */
+    private static final short OP_SERVICE_GET_TOPOLOGY = 7003;
+
     /** Marshaller. */
     private final GridBinaryMarshaller marsh;
 
@@ -697,6 +701,9 @@ public class ClientMessageParser implements 
ClientListenerMessageParser {
 
             case OP_SET_ITERATOR_GET_PAGE:
                 return new ClientIgniteSetIteratorGetPageRequest(reader);
+
+            case OP_SERVICE_GET_TOPOLOGY:
+                return new ClientServiceTopologyRequest(reader);
         }
 
         return new ClientRawRequest(reader.readLong(), 
ClientStatus.INVALID_OP_CODE,
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceMappingsResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceMappingsResponse.java
new file mode 100644
index 00000000000..6be07fb886a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceMappingsResponse.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.service;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.client.thin.ClientUtils;
+import 
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.jetbrains.annotations.Nullable;
+
+/** Service topology response. */
+public class ClientServiceMappingsResponse extends ClientResponse {
+    /** Service instance nodes. */
+    @Nullable private final Collection<UUID> svcsNodes;
+
+    /**
+     * @param reqId Request id.
+     * @param svcsNodes Services instance nodes.
+     */
+    public ClientServiceMappingsResponse(long reqId, @Nullable 
Collection<UUID> svcsNodes) {
+        super(reqId);
+
+        this.svcsNodes = svcsNodes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void encode(ClientConnectionContext ctx, 
BinaryRawWriterEx writer) {
+        super.encode(ctx, writer);
+
+        ClientUtils.collection(
+            svcsNodes,
+            writer.out(),
+            (out, uuid) -> {
+                out.writeLong(uuid.getMostSignificantBits());
+                out.writeLong(uuid.getLeastSignificantBits());
+            });
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceTopologyRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceTopologyRequest.java
new file mode 100644
index 00000000000..560c79c0560
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceTopologyRequest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.service;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryRawReader;
+import 
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientRequest;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientStatus;
+import 
org.apache.ignite.internal.processors.platform.client.IgniteClientException;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Request topology of certain service.
+ */
+public class ClientServiceTopologyRequest extends ClientRequest {
+    /** The service name. */
+    private final String name;
+
+    /**
+     * Creates the service topology request.
+     *
+     * @param reader Reader to read the {@link #name} from.
+     */
+    public ClientServiceTopologyRequest(BinaryRawReader reader) {
+        super(reader);
+
+        name = reader.readString();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientResponse process(ClientConnectionContext ctx) {
+        Map<UUID, Integer> srvcTop;
+
+        try {
+            srvcTop = ctx.kernalContext().service().serviceTopology(name, 0);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteClientException(ClientStatus.FAILED, "Failed to 
get topology for service '" + name + "'.", e);
+        }
+
+        return new ClientServiceMappingsResponse(requestId(), 
F.isEmpty(srvcTop) ? Collections.emptyList() : srvcTop.keySet());
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
index 598c3e97df0..dcceab41d9d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
@@ -936,7 +936,7 @@ public class IgniteServiceProcessor extends 
GridProcessorAdapter implements Igni
      * @return Service topology.
      * @throws IgniteCheckedException On error.
      */
-    public Map<UUID, Integer> serviceTopology(String name, long timeout) 
throws IgniteCheckedException {
+    @Nullable public Map<UUID, Integer> serviceTopology(String name, long 
timeout) throws IgniteCheckedException {
         assert timeout >= 0;
 
         long startTime = U.currentTimeMillis();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
index 37828dd2474..d644f90c281 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
@@ -132,7 +132,7 @@ class ServiceDeploymentTask {
         this.depId = depId;
         this.ctx = ctx;
 
-        srvcProc = (IgniteServiceProcessor)ctx.service();
+        srvcProc = ctx.service();
         log = ctx.log(getClass());
     }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java 
b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 5a014365a15..b7ea340f36c 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -380,7 +380,7 @@ public class ReliabilityTest extends AbstractThinClientTest 
{
 
         String nullOpsNames = 
nullOps.stream().map(Enum::name).collect(Collectors.joining(", "));
 
-        long expectedNullCnt = 21;
+        long expectedNullCnt = 22;
 
         String msg = nullOps.size()
                 + " operation codes do not have public equivalent. When adding 
new codes, update ClientOperationType too. Missing ops: "
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
new file mode 100644
index 00000000000..94fba7293f3
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.client.ClientAuthenticationException;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridJobExecuteRequest;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.service.GridServiceProxy;
+import 
org.apache.ignite.internal.processors.service.ServiceClusterDeploymentResultBatch;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
+import org.apache.logging.log4j.Level;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static 
org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Checks the service awareness feature of the thin client.
+ */
+public class ServiceAwarenessTest extends AbstractThinClientTest {
+    /** Node-filter service name. */
+    private static final String SRV_NAME = "node_filtered_svc";
+
+    /** Number of grids at the test start. */
+    private static final int GRIDS = 4;
+
+    /** Number of node instances with the initial service deployment. */
+    private static final int INIT_SRVC_NODES_CNT = 2;
+
+    /** */
+    protected boolean partitionAwareness = true;
+
+    /** */
+    private static ListeningTestLogger clientLogLsnr;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDiscoverySpi(new TestBlockingDiscoverySpi());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected ClientConfiguration getClientConfiguration() {
+        ClientConfiguration ccfg = super.getClientConfiguration();
+
+        ccfg.setLogger(clientLogLsnr);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        clientLogLsnr = new ListeningTestLogger(log);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 45_000L;
+    }
+
+    /** */
+    private static ServiceConfiguration serviceCfg() {
+        // Service is deployed on nodes with the name index equal to 1, 2 or 
>= GRIDS.
+        return new ServiceConfiguration()
+            .setName(SRV_NAME)
+            .setService(new ServicesTest.TestService())
+            .setMaxPerNodeCount(1)
+            .setNodeFilter(new TestNodeFilter());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean isClientPartitionAwarenessEnabled() {
+        return partitionAwareness;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        ((GridTestLog4jLogger)log).setLevel(Level.INFO);
+
+        stopAllGrids();
+
+        partitionAwareness = true;
+
+        clientLogLsnr.clearListeners();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGrids(GRIDS);
+
+        grid(1).services().deploy(serviceCfg());
+    }
+
+    /** */
+    @Test
+    public void testDelayedServiceRedeploy() throws Exception {
+        TestBlockingDiscoverySpi testDisco = 
((TestBlockingDiscoverySpi)grid(0).configuration().getDiscoverySpi());
+
+        // Service topology on the client.
+        Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
+
+        addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+
+        AtomicBoolean svcRunFlag = new AtomicBoolean(true);
+
+        try (IgniteClient client = startClient()) {
+            ServicesTest.TestServiceInterface svc = 
client.services().serviceProxy(SRV_NAME, 
ServicesTest.TestServiceInterface.class);
+
+            runAsync(() -> {
+                while (svcRunFlag.get())
+                    svc.testMethod();
+            });
+
+            waitForCondition(() -> srvcTopOnClient.size() == 
INIT_SRVC_NODES_CNT
+                && srvcTopOnClient.contains(grid(1).localNode().id())
+                && srvcTopOnClient.contains(grid(2).localNode().id()),
+                getTestTimeout());
+
+            // Delays service redeployment and the service topology update on 
the server side.
+            testDisco.toBlock.add(ServiceClusterDeploymentResultBatch.class);
+
+            startGrid(GRIDS);
+
+            waitForCondition(() -> testDisco.blocked.size() == 1, 
getTestTimeout());
+
+            // Ensure all the nodes have started but the service topology 
hasn't updated yet.
+            for (Ignite ig : G.allGrids()) {
+                assertEquals(ig.cluster().nodes().size(), GRIDS + 1);
+
+                // Ensure there are still SRVC_FILTERED_NOIDES_CNT nodes with 
the service instance.
+                
assertEquals(((IgniteEx)ig).context().service().serviceTopology(SRV_NAME, 
0).size(),
+                    INIT_SRVC_NODES_CNT);
+            }
+
+            // Ensure the client's topology is not updated.
+            assertTrue(srvcTopOnClient.size() == INIT_SRVC_NODES_CNT
+                && !srvcTopOnClient.contains(grid(GRIDS).localNode().id()));
+
+            testDisco.release();
+
+            // Ensure the service topology has been updated to 3 instances per 
cluster.
+            for (Ignite ig : G.allGrids()) {
+                waitForCondition(
+                    () -> {
+                        try {
+                            return 
((IgniteEx)ig).context().service().serviceTopology(SRV_NAME, 0).size() == 3;
+                        }
+                        catch (Exception e) {
+                            return false;
+                        }
+                    },
+                    getTestTimeout()
+                );
+            }
+
+            waitForCondition(() -> srvcTopOnClient.size() == 3 && 
srvcTopOnClient.contains(grid(1).localNode().id())
+                && srvcTopOnClient.contains(grid(2).localNode().id())
+                && srvcTopOnClient.contains(grid(GRIDS).localNode().id()), 
getTestTimeout());
+        }
+        finally {
+            svcRunFlag.set(false);
+        }
+    }
+
+    /**
+     * Tests several nodes come while one thread is used to call the service.
+     */
+    @Test
+    public void testNodesJoinSingleThreaded() throws Exception {
+        doTestClusterTopChangesWhileServiceCalling(3, true, false);
+    }
+
+    /**
+     * Tests several nodes come while several threads are used to call the 
service.
+     */
+    @Test
+    public void testNodesJoinMultiThreaded() throws Exception {
+        doTestClusterTopChangesWhileServiceCalling(3, true, true);
+    }
+
+    /**
+     * Tests several nodes leaves while one thread is used to call the service.
+     */
+    @Test
+    public void testNodesLeaveSingleThreaded() throws Exception {
+        doTestClusterTopChangesWhileServiceCalling(3, false, false);
+    }
+
+    /**
+     * Tests several nodes leave while several threads are used to call the 
service.
+     */
+    @Test
+    public void testNodesLeaveMultiThreaded() throws Exception {
+        doTestClusterTopChangesWhileServiceCalling(3, false, true);
+    }
+
+    /**
+     * Tests change of the minor cluster topology version doesn't trigger the 
service topology update.
+     */
+    @Test
+    public void testMinorTopologyVersionDoesntAffect() throws Exception {
+        try (IgniteClient client = startClient()) {
+            ServicesTest.TestServiceInterface svc = 
client.services().serviceProxy(SRV_NAME, 
ServicesTest.TestServiceInterface.class);
+
+            Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
+
+            addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+
+            ((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
+
+            while (srvcTopOnClient.isEmpty())
+                svc.testMethod();
+
+            // Last time ot the topology update.
+            long time = System.nanoTime();
+
+            srvcTopOnClient.clear();
+
+            AffinityTopologyVersion prevTopVersion = 
grid(0).context().discovery().topologyVersionEx();
+
+            grid(0).createCache("testCache");
+
+            awaitPartitionMapExchange();
+
+            AffinityTopologyVersion newTopVersion = 
grid(0).context().discovery().topologyVersionEx();
+
+            assertTrue(newTopVersion.topologyVersion() == 
prevTopVersion.topologyVersion()
+                && newTopVersion.minorTopologyVersion() > 
prevTopVersion.minorTopologyVersion());
+
+            while (srvcTopOnClient.isEmpty())
+                svc.testMethod();
+
+            // Update only by the timeout.
+            assertTrue(U.nanosToMillis(System.nanoTime() - time) > 
ClientServicesImpl.SRV_TOP_UPDATE_PERIOD / 2);
+        }
+    }
+
+    /**
+     * Tests the service topology update with a gap of service invocation 
during forced service redeployment.
+     */
+    @Test
+    public void testForcedServiceRedeployWhileClientIsIdle() {
+        try (IgniteClient client = startClient()) {
+            ServicesTest.TestServiceInterface svc = 
client.services().serviceProxy(SRV_NAME, 
ServicesTest.TestServiceInterface.class);
+
+            Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
+
+            addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+
+            ((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
+
+            while (srvcTopOnClient.isEmpty())
+                svc.testMethod();
+
+            ((GridTestLog4jLogger)log).setLevel(Level.INFO);
+
+            assertTrue(srvcTopOnClient.size() == INIT_SRVC_NODES_CNT
+                && srvcTopOnClient.contains(grid(1).localNode().id())
+                && srvcTopOnClient.contains(grid(2).localNode().id()));
+
+            long prevTopVersion = 
grid(0).context().discovery().topologyVersion();
+
+            grid(1).services().cancel(SRV_NAME);
+
+            srvcTopOnClient.clear();
+
+            grid(1).services().deploy(serviceCfg().setNodeFilter(null));
+
+            assertEquals(prevTopVersion, 
grid(0).context().discovery().topologyVersion());
+
+            ((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
+
+            while (srvcTopOnClient.isEmpty())
+                svc.testMethod();
+
+            assertEquals(GRIDS, srvcTopOnClient.size());
+
+            for (Ignite ig : G.allGrids())
+                
assertTrue(srvcTopOnClient.contains(ig.cluster().localNode().id()));
+        }
+    }
+
+    /** */
+    private void doTestClusterTopChangesWhileServiceCalling(
+        int nodesCnt,
+        boolean addNodes,
+        boolean multiThreaded)
+        throws Exception {
+        assert nodesCnt > 0;
+
+        Set<UUID> newNodesUUIDs = new GridConcurrentHashSet<>();
+
+        // Start additional nodes to stop them.
+        if (!addNodes) {
+            startGridsMultiThreaded(GRIDS, nodesCnt);
+
+            for (int i = GRIDS; i < GRIDS + nodesCnt; ++i)
+                newNodesUUIDs.add(grid(i).localNode().id());
+        }
+
+        // Service topology on the clients.
+        Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
+
+        addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+
+        AtomicBoolean changeClusterTop = new AtomicBoolean();
+        AtomicBoolean stopFlag = new AtomicBoolean();
+
+        try (IgniteClient client = startClient()) {
+            ServicesTest.TestServiceInterface svc = 
client.services().serviceProxy(SRV_NAME, 
ServicesTest.TestServiceInterface.class);
+
+            ((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
+
+            IgniteInternalFuture<?> runFut = runMultiThreadedAsync(() -> {
+                do {
+                    try {
+                        svc.testMethod();
+                    }
+                    catch (ClientException e) {
+                        String m = e.getMessage();
+
+                        // TODO: IGNITE-20802 : Exception should not occur.
+                        // Client doesn't retry service invocation if the 
redirected-to service instance node leaves cluster.
+                        if (addNodes || (!m.contains("Node has left grid") && 
!m.contains("Failed to send job due to node failure"))
+                            || newNodesUUIDs.stream().noneMatch(nid -> 
m.contains(nid.toString())))
+                            throw e;
+                    }
+                }
+                while (!stopFlag.get());
+            }, multiThreaded ? 4 : 1, "ServiceTestLoader");
+
+            while (!stopFlag.get()) {
+                // Wait until the initial topology is received.
+                if (srvcTopOnClient.size() == (addNodes ? INIT_SRVC_NODES_CNT 
: INIT_SRVC_NODES_CNT + nodesCnt)
+                    && changeClusterTop.compareAndSet(false, true)) {
+                    srvcTopOnClient.clear();
+
+                    for (int i = 0; i < nodesCnt; ++i) {
+                        int nodeIdx = GRIDS + i;
+
+                        runAsync(() -> {
+                            try {
+                                if (addNodes)
+                                    
newNodesUUIDs.add(startGrid(nodeIdx).localNode().id());
+                                else
+                                    stopGrid(nodeIdx);
+                            }
+                            catch (Exception e) {
+                                log.error("Unable to start or stop test 
grid.", e);
+
+                                stopFlag.set(true);
+                            }
+                        });
+                    }
+                }
+
+                // Stop if new excepted service topology received.
+                if (srvcTopOnClient.size() == (addNodes ? INIT_SRVC_NODES_CNT 
+ nodesCnt : INIT_SRVC_NODES_CNT))
+                    stopFlag.set(true);
+
+                Thread.sleep(10);
+            }
+
+            runFut.get();
+        }
+
+        // The initial nodes must always persist it the service topology.
+        assertTrue(srvcTopOnClient.contains(grid(1).localNode().id())
+            && srvcTopOnClient.contains(grid(2).localNode().id()));
+
+        assertEquals(addNodes ? nodesCnt : 0, 
newNodesUUIDs.stream().filter(srvcTopOnClient::contains).count());
+    }
+
+    /**
+     * Tests client uses service awareness when partitionAwareness is enabled.
+     */
+    @Test
+    public void testServiceAwarenessEnabled() {
+        // Counters of the invocation redirects.
+        AtomicInteger redirectCnt = new AtomicInteger();
+
+        // Service topology received by the client.
+        Set<UUID> top = new GridConcurrentHashSet<>();
+
+        // Requested server nodes with a service invocation.
+        Collection<UUID> requestedServers = new GridConcurrentHashSet<>();
+
+        addSrvcTopUpdateClientLogLsnr(uuids -> {
+            // Reset counters on the first topology update.
+            if (top.isEmpty())
+                redirectCnt.set(0);
+
+            top.addAll(uuids);
+        });
+
+        // Listener of the service remote call (the redirection).
+        G.allGrids().forEach(g -> 
((IgniteEx)g).context().io().addMessageListener(GridTopic.TOPIC_JOB, new 
GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) 
{
+                if (msg instanceof GridJobExecuteRequest
+                    && 
((GridJobExecuteRequest)msg).getTaskClassName().contains(GridServiceProxy.class.getName()))
+                    redirectCnt.incrementAndGet();
+            }
+        }));
+
+        partitionAwareness = false;
+
+        ((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
+
+        try (IgniteClient client = startClient(requestedServers)) {
+            ServicesTest.TestServiceInterface svc = 
client.services().serviceProxy(SRV_NAME, 
ServicesTest.TestServiceInterface.class);
+
+            for (int i = 0; i < 100; ++i)
+                svc.testMethod();
+        }
+
+        // Check no service awareness: continous redirections.
+        assertEquals(100, redirectCnt.get());
+
+        // Ensure that client received no service topology update.
+        assertTrue(top.isEmpty());
+
+        assertTrue(requestedServers.size() == 1 && 
requestedServers.contains(grid(0).localNode().id()));
+
+        partitionAwareness = true;
+
+        try (IgniteClient client = startClient(requestedServers)) {
+            ServicesTest.TestServiceInterface svc = 
client.services().serviceProxy(SRV_NAME, 
ServicesTest.TestServiceInterface.class);
+
+            // We assume that the topology will be received and used for the 
further requests.
+            for (int i = 0; i < 1000; ++i)
+                svc.testMethod();
+
+            redirectCnt.set(0);
+            requestedServers.clear();
+
+            for (int i = 0; i < 1000; ++i)
+                svc.testMethod();
+        }
+
+        // Check the received topology.
+        assertTrue(top.size() == INIT_SRVC_NODES_CNT && 
top.contains(grid(1).localNode().id())
+            && top.contains(grid(2).localNode().id()));
+
+        // Ensure that only the target nodes were requeted after the topology 
getting.
+        assertEquals(top, requestedServers);
+
+        // Ensure there were no redirected sertvic calls any more.
+        assertEquals(0, redirectCnt.get());
+    }
+
+    /** */
+    private static void addSrvcTopUpdateClientLogLsnr(Consumer<Set<UUID>> 
srvTopConsumer) {
+        clientLogLsnr.registerListener(s -> {
+            if (s.contains("Topology of service '" + SRV_NAME + "' has been 
updated. The service instance nodes: ")) {
+                String nodes = s.substring(s.indexOf(": [") + 3);
+
+                nodes = nodes.substring(0, nodes.length() - 1);
+
+                srvTopConsumer.accept(Arrays.stream(nodes.split(", 
")).map(UUID::fromString).collect(Collectors.toSet()));
+            }
+        });
+    }
+
+    /** */
+    private IgniteClient startClient() {
+        return new TcpIgniteClient((cfg, hnd) -> new TestTcpChannel(cfg, hnd, 
null),
+            getClientConfiguration(grid(0)));
+    }
+
+    /** */
+    private IgniteClient startClient(@Nullable Collection<UUID> 
requestedServerNodes) {
+        return new TcpIgniteClient((cfg, hnd) -> new TestTcpChannel(cfg, hnd, 
requestedServerNodes),
+            getClientConfiguration(grid(0)));
+    }
+
+    /**
+     * Accepts nodes with the name index equal to 1, 2 or >= GRIDS.
+     */
+    private static final class TestNodeFilter implements 
IgnitePredicate<ClusterNode> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            String nodeName = node.attribute("org.apache.ignite.ignite.name");
+
+            if (F.isEmpty(nodeName))
+                return false;
+
+            int nodeIdx = -1;
+
+            try {
+                nodeIdx = 
Integer.parseInt(nodeName.substring(nodeName.length() - 1));
+            }
+            catch (Exception e) {
+                // No-op.
+            }
+
+            return nodeIdx == 1 || nodeIdx == 2 || nodeIdx >= GRIDS;
+        }
+    }
+
+    /** */
+    private static final class TestBlockingDiscoverySpi extends 
TcpDiscoverySpi {
+        /** */
+        private final Set<Class<? extends DiscoveryCustomMessage>> toBlock = 
new HashSet<>();
+
+        /** */
+        private final List<CustomMessageWrapper> blocked = new 
CopyOnWriteArrayList<>();
+
+        /** {@inheritDoc} */
+        @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) 
throws IgniteException {
+            if (msg instanceof CustomMessageWrapper
+                && toBlock.stream().anyMatch(mt -> 
mt.isAssignableFrom(((CustomMessageWrapper)msg).delegate().getClass()))) {
+                blocked.add((CustomMessageWrapper)msg);
+
+                return;
+            }
+
+            super.sendCustomEvent(msg);
+        }
+
+        /** */
+        public void release() {
+            toBlock.clear();
+
+            blocked.forEach(this::sendCustomEvent);
+        }
+    }
+
+    /**
+     * A client connection channel able to register the server nodes requested 
to call a service.
+     */
+    private static final class TestTcpChannel extends TcpClientChannel {
+        /** */
+        private final @Nullable Collection<UUID> requestedServerNodes;
+
+        /** Ctor. */
+        private TestTcpChannel(
+            ClientChannelConfiguration cfg,
+            ClientConnectionMultiplexer connMgr,
+            @Nullable Collection<UUID> requestedServerNodes)
+            throws ClientConnectionException, ClientAuthenticationException, 
ClientProtocolError {
+            super(cfg, connMgr);
+
+            this.requestedServerNodes = requestedServerNodes;
+        }
+
+        /** {@inheritDoc} */
+        @Override public <T> T service(ClientOperation op, 
Consumer<PayloadOutputChannel> payloadWriter,
+            Function<PayloadInputChannel, T> payloadReader) throws 
ClientException {
+            UUID srvNodeId = serverNodeId();
+
+            if (op == ClientOperation.SERVICE_INVOKE && requestedServerNodes 
!= null && srvNodeId != null)
+                requestedServerNodes.add(srvNodeId);
+
+            return super.service(op, payloadWriter, payloadReader);
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
index 3069a56313f..f9510ae8f36 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
@@ -444,7 +444,7 @@ public class GridFutureAdapterSelfTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * Completes the future exceptionally, if {@code res} is the instanse of 
{@link Throwable}.
+     * Completes the future exceptionally, if {@code res} is the instance of 
{@link Throwable}.
      *
      * @param fut Future.
      * @param res Result.
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java 
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 04d9e2b76d1..588715be857 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -31,6 +31,7 @@ import 
org.apache.ignite.internal.client.thin.InactiveClusterCacheRequestTest;
 import org.apache.ignite.internal.client.thin.MetadataRegistrationTest;
 import 
org.apache.ignite.internal.client.thin.OptimizedMarshallerClassesCachedTest;
 import org.apache.ignite.internal.client.thin.ReliableChannelTest;
+import org.apache.ignite.internal.client.thin.ServiceAwarenessTest;
 import org.apache.ignite.internal.client.thin.ServicesBinaryArraysTests;
 import org.apache.ignite.internal.client.thin.ServicesTest;
 import org.apache.ignite.internal.client.thin.ThinClientEnpointsDiscoveryTest;
@@ -68,6 +69,7 @@ import org.junit.runners.Suite;
     ClusterGroupTest.class,
     ServicesTest.class,
     ServicesBinaryArraysTests.class,
+    ServiceAwarenessTest.class,
     CacheEntryListenersTest.class,
     ThinClientPartitionAwarenessStableTopologyTest.class,
     ThinClientPartitionAwarenessUnstableTopologyTest.class,

Reply via email to