This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch ignite-2.16
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-2.16 by this push:
new dedcf99c1c9 IGNITE-20656 Java thin: Service awareness (#11003)
dedcf99c1c9 is described below
commit dedcf99c1c9974adcfcfe24e42033deef44fc18b
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Nov 9 21:28:18 2023 +0300
IGNITE-20656 Java thin: Service awareness (#11003)
Co-authored-by: Pavel Tupitsyn <[email protected]>
(cherry picked from commit 7bcf23279d99fb7d24b6f758ef3fe8ca58f45139)
---
docs/_docs/services/services.adoc | 16 +
docs/_docs/thin-clients/java-thin-client.adoc | 9 +-
.../apache/ignite/util/GridCommandHandlerTest.java | 4 +-
.../client/thin/ClientCacheAffinityContext.java | 7 +
.../internal/client/thin/ClientOperation.java | 3 +
.../internal/client/thin/ClientServicesImpl.java | 123 +++-
.../client/thin/ProtocolBitmaskFeature.java | 5 +-
.../internal/client/thin/ReliableChannel.java | 32 +-
.../internal/client/thin/TcpIgniteClient.java | 9 +-
.../platform/client/ClientBitmaskFeature.java | 5 +-
.../platform/client/ClientConnectionContext.java | 4 -
.../platform/client/ClientMessageParser.java | 7 +
.../service/ClientServiceMappingsResponse.java | 55 ++
.../service/ClientServiceTopologyRequest.java | 63 +++
.../processors/service/IgniteServiceProcessor.java | 2 +-
.../processors/service/ServiceDeploymentTask.java | 2 +-
.../org/apache/ignite/client/ReliabilityTest.java | 2 +-
.../internal/client/thin/ServiceAwarenessTest.java | 622 +++++++++++++++++++++
.../util/future/GridFutureAdapterSelfTest.java | 2 +-
.../org/apache/ignite/client/ClientTestSuite.java | 2 +
20 files changed, 951 insertions(+), 23 deletions(-)
diff --git a/docs/_docs/services/services.adoc
b/docs/_docs/services/services.adoc
index ce72603e908..7568b9d6c46 100644
--- a/docs/_docs/services/services.adoc
+++ b/docs/_docs/services/services.adoc
@@ -234,6 +234,22 @@ tab:C++[]
//== Accessing Services from Compute Tasks
// TODO the @ServiceResource annotation
+== Service Awareness [[service_awareness]]
+For link:../thin-clients/java-thin-client.adoc#java_thin_client[Java Thin
Client] you can activate Service Awareness.
+To do that, enable
link:../thin-clients/java-thin-client.adoc#partition_awareness[Partition
Awareness].
+
+Without Service Awareness, the invocation requests are sent to a random node.
If it has no service
+instance deployed, the request is redirected to a different node. This
additional network hop adds overhead.
+
+With Service Awareness, the thin client knows where service instances are
deployed and sends the request to the correct node.
+
+[NOTE]
+====
+The service topology is updated asynchronously starting with the first service
invocation.
+Thus, some invocation redirects are still possible.
+====
+
+
== Un-deploying Services
To undeploy a service, use the `IgniteServices.cancel(serviceName)` or
`IgniteServices.cancelAll()` methods.
diff --git a/docs/_docs/thin-clients/java-thin-client.adoc
b/docs/_docs/thin-clients/java-thin-client.adoc
index 5f955d35615..5a744d545c4 100644
--- a/docs/_docs/thin-clients/java-thin-client.adoc
+++ b/docs/_docs/thin-clients/java-thin-client.adoc
@@ -12,7 +12,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-= Java Thin Client
+= Java Thin Client [[java_thin_client]]
:sourceCodeFile: {javaCodeDir}/JavaThinClient.java
== Overview
@@ -75,7 +75,7 @@ include::{sourceCodeFile}[tag=connect-to-many-nodes,indent=0]
Note that the code above provides a failover mechanism in case of server node
failures. Refer to the <<Handling Node Failures>> section for more information.
-== Partition Awareness
+== Partition Awareness [partition_awareness]
include::includes/partition-awareness.adoc[]
@@ -116,6 +116,11 @@ The code snippet shows how an example implementation might
look like if you want
Also, you can check a
link:https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/client/ClientKubernetesPutGetExample.java#L50[real
example] of the interface implementation. `ThinClientKubernetesAddressFinder`
is created to handle scalable Kubernetes environment.
+[NOTE]
+====
+Partition Awareness also enables
link:../services/services.adoc#service_awareness[Service Awareness]
+====
+
== Using Key-Value API
The Java thin client supports most of the key-value operations available in
the thick client.
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 6a40bfb738e..f3869dbac24 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -2053,14 +2053,14 @@ public class GridCommandHandlerTest extends
GridCommandHandlerClusterPerMethodAb
String testOutStr = testOut.toString();
// Ignite instase 1 can be logged only in arguments list.
- boolean isInstanse1Found = Arrays.stream(testOutStr.split("\n"))
+ boolean isInstance1Found = Arrays.stream(testOutStr.split("\n"))
.filter(s -> s.contains("Arguments:"))
.noneMatch(s ->
s.contains(getTestIgniteInstanceName() + "1"));
assertTrue(testOutStr, testOutStr.contains("Node not found for
consistent ID:"));
if (commandHandler.equals(CLI_CMD_HND))
- assertFalse(testOutStr, isInstanse1Found);
+ assertFalse(testOutStr, isInstance1Found);
}
/** */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
index 577a2c5f476..bcd681796da 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
@@ -329,6 +329,13 @@ public class ClientCacheAffinityContext {
public Iterable<UUID> nodes() {
return Collections.unmodifiableCollection(nodes);
}
+
+ /**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion version() {
+ return topVer;
+ }
}
/** Holder of a mapper factory and cacheName. */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index a683154a235..c97ae47928a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -216,6 +216,9 @@ public enum ClientOperation {
/** Get service descriptors. */
SERVICE_GET_DESCRIPTOR(7002),
+ /** Get service topology. */
+ SERVICE_GET_TOPOLOGY(7003),
+
/** Get or create an AtomicLong by name. */
ATOMIC_LONG_CREATE(9000),
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java
index f9fcb37d1fe..869c8db8883 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java
@@ -23,8 +23,13 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.client.ClientClusterGroup;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
@@ -32,9 +37,11 @@ import org.apache.ignite.client.ClientServiceDescriptor;
import org.apache.ignite.client.ClientServices;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.service.ServiceCallContextImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.platform.PlatformServiceMethod;
import org.apache.ignite.platform.PlatformType;
import org.apache.ignite.services.ServiceCallContext;
@@ -45,6 +52,9 @@ import org.jetbrains.annotations.Nullable;
* Implementation of {@link ClientServices}.
*/
class ClientServicesImpl implements ClientServices {
+ /** Max service topology update period in mills. */
+ static final int SRV_TOP_UPDATE_PERIOD = 10_000;
+
/** Channel. */
private final ReliableChannel ch;
@@ -57,13 +67,23 @@ class ClientServicesImpl implements ClientServices {
/** Cluster group. */
private final ClientClusterGroupImpl grp;
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Services topology. {@code Null} if partition awareness is not enabled.
*/
+ private final Map<String, ServiceTopology> servicesTopologies;
+
/** Constructor. */
- ClientServicesImpl(ReliableChannel ch, ClientBinaryMarshaller marsh,
ClientClusterGroupImpl grp) {
+ ClientServicesImpl(ReliableChannel ch, ClientBinaryMarshaller marsh,
ClientClusterGroupImpl grp, IgniteLogger log) {
this.ch = ch;
this.marsh = marsh;
this.grp = grp;
utils = new ClientUtils(marsh);
+
+ this.log = log;
+
+ servicesTopologies = ch.partitionAwarenessEnabled ? new
ConcurrentHashMap<>() : Collections.emptyMap();
}
/** {@inheritDoc} */
@@ -165,7 +185,91 @@ class ClientServicesImpl implements ClientServices {
ClientServices withClusterGroup(ClientClusterGroupImpl grp) {
A.notNull(grp, "grp");
- return new ClientServicesImpl(ch, marsh, grp);
+ return new ClientServicesImpl(ch, marsh, grp, log);
+ }
+
+ /**
+ * Keeps topology of certain service and its update progress meta.
+ */
+ private final class ServiceTopology {
+ /** The service name. */
+ private final String srvcName;
+
+ /** If {@code true}, topology update of current service is in
progress. */
+ private final AtomicBoolean updateInProgress = new AtomicBoolean();
+
+ /** Time of the last received topology. */
+ private volatile long lastUpdateRequestTime;
+
+ /** UUID of the nodes with at least one service instance. */
+ private volatile List<UUID> nodes = Collections.emptyList();
+
+ /** Last cluster topology version when current service topology was
actual. */
+ private volatile AffinityTopologyVersion lastAffTop;
+
+ /** */
+ private ServiceTopology(String name) {
+ srvcName = name;
+ }
+
+ /**
+ * @return {@code True} if update of the service topology is required.
{@code False} otherwise.
+ */
+ private boolean isUpdateRequired(AffinityTopologyVersion curAffTop) {
+ return lastAffTop == null || curAffTop.topologyVersion() >
lastAffTop.topologyVersion()
+ || U.nanosToMillis(System.nanoTime() - lastUpdateRequestTime)
>= SRV_TOP_UPDATE_PERIOD;
+ }
+
+ /**
+ * Asynchronously updates the service topology.
+ */
+ private void updateTopologyAsync() {
+ AffinityTopologyVersion curAffTop =
ch.affinityContext().lastTopology().version();
+
+ if (!updateInProgress.compareAndSet(false, true))
+ return;
+
+ ch.serviceAsync(
+ ClientOperation.SERVICE_GET_TOPOLOGY,
+ req -> utils.writeObject(req.out(), srvcName),
+ resp -> {
+ int cnt = resp.in().readInt();
+
+ List<UUID> res = new ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; ++i)
+ res.add(new UUID(resp.in().readLong(),
resp.in().readLong()));
+
+ return res;
+ }).whenComplete((nodes, err) -> {
+ if (err == null) {
+ this.nodes = nodes;
+ lastAffTop = curAffTop;
+ lastUpdateRequestTime = System.nanoTime();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Topology of service '" + srvcName + "'
has been updated. The " +
+ "service instance nodes: " + nodes);
+ }
+ }
+ else
+ log.error("Failed to update topology of the service '"
+ srvcName + "'.", err);
+
+ updateInProgress.set(false);
+ });
+ }
+
+ /**
+ * Provides last known service topology and asynchronously updates it
if required.
+ *
+ * @return Last known service topology.
+ */
+ public List<UUID> getAndUpdate() {
+ if
(isUpdateRequired(ch.affinityContext().lastTopology().version()))
+ updateTopologyAsync();
+
+ return nodes;
+ }
}
/**
@@ -215,7 +319,8 @@ class ClientServicesImpl implements ClientServices {
return ch.service(ClientOperation.SERVICE_INVOKE,
req -> writeServiceInvokeRequest(req, nodeIds, method,
args),
- res -> utils.readObject(res.in(), false,
method.getReturnType())
+ res -> utils.readObject(res.in(), false,
method.getReturnType()),
+ serviceTopology()
);
}
catch (ClientError e) {
@@ -223,6 +328,18 @@ class ClientServicesImpl implements ClientServices {
}
}
+ /**
+ * @return Actual known service topology or empty list if: service
topology is not enabled, not supported or
+ * not received yet.
+ */
+ private List<UUID> serviceTopology() {
+ if (!ch.partitionAwarenessEnabled
+ || !ch.applyOnDefaultChannel(c ->
c.protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.SERVICE_TOPOLOGY),
null))
+ return Collections.emptyList();
+
+ return servicesTopologies.computeIfAbsent(name,
ServiceTopology::new).getAndUpdate();
+ }
+
/**
* @param ch Payload output channel.
* @param nodeIds Node IDs.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
index 751f66dbc27..7676fcc323c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
@@ -74,7 +74,10 @@ public enum ProtocolBitmaskFeature {
INDEX_QUERY(14),
/** IndexQuery limit. */
- INDEX_QUERY_LIMIT(15);
+ INDEX_QUERY_LIMIT(15),
+
+ /** Service topology. */
+ SERVICE_TOPOLOGY(16);
/** */
private static final EnumSet<ProtocolBitmaskFeature>
ALL_FEATURES_AS_ENUM_SET =
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index 2bcf6b116c2..f88a90e654e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client.thin;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -74,7 +75,7 @@ final class ReliableChannel implements AutoCloseable {
private volatile int curChIdx = -1;
/** Partition awareness enabled. */
- private final boolean partitionAwarenessEnabled;
+ final boolean partitionAwarenessEnabled;
/** Cache partition awareness context. */
private final ClientCacheAffinityContext affinityCtx;
@@ -182,7 +183,32 @@ final class ReliableChannel implements AutoCloseable {
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError {
- return applyOnDefaultChannel(channel -> channel.service(op,
payloadWriter, payloadReader), op);
+ return service(op, payloadWriter, payloadReader,
Collections.emptyList());
+ }
+
+ /**
+ * Send request to one of the passed nodes and handle response.
+ *
+ * @throws ClientException Thrown by {@code payloadWriter} or {@code
payloadReader}.
+ * @throws ClientAuthenticationException When user name or password is
invalid.
+ * @throws ClientAuthorizationException When user has no permission to
perform operation.
+ * @throws ClientProtocolError When failed to handshake with server.
+ * @throws ClientServerError When failed to process request on server.
+ */
+ public <T> T service(
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader,
+ List<UUID> targetNodes
+ ) throws ClientException, ClientError {
+ if (F.isEmpty(targetNodes))
+ return applyOnDefaultChannel(channel -> channel.service(op,
payloadWriter, payloadReader), op);
+
+ return applyOnNodeChannelWithFallback(
+
targetNodes.get(ThreadLocalRandom.current().nextInt(targetNodes.size())),
+ channel -> channel.service(op, payloadWriter, payloadReader),
+ op
+ );
}
/**
@@ -943,7 +969,7 @@ final class ReliableChannel implements AutoCloseable {
/** Channel. */
private volatile ClientChannel ch;
- /** ID of the last server node that {@link ch} is or was connected to.
*/
+ /** ID of the last server node that {@link #ch} is or was connected
to. */
private volatile UUID serverNodeId;
/** Address that holder is bind to (chCfg.addr) is not in use now. So
close the holder. */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index f15293089fb..a4d9922143f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -107,6 +107,9 @@ public class TcpIgniteClient implements IgniteClient {
/** Serializer/deserializer. */
private final ClientUtils serDes;
+ /** Logger. */
+ private final IgniteLogger log;
+
/**
* Private constructor. Use {@link
TcpIgniteClient#start(ClientConfiguration)} to create an instance of
* {@code TcpIgniteClient}.
@@ -122,6 +125,8 @@ public class TcpIgniteClient implements IgniteClient {
BiFunction<ClientChannelConfiguration,
ClientConnectionMultiplexer, ClientChannel> chFactory,
ClientConfiguration cfg
) throws ClientException {
+ log = NullLogger.whenNull(cfg.getLogger());
+
final ClientBinaryMetadataHandler metadataHnd = new
ClientBinaryMetadataHandler();
ClientMarshallerContext marshCtx = new ClientMarshallerContext();
@@ -159,7 +164,7 @@ public class TcpIgniteClient implements IgniteClient {
compute = new ClientComputeImpl(ch, marsh,
cluster.defaultClusterGroup());
- services = new ClientServicesImpl(ch, marsh,
cluster.defaultClusterGroup());
+ services = new ClientServicesImpl(ch, marsh,
cluster.defaultClusterGroup(), log);
lsnrsRegistry = new ClientCacheEntryListenersRegistry();
}
@@ -475,8 +480,6 @@ public class TcpIgniteClient implements IgniteClient {
if (clusterCfg == null)
return;
- IgniteLogger log = NullLogger.whenNull(cfg.getLogger());
-
if (log.isDebugEnabled())
log.debug("Cluster binary configuration retrieved: " + clusterCfg);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
index e858bdbb8db..b0f673ac5ee 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
@@ -72,7 +72,10 @@ public enum ClientBitmaskFeature implements
ThinProtocolFeature {
INDEX_QUERY(14),
/** IndexQuery limit. */
- INDEX_QUERY_LIMIT(15);
+ INDEX_QUERY_LIMIT(15),
+
+ /** Service topology. */
+ SERVICE_TOPOLOGY(16);
/** */
private static final EnumSet<ClientBitmaskFeature>
ALL_FEATURES_AS_ENUM_SET =
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
index eb765bdf5ed..13568187266 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
@@ -78,10 +78,6 @@ public class ClientConnectionContext extends
ClientListenerAbstractConnectionCon
/** Default version. */
public static final ClientListenerProtocolVersion DEFAULT_VER = VER_1_7_0;
- /** Default protocol context. */
- public static final ClientProtocolContext DEFAULT_PROTOCOL_CONTEXT =
- new ClientProtocolContext(DEFAULT_VER,
ClientBitmaskFeature.allFeaturesAsEnumSet());
-
/** Supported versions. */
private static final Collection<ClientListenerProtocolVersion>
SUPPORTED_VERS = Arrays.asList(
VER_1_7_0,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index 22cc257e660..ab3fe4c1608 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -106,6 +106,7 @@ import
org.apache.ignite.internal.processors.platform.client.datastructures.Clie
import
org.apache.ignite.internal.processors.platform.client.service.ClientServiceGetDescriptorRequest;
import
org.apache.ignite.internal.processors.platform.client.service.ClientServiceGetDescriptorsRequest;
import
org.apache.ignite.internal.processors.platform.client.service.ClientServiceInvokeRequest;
+import
org.apache.ignite.internal.processors.platform.client.service.ClientServiceTopologyRequest;
import
org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerAddDataRequest;
import
org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerStartRequest;
import
org.apache.ignite.internal.processors.platform.client.tx.ClientTxEndRequest;
@@ -393,6 +394,9 @@ public class ClientMessageParser implements
ClientListenerMessageParser {
/** IgniteSet.iterator page. */
private static final short OP_SET_ITERATOR_GET_PAGE = 9023;
+ /** Get service topology. */
+ private static final short OP_SERVICE_GET_TOPOLOGY = 7003;
+
/** Marshaller. */
private final GridBinaryMarshaller marsh;
@@ -697,6 +701,9 @@ public class ClientMessageParser implements
ClientListenerMessageParser {
case OP_SET_ITERATOR_GET_PAGE:
return new ClientIgniteSetIteratorGetPageRequest(reader);
+
+ case OP_SERVICE_GET_TOPOLOGY:
+ return new ClientServiceTopologyRequest(reader);
}
return new ClientRawRequest(reader.readLong(),
ClientStatus.INVALID_OP_CODE,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceMappingsResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceMappingsResponse.java
new file mode 100644
index 00000000000..6be07fb886a
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceMappingsResponse.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.service;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.client.thin.ClientUtils;
+import
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.jetbrains.annotations.Nullable;
+
+/** Service topology response. */
+public class ClientServiceMappingsResponse extends ClientResponse {
+ /** Service instance nodes. */
+ @Nullable private final Collection<UUID> svcsNodes;
+
+ /**
+ * @param reqId Request id.
+ * @param svcsNodes Services instance nodes.
+ */
+ public ClientServiceMappingsResponse(long reqId, @Nullable
Collection<UUID> svcsNodes) {
+ super(reqId);
+
+ this.svcsNodes = svcsNodes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void encode(ClientConnectionContext ctx,
BinaryRawWriterEx writer) {
+ super.encode(ctx, writer);
+
+ ClientUtils.collection(
+ svcsNodes,
+ writer.out(),
+ (out, uuid) -> {
+ out.writeLong(uuid.getMostSignificantBits());
+ out.writeLong(uuid.getLeastSignificantBits());
+ });
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceTopologyRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceTopologyRequest.java
new file mode 100644
index 00000000000..560c79c0560
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceTopologyRequest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.service;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryRawReader;
+import
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientRequest;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientStatus;
+import
org.apache.ignite.internal.processors.platform.client.IgniteClientException;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Request topology of certain service.
+ */
+public class ClientServiceTopologyRequest extends ClientRequest {
+ /** The service name. */
+ private final String name;
+
+ /**
+ * Creates the service topology request.
+ *
+ * @param reader Reader to read the {@link #name} from.
+ */
+ public ClientServiceTopologyRequest(BinaryRawReader reader) {
+ super(reader);
+
+ name = reader.readString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ Map<UUID, Integer> srvcTop;
+
+ try {
+ srvcTop = ctx.kernalContext().service().serviceTopology(name, 0);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteClientException(ClientStatus.FAILED, "Failed to
get topology for service '" + name + "'.", e);
+ }
+
+ return new ClientServiceMappingsResponse(requestId(),
F.isEmpty(srvcTop) ? Collections.emptyList() : srvcTop.keySet());
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
index 598c3e97df0..dcceab41d9d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
@@ -936,7 +936,7 @@ public class IgniteServiceProcessor extends
GridProcessorAdapter implements Igni
* @return Service topology.
* @throws IgniteCheckedException On error.
*/
- public Map<UUID, Integer> serviceTopology(String name, long timeout)
throws IgniteCheckedException {
+ @Nullable public Map<UUID, Integer> serviceTopology(String name, long
timeout) throws IgniteCheckedException {
assert timeout >= 0;
long startTime = U.currentTimeMillis();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
index 37828dd2474..d644f90c281 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
@@ -132,7 +132,7 @@ class ServiceDeploymentTask {
this.depId = depId;
this.ctx = ctx;
- srvcProc = (IgniteServiceProcessor)ctx.service();
+ srvcProc = ctx.service();
log = ctx.log(getClass());
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 5a014365a15..b7ea340f36c 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -380,7 +380,7 @@ public class ReliabilityTest extends AbstractThinClientTest
{
String nullOpsNames =
nullOps.stream().map(Enum::name).collect(Collectors.joining(", "));
- long expectedNullCnt = 21;
+ long expectedNullCnt = 22;
String msg = nullOps.size()
+ " operation codes do not have public equivalent. When adding
new codes, update ClientOperationType too. Missing ops: "
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
new file mode 100644
index 00000000000..94fba7293f3
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.client.ClientAuthenticationException;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridJobExecuteRequest;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.service.GridServiceProxy;
+import
org.apache.ignite.internal.processors.service.ServiceClusterDeploymentResultBatch;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
+import org.apache.logging.log4j.Level;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static
org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Checks the service awareness feature of the thin client.
+ */
+public class ServiceAwarenessTest extends AbstractThinClientTest {
+ /** Node-filter service name. */
+ private static final String SRV_NAME = "node_filtered_svc";
+
+ /** Number of grids at the test start. */
+ private static final int GRIDS = 4;
+
+ /** Number of node instances with the initial service deployment. */
+ private static final int INIT_SRVC_NODES_CNT = 2;
+
+ /** */
+ protected boolean partitionAwareness = true;
+
+ /** */
+ private static ListeningTestLogger clientLogLsnr;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setDiscoverySpi(new TestBlockingDiscoverySpi());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected ClientConfiguration getClientConfiguration() {
+ ClientConfiguration ccfg = super.getClientConfiguration();
+
+ ccfg.setLogger(clientLogLsnr);
+
+ return ccfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ clientLogLsnr = new ListeningTestLogger(log);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 45_000L;
+ }
+
+ /** */
+ private static ServiceConfiguration serviceCfg() {
+ // Service is deployed on nodes with the name index equal to 1, 2 or
>= GRIDS.
+ return new ServiceConfiguration()
+ .setName(SRV_NAME)
+ .setService(new ServicesTest.TestService())
+ .setMaxPerNodeCount(1)
+ .setNodeFilter(new TestNodeFilter());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean isClientPartitionAwarenessEnabled() {
+ return partitionAwareness;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ ((GridTestLog4jLogger)log).setLevel(Level.INFO);
+
+ stopAllGrids();
+
+ partitionAwareness = true;
+
+ clientLogLsnr.clearListeners();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ startGrids(GRIDS);
+
+ grid(1).services().deploy(serviceCfg());
+ }
+
+ /** */
+ @Test
+ public void testDelayedServiceRedeploy() throws Exception {
+ TestBlockingDiscoverySpi testDisco =
((TestBlockingDiscoverySpi)grid(0).configuration().getDiscoverySpi());
+
+ // Service topology on the client.
+ Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
+
+ addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+
+ AtomicBoolean svcRunFlag = new AtomicBoolean(true);
+
+ try (IgniteClient client = startClient()) {
+ ServicesTest.TestServiceInterface svc =
client.services().serviceProxy(SRV_NAME,
ServicesTest.TestServiceInterface.class);
+
+ runAsync(() -> {
+ while (svcRunFlag.get())
+ svc.testMethod();
+ });
+
+ waitForCondition(() -> srvcTopOnClient.size() ==
INIT_SRVC_NODES_CNT
+ && srvcTopOnClient.contains(grid(1).localNode().id())
+ && srvcTopOnClient.contains(grid(2).localNode().id()),
+ getTestTimeout());
+
+ // Delays service redeployment and the service topology update on
the server side.
+ testDisco.toBlock.add(ServiceClusterDeploymentResultBatch.class);
+
+ startGrid(GRIDS);
+
+ waitForCondition(() -> testDisco.blocked.size() == 1,
getTestTimeout());
+
+ // Ensure all the nodes have started but the service topology
hasn't updated yet.
+ for (Ignite ig : G.allGrids()) {
+ assertEquals(ig.cluster().nodes().size(), GRIDS + 1);
+
+ // Ensure there are still SRVC_FILTERED_NOIDES_CNT nodes with
the service instance.
+
assertEquals(((IgniteEx)ig).context().service().serviceTopology(SRV_NAME,
0).size(),
+ INIT_SRVC_NODES_CNT);
+ }
+
+ // Ensure the client's topology is not updated.
+ assertTrue(srvcTopOnClient.size() == INIT_SRVC_NODES_CNT
+ && !srvcTopOnClient.contains(grid(GRIDS).localNode().id()));
+
+ testDisco.release();
+
+ // Ensure the service topology has been updated to 3 instances per
cluster.
+ for (Ignite ig : G.allGrids()) {
+ waitForCondition(
+ () -> {
+ try {
+ return
((IgniteEx)ig).context().service().serviceTopology(SRV_NAME, 0).size() == 3;
+ }
+ catch (Exception e) {
+ return false;
+ }
+ },
+ getTestTimeout()
+ );
+ }
+
+ waitForCondition(() -> srvcTopOnClient.size() == 3 &&
srvcTopOnClient.contains(grid(1).localNode().id())
+ && srvcTopOnClient.contains(grid(2).localNode().id())
+ && srvcTopOnClient.contains(grid(GRIDS).localNode().id()),
getTestTimeout());
+ }
+ finally {
+ svcRunFlag.set(false);
+ }
+ }
+
+ /**
+ * Tests several nodes come while one thread is used to call the service.
+ */
+ @Test
+ public void testNodesJoinSingleThreaded() throws Exception {
+ doTestClusterTopChangesWhileServiceCalling(3, true, false);
+ }
+
+ /**
+ * Tests several nodes come while several threads are used to call the
service.
+ */
+ @Test
+ public void testNodesJoinMultiThreaded() throws Exception {
+ doTestClusterTopChangesWhileServiceCalling(3, true, true);
+ }
+
+ /**
+ * Tests several nodes leaves while one thread is used to call the service.
+ */
+ @Test
+ public void testNodesLeaveSingleThreaded() throws Exception {
+ doTestClusterTopChangesWhileServiceCalling(3, false, false);
+ }
+
+ /**
+ * Tests several nodes leave while several threads are used to call the
service.
+ */
+ @Test
+ public void testNodesLeaveMultiThreaded() throws Exception {
+ doTestClusterTopChangesWhileServiceCalling(3, false, true);
+ }
+
+ /**
+ * Tests change of the minor cluster topology version doesn't trigger the
service topology update.
+ */
+ @Test
+ public void testMinorTopologyVersionDoesntAffect() throws Exception {
+ try (IgniteClient client = startClient()) {
+ ServicesTest.TestServiceInterface svc =
client.services().serviceProxy(SRV_NAME,
ServicesTest.TestServiceInterface.class);
+
+ Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
+
+ addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+
+ ((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
+
+ while (srvcTopOnClient.isEmpty())
+ svc.testMethod();
+
+ // Last time ot the topology update.
+ long time = System.nanoTime();
+
+ srvcTopOnClient.clear();
+
+ AffinityTopologyVersion prevTopVersion =
grid(0).context().discovery().topologyVersionEx();
+
+ grid(0).createCache("testCache");
+
+ awaitPartitionMapExchange();
+
+ AffinityTopologyVersion newTopVersion =
grid(0).context().discovery().topologyVersionEx();
+
+ assertTrue(newTopVersion.topologyVersion() ==
prevTopVersion.topologyVersion()
+ && newTopVersion.minorTopologyVersion() >
prevTopVersion.minorTopologyVersion());
+
+ while (srvcTopOnClient.isEmpty())
+ svc.testMethod();
+
+ // Update only by the timeout.
+ assertTrue(U.nanosToMillis(System.nanoTime() - time) >
ClientServicesImpl.SRV_TOP_UPDATE_PERIOD / 2);
+ }
+ }
+
+ /**
+ * Tests the service topology update with a gap of service invocation
during forced service redeployment.
+ */
+ @Test
+ public void testForcedServiceRedeployWhileClientIsIdle() {
+ try (IgniteClient client = startClient()) {
+ ServicesTest.TestServiceInterface svc =
client.services().serviceProxy(SRV_NAME,
ServicesTest.TestServiceInterface.class);
+
+ Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
+
+ addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+
+ ((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
+
+ while (srvcTopOnClient.isEmpty())
+ svc.testMethod();
+
+ ((GridTestLog4jLogger)log).setLevel(Level.INFO);
+
+ assertTrue(srvcTopOnClient.size() == INIT_SRVC_NODES_CNT
+ && srvcTopOnClient.contains(grid(1).localNode().id())
+ && srvcTopOnClient.contains(grid(2).localNode().id()));
+
+ long prevTopVersion =
grid(0).context().discovery().topologyVersion();
+
+ grid(1).services().cancel(SRV_NAME);
+
+ srvcTopOnClient.clear();
+
+ grid(1).services().deploy(serviceCfg().setNodeFilter(null));
+
+ assertEquals(prevTopVersion,
grid(0).context().discovery().topologyVersion());
+
+ ((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
+
+ while (srvcTopOnClient.isEmpty())
+ svc.testMethod();
+
+ assertEquals(GRIDS, srvcTopOnClient.size());
+
+ for (Ignite ig : G.allGrids())
+
assertTrue(srvcTopOnClient.contains(ig.cluster().localNode().id()));
+ }
+ }
+
+ /** */
+ private void doTestClusterTopChangesWhileServiceCalling(
+ int nodesCnt,
+ boolean addNodes,
+ boolean multiThreaded)
+ throws Exception {
+ assert nodesCnt > 0;
+
+ Set<UUID> newNodesUUIDs = new GridConcurrentHashSet<>();
+
+ // Start additional nodes to stop them.
+ if (!addNodes) {
+ startGridsMultiThreaded(GRIDS, nodesCnt);
+
+ for (int i = GRIDS; i < GRIDS + nodesCnt; ++i)
+ newNodesUUIDs.add(grid(i).localNode().id());
+ }
+
+ // Service topology on the clients.
+ Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
+
+ addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+
+ AtomicBoolean changeClusterTop = new AtomicBoolean();
+ AtomicBoolean stopFlag = new AtomicBoolean();
+
+ try (IgniteClient client = startClient()) {
+ ServicesTest.TestServiceInterface svc =
client.services().serviceProxy(SRV_NAME,
ServicesTest.TestServiceInterface.class);
+
+ ((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
+
+ IgniteInternalFuture<?> runFut = runMultiThreadedAsync(() -> {
+ do {
+ try {
+ svc.testMethod();
+ }
+ catch (ClientException e) {
+ String m = e.getMessage();
+
+ // TODO: IGNITE-20802 : Exception should not occur.
+ // Client doesn't retry service invocation if the
redirected-to service instance node leaves cluster.
+ if (addNodes || (!m.contains("Node has left grid") &&
!m.contains("Failed to send job due to node failure"))
+ || newNodesUUIDs.stream().noneMatch(nid ->
m.contains(nid.toString())))
+ throw e;
+ }
+ }
+ while (!stopFlag.get());
+ }, multiThreaded ? 4 : 1, "ServiceTestLoader");
+
+ while (!stopFlag.get()) {
+ // Wait until the initial topology is received.
+ if (srvcTopOnClient.size() == (addNodes ? INIT_SRVC_NODES_CNT
: INIT_SRVC_NODES_CNT + nodesCnt)
+ && changeClusterTop.compareAndSet(false, true)) {
+ srvcTopOnClient.clear();
+
+ for (int i = 0; i < nodesCnt; ++i) {
+ int nodeIdx = GRIDS + i;
+
+ runAsync(() -> {
+ try {
+ if (addNodes)
+
newNodesUUIDs.add(startGrid(nodeIdx).localNode().id());
+ else
+ stopGrid(nodeIdx);
+ }
+ catch (Exception e) {
+ log.error("Unable to start or stop test
grid.", e);
+
+ stopFlag.set(true);
+ }
+ });
+ }
+ }
+
+ // Stop if new excepted service topology received.
+ if (srvcTopOnClient.size() == (addNodes ? INIT_SRVC_NODES_CNT
+ nodesCnt : INIT_SRVC_NODES_CNT))
+ stopFlag.set(true);
+
+ Thread.sleep(10);
+ }
+
+ runFut.get();
+ }
+
+ // The initial nodes must always persist it the service topology.
+ assertTrue(srvcTopOnClient.contains(grid(1).localNode().id())
+ && srvcTopOnClient.contains(grid(2).localNode().id()));
+
+ assertEquals(addNodes ? nodesCnt : 0,
newNodesUUIDs.stream().filter(srvcTopOnClient::contains).count());
+ }
+
+ /**
+ * Tests client uses service awareness when partitionAwareness is enabled.
+ */
+ @Test
+ public void testServiceAwarenessEnabled() {
+ // Counters of the invocation redirects.
+ AtomicInteger redirectCnt = new AtomicInteger();
+
+ // Service topology received by the client.
+ Set<UUID> top = new GridConcurrentHashSet<>();
+
+ // Requested server nodes with a service invocation.
+ Collection<UUID> requestedServers = new GridConcurrentHashSet<>();
+
+ addSrvcTopUpdateClientLogLsnr(uuids -> {
+ // Reset counters on the first topology update.
+ if (top.isEmpty())
+ redirectCnt.set(0);
+
+ top.addAll(uuids);
+ });
+
+ // Listener of the service remote call (the redirection).
+ G.allGrids().forEach(g ->
((IgniteEx)g).context().io().addMessageListener(GridTopic.TOPIC_JOB, new
GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc)
{
+ if (msg instanceof GridJobExecuteRequest
+ &&
((GridJobExecuteRequest)msg).getTaskClassName().contains(GridServiceProxy.class.getName()))
+ redirectCnt.incrementAndGet();
+ }
+ }));
+
+ partitionAwareness = false;
+
+ ((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
+
+ try (IgniteClient client = startClient(requestedServers)) {
+ ServicesTest.TestServiceInterface svc =
client.services().serviceProxy(SRV_NAME,
ServicesTest.TestServiceInterface.class);
+
+ for (int i = 0; i < 100; ++i)
+ svc.testMethod();
+ }
+
+ // Check no service awareness: continous redirections.
+ assertEquals(100, redirectCnt.get());
+
+ // Ensure that client received no service topology update.
+ assertTrue(top.isEmpty());
+
+ assertTrue(requestedServers.size() == 1 &&
requestedServers.contains(grid(0).localNode().id()));
+
+ partitionAwareness = true;
+
+ try (IgniteClient client = startClient(requestedServers)) {
+ ServicesTest.TestServiceInterface svc =
client.services().serviceProxy(SRV_NAME,
ServicesTest.TestServiceInterface.class);
+
+ // We assume that the topology will be received and used for the
further requests.
+ for (int i = 0; i < 1000; ++i)
+ svc.testMethod();
+
+ redirectCnt.set(0);
+ requestedServers.clear();
+
+ for (int i = 0; i < 1000; ++i)
+ svc.testMethod();
+ }
+
+ // Check the received topology.
+ assertTrue(top.size() == INIT_SRVC_NODES_CNT &&
top.contains(grid(1).localNode().id())
+ && top.contains(grid(2).localNode().id()));
+
+ // Ensure that only the target nodes were requeted after the topology
getting.
+ assertEquals(top, requestedServers);
+
+ // Ensure there were no redirected sertvic calls any more.
+ assertEquals(0, redirectCnt.get());
+ }
+
+ /** */
+ private static void addSrvcTopUpdateClientLogLsnr(Consumer<Set<UUID>>
srvTopConsumer) {
+ clientLogLsnr.registerListener(s -> {
+ if (s.contains("Topology of service '" + SRV_NAME + "' has been
updated. The service instance nodes: ")) {
+ String nodes = s.substring(s.indexOf(": [") + 3);
+
+ nodes = nodes.substring(0, nodes.length() - 1);
+
+ srvTopConsumer.accept(Arrays.stream(nodes.split(",
")).map(UUID::fromString).collect(Collectors.toSet()));
+ }
+ });
+ }
+
+ /** */
+ private IgniteClient startClient() {
+ return new TcpIgniteClient((cfg, hnd) -> new TestTcpChannel(cfg, hnd,
null),
+ getClientConfiguration(grid(0)));
+ }
+
+ /** */
+ private IgniteClient startClient(@Nullable Collection<UUID>
requestedServerNodes) {
+ return new TcpIgniteClient((cfg, hnd) -> new TestTcpChannel(cfg, hnd,
requestedServerNodes),
+ getClientConfiguration(grid(0)));
+ }
+
+ /**
+ * Accepts nodes with the name index equal to 1, 2 or >= GRIDS.
+ */
+ private static final class TestNodeFilter implements
IgnitePredicate<ClusterNode> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ String nodeName = node.attribute("org.apache.ignite.ignite.name");
+
+ if (F.isEmpty(nodeName))
+ return false;
+
+ int nodeIdx = -1;
+
+ try {
+ nodeIdx =
Integer.parseInt(nodeName.substring(nodeName.length() - 1));
+ }
+ catch (Exception e) {
+ // No-op.
+ }
+
+ return nodeIdx == 1 || nodeIdx == 2 || nodeIdx >= GRIDS;
+ }
+ }
+
+ /** */
+ private static final class TestBlockingDiscoverySpi extends
TcpDiscoverySpi {
+ /** */
+ private final Set<Class<? extends DiscoveryCustomMessage>> toBlock =
new HashSet<>();
+
+ /** */
+ private final List<CustomMessageWrapper> blocked = new
CopyOnWriteArrayList<>();
+
+ /** {@inheritDoc} */
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg)
throws IgniteException {
+ if (msg instanceof CustomMessageWrapper
+ && toBlock.stream().anyMatch(mt ->
mt.isAssignableFrom(((CustomMessageWrapper)msg).delegate().getClass()))) {
+ blocked.add((CustomMessageWrapper)msg);
+
+ return;
+ }
+
+ super.sendCustomEvent(msg);
+ }
+
+ /** */
+ public void release() {
+ toBlock.clear();
+
+ blocked.forEach(this::sendCustomEvent);
+ }
+ }
+
+ /**
+ * A client connection channel able to register the server nodes requested
to call a service.
+ */
+ private static final class TestTcpChannel extends TcpClientChannel {
+ /** */
+ private final @Nullable Collection<UUID> requestedServerNodes;
+
+ /** Ctor. */
+ private TestTcpChannel(
+ ClientChannelConfiguration cfg,
+ ClientConnectionMultiplexer connMgr,
+ @Nullable Collection<UUID> requestedServerNodes)
+ throws ClientConnectionException, ClientAuthenticationException,
ClientProtocolError {
+ super(cfg, connMgr);
+
+ this.requestedServerNodes = requestedServerNodes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T service(ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader) throws
ClientException {
+ UUID srvNodeId = serverNodeId();
+
+ if (op == ClientOperation.SERVICE_INVOKE && requestedServerNodes
!= null && srvNodeId != null)
+ requestedServerNodes.add(srvNodeId);
+
+ return super.service(op, payloadWriter, payloadReader);
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
index 3069a56313f..f9510ae8f36 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
@@ -444,7 +444,7 @@ public class GridFutureAdapterSelfTest extends
GridCommonAbstractTest {
}
/**
- * Completes the future exceptionally, if {@code res} is the instanse of
{@link Throwable}.
+ * Completes the future exceptionally, if {@code res} is the instance of
{@link Throwable}.
*
* @param fut Future.
* @param res Result.
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 04d9e2b76d1..588715be857 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -31,6 +31,7 @@ import
org.apache.ignite.internal.client.thin.InactiveClusterCacheRequestTest;
import org.apache.ignite.internal.client.thin.MetadataRegistrationTest;
import
org.apache.ignite.internal.client.thin.OptimizedMarshallerClassesCachedTest;
import org.apache.ignite.internal.client.thin.ReliableChannelTest;
+import org.apache.ignite.internal.client.thin.ServiceAwarenessTest;
import org.apache.ignite.internal.client.thin.ServicesBinaryArraysTests;
import org.apache.ignite.internal.client.thin.ServicesTest;
import org.apache.ignite.internal.client.thin.ThinClientEnpointsDiscoveryTest;
@@ -68,6 +69,7 @@ import org.junit.runners.Suite;
ClusterGroupTest.class,
ServicesTest.class,
ServicesBinaryArraysTests.class,
+ ServiceAwarenessTest.class,
CacheEntryListenersTest.class,
ThinClientPartitionAwarenessStableTopologyTest.class,
ThinClientPartitionAwarenessUnstableTopologyTest.class,