This is an automated email from the ASF dual-hosted git repository.
petrov-mg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new ab2355b539c IGNITE-28369 Fixed service became unavailable if nodes
leave the cluster (#13034)
ab2355b539c is described below
commit ab2355b539c9584561b290452be94184da7f0296
Author: Mikhail Petrov <[email protected]>
AuthorDate: Sat May 9 09:41:21 2026 +0300
IGNITE-28369 Fixed service became unavailable if nodes leave the cluster
(#13034)
---
.../processors/service/GridServiceProxy.java | 20 ++-
.../processors/service/IgniteServiceProcessor.java | 14 +-
.../service/ServiceClusterDeploymentResult.java | 28 ++-
.../service/ServiceDeploymentActions.java | 11 +-
.../processors/service/ServiceDeploymentTask.java | 62 ++++++-
.../internal/processors/service/ServiceInfo.java | 25 ++-
.../processors/service/ServiceTopology.java | 88 ++++++++++
.../IgniteDiscoverySpiInternalListener.java | 4 +-
.../processors/service/ServiceInfoSelfTest.java | 4 +-
.../service/ServiceRedeploymentOnNodeLeftTest.java | 189 +++++++++++++++++++++
.../junits/IgniteConfigVariationsAbstractTest.java | 7 +-
.../testsuites/IgniteServiceGridTestSuite.java | 4 +-
12 files changed, 408 insertions(+), 48 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
index 80ee4f0d600..98484e6b340 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
@@ -276,7 +276,7 @@ public class GridServiceProxy<T> implements Serializable {
}
if (waitTimeout > 0 && U.currentTimeMillis() - startTime >=
waitTimeout)
- throw new IgniteException("Service acquire timeout was
reached, stopping. [timeout=" + waitTimeout + "]");
+ throw new IgniteException("Service acquire timeout was
reached, stopping [timeout=" + waitTimeout + "]");
}
}
finally {
@@ -397,7 +397,9 @@ public class GridServiceProxy<T> implements Serializable {
if (snapshot.size() == 1) {
UUID nodeId = snapshot.keySet().iterator().next();
- return prj.node(nodeId);
+ ClusterNode node = getAliveNode(nodeId);
+
+ return prj.predicate().apply(node) ? node : null;
}
Collection<ClusterNode> nodes = prj.nodes();
@@ -419,7 +421,7 @@ public class GridServiceProxy<T> implements Serializable {
for (Map.Entry<UUID, Integer> e : snapshot.entrySet()) {
if (i++ >= idx) {
if (e.getValue() > 0)
- return ctx.discovery().node(e.getKey());
+ return getAliveNode(e.getKey());
}
}
@@ -428,7 +430,7 @@ public class GridServiceProxy<T> implements Serializable {
// Circle back.
for (Map.Entry<UUID, Integer> e : snapshot.entrySet()) {
if (e.getValue() > 0)
- return ctx.discovery().node(e.getKey());
+ return getAliveNode(e.getKey());
if (i++ == idx)
return null;
@@ -462,6 +464,16 @@ public class GridServiceProxy<T> implements Serializable {
return proxy;
}
+ /** */
+ public ClusterNode getAliveNode(UUID nodeId) throws
ClusterTopologyCheckedException {
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null)
+ throw new ClusterTopologyCheckedException("The node holding the
service left the cluster [nodeId=" + nodeId + ']');
+
+ return node;
+ }
+
/**
* @param mtd Method to invoke.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
index ed9153b4969..86fa2f2cea9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
@@ -966,7 +966,7 @@ public class IgniteServiceProcessor extends
GridProcessorAdapter implements Igni
if (timeout == 0 && desc == null)
return null;
- if (desc != null && desc.topologyInitialized())
+ if (desc != null && !desc.serviceTopology().isTransitional())
return desc.topologySnapshot();
long wait = 0;
@@ -1566,7 +1566,7 @@ public class IgniteServiceProcessor extends
GridProcessorAdapter implements Igni
*
* @param fullTops Deployment topologies.
*/
- void updateServicesTopologies(@NotNull final Map<IgniteUuid, Map<UUID,
Integer>> fullTops) {
+ void updateServicesTopologies(@NotNull final Map<IgniteUuid,
ServiceTopology> fullTops) {
if (!enterBusy())
return;
@@ -1936,7 +1936,7 @@ public class IgniteServiceProcessor extends
GridProcessorAdapter implements Igni
* @param msg Message.
*/
private void
processServicesFullDeployments(ServiceClusterDeploymentResultBatch msg) {
- final Map<IgniteUuid, Map<UUID, Integer>> fullTops = new HashMap<>();
+ final Map<IgniteUuid, ServiceTopology> fullTops = new HashMap<>();
final Map<IgniteUuid, Collection<Throwable>> fullErrors = new
HashMap<>();
for (ServiceClusterDeploymentResult depRes : msg.results()) {
@@ -1959,7 +1959,7 @@ public class IgniteServiceProcessor extends
GridProcessorAdapter implements Igni
if (!errors.isEmpty())
fullErrors.computeIfAbsent(srvcId, e -> new
ArrayList<>()).addAll(errors);
- fullTops.put(srvcId, top);
+ fullTops.put(srvcId, new ServiceTopology(top,
depRes.isServiceTopologyTransitional()));
}
synchronized (servicesTopsUpdateMux) {
@@ -1990,14 +1990,12 @@ public class IgniteServiceProcessor extends
GridProcessorAdapter implements Igni
* @param services Services info to update.
* @param tops Deployment topologies.
*/
- private void updateServicesMap(Map<IgniteUuid, ServiceInfo> services,
- Map<IgniteUuid, Map<UUID, Integer>> tops) {
-
+ private void updateServicesMap(Map<IgniteUuid, ServiceInfo> services,
Map<IgniteUuid, ServiceTopology> tops) {
tops.forEach((srvcId, top) -> {
ServiceInfo desc = services.get(srvcId);
if (desc != null)
- desc.topologySnapshot(top);
+ desc.updateServiceTopology(top);
});
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceClusterDeploymentResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceClusterDeploymentResult.java
index 655f70dfd45..2c8290513a0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceClusterDeploymentResult.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceClusterDeploymentResult.java
@@ -43,6 +43,14 @@ public class ServiceClusterDeploymentResult implements
Message {
@GridToStringInclude
Map<UUID, ServiceSingleNodeDeploymentResult> results;
+ /**
+ * Whether topology is transitional. Nodes may leave the cluster while the
service topology is being recalculated.
+ * In this case, the resulting service topology may be incomplete. We
consider the mentioned service topology
+ * transitional and expect it to be recalculated soon.
+ */
+ @Order(2)
+ boolean isSvcTopTransitional;
+
/** Default constructor for {@link MessageFactory}. */
public ServiceClusterDeploymentResult() {
}
@@ -51,8 +59,10 @@ public class ServiceClusterDeploymentResult implements
Message {
* @param srvcId Service id.
* @param results Deployments results.
*/
- public ServiceClusterDeploymentResult(@NotNull IgniteUuid srvcId,
- @NotNull Map<UUID, ServiceSingleNodeDeploymentResult> results) {
+ public ServiceClusterDeploymentResult(
+ @NotNull IgniteUuid srvcId,
+ @NotNull Map<UUID, ServiceSingleNodeDeploymentResult> results
+ ) {
this.srvcId = srvcId;
this.results = results;
}
@@ -71,6 +81,20 @@ public class ServiceClusterDeploymentResult implements
Message {
return Collections.unmodifiableMap(results);
}
+ /** */
+ public boolean isServiceTopologyTransitional() {
+ return isSvcTopTransitional;
+ }
+
+ /**
+ * Marks topology as transitional. Nodes may leave the cluster while the
service topology is being recalculated.
+ * In this case, the resulting service topology may be incomplete. We
consider the mentioned service topology
+ * transitional and expect it to be recalculated soon.
+ */
+ public void markServiceTopologyTransitional() {
+ isSvcTopTransitional = true;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ServiceClusterDeploymentResult.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java
index e42e3a1a6e2..5a15a419cf2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java
@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.UUID;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.util.typedef.F;
@@ -43,7 +42,7 @@ public class ServiceDeploymentActions {
private Map<IgniteUuid, ServiceInfo> servicesToUndeploy;
/** Services deployment topologies. */
- private Map<IgniteUuid, Map<UUID, Integer>> depTops;
+ private Map<IgniteUuid, ServiceTopology> depTops;
/** Services deployment errors. */
private Map<IgniteUuid, Collection<Throwable>> depErrors;
@@ -118,15 +117,15 @@ public class ServiceDeploymentActions {
/**
* @return Deployment topologies.
*/
- @NotNull public Map<IgniteUuid, Map<UUID, Integer>> deploymentTopologies()
{
+ @NotNull public Map<IgniteUuid, ServiceTopology> deploymentTopologies() {
return depTops != null ? depTops : Collections.emptyMap();
}
/**
* @param depTops Deployment topologies.
*/
- public void deploymentTopologies(@NotNull Map<IgniteUuid, Map<UUID,
Integer>> depTops) {
- this.depTops = Collections.unmodifiableMap(new HashMap<>(depTops));
+ public void deploymentTopologies(@NotNull Map<IgniteUuid, ServiceTopology>
depTops) {
+ this.depTops = Collections.unmodifiableMap(depTops);
}
/**
@@ -140,6 +139,6 @@ public class ServiceDeploymentActions {
* @param depErrors Deployment errors.
*/
public void deploymentErrors(@NotNull Map<IgniteUuid,
Collection<Throwable>> depErrors) {
- this.depErrors = Collections.unmodifiableMap(new HashMap<>(depErrors));
+ this.depErrors = Collections.unmodifiableMap(depErrors);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
index ede3217390b..ad654af7ab5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.service;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -79,6 +80,9 @@ class ServiceDeploymentTask {
@GridToStringInclude
private final Set<UUID> remaining = new HashSet<>();
+ /** Nodes that did not respond with single message because they left the
cluster during distributed process. */
+ private final Set<UUID> failedToReply = new HashSet<>();
+
/** Added in deployment queue flag. */
private final AtomicBoolean addedInQueue = new AtomicBoolean(false);
@@ -219,8 +223,13 @@ class ServiceDeploymentTask {
if (evtType == EVT_NODE_LEFT || evtType ==
EVT_NODE_FAILED) {
deployedServices.forEach((srvcId, desc) -> {
- if
(desc.topologySnapshot().containsKey(evtNode.id()) ||
- (desc.cacheName() != null &&
!evtNode.isClient())) // If affinity service
+ ServiceTopology top = desc.serviceTopology();
+
+ if (
+ top.isTransitional() ||
+ top.containsNode(evtNode.id()) ||
+ desc.cacheName() != null &&
!evtNode.isClient() // If affinity service
+ )
toDeploy.put(srvcId, desc);
});
}
@@ -342,8 +351,13 @@ class ServiceDeploymentTask {
try {
for (ClusterNode node : ctx.discovery().nodes(topVer)) {
- if (ctx.discovery().alive(node) &&
!singleDepsMsgs.containsKey(node.id()))
+ if (singleDepsMsgs.containsKey(node.id()))
+ continue;
+
+ if (ctx.discovery().alive(node))
remaining.add(node.id());
+ else
+ failedToReply.add(node.id());
}
}
catch (Exception e) {
@@ -462,7 +476,7 @@ class ServiceDeploymentTask {
assert depResults != null : "Services deployment actions
should be attached.";
- final Map<IgniteUuid, Map<UUID, Integer>> fullTops =
depResults.deploymentTopologies();
+ final Map<IgniteUuid, ServiceTopology> fullTops =
depResults.deploymentTopologies();
final Map<IgniteUuid, Collection<Throwable>> fullErrors =
depResults.deploymentErrors();
depActions.deploymentTopologies(fullTops);
@@ -473,7 +487,7 @@ class ServiceDeploymentTask {
final Map<IgniteUuid, ServiceInfo> services =
srvcProc.deployedServices();
fullTops.forEach((srvcId, top) -> {
- Integer expCnt = top.getOrDefault(ctx.localNodeId(),
0);
+ Integer expCnt =
top.snapshot().getOrDefault(ctx.localNodeId(), 0);
if (expCnt < srvcProc.localInstancesCount(srvcId)) {
// Undeploy exceed instances
ServiceInfo desc = services.get(srvcId);
@@ -483,7 +497,7 @@ class ServiceDeploymentTask {
ServiceConfiguration cfg = desc.configuration();
try {
- srvcProc.redeploy(srvcId, cfg, top);
+ srvcProc.redeploy(srvcId, cfg, top.snapshot());
}
catch (IgniteCheckedException e) {
log.error("Error occured during cancel exceed
service instances: " +
@@ -649,15 +663,41 @@ class ServiceDeploymentTask {
final Collection<ServiceClusterDeploymentResult> fullResults = new
ArrayList<>();
+ Set<IgniteUuid> transitionalSrvcTops = collectTransitionalTopologies();
+
singleResults.forEach((srvcId, dep) -> {
ServiceClusterDeploymentResult res = new
ServiceClusterDeploymentResult(srvcId, dep);
+ if (transitionalSrvcTops.contains(srvcId))
+ res.markServiceTopologyTransitional();
+
fullResults.add(res);
});
return fullResults;
}
+ /**
+ * Nodes may leave the cluster while the service topology is being
recalculated. In this case, the resulting service
+ * topology may be incomplete. We consider the mentioned service topology
transitional and expect it to be recalculated
+ * soon.
+ */
+ private Set<IgniteUuid> collectTransitionalTopologies() {
+ if (failedToReply.isEmpty())
+ return Collections.emptySet();
+
+ Set<IgniteUuid> res = new HashSet<>();
+
+ for (UUID nodeId : failedToReply) {
+ expDeps.forEach((srvcId, top) -> {
+ if (top.containsKey(nodeId))
+ res.add(srvcId);
+ });
+ }
+
+ return res;
+ }
+
/**
* Handles a node leaves topology.
*
@@ -688,10 +728,14 @@ class ServiceDeploymentTask {
synchronized (initCrdMux) {
boolean rmvd = remaining.remove(nodeId);
- if (rmvd && remaining.isEmpty()) {
- singleDepsMsgs.remove(nodeId);
+ if (rmvd) {
+ failedToReply.add(nodeId);
- onAllReceived();
+ if (remaining.isEmpty()) {
+ singleDepsMsgs.remove(nodeId);
+
+ onAllReceived();
+ }
}
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java
index 1d750d13e11..59b1f0903f8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.service;
-import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteException;
@@ -31,6 +30,8 @@ import org.apache.ignite.services.ServiceDescriptor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static
org.apache.ignite.internal.processors.service.ServiceTopology.EMPTY;
+
/**
* Service's information container.
*/
@@ -55,7 +56,7 @@ public class ServiceInfo implements ServiceDescriptor {
/** Topology snapshot. */
@GridToStringInclude
- private volatile Map<UUID, Integer> top;
+ private volatile ServiceTopology top = EMPTY;
/** Service class. */
private transient volatile Class<? extends Service> srvcCls;
@@ -97,10 +98,15 @@ public class ServiceInfo implements ServiceDescriptor {
*
* @param top Topology snapshot.
*/
- public void topologySnapshot(@NotNull Map<UUID, Integer> top) {
+ public void updateServiceTopology(@NotNull ServiceTopology top) {
this.top = top;
}
+ /** @return Service Topology. */
+ public ServiceTopology serviceTopology() {
+ return top;
+ }
+
/**
* Returns service's configuration.
*
@@ -118,7 +124,7 @@ public class ServiceInfo implements ServiceDescriptor {
}
/**
- * Rerurns services id.
+ * Returns services id.
*
* @return Service id.
*/
@@ -186,16 +192,7 @@ public class ServiceInfo implements ServiceDescriptor {
/** {@inheritDoc} */
@Override public Map<UUID, Integer> topologySnapshot() {
- return top == null ? Collections.emptyMap() :
Collections.unmodifiableMap(top);
- }
-
- /**
- * Whether service topology was initialized.
- *
- * @return {@code True} if service topology was initialized.
- */
- public boolean topologyInitialized() {
- return top != null;
+ return top.snapshot();
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceTopology.java
new file mode 100644
index 00000000000..a4e9b9c229f
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceTopology.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** */
+public class ServiceTopology implements Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Empty service topology instance. */
+ public static final ServiceTopology EMPTY = new ServiceTopology();
+
+ /** Topology snapshot. */
+ @GridToStringInclude
+ private final Map<UUID, Integer> snapshot;
+
+ /**
+ * Whether topology is transitional. Nodes may leave the cluster while the
service topology is being recalculated.
+ * In this case, the resulting service topology may be incomplete. We
consider the mentioned service topology
+ * transitional and expect it to be recalculated soon.
+ */
+ @GridToStringInclude
+ private final boolean isTransitional;
+
+ /** */
+ private ServiceTopology() {
+ snapshot = Collections.emptyMap();
+ isTransitional = true;
+ }
+
+ /** */
+ public ServiceTopology(Map<UUID, Integer> topSnapshot) {
+ this(topSnapshot, false);
+ }
+
+ /**
+ * @param snapshot Service topology snapshot.
+ * @param isTransitional Whether topology is transitional. Nodes may leave
the cluster while the service topology is being recalculated.
+ * In this case, the resulting service topology may be incomplete. We
consider the mentioned service topology
+ * transitional and expect it to be recalculated soon.
+ */
+ public ServiceTopology(Map<UUID, Integer> snapshot, boolean
isTransitional) {
+ this.snapshot = Collections.unmodifiableMap(snapshot);
+ this.isTransitional = isTransitional;
+ }
+
+ /** */
+ public Map<UUID, Integer> snapshot() {
+ return snapshot;
+ }
+
+ /** */
+ public boolean isTransitional() {
+ return isTransitional;
+ }
+
+ /** */
+ public boolean containsNode(UUID nodeId) {
+ return snapshot.containsKey(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ServiceTopology.class, this);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
index 6b25e9ff5d9..54365c2b510 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
@@ -30,7 +30,9 @@ public interface IgniteDiscoverySpiInternalListener {
* @param locNode Local node.
* @param log Log.
*/
- public void beforeJoin(ClusterNode locNode, IgniteLogger log);
+ public default void beforeJoin(ClusterNode locNode, IgniteLogger log) {
+ // No-op.
+ }
/**
* @param locNode Local node.
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceInfoSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceInfoSelfTest.java
index a67980f89d2..208cda4072f 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceInfoSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceInfoSelfTest.java
@@ -89,14 +89,14 @@ public class ServiceInfoSelfTest {
* Tests {@link ServiceInfo#topologySnapshot()}.
*/
@Test
- public void testTopologySnapshotEquality() {
+ public void testServiceTopologyEquality() {
assertEquals(new HashMap<>(), sut.topologySnapshot());
HashMap<UUID, Integer> top = new HashMap<>();
top.put(nodeId, 5);
- sut.topologySnapshot(top);
+ sut.updateServiceTopology(new ServiceTopology(top));
assertEquals(top, sut.topologySnapshot());
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceRedeploymentOnNodeLeftTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceRedeploymentOnNodeLeftTest.java
new file mode 100644
index 00000000000..bc9a5db21f1
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceRedeploymentOnNodeLeftTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+
+/** */
+public class ServiceRedeploymentOnNodeLeftTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+ cfg.setDiscoverySpi(new TestTcpDiscoverySpi()
+
.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** */
+ @Test
+ public void testReassignedServiceRedeployment() throws Exception {
+ startGrids(4);
+
+ assignServiceToNode(2);
+
+ grid(0).services().deploy(new ServiceConfiguration()
+ .setName("service")
+ .setService(new TestService())
+ .setTotalCount(1)
+ .setNodeFilter(new TestNodeFilter()));
+
+ assertEquals("test", grid(3).services().serviceProxy("service",
Supplier.class, false, 5_000).get());
+
+ assignServiceToNode(1);
+
+ spi(grid(1)).blockMessages((n, msg) -> msg instanceof
ServiceSingleNodeDeploymentResultBatch);
+
+ interceptDiscoveryMessage(0,
ServiceClusterDeploymentResultBatch.class, () -> {
+ assignServiceToNode(3);
+
+ return true; // Proceed with message sending.
+ });
+
+ stopGrid(2);
+
+ spi(grid(1)).waitForBlocked();
+
+ stopGrid(1);
+
+ assertEquals("test", grid(3).services().serviceProxy("service",
Supplier.class, false, 5_000).get());
+ }
+
+ /** */
+ @Test
+ public void testCoordinatorChangeReassignedServiceRedeployment() throws
Exception {
+ startGrids(5);
+
+ assignServiceToNode(2);
+
+ grid(0).services().deploy(new ServiceConfiguration()
+ .setName("service")
+ .setService(new TestService())
+ .setTotalCount(1)
+ .setNodeFilter(new TestNodeFilter()));
+
+ assertEquals("test", grid(3).services().serviceProxy("service",
Supplier.class, false, 5_000).get());
+
+ assignServiceToNode(1);
+
+ spi(grid(1)).blockMessages((n, msg) -> msg instanceof
ServiceSingleNodeDeploymentResultBatch);
+
+ CountDownLatch allSingleMsgReceivedOnCoordinatorLatch = new
CountDownLatch(1);
+
+ interceptDiscoveryMessage(0,
ServiceClusterDeploymentResultBatch.class, () -> {
+ assignServiceToNode(4);
+
+ allSingleMsgReceivedOnCoordinatorLatch.countDown();
+
+ return false; // Block message sending.
+ });
+
+ stopGrid(2);
+
+ spi(grid(1)).waitForBlocked();
+
+ stopGrid(1);
+
+
assertTrue(allSingleMsgReceivedOnCoordinatorLatch.await(getTestTimeout(),
TimeUnit.MILLISECONDS));
+
+ stopGrid(0);
+
+ assertEquals("test", grid(3).services().serviceProxy("service",
Supplier.class, false, 5_000).get());
+ }
+
+ /** */
+ private void invokeOnDiscoveryMessage(int nodeIdx, Class<?> msgCls,
Runnable action) {
+ interceptDiscoveryMessage(nodeIdx, msgCls, () -> {
+ action.run();
+
+ return true;
+ });
+ }
+
+ /** */
+ private void interceptDiscoveryMessage(int nodeIdx, Class<?> msgCls,
Supplier<Boolean> interceptor) {
+ TestTcpDiscoverySpi discoSpi =
(TestTcpDiscoverySpi)grid(nodeIdx).configuration().getDiscoverySpi();
+
+ discoSpi.setInternalListener(new IgniteDiscoverySpiInternalListener() {
+ @Override public boolean beforeSendCustomEvent(DiscoverySpi spi,
IgniteLogger log, DiscoverySpiCustomMessage msg) {
+ if (msgCls.isAssignableFrom(msg.getClass())) {
+ try {
+ return interceptor.get();
+ }
+ finally {
+ discoSpi.setInternalListener(null);
+ }
+ }
+
+ return true;
+ }
+ });
+ }
+
+ /** Service reassignment is not deterministic - it use random to choose
service holder among nodes. */
+ private void assignServiceToNode(int idx) {
+ TestNodeFilter.serviceHolderNodeId = grid(idx).context().localNodeId();
+ }
+
+ /** */
+ private static class TestNodeFilter implements
IgnitePredicate<ClusterNode> {
+ /** */
+ static UUID serviceHolderNodeId;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ return node.id().equals(serviceHolderNodeId);
+ }
+ }
+
+ /** */
+ private static class TestService implements Supplier<String>, Service {
+ /** {@inheritDoc} */
+ @Override public String get() {
+ return "test";
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
index 44599dc4a30..f7357852e46 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
@@ -385,7 +385,12 @@ public abstract class IgniteConfigVariationsAbstractTest
extends GridCommonAbstr
private VariationsTestsConfig dummyCfg() {
return new VariationsTestsConfig(
new ConfigVariationsFactory(null, new int[] {0},
ConfigVariations.cacheBasicSet(), new int[] {0}),
- "Dummy config", false, null, 1, false);
+ "Dummy config",
+ true,
+ null,
+ 1,
+ false
+ );
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java
index 514efaa7681..034c8ff5c86 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java
@@ -62,6 +62,7 @@ import
org.apache.ignite.internal.processors.service.ServiceHotRedeploymentViaDe
import org.apache.ignite.internal.processors.service.ServiceInfoSelfTest;
import
org.apache.ignite.internal.processors.service.ServicePredicateAccessCacheTest;
import
org.apache.ignite.internal.processors.service.ServiceReassignmentFunctionSelfTest;
+import
org.apache.ignite.internal.processors.service.ServiceRedeploymentOnNodeLeftTest;
import
org.apache.ignite.internal.processors.service.SystemCacheNotConfiguredTest;
import org.apache.ignite.services.ServiceThreadPoolSelfTest;
import org.apache.ignite.tools.junit.JUnitTeamcityReporter;
@@ -122,7 +123,8 @@ import org.junit.runners.Suite;
GridServiceClusterReadOnlyModeTest.class,
IgniteServiceCallContextTest.class,
GridServiceMetricsTest.class,
- IgniteServiceCallInterceptorTest.class
+ IgniteServiceCallInterceptorTest.class,
+ ServiceRedeploymentOnNodeLeftTest.class,
})
public class IgniteServiceGridTestSuite {
/** */