This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 84b760b54c IGNITE-22315 Make raft-client starting only once and only 
with raft-client and replica together (#3956)
84b760b54c is described below

commit 84b760b54cbfe9ef5842b3b470a7e6f4be4ecdba
Author: Mikhail Efremov <jakuten...@gmail.com>
AuthorDate: Tue Jul 9 11:45:44 2024 +0600

    IGNITE-22315 Make raft-client starting only once and only with raft-client 
and replica together (#3956)
---
 .../ignite/internal/index/ItBuildIndexTest.java    |  10 +-
 .../metastorage/server/time/ClusterTime.java       |   5 +
 .../metastorage/server/time/ClusterTimeImpl.java   |  10 +-
 .../PartitionReplicaLifecycleManager.java          |   4 +-
 .../ignite/internal/replicator/ReplicaManager.java |  61 ++--
 .../app/ItIgniteInMemoryNodeRestartTest.java       |   4 +-
 .../rebalance/ItRebalanceDistributedTest.java      | 116 +++++--
 .../internal/table/distributed/TableManager.java   | 346 ++++++++++++---------
 .../distributed/TableManagerRecoveryTest.java      |   3 -
 .../table/distributed/TableManagerTest.java        |   3 -
 .../apache/ignite/distributed/ItTxTestCluster.java |   5 -
 11 files changed, 341 insertions(+), 226 deletions(-)

diff --git 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
index 64ec13d0de..e873d3e895 100644
--- 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
+++ 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.network.NetworkMessage;
 import 
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
 import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
@@ -324,7 +325,14 @@ public class ItBuildIndexTest extends 
BaseSqlIntegrationTest {
                 assertNotNull(indexDescriptor);
 
                 for (int partitionId = 0; partitionId < 
internalTable.partitions(); partitionId++) {
-                    RaftGroupService raftGroupService = 
internalTable.tableRaftService().partitionRaftGroupService(partitionId);
+                    // Excluding partitions on the node outside of replication 
group
+                    // TODO: will be replaced with replica usage in 
https://issues.apache.org/jira/browse/IGNITE-22218
+                    RaftGroupService raftGroupService;
+                    try {
+                        raftGroupService = 
internalTable.tableRaftService().partitionRaftGroupService(partitionId);
+                    } catch (IgniteInternalException e) {
+                        continue;
+                    }
 
                     List<Peer> allPeers = raftGroupService.peers();
 
diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java
index 55d662bbfa..4dee69f665 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java
@@ -34,6 +34,11 @@ public interface ClusterTime {
      */
     long nowLong();
 
+    /**
+     * Returns current safe time.
+     */
+    HybridTimestamp currentSafeTime();
+
     /**
      * Provides the future that is completed when cluster time reaches given 
one. If the time is greater or equal
      * then the given one, returns completed future.
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java
index af74dcde63..9277d40bf0 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
 
 /**
  * Cluster time implementation with additional methods to adjust time and 
update safe time.
@@ -138,6 +137,11 @@ public class ClusterTimeImpl implements ClusterTime, 
MetaStorageMetrics, Manuall
         return clock.nowLong();
     }
 
+    @Override
+    public HybridTimestamp currentSafeTime() {
+        return this.safeTime.current();
+    }
+
     @Override
     public CompletableFuture<Void> waitFor(HybridTimestamp time) {
         return safeTime.waitFor(time);
@@ -233,10 +237,6 @@ public class ClusterTimeImpl implements ClusterTime, 
MetaStorageMetrics, Manuall
 
             IgniteUtils.shutdownAndAwaitTermination(executorService, 10, 
TimeUnit.SECONDS);
         }
-    }
 
-    @TestOnly
-    public HybridTimestamp currentSafeTime() {
-        return this.safeTime.current();
     }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 8f7cd70732..c02d26f89c 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -631,7 +631,7 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
     ) {
         // Update raft client peers and learners according to the actual 
assignments.
         if (replicaMgr.isReplicaStarted(zonePartitionId)) {
-            replicaMgr.getReplica(zonePartitionId).join()
+            replicaMgr.replica(zonePartitionId).join()
                     
.raftClient().updateConfiguration(fromAssignments(stableAssignments));
         }
 
@@ -785,7 +785,7 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
                     ? pendingAssignmentsNodes
                     : RebalanceUtil.union(pendingAssignmentsNodes, 
stableAssignments.nodes());
 
-            
replicaMgr.getReplica(replicaGrpId).join().raftClient().updateConfiguration(fromAssignments(newAssignments));
+            
replicaMgr.replica(replicaGrpId).join().raftClient().updateConfiguration(fromAssignments(newAssignments));
         }, ioExecutor);
     }
 
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index ca37e69f04..fd3d919f64 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -111,6 +111,7 @@ import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
 import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
 import org.apache.ignite.internal.thread.ExecutorChooser;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.thread.PublicApiThreading;
 import org.apache.ignite.internal.thread.ThreadAttributes;
@@ -200,6 +201,8 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
     private final ReplicaStateManager replicaStateManager;
 
+    private final ExecutorService replicasCreationExecutor;
+
     private volatile String localNodeId;
 
     private volatile String localNodeConsistentId;
@@ -331,6 +334,15 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 new LinkedBlockingQueue<>(),
                 NamedThreadFactory.create(nodeName, "replica", LOG)
         );
+
+        replicasCreationExecutor = new ThreadPoolExecutor(
+                threadCount,
+                threadCount,
+                30,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                IgniteThreadFactory.create(nodeName, "replica-manager", LOG, 
STORAGE_READ, STORAGE_WRITE)
+        );
     }
 
     private void onReplicaMessageReceived(NetworkMessage message, ClusterNode 
sender, @Nullable Long correlationId) {
@@ -613,6 +625,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      * @param replicaGrpId Replication group id.
      * @param storageIndexTracker Storage index tracker.
      * @param newConfiguration A configuration for new raft group.
+     *
      * @return Future that promises ready new replica when done.
      */
     public CompletableFuture<Boolean> startReplica(
@@ -752,14 +765,14 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
     ) throws NodeStoppingException {
         LOG.info("Replica is about to start [replicationGroupId={}].", 
replicaGrpId);
 
-        CompletableFuture<Boolean> resultFuture = 
newRaftClientFut.thenAccept(updateTableRaftService)
-                .thenApply((v) -> true);
-
-        CompletableFuture<ReplicaListener> newReplicaListenerFut = 
newRaftClientFut.thenApply(createListener);
-
-        startReplica(replicaGrpId, storageIndexTracker, newReplicaListenerFut);
-
-        return resultFuture;
+        return newRaftClientFut
+                .thenApplyAsync(raftClient -> {
+                    // TODO: will be removed in 
https://issues.apache.org/jira/browse/IGNITE-22218
+                    updateTableRaftService.accept(raftClient);
+                    return createListener.apply(raftClient);
+                }, replicasCreationExecutor)
+                .thenCompose(replicaListener -> startReplica(replicaGrpId, 
storageIndexTracker, completedFuture(replicaListener)))
+                .thenApply(r -> true);
     }
 
     /**
@@ -777,7 +790,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             ReplicationGroupId replicaGrpId,
             PendingComparableValuesTracker<Long, Void> storageIndexTracker,
             CompletableFuture<ReplicaListener> newReplicaListenerFut
-    ) throws NodeStoppingException {
+    ) {
 
         ClusterNode localNode = clusterNetSvc.topologyService().localMember();
 
@@ -820,30 +833,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 .thenCompose(v -> replicaFuture);
     }
 
-    /**
-     * Temporary public method for RAFT-client starting.
-     * TODO: will be removed after 
https://issues.apache.org/jira/browse/IGNITE-22315
-     *
-     * @param replicaGrpId Replication Group ID.
-     * @param newConfiguration Peers and learners nodes for a raft group.
-     * @param raftClientCache Temporal supplier that returns RAFT-client from 
TableRaftService if it's already exists and was put into the
-     *      service's map.
-     * @return Future that returns started RAFT-client.
-     * @throws NodeStoppingException In case if node was stopping.
-     */
-    @Deprecated
-    public CompletableFuture<TopologyAwareRaftGroupService> startRaftClient(
-            ReplicationGroupId replicaGrpId,
-            PeersAndLearners newConfiguration,
-            Supplier<RaftGroupService> raftClientCache)
-            throws NodeStoppingException {
-        RaftGroupService cachedRaftClient = raftClientCache.get();
-        return cachedRaftClient != null
-                ? completedFuture((TopologyAwareRaftGroupService) 
cachedRaftClient)
-                // TODO IGNITE-19614 This procedure takes 10 seconds if 
there's no majority online.
-                : raftManager.startRaftGroupService(replicaGrpId, 
newConfiguration, raftGroupServiceFactory, raftCommandsMarshaller);
-    }
-
     /**
      * Returns future with a replica if it was created or null if there no any 
replicas starting with given identifier.
      *
@@ -1021,6 +1010,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
         shutdownAndAwaitTermination(scheduledIdleSafeTimeSyncExecutor, 
shutdownTimeoutSeconds, TimeUnit.SECONDS);
         shutdownAndAwaitTermination(executor, shutdownTimeoutSeconds, 
TimeUnit.SECONDS);
+        shutdownAndAwaitTermination(replicasCreationExecutor, 
shutdownTimeoutSeconds, TimeUnit.SECONDS);
 
         assert replicas.values().stream().noneMatch(CompletableFuture::isDone)
                 : "There are replicas alive [replicas="
@@ -1219,11 +1209,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         return replicas.containsKey(replicaGrpId);
     }
 
-    @TestOnly
-    public CompletableFuture<Replica> getReplica(ReplicationGroupId 
replicationGroupId) {
-        return replicas.get(replicationGroupId);
-    }
-
     /**
      * Returns started replication groups.
      *
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index 378b2cb681..4e479fbe90 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -219,7 +219,9 @@ public class ItIgniteInMemoryNodeRestartTest extends 
BaseIgniteRestartTest {
 
         List<String> partitionAssignments = assignments.get(0);
 
-        return partitionAssignments.contains(restartingNodeConsistentId);
+        return !assignments.isEmpty()
+                && partitionAssignments != null
+                && partitionAssignments.contains(restartingNodeConsistentId);
     }
 
     private static boolean isRaftNodeStarted(TableViewInternal table, Loza 
loza) {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index b48222de3c..8825e5ddf0 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -120,6 +120,7 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import 
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
+import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.failure.NoOpFailureProcessor;
 import org.apache.ignite.internal.hlc.ClockService;
@@ -160,12 +161,14 @@ import org.apache.ignite.internal.raft.RaftNodeId;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
 import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
 import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
 import org.apache.ignite.internal.rest.configuration.RestConfiguration;
@@ -707,19 +710,23 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
         ));
 
         // Check that raft clients on all nodes were updated with the new list 
of peers.
+        Predicate<Node> isNodeInReplicationGroup = n -> isNodeInAssignments(n, 
newAssignment);
         assertTrue(waitForCondition(
-                () -> nodes.stream().allMatch(n ->
-                        n.tableManager
-                                .startedTables()
-                                .get(getTableId(node, TABLE_NAME))
-                                .internalTable()
-                                .tableRaftService()
-                                .partitionRaftGroupService(0)
-                                .peers()
-                                .equals(List.of(new 
Peer(newNodeNameForAssignment)))),
+                () -> nodes.stream()
+                        .filter(isNodeInReplicationGroup)
+                        .allMatch(n -> isNodeUpdatesPeersOnGroupService(node, 
assignmentsToPeersSet(newAssignment))),
                 (long) AWAIT_TIMEOUT_MILLIS * nodes.size()
         ));
 
+        // Checks that there no any replicas outside replication group
+        var replGrpId = new TablePartitionId(getTableId(node, TABLE_NAME), 0);
+        Predicate<Node> isNodeOutsideReplicationGroup = n -> 
!isNodeInAssignments(n, newAssignment);
+        assertTrue(waitForCondition(
+                () -> nodes.stream()
+                        .filter(isNodeOutsideReplicationGroup)
+                        .noneMatch(n -> isReplicationGroupStarted(n, 
replGrpId)),
+                (long) AWAIT_TIMEOUT_MILLIS * nodes.size()
+        ));
     }
 
 
@@ -733,13 +740,14 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         waitPartitionAssignmentsSyncedToExpected(0, 1);
 
-        String assignmentsBeforeRebalance = getPartitionClusterNodes(node, 
0).stream()
+        var assignmentsBeforeRebalance = getPartitionClusterNodes(node, 0);
+        String nodeNameAssignedBeforeRebalance = 
assignmentsBeforeRebalance.stream()
                 .findFirst()
                 .orElseThrow()
                 .consistentId();
 
         String newNodeNameForAssignment = nodes.stream()
-                .filter(n -> 
!assignmentsBeforeRebalance.equals(n.clusterService.nodeName()))
+                .filter(n -> 
!nodeNameAssignedBeforeRebalance.equals(n.clusterService.nodeName()))
                 .findFirst()
                 .orElseThrow()
                 .name;
@@ -762,25 +770,65 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         node.metaStorageManager.put(partAssignmentsPendingKey, 
bytesPendingAssignments).get(AWAIT_TIMEOUT_MILLIS, MILLISECONDS);
 
+        Set<Assignment> union = 
RebalanceUtil.union(assignmentsBeforeRebalance, newAssignment);
+
         // Check that raft clients on all nodes were updated with the new list 
of peers.
+        Predicate<Node> isNodeInReplicationGroup = n -> isNodeInAssignments(n, 
union);
         assertTrue(waitForCondition(
-                () -> nodes.stream().allMatch(n ->
-                        n.tableManager
-                                .startedTables()
-                                .get(getTableId(node, TABLE_NAME))
-                                .internalTable()
-                                .tableRaftService()
-                                .partitionRaftGroupService(0)
-                                .peers()
-                                .stream()
-                                .collect(toSet())
-                                .equals(Set.of(new 
Peer(newNodeNameForAssignment), new Peer(assignmentsBeforeRebalance)))),
+                () -> nodes.stream()
+                        .filter(isNodeInReplicationGroup)
+                        .allMatch(n -> isNodeUpdatesPeersOnGroupService(node, 
assignmentsToPeersSet(union))),
+                (long) AWAIT_TIMEOUT_MILLIS * nodes.size()
+        ));
+
+        // Checks that there no any replicas outside replication group
+        Predicate<Node> isNodeOutsideReplicationGroup = n -> 
!isNodeInAssignments(n, union);
+        assertTrue(waitForCondition(
+                () -> nodes.stream()
+                        .filter(isNodeOutsideReplicationGroup)
+                        .noneMatch(n -> isReplicationGroupStarted(n, partId)),
                 (long) AWAIT_TIMEOUT_MILLIS * nodes.size()
         ));
 
         dropMessages.set(false);
     }
 
+    private static Set<Peer> assignmentsToPeersSet(Set<Assignment> 
assignments) {
+        return assignments.stream()
+                .map(Assignment::consistentId)
+                .map(Peer::new)
+                .collect(toSet());
+    }
+
+    private static boolean isNodeInAssignments(Node node, Set<Assignment> 
assignments) {
+        return assignmentsToPeersSet(assignments).stream()
+                .map(Peer::consistentId)
+                .anyMatch(id -> id.equals(node.clusterService.nodeName()));
+    }
+
+    private static boolean isReplicationGroupStarted(Node node, 
ReplicationGroupId replicationGroupId) {
+        return node.replicaManager.isReplicaStarted(replicationGroupId);
+    }
+
+    private static boolean isNodeUpdatesPeersOnGroupService(Node node, 
Set<Peer> desiredPeers) {
+        // TODO: will be replaced with replica usage in 
https://issues.apache.org/jira/browse/IGNITE-22218
+        TableRaftService tblRaftSvc = node.tableManager.startedTables()
+                                .get(getTableId(node, TABLE_NAME))
+                                .internalTable()
+                .tableRaftService();
+        RaftGroupService groupService;
+        try {
+            groupService = tblRaftSvc.partitionRaftGroupService(0);
+        } catch (IgniteInternalException e) {
+            return false;
+        }
+        List<Peer> peersList = groupService.peers();
+        boolean isUpdated = peersList.stream()
+                                .collect(toSet())
+                .equals(desiredPeers);
+        return isUpdated;
+    }
+
     private void clearSpyInvocations() {
         for (int i = 0; i < NODE_COUNT; i++) {
             clearInvocations(getNode(i).raftManager);
@@ -846,16 +894,26 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                 (long) AWAIT_TIMEOUT_MILLIS * nodes.size()
         ));
 
+        Node anyNode = nodes.get(0);
+        Set<Assignment> assignments = getPartitionClusterNodes(anyNode, 
tableName, replicasNum);
         assertTrue(waitForCondition(
                 () -> {
                     try {
-                        return nodes.stream().allMatch(n ->
-                                n.tableManager
-                                        .cachedTable(getTableId(n, tableName))
-                                        .internalTable()
-                                        .tableRaftService()
-                                        .partitionRaftGroupService(partNum) != 
null
-                        );
+                        return nodes.stream()
+                                .filter(n -> isNodeInAssignments(n, 
assignments))
+                                .allMatch(n -> {
+                                    // TODO: will be replaced with replica 
usage in https://issues.apache.org/jira/browse/IGNITE-22218
+                                    TableRaftService trs = n.tableManager
+                                            .cachedTable(getTableId(n, 
tableName))
+                                            .internalTable()
+                                            .tableRaftService();
+
+                                    try {
+                                        return 
trs.partitionRaftGroupService(partNum) != null;
+                                    } catch (IgniteInternalException e) {
+                                        return false;
+                                    }
+                                });
                     } catch (IgniteInternalException e) {
                         // Raft group service not found.
                         return false;
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 13406d7f77..9bad9f3241 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -41,6 +41,7 @@ import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUt
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignmentsGetLocally;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.subtract;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignmentsGetLocally;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tablesCounterKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.union;
@@ -99,6 +100,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.IntSupplier;
 import java.util.function.LongFunction;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -152,7 +154,6 @@ import 
org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
-import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -161,6 +162,7 @@ import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import 
org.apache.ignite.internal.replicator.ReplicaManager.WeakReplicaStopReason;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.schema.SchemaManager;
@@ -413,6 +415,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
     private final IndexMetaStorage indexMetaStorage;
 
+    private final Predicate<Assignment> isLocalNodeAssignment = assignment -> 
assignment.consistentId().equals(localNode().name());
+
     /**
      * Creates a new table manager.
      *
@@ -646,7 +650,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         startVv.update(recoveryRevision, (v, e) -> handleAssignmentsOnRecovery(
                 stableAssignmentsPrefix,
                 recoveryRevision,
-                (entry, rev) -> handleChangeStableAssignmentEvent(entry, rev, 
true),
+                (entry, rev) ->  handleChangeStableAssignmentEvent(entry, rev, 
true),
                 "stable"
         ));
         startVv.update(recoveryRevision, (v, e) -> handleAssignmentsOnRecovery(
@@ -889,6 +893,11 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
             int zoneId,
             boolean isRecovery
     ) {
+        if (localMemberAssignment == null) {
+            // (0) in case if node not in the assignments
+            return nullCompletedFuture();
+        }
+
         int tableId = table.tableId();
 
         var internalTbl = (InternalTableImpl) table.internalTable();
@@ -902,137 +911,103 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 .tableRaftService()
                 .updateInternalTableRaftGroupService(partId, raftClient);
 
-        CompletableFuture<Boolean> startGroupFut;
-
-        if (localMemberAssignment != null) {
-            CompletableFuture<Boolean> shouldStartGroupFut = isRecovery
-                    ? 
partitionReplicatorNodeRecovery.initiateGroupReentryIfNeeded(
-                            replicaGrpId,
-                            internalTbl,
-                            stablePeersAndLearners,
-                            localMemberAssignment
-                    )
-                    : trueCompletedFuture();
-
-            Assignments forcedAssignments = stableAssignments.force() ? 
stableAssignments : null;
-
-            startGroupFut = replicaMgr.weakStartReplica(
-                    replicaGrpId,
-                    () -> shouldStartGroupFut.thenComposeAsync(startGroup -> 
inBusyLock(busyLock, () -> {
-                        // (1) if 
partitionReplicatorNodeRecovery#shouldStartGroup fails -> do start nothing
-                        if (!startGroup) {
-                            return falseCompletedFuture();
-                        }
+        CompletableFuture<Boolean> shouldStartGroupFut = isRecovery
+                ? partitionReplicatorNodeRecovery.initiateGroupReentryIfNeeded(
+                        replicaGrpId,
+                        internalTbl,
+                        stablePeersAndLearners,
+                        localMemberAssignment
+                )
+                : trueCompletedFuture();
 
-                        // (2) Otherwise let's start replica manually
-                        var safeTimeTracker = new 
PendingComparableValuesTracker<HybridTimestamp, 
Void>(HybridTimestamp.MIN_VALUE);
+        Assignments forcedAssignments = stableAssignments.force() ? 
stableAssignments : null;
 
-                        var storageIndexTracker = new 
PendingComparableValuesTracker<Long, Void>(0L);
+        Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> 
shouldStartGroupFut
+                .thenComposeAsync(startGroup -> inBusyLock(busyLock, () -> {
+                    // (1) if partitionReplicatorNodeRecovery#shouldStartGroup 
fails -> do start nothing
+                    if (!startGroup) {
+                        return falseCompletedFuture();
+                    }
 
-                        PartitionStorages partitionStorages = 
getPartitionStorages(table, partId);
+                    // (2) Otherwise let's start replica manually
+                    var safeTimeTracker = new 
PendingComparableValuesTracker<HybridTimestamp, 
Void>(HybridTimestamp.MIN_VALUE);
 
-                        PartitionDataStorage partitionDataStorage = 
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
-                                internalTbl, partId);
+                    var storageIndexTracker = new 
PendingComparableValuesTracker<Long, Void>(0L);
 
-                        
storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null);
+                    PartitionStorages partitionStorages = 
getPartitionStorages(table, partId);
 
-                        PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
-                                partId,
-                                partitionDataStorage,
-                                table,
-                                safeTimeTracker,
-                                storageUpdateConfig
-                        );
+                    PartitionDataStorage partitionDataStorage = 
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
+                            internalTbl, partId);
 
-                        internalTbl.updatePartitionTrackers(partId, 
safeTimeTracker, storageIndexTracker);
+                    
storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null);
 
-                        mvGc.addStorage(replicaGrpId, 
partitionUpdateHandlers.gcUpdateHandler);
+                    PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
+                            partId,
+                            partitionDataStorage,
+                            table,
+                            safeTimeTracker,
+                            storageUpdateConfig
+                    );
 
-                        RaftGroupListener raftGroupListener = new 
PartitionListener(
-                                txManager,
-                                partitionDataStorage,
-                                partitionUpdateHandlers.storageUpdateHandler,
-                                partitionStorages.getTxStateStorage(),
-                                safeTimeTracker,
-                                storageIndexTracker,
-                                catalogService,
-                                table.schemaView(),
-                                clockService,
-                                indexMetaStorage
-                        );
+                    internalTbl.updatePartitionTrackers(partId, 
safeTimeTracker, storageIndexTracker);
+
+                    mvGc.addStorage(replicaGrpId, 
partitionUpdateHandlers.gcUpdateHandler);
+
+                    RaftGroupListener raftGroupListener = new 
PartitionListener(
+                            txManager,
+                            partitionDataStorage,
+                            partitionUpdateHandlers.storageUpdateHandler,
+                            partitionStorages.getTxStateStorage(),
+                            safeTimeTracker,
+                            storageIndexTracker,
+                            catalogService,
+                            table.schemaView(),
+                            clockService,
+                            indexMetaStorage
+                    );
 
-                        SnapshotStorageFactory snapshotStorageFactory = 
createSnapshotStorageFactory(replicaGrpId,
-                                partitionUpdateHandlers, internalTbl);
+                    SnapshotStorageFactory snapshotStorageFactory = 
createSnapshotStorageFactory(replicaGrpId,
+                            partitionUpdateHandlers, internalTbl);
 
-                        Function<RaftGroupService, ReplicaListener> 
createListener = (raftClient) -> createReplicaListener(
-                                replicaGrpId,
-                                table,
-                                safeTimeTracker,
-                                partitionStorages.getMvPartitionStorage(),
-                                partitionStorages.getTxStateStorage(),
-                                partitionUpdateHandlers,
-                                raftClient);
-
-                        RaftGroupEventsListener raftGroupEventsListener = 
createRaftGroupEventsListener(zoneId, replicaGrpId);
-
-                        MvTableStorage mvTableStorage = internalTbl.storage();
-
-                        try {
-                            return replicaMgr.startReplica(
-                                    raftGroupEventsListener,
-                                    raftGroupListener,
-                                    mvTableStorage.isVolatile(),
-                                    snapshotStorageFactory,
-                                    updateTableRaftService,
-                                    createListener,
-                                    storageIndexTracker,
-                                    replicaGrpId,
-                                    stablePeersAndLearners);
-                        } catch (NodeStoppingException e) {
-                            throw new AssertionError("Loza was stopped before 
Table manager", e);
-                        }
-                    }), ioExecutor),
-                    forcedAssignments
-            );
-        } else {
-            // TODO: will be removed after 
https://issues.apache.org/jira/browse/IGNITE-22315
-            // (4) in case if node not in the assignments
-            startGroupFut = falseCompletedFuture();
-        }
+                    Function<RaftGroupService, ReplicaListener> createListener 
= (raftClient) -> createReplicaListener(
+                            replicaGrpId,
+                            table,
+                            safeTimeTracker,
+                            partitionStorages.getMvPartitionStorage(),
+                            partitionStorages.getTxStateStorage(),
+                            partitionUpdateHandlers,
+                            raftClient);
 
-        return startGroupFut
-                // TODO: the stage will be removed after 
https://issues.apache.org/jira/browse/IGNITE-22315
-                .thenComposeAsync(isReplicaStarted -> inBusyLock(busyLock, () 
-> {
-                    if (isReplicaStarted) {
-                        return nullCompletedFuture();
-                    }
+                    RaftGroupEventsListener raftGroupEventsListener = 
createRaftGroupEventsListener(zoneId, replicaGrpId);
 
-                    // TODO: will be removed in 
https://issues.apache.org/jira/browse/IGNITE-22315
-                    Supplier<RaftGroupService> getCachedRaftClient = () -> {
-                        try {
-                            // Return existing service if it's already started.
-                            return internalTbl
-                                    .tableRaftService()
-                                    
.partitionRaftGroupService(replicaGrpId.partitionId());
-                        } catch (IgniteInternalException e) {
-                            // We use "IgniteInternalException" in accordance 
with the javadoc of "partitionRaftGroupService" method.
-                            return null;
-                        }
-                    };
+                    MvTableStorage mvTableStorage = internalTbl.storage();
 
-                    CompletableFuture<TopologyAwareRaftGroupService> 
newRaftClientFut;
                     try {
-                        newRaftClientFut = 
replicaMgr.startRaftClient(replicaGrpId, stablePeersAndLearners, 
getCachedRaftClient);
+                        return replicaMgr.startReplica(
+                                raftGroupEventsListener,
+                                raftGroupListener,
+                                mvTableStorage.isVolatile(),
+                                snapshotStorageFactory,
+                                updateTableRaftService,
+                                createListener,
+                                storageIndexTracker,
+                                replicaGrpId,
+                                stablePeersAndLearners);
                     } catch (NodeStoppingException e) {
-                        throw new CompletionException(e);
+                        throw new AssertionError("Loza was stopped before 
Table manager", e);
                     }
-                    return newRaftClientFut.thenAccept(updateTableRaftService);
-                }), ioExecutor)
-                .whenComplete((res, ex) -> {
-                    if (ex != null) {
-                        LOG.warn("Unable to update raft groups on the node 
[tableId={}, partitionId={}]", ex, tableId, partId);
-                    }
-                });
+                }), ioExecutor);
+
+        return replicaMgr.weakStartReplica(
+                replicaGrpId,
+                startReplicaSupplier,
+                forcedAssignments
+        ).handle((res, ex) -> {
+            if (ex != null) {
+                LOG.warn("Unable to update raft groups on the node 
[tableId={}, partitionId={}]", ex, tableId, partId);
+            }
+            return null;
+        });
     }
 
     @Nullable
@@ -1110,6 +1085,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return peer.consistentId().equals(localNode().name());
     }
 
+    private boolean isLocalNodeInAssignments(Collection<Assignment> 
assignments) {
+        return assignments.stream().anyMatch(isLocalNodeAssignment);
+    }
+
     private PartitionDataStorage partitionDataStorage(MvPartitionStorage 
partitionStorage, InternalTable internalTbl, int partId) {
         return new SnapshotAwarePartitionDataStorage(
                 partitionStorage,
@@ -1141,6 +1120,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
     @Override
     public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
+        // NB: busy lock had already gotten in {@link beforeNodeStop}
         assert beforeStopGuard.get() : "'stop' called before 'beforeNodeStop'";
 
         if (!stopGuard.compareAndSet(false, true)) {
@@ -1793,7 +1773,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                         int catalogVersion = 
catalogService.latestCatalogVersion();
 
                         return 
setTablesPartitionCountersForRebalance(replicaGrpId, revision, 
pendingAssignments.force(), catalogVersion)
-                                .thenCompose(r -> 
handleChangePendingAssignmentEvent(
+                                .thenCompose(v -> 
handleChangePendingAssignmentEvent(
                                         replicaGrpId,
                                         table,
                                         stableAssignments,
@@ -1802,7 +1782,20 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                                         isRecovery,
                                         catalogVersion
                                 ))
-                                .thenCompose(v -> 
changePeersOnRebalance(table, replicaGrpId, pendingAssignments.nodes(), 
revision));
+                                .thenCompose(v -> {
+                                    boolean isLocalNodeInStableOrPending = 
isNodeInReducedStableOrPendingAssignments(
+                                            replicaGrpId,
+                                            stableAssignments,
+                                            pendingAssignments,
+                                            revision
+                                    );
+
+                                    if (!isLocalNodeInStableOrPending) {
+                                        return nullCompletedFuture();
+                                    }
+
+                                    return changePeersOnRebalance(table, 
replicaGrpId, pendingAssignments.nodes(), revision);
+                                });
                     } finally {
                         busyLock.leaveBusy();
                     }
@@ -1886,24 +1879,76 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                     }), ioExecutor);
         } else {
             localServicesStartFuture = runAsync(() -> {
-                if (pendingAssignmentsAreForced && 
replicaMgr.isReplicaStarted(replicaGrpId)) {
+                if (pendingAssignmentsAreForced && localMemberAssignment != 
null) {
+
+                    assert replicaMgr.isReplicaStarted(replicaGrpId) : "The 
local node is outside of the replication group";
+
                     replicaMgr.resetPeers(replicaGrpId, 
fromAssignments(computedStableAssignments.nodes()));
+                } else if (pendingAssignmentsAreForced && 
localMemberAssignment == null) {
+                    assert !replicaMgr.isReplicaStarted(replicaGrpId)
+                            : "The local node is inside of the replication 
group";
                 }
             }, ioExecutor);
         }
 
-        return localServicesStartFuture.thenRunAsync(() -> {
-            // For forced assignments, we exclude dead stable nodes, and all 
alive stable nodes are already in pending assignments.
-            // Union is not required in such a case.
-            Set<Assignment> newAssignments = pendingAssignmentsAreForced || 
stableAssignments == null
-                    ? pendingAssignmentsNodes
-                    : union(pendingAssignmentsNodes, 
stableAssignments.nodes());
+        return localServicesStartFuture
+                .thenComposeAsync(v -> inBusyLock(busyLock, () -> 
isLocalNodeLeaseholder(replicaGrpId)), ioExecutor)
+                .thenAcceptAsync(isLeaseholder -> inBusyLock(busyLock, () -> {
+                    boolean isLocalNodeInStableOrPending = 
isNodeInReducedStableOrPendingAssignments(
+                            replicaGrpId,
+                            stableAssignments,
+                            pendingAssignments,
+                            revision
+                    );
 
-            tbl.internalTable()
-                    .tableRaftService()
-                    .partitionRaftGroupService(partitionId)
-                    .updateConfiguration(fromAssignments(newAssignments));
-        }, ioExecutor);
+                    if (!isLocalNodeInStableOrPending && !isLeaseholder) {
+                        return;
+                    }
+
+                    assert isLocalNodeInStableOrPending || isLeaseholder
+                            : "The local node is outside of the replication 
group [inStableOrPending=" + isLocalNodeInStableOrPending
+                            + ", isLeaseholder=" + isLeaseholder + "].";
+
+                    // For forced assignments, we exclude dead stable nodes, 
and all alive stable nodes are already in pending assignments.
+                    // Union is not required in such a case.
+                    Set<Assignment> newAssignments = 
pendingAssignmentsAreForced || stableAssignments == null
+                            ? pendingAssignmentsNodes
+                            : union(pendingAssignmentsNodes, 
stableAssignments.nodes());
+
+                    tbl.internalTable()
+                            .tableRaftService()
+                            .partitionRaftGroupService(partitionId)
+                            
.updateConfiguration(fromAssignments(newAssignments));
+                }), ioExecutor);
+    }
+
+    private boolean isNodeInReducedStableOrPendingAssignments(
+            TablePartitionId replicaGrpId,
+            @Nullable Assignments stableAssignments,
+            Assignments pendingAssignments,
+            long revision
+    ) {
+        Entry reduceEntry  = 
metaStorageMgr.getLocally(RebalanceUtil.switchReduceKey(replicaGrpId), 
revision);
+
+        Assignments reduceAssignments = reduceEntry != null
+                ? Assignments.fromBytes(reduceEntry.value())
+                : null;
+
+        Set<Assignment> reducedStableAssignments = reduceAssignments != null
+                ? subtract(stableAssignments.nodes(), 
reduceAssignments.nodes())
+                : stableAssignments.nodes();
+
+        if (!isLocalNodeInAssignments(union(reducedStableAssignments, 
pendingAssignments.nodes()))) {
+            return false;
+        }
+
+        assert replicaMgr.isReplicaStarted(replicaGrpId) : "The local node is 
outside of the replication group ["
+                + ", stable=" + stableAssignments
+                + ", pending=" + pendingAssignments
+                + ", reduce=" + reduceAssignments
+                + ", localName=" + localNode().name() + "].";
+
+        return true;
     }
 
     private CompletableFuture<Void> setTablesPartitionCountersForRebalance(
@@ -1966,6 +2011,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     }
 
     private CompletableFuture<Void> changePeersOnRebalance(
+            // TODO: remove excessive argument (used to get raft-client) 
https://issues.apache.org/jira/browse/IGNITE-22218
             TableImpl table,
             TablePartitionId replicaGrpId,
             Set<Assignment> pendingAssignments,
@@ -1999,7 +2045,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                     // run update of raft configuration if this node is a 
leader
                     LOG.info("Current node={} is the leader of partition raft 
group={}. "
                                     + "Initiate rebalance process for 
partition={}, table={}",
-                            leaderWithTerm.leader(), replicaGrpId, partId, 
table.name());
+                            leaderWithTerm.leader(), replicaGrpId, partId, 
tables.get(replicaGrpId.tableId()).name());
 
                     return 
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId))
                             .thenCompose(latestPendingAssignmentsEntry -> {
@@ -2010,8 +2056,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                                     return nullCompletedFuture();
                                 }
 
-                                PeersAndLearners newConfiguration =
-                                        fromAssignments(pendingAssignments);
+                                PeersAndLearners newConfiguration = 
fromAssignments(pendingAssignments);
 
                                 return 
partGrpSvc.changePeersAsync(newConfiguration, leaderWithTerm.term());
                             });
@@ -2244,18 +2289,41 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         }, ioExecutor).thenCompose(identity());
     }
 
+    private CompletableFuture<Boolean> 
isLocalNodeLeaseholder(ReplicationGroupId replicationGroupId) {
+        HybridTimestamp previousMetastoreSafeTime = 
metaStorageMgr.clusterTime()
+                .currentSafeTime()
+                .addPhysicalTime(-clockService.maxClockSkewMillis());
+
+        return 
executorInclinedPlacementDriver.getPrimaryReplica(replicationGroupId, 
previousMetastoreSafeTime)
+                .thenApply(replicaMeta -> replicaMeta != null
+                    && replicaMeta.getLeaseholderId() != null
+                    && 
replicaMeta.getLeaseholderId().equals(localNode().name()));
+    }
+
     private CompletableFuture<Void> updatePartitionClients(
             TablePartitionId tablePartitionId,
             Set<Assignment> stableAssignments,
             long revision
     ) {
-        // Update raft client peers and learners according to the actual 
assignments.
-        return tablesById(revision).thenAccept(t -> {
-            t.get(tablePartitionId.tableId()).internalTable()
+        return 
isLocalNodeLeaseholder(tablePartitionId).thenCompose(isLeaseholder -> 
inBusyLock(busyLock, () -> {
+            boolean isLocalInStable = 
isLocalNodeInAssignments(stableAssignments);
+
+            if (!isLocalInStable && !isLeaseholder) {
+                return nullCompletedFuture();
+            }
+
+            assert replicaMgr.isReplicaStarted(tablePartitionId)
+                    : "The local node is outside of the replication group 
[inStable=" + isLocalInStable
+                            + ", isLeaseholder=" + isLeaseholder + "].";
+
+            // Update raft client peers and learners according to the actual 
assignments.
+            return tablesById(revision).thenAccept(t -> 
t.get(tablePartitionId.tableId())
+                    .internalTable()
                     .tableRaftService()
                     .partitionRaftGroupService(tablePartitionId.partitionId())
-                    .updateConfiguration(fromAssignments(stableAssignments));
-        });
+                    .updateConfiguration(fromAssignments(stableAssignments))
+            );
+        }));
     }
 
     private CompletableFuture<Void> stopAndDestroyPartitionAndUpdateClients(
@@ -2274,7 +2342,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                         ? pendingAssignments.nodes().stream()
                         : Stream.concat(stableAssignments.stream(), 
pendingAssignments.nodes().stream())
                 )
-                .noneMatch(assignment -> 
assignment.consistentId().equals(localNode().name()));
+                .noneMatch(isLocalNodeAssignment);
 
         if (shouldStopLocalServices) {
             return allOf(
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 250f56c51b..a4127f5c7c 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -281,9 +281,6 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
         when(replicaMgr.getLogSyncer()).thenReturn(mock(LogSyncer.class));
         when(replicaMgr.startReplica(any(), any(), any(), any(), 
any(PendingComparableValuesTracker.class), any()))
                 .thenReturn(nullCompletedFuture());
-        // TODO: will be removed after 
https://issues.apache.org/jira/browse/IGNITE-22315
-        when(replicaMgr.startRaftClient(any(), any(), any()))
-                
.thenReturn(completedFuture(mock(TopologyAwareRaftGroupService.class)));
         when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture());
         when(replicaMgr.weakStartReplica(any(), any(), 
any())).thenReturn(trueCompletedFuture());
         when(replicaMgr.weakStopReplica(any(), any(), 
any())).thenReturn(nullCompletedFuture());
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index df39756946..ecb6c2016b 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -282,9 +282,6 @@ public class TableManagerTest extends IgniteAbstractTest {
 
         when(replicaMgr.startReplica(any(), any(), anyBoolean(), any(), any(), 
any(), any(), any(), any()))
                 .thenReturn(trueCompletedFuture());
-        // TODO: will be removed after 
https://issues.apache.org/jira/browse/IGNITE-22315
-        when(replicaMgr.startRaftClient(any(), any(), any()))
-                
.thenReturn(completedFuture(mock(TopologyAwareRaftGroupService.class)));
         when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture());
         when(replicaMgr.weakStartReplica(any(), any(), any())).thenAnswer(inv 
-> {
             Supplier<CompletableFuture<Void>> startOperation = 
inv.getArgument(1);
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 7ec6ac9c5b..f1379fee24 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -32,7 +32,6 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.stopAsync;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
@@ -682,7 +681,6 @@ public class ItTxTestCluster {
                         topologyAwareRaftGroupServiceFactory
                 ).thenAccept(
                         raftSvc -> {
-                            try {
                                 PartitionReplicaListener listener = 
newReplicaListener(
                                         mvPartStorage,
                                         raftSvc,
@@ -713,9 +711,6 @@ public class ItTxTestCluster {
                                         storageIndexTracker,
                                         completedFuture(listener)
                                 );
-                            } catch (NodeStoppingException e) {
-                                fail("Unexpected node stopping", e);
-                            }
                         }
                 );
 

Reply via email to