This is an automated email from the ASF dual-hosted git repository.

petrov-mg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new ab2355b539c IGNITE-28369 Fixed service became unavailable if nodes 
leave the cluster (#13034)
ab2355b539c is described below

commit ab2355b539c9584561b290452be94184da7f0296
Author: Mikhail Petrov <[email protected]>
AuthorDate: Sat May 9 09:41:21 2026 +0300

    IGNITE-28369 Fixed service became unavailable if nodes leave the cluster 
(#13034)
---
 .../processors/service/GridServiceProxy.java       |  20 ++-
 .../processors/service/IgniteServiceProcessor.java |  14 +-
 .../service/ServiceClusterDeploymentResult.java    |  28 ++-
 .../service/ServiceDeploymentActions.java          |  11 +-
 .../processors/service/ServiceDeploymentTask.java  |  62 ++++++-
 .../internal/processors/service/ServiceInfo.java   |  25 ++-
 .../processors/service/ServiceTopology.java        |  88 ++++++++++
 .../IgniteDiscoverySpiInternalListener.java        |   4 +-
 .../processors/service/ServiceInfoSelfTest.java    |   4 +-
 .../service/ServiceRedeploymentOnNodeLeftTest.java | 189 +++++++++++++++++++++
 .../junits/IgniteConfigVariationsAbstractTest.java |   7 +-
 .../testsuites/IgniteServiceGridTestSuite.java     |   4 +-
 12 files changed, 408 insertions(+), 48 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
index 80ee4f0d600..98484e6b340 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
@@ -276,7 +276,7 @@ public class GridServiceProxy<T> implements Serializable {
                 }
 
                 if (waitTimeout > 0 && U.currentTimeMillis() - startTime >= 
waitTimeout)
-                    throw new IgniteException("Service acquire timeout was 
reached, stopping. [timeout=" + waitTimeout + "]");
+                    throw new IgniteException("Service acquire timeout was 
reached, stopping [timeout=" + waitTimeout + "]");
             }
         }
         finally {
@@ -397,7 +397,9 @@ public class GridServiceProxy<T> implements Serializable {
         if (snapshot.size() == 1) {
             UUID nodeId = snapshot.keySet().iterator().next();
 
-            return prj.node(nodeId);
+            ClusterNode node = getAliveNode(nodeId);
+
+            return prj.predicate().apply(node) ? node : null;
         }
 
         Collection<ClusterNode> nodes = prj.nodes();
@@ -419,7 +421,7 @@ public class GridServiceProxy<T> implements Serializable {
             for (Map.Entry<UUID, Integer> e : snapshot.entrySet()) {
                 if (i++ >= idx) {
                     if (e.getValue() > 0)
-                        return ctx.discovery().node(e.getKey());
+                        return getAliveNode(e.getKey());
                 }
             }
 
@@ -428,7 +430,7 @@ public class GridServiceProxy<T> implements Serializable {
             // Circle back.
             for (Map.Entry<UUID, Integer> e : snapshot.entrySet()) {
                 if (e.getValue() > 0)
-                    return ctx.discovery().node(e.getKey());
+                    return getAliveNode(e.getKey());
 
                 if (i++ == idx)
                     return null;
@@ -462,6 +464,16 @@ public class GridServiceProxy<T> implements Serializable {
         return proxy;
     }
 
+    /** */
+    public ClusterNode getAliveNode(UUID nodeId) throws 
ClusterTopologyCheckedException {
+        ClusterNode node = ctx.discovery().node(nodeId);
+
+        if (node == null)
+            throw new ClusterTopologyCheckedException("The node holding the 
service left the cluster [nodeId=" + nodeId + ']');
+
+        return node;
+    }
+
     /**
      * @param mtd Method to invoke.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
index ed9153b4969..86fa2f2cea9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
@@ -966,7 +966,7 @@ public class IgniteServiceProcessor extends 
GridProcessorAdapter implements Igni
                 if (timeout == 0 && desc == null)
                     return null;
 
-                if (desc != null && desc.topologyInitialized())
+                if (desc != null && !desc.serviceTopology().isTransitional())
                     return desc.topologySnapshot();
 
                 long wait = 0;
@@ -1566,7 +1566,7 @@ public class IgniteServiceProcessor extends 
GridProcessorAdapter implements Igni
      *
      * @param fullTops Deployment topologies.
      */
-    void updateServicesTopologies(@NotNull final Map<IgniteUuid, Map<UUID, 
Integer>> fullTops) {
+    void updateServicesTopologies(@NotNull final Map<IgniteUuid, 
ServiceTopology> fullTops) {
         if (!enterBusy())
             return;
 
@@ -1936,7 +1936,7 @@ public class IgniteServiceProcessor extends 
GridProcessorAdapter implements Igni
      * @param msg Message.
      */
     private void 
processServicesFullDeployments(ServiceClusterDeploymentResultBatch msg) {
-        final Map<IgniteUuid, Map<UUID, Integer>> fullTops = new HashMap<>();
+        final Map<IgniteUuid, ServiceTopology> fullTops = new HashMap<>();
         final Map<IgniteUuid, Collection<Throwable>> fullErrors = new 
HashMap<>();
 
         for (ServiceClusterDeploymentResult depRes : msg.results()) {
@@ -1959,7 +1959,7 @@ public class IgniteServiceProcessor extends 
GridProcessorAdapter implements Igni
             if (!errors.isEmpty())
                 fullErrors.computeIfAbsent(srvcId, e -> new 
ArrayList<>()).addAll(errors);
 
-            fullTops.put(srvcId, top);
+            fullTops.put(srvcId, new ServiceTopology(top, 
depRes.isServiceTopologyTransitional()));
         }
 
         synchronized (servicesTopsUpdateMux) {
@@ -1990,14 +1990,12 @@ public class IgniteServiceProcessor extends 
GridProcessorAdapter implements Igni
      * @param services Services info to update.
      * @param tops Deployment topologies.
      */
-    private void updateServicesMap(Map<IgniteUuid, ServiceInfo> services,
-        Map<IgniteUuid, Map<UUID, Integer>> tops) {
-
+    private void updateServicesMap(Map<IgniteUuid, ServiceInfo> services, 
Map<IgniteUuid, ServiceTopology> tops) {
         tops.forEach((srvcId, top) -> {
             ServiceInfo desc = services.get(srvcId);
 
             if (desc != null)
-                desc.topologySnapshot(top);
+                desc.updateServiceTopology(top);
         });
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceClusterDeploymentResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceClusterDeploymentResult.java
index 655f70dfd45..2c8290513a0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceClusterDeploymentResult.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceClusterDeploymentResult.java
@@ -43,6 +43,14 @@ public class ServiceClusterDeploymentResult implements 
Message {
     @GridToStringInclude
     Map<UUID, ServiceSingleNodeDeploymentResult> results;
 
+    /**
+     * Whether topology is transitional. Nodes may leave the cluster while the 
service topology is being recalculated.
+     * In this case, the resulting service topology may be incomplete. We 
consider the mentioned service topology
+     * transitional and expect it to be recalculated soon.
+     */
+    @Order(2)
+    boolean isSvcTopTransitional;
+
      /** Default constructor for {@link MessageFactory}. */
     public ServiceClusterDeploymentResult() {
     }
@@ -51,8 +59,10 @@ public class ServiceClusterDeploymentResult implements 
Message {
      * @param srvcId Service id.
      * @param results Deployments results.
      */
-    public ServiceClusterDeploymentResult(@NotNull IgniteUuid srvcId,
-        @NotNull Map<UUID, ServiceSingleNodeDeploymentResult> results) {
+    public ServiceClusterDeploymentResult(
+        @NotNull IgniteUuid srvcId,
+        @NotNull Map<UUID, ServiceSingleNodeDeploymentResult> results
+    ) {
         this.srvcId = srvcId;
         this.results = results;
     }
@@ -71,6 +81,20 @@ public class ServiceClusterDeploymentResult implements 
Message {
         return Collections.unmodifiableMap(results);
     }
 
+    /** */
+    public boolean isServiceTopologyTransitional() {
+        return isSvcTopTransitional;
+    }
+
+    /**
+     * Marks topology as transitional. Nodes may leave the cluster while the 
service topology is being recalculated.
+     * In this case, the resulting service topology may be incomplete. We 
consider the mentioned service topology
+     * transitional and expect it to be recalculated soon.
+     */
+    public void markServiceTopologyTransitional() {
+        isSvcTopTransitional = true;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ServiceClusterDeploymentResult.class, this);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java
index e42e3a1a6e2..5a15a419cf2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.UUID;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
 import org.apache.ignite.internal.util.typedef.F;
@@ -43,7 +42,7 @@ public class ServiceDeploymentActions {
     private Map<IgniteUuid, ServiceInfo> servicesToUndeploy;
 
     /** Services deployment topologies. */
-    private Map<IgniteUuid, Map<UUID, Integer>> depTops;
+    private Map<IgniteUuid, ServiceTopology> depTops;
 
     /** Services deployment errors. */
     private Map<IgniteUuid, Collection<Throwable>> depErrors;
@@ -118,15 +117,15 @@ public class ServiceDeploymentActions {
     /**
      * @return Deployment topologies.
      */
-    @NotNull public Map<IgniteUuid, Map<UUID, Integer>> deploymentTopologies() 
{
+    @NotNull public Map<IgniteUuid, ServiceTopology> deploymentTopologies() {
         return depTops != null ? depTops : Collections.emptyMap();
     }
 
     /**
      * @param depTops Deployment topologies.
      */
-    public void deploymentTopologies(@NotNull Map<IgniteUuid, Map<UUID, 
Integer>> depTops) {
-        this.depTops = Collections.unmodifiableMap(new HashMap<>(depTops));
+    public void deploymentTopologies(@NotNull Map<IgniteUuid, ServiceTopology> 
depTops) {
+        this.depTops = Collections.unmodifiableMap(depTops);
     }
 
     /**
@@ -140,6 +139,6 @@ public class ServiceDeploymentActions {
      * @param depErrors Deployment errors.
      */
     public void deploymentErrors(@NotNull Map<IgniteUuid, 
Collection<Throwable>> depErrors) {
-        this.depErrors = Collections.unmodifiableMap(new HashMap<>(depErrors));
+        this.depErrors = Collections.unmodifiableMap(depErrors);
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
index ede3217390b..ad654af7ab5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.service;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -79,6 +80,9 @@ class ServiceDeploymentTask {
     @GridToStringInclude
     private final Set<UUID> remaining = new HashSet<>();
 
+    /** Nodes that did not respond with single message because they left the 
cluster during distributed process. */
+    private final Set<UUID> failedToReply = new HashSet<>();
+
     /** Added in deployment queue flag. */
     private final AtomicBoolean addedInQueue = new AtomicBoolean(false);
 
@@ -219,8 +223,13 @@ class ServiceDeploymentTask {
 
                     if (evtType == EVT_NODE_LEFT || evtType == 
EVT_NODE_FAILED) {
                         deployedServices.forEach((srvcId, desc) -> {
-                            if 
(desc.topologySnapshot().containsKey(evtNode.id()) ||
-                                (desc.cacheName() != null && 
!evtNode.isClient())) // If affinity service
+                            ServiceTopology top = desc.serviceTopology();
+
+                            if (
+                                top.isTransitional() ||
+                                top.containsNode(evtNode.id()) ||
+                                desc.cacheName() != null && 
!evtNode.isClient() // If affinity service
+                            )
                                 toDeploy.put(srvcId, desc);
                         });
                     }
@@ -342,8 +351,13 @@ class ServiceDeploymentTask {
 
             try {
                 for (ClusterNode node : ctx.discovery().nodes(topVer)) {
-                    if (ctx.discovery().alive(node) && 
!singleDepsMsgs.containsKey(node.id()))
+                    if (singleDepsMsgs.containsKey(node.id()))
+                        continue;
+
+                    if (ctx.discovery().alive(node))
                         remaining.add(node.id());
+                    else
+                        failedToReply.add(node.id());
                 }
             }
             catch (Exception e) {
@@ -462,7 +476,7 @@ class ServiceDeploymentTask {
 
                     assert depResults != null : "Services deployment actions 
should be attached.";
 
-                    final Map<IgniteUuid, Map<UUID, Integer>> fullTops = 
depResults.deploymentTopologies();
+                    final Map<IgniteUuid, ServiceTopology> fullTops = 
depResults.deploymentTopologies();
                     final Map<IgniteUuid, Collection<Throwable>> fullErrors = 
depResults.deploymentErrors();
 
                     depActions.deploymentTopologies(fullTops);
@@ -473,7 +487,7 @@ class ServiceDeploymentTask {
                     final Map<IgniteUuid, ServiceInfo> services = 
srvcProc.deployedServices();
 
                     fullTops.forEach((srvcId, top) -> {
-                        Integer expCnt = top.getOrDefault(ctx.localNodeId(), 
0);
+                        Integer expCnt = 
top.snapshot().getOrDefault(ctx.localNodeId(), 0);
 
                         if (expCnt < srvcProc.localInstancesCount(srvcId)) { 
// Undeploy exceed instances
                             ServiceInfo desc = services.get(srvcId);
@@ -483,7 +497,7 @@ class ServiceDeploymentTask {
                             ServiceConfiguration cfg = desc.configuration();
 
                             try {
-                                srvcProc.redeploy(srvcId, cfg, top);
+                                srvcProc.redeploy(srvcId, cfg, top.snapshot());
                             }
                             catch (IgniteCheckedException e) {
                                 log.error("Error occured during cancel exceed 
service instances: " +
@@ -649,15 +663,41 @@ class ServiceDeploymentTask {
 
         final Collection<ServiceClusterDeploymentResult> fullResults = new 
ArrayList<>();
 
+        Set<IgniteUuid> transitionalSrvcTops = collectTransitionalTopologies();
+
         singleResults.forEach((srvcId, dep) -> {
             ServiceClusterDeploymentResult res = new 
ServiceClusterDeploymentResult(srvcId, dep);
 
+            if (transitionalSrvcTops.contains(srvcId))
+                res.markServiceTopologyTransitional();
+
             fullResults.add(res);
         });
 
         return fullResults;
     }
 
+    /**
+     * Nodes may leave the cluster while the service topology is being 
recalculated. In this case, the resulting service
+     * topology may be incomplete. We consider the mentioned service topology 
transitional and expect it to be recalculated
+     * soon.
+     */
+    private Set<IgniteUuid> collectTransitionalTopologies() {
+        if (failedToReply.isEmpty())
+            return Collections.emptySet();
+
+        Set<IgniteUuid> res = new HashSet<>();
+
+        for (UUID nodeId : failedToReply) {
+            expDeps.forEach((srvcId, top) -> {
+                if (top.containsKey(nodeId))
+                    res.add(srvcId);
+            });
+        }
+
+        return res;
+    }
+
     /**
      * Handles a node leaves topology.
      *
@@ -688,10 +728,14 @@ class ServiceDeploymentTask {
                 synchronized (initCrdMux) {
                     boolean rmvd = remaining.remove(nodeId);
 
-                    if (rmvd && remaining.isEmpty()) {
-                        singleDepsMsgs.remove(nodeId);
+                    if (rmvd) {
+                        failedToReply.add(nodeId);
 
-                        onAllReceived();
+                        if (remaining.isEmpty()) {
+                            singleDepsMsgs.remove(nodeId);
+
+                            onAllReceived();
+                        }
                     }
                 }
             }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java
index 1d750d13e11..59b1f0903f8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.service;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteException;
@@ -31,6 +30,8 @@ import org.apache.ignite.services.ServiceDescriptor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.internal.processors.service.ServiceTopology.EMPTY;
+
 /**
  * Service's information container.
  */
@@ -55,7 +56,7 @@ public class ServiceInfo implements ServiceDescriptor {
 
     /** Topology snapshot. */
     @GridToStringInclude
-    private volatile Map<UUID, Integer> top;
+    private volatile ServiceTopology top = EMPTY;
 
     /** Service class. */
     private transient volatile Class<? extends Service> srvcCls;
@@ -97,10 +98,15 @@ public class ServiceInfo implements ServiceDescriptor {
      *
      * @param top Topology snapshot.
      */
-    public void topologySnapshot(@NotNull Map<UUID, Integer> top) {
+    public void updateServiceTopology(@NotNull ServiceTopology top) {
         this.top = top;
     }
 
+    /** @return Service Topology. */
+    public ServiceTopology serviceTopology() {
+        return top;
+    }
+
     /**
      * Returns service's configuration.
      *
@@ -118,7 +124,7 @@ public class ServiceInfo implements ServiceDescriptor {
     }
 
     /**
-     * Rerurns services id.
+     * Returns services id.
      *
      * @return Service id.
      */
@@ -186,16 +192,7 @@ public class ServiceInfo implements ServiceDescriptor {
 
     /** {@inheritDoc} */
     @Override public Map<UUID, Integer> topologySnapshot() {
-        return top == null ? Collections.emptyMap() : 
Collections.unmodifiableMap(top);
-    }
-
-    /**
-     * Whether service topology was initialized.
-     *
-     * @return {@code True} if service topology was initialized.
-     */
-    public boolean topologyInitialized() {
-        return top != null;
+        return top.snapshot();
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceTopology.java
new file mode 100644
index 00000000000..a4e9b9c229f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceTopology.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** */
+public class ServiceTopology implements Serializable {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Empty service topology instance. */
+    public static final ServiceTopology EMPTY = new ServiceTopology();
+
+    /** Topology snapshot. */
+    @GridToStringInclude
+    private final Map<UUID, Integer> snapshot;
+
+    /**
+     * Whether topology is transitional. Nodes may leave the cluster while the 
service topology is being recalculated.
+     * In this case, the resulting service topology may be incomplete. We 
consider the mentioned service topology
+     * transitional and expect it to be recalculated soon.
+     */
+    @GridToStringInclude
+    private final boolean isTransitional;
+
+    /** */
+    private ServiceTopology() {
+        snapshot = Collections.emptyMap();
+        isTransitional = true;
+    }
+
+    /** */
+    public ServiceTopology(Map<UUID, Integer> topSnapshot) {
+        this(topSnapshot, false);
+    }
+
+    /**
+     * @param snapshot Service topology snapshot.
+     * @param isTransitional Whether topology is transitional. Nodes may leave 
the cluster while the service topology is being recalculated.
+     * In this case, the resulting service topology may be incomplete. We 
consider the mentioned service topology
+     * transitional and expect it to be recalculated soon.
+     */
+    public ServiceTopology(Map<UUID, Integer> snapshot, boolean 
isTransitional) {
+        this.snapshot = Collections.unmodifiableMap(snapshot);
+        this.isTransitional = isTransitional;
+    }
+
+    /** */
+    public Map<UUID, Integer> snapshot() {
+        return snapshot;
+    }
+
+    /** */
+    public boolean isTransitional() {
+        return isTransitional;
+    }
+
+    /** */
+    public boolean containsNode(UUID nodeId) {
+        return snapshot.containsKey(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceTopology.class, this);
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
index 6b25e9ff5d9..54365c2b510 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
@@ -30,7 +30,9 @@ public interface IgniteDiscoverySpiInternalListener {
      * @param locNode Local node.
      * @param log Log.
      */
-    public void beforeJoin(ClusterNode locNode, IgniteLogger log);
+    public default void beforeJoin(ClusterNode locNode, IgniteLogger log) {
+        // No-op.
+    }
 
     /**
      * @param locNode Local node.
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceInfoSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceInfoSelfTest.java
index a67980f89d2..208cda4072f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceInfoSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceInfoSelfTest.java
@@ -89,14 +89,14 @@ public class ServiceInfoSelfTest {
      * Tests {@link ServiceInfo#topologySnapshot()}.
      */
     @Test
-    public void testTopologySnapshotEquality() {
+    public void testServiceTopologyEquality() {
         assertEquals(new HashMap<>(), sut.topologySnapshot());
 
         HashMap<UUID, Integer> top = new HashMap<>();
 
         top.put(nodeId, 5);
 
-        sut.topologySnapshot(top);
+        sut.updateServiceTopology(new ServiceTopology(top));
 
         assertEquals(top, sut.topologySnapshot());
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceRedeploymentOnNodeLeftTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceRedeploymentOnNodeLeftTest.java
new file mode 100644
index 00000000000..bc9a5db21f1
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceRedeploymentOnNodeLeftTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import 
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+
+/** */
+public class ServiceRedeploymentOnNodeLeftTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+        cfg.setDiscoverySpi(new TestTcpDiscoverySpi()
+            
.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** */
+    @Test
+    public void testReassignedServiceRedeployment() throws Exception {
+        startGrids(4);
+
+        assignServiceToNode(2);
+
+        grid(0).services().deploy(new ServiceConfiguration()
+            .setName("service")
+            .setService(new TestService())
+            .setTotalCount(1)
+            .setNodeFilter(new TestNodeFilter()));
+
+        assertEquals("test", grid(3).services().serviceProxy("service", 
Supplier.class, false, 5_000).get());
+
+        assignServiceToNode(1);
+
+        spi(grid(1)).blockMessages((n, msg) -> msg instanceof 
ServiceSingleNodeDeploymentResultBatch);
+
+        interceptDiscoveryMessage(0, 
ServiceClusterDeploymentResultBatch.class, () -> {
+            assignServiceToNode(3);
+
+            return true; // Proceed with message sending.
+        });
+
+        stopGrid(2);
+
+        spi(grid(1)).waitForBlocked();
+
+        stopGrid(1);
+
+        assertEquals("test", grid(3).services().serviceProxy("service", 
Supplier.class, false, 5_000).get());
+    }
+
+    /** */
+    @Test
+    public void testCoordinatorChangeReassignedServiceRedeployment() throws 
Exception {
+        startGrids(5);
+
+        assignServiceToNode(2);
+
+        grid(0).services().deploy(new ServiceConfiguration()
+            .setName("service")
+            .setService(new TestService())
+            .setTotalCount(1)
+            .setNodeFilter(new TestNodeFilter()));
+
+        assertEquals("test", grid(3).services().serviceProxy("service", 
Supplier.class, false, 5_000).get());
+
+        assignServiceToNode(1);
+
+        spi(grid(1)).blockMessages((n, msg) -> msg instanceof 
ServiceSingleNodeDeploymentResultBatch);
+
+        CountDownLatch allSingleMsgReceivedOnCoordinatorLatch = new 
CountDownLatch(1);
+
+        interceptDiscoveryMessage(0, 
ServiceClusterDeploymentResultBatch.class, () -> {
+            assignServiceToNode(4);
+
+            allSingleMsgReceivedOnCoordinatorLatch.countDown();
+
+            return false; // Block message sending.
+        });
+
+        stopGrid(2);
+
+        spi(grid(1)).waitForBlocked();
+
+        stopGrid(1);
+
+        
assertTrue(allSingleMsgReceivedOnCoordinatorLatch.await(getTestTimeout(), 
TimeUnit.MILLISECONDS));
+
+        stopGrid(0);
+
+        assertEquals("test", grid(3).services().serviceProxy("service", 
Supplier.class, false, 5_000).get());
+    }
+
+    /** */
+    private void invokeOnDiscoveryMessage(int nodeIdx, Class<?> msgCls, 
Runnable action) {
+        interceptDiscoveryMessage(nodeIdx, msgCls, () -> {
+            action.run();
+
+            return true;
+        });
+    }
+
+    /** */
+    private void interceptDiscoveryMessage(int nodeIdx, Class<?> msgCls, 
Supplier<Boolean> interceptor) {
+        TestTcpDiscoverySpi discoSpi = 
(TestTcpDiscoverySpi)grid(nodeIdx).configuration().getDiscoverySpi();
+
+        discoSpi.setInternalListener(new IgniteDiscoverySpiInternalListener() {
+            @Override public boolean beforeSendCustomEvent(DiscoverySpi spi, 
IgniteLogger log, DiscoverySpiCustomMessage msg) {
+                if (msgCls.isAssignableFrom(msg.getClass())) {
+                    try {
+                        return interceptor.get();
+                    }
+                    finally {
+                        discoSpi.setInternalListener(null);
+                    }
+                }
+
+                return true;
+            }
+        });
+    }
+
+    /** Service reassignment is not deterministic - it use random to choose 
service holder among nodes. */
+    private void assignServiceToNode(int idx) {
+        TestNodeFilter.serviceHolderNodeId = grid(idx).context().localNodeId();
+    }
+
+    /** */
+    private static class TestNodeFilter implements 
IgnitePredicate<ClusterNode> {
+        /** */
+        static UUID serviceHolderNodeId;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            return node.id().equals(serviceHolderNodeId);
+        }
+    }
+
+    /** */
+    private static class TestService implements Supplier<String>, Service {
+        /** {@inheritDoc} */
+        @Override public String get() {
+            return "test";
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
index 44599dc4a30..f7357852e46 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
@@ -385,7 +385,12 @@ public abstract class IgniteConfigVariationsAbstractTest 
extends GridCommonAbstr
     private VariationsTestsConfig dummyCfg() {
         return new VariationsTestsConfig(
             new ConfigVariationsFactory(null, new int[] {0}, 
ConfigVariations.cacheBasicSet(), new int[] {0}),
-            "Dummy config", false, null, 1, false);
+            "Dummy config",
+            true,
+            null,
+            1,
+            false
+        );
     }
 
     /**
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java
index 514efaa7681..034c8ff5c86 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java
@@ -62,6 +62,7 @@ import 
org.apache.ignite.internal.processors.service.ServiceHotRedeploymentViaDe
 import org.apache.ignite.internal.processors.service.ServiceInfoSelfTest;
 import 
org.apache.ignite.internal.processors.service.ServicePredicateAccessCacheTest;
 import 
org.apache.ignite.internal.processors.service.ServiceReassignmentFunctionSelfTest;
+import 
org.apache.ignite.internal.processors.service.ServiceRedeploymentOnNodeLeftTest;
 import 
org.apache.ignite.internal.processors.service.SystemCacheNotConfiguredTest;
 import org.apache.ignite.services.ServiceThreadPoolSelfTest;
 import org.apache.ignite.tools.junit.JUnitTeamcityReporter;
@@ -122,7 +123,8 @@ import org.junit.runners.Suite;
     GridServiceClusterReadOnlyModeTest.class,
     IgniteServiceCallContextTest.class,
     GridServiceMetricsTest.class,
-    IgniteServiceCallInterceptorTest.class
+    IgniteServiceCallInterceptorTest.class,
+    ServiceRedeploymentOnNodeLeftTest.class,
 })
 public class IgniteServiceGridTestSuite {
     /** */


Reply via email to