This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f9b321fee IGNITE-21805 Refactor TableManager and move all RAFT 
related pieces to Replica (#3633)
8f9b321fee is described below

commit 8f9b321fee2848757d8174b1d0c330326d21d485
Author: Mikhail Efremov <jakuten...@gmail.com>
AuthorDate: Thu Jun 6 15:47:36 2024 +0600

    IGNITE-21805 Refactor TableManager and move all RAFT related pieces to 
Replica (#3633)
---
 .../rebalance/PartitionMover.java                  |   9 +-
 .../raft/ExecutorInclinedRaftCommandRunner.java    |   5 +
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |   3 +-
 .../ItPlacementDriverReplicaSideTest.java          |  60 +++-
 .../apache/ignite/internal/replicator/Replica.java |   9 +-
 .../ignite/internal/replicator/ReplicaManager.java | 353 +++++++++++++++-----
 .../replicator/listener/ReplicaListener.java       |   5 +-
 .../replicator/PlacementDriverReplicaSideTest.java |   6 +-
 .../internal/replicator/ReplicaManagerTest.java    |  37 ++-
 .../runner/app/ItIgniteNodeRestartTest.java        |  18 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  26 +-
 .../ignite/distributed/ReplicaUnavailableTest.java |  74 ++++-
 .../ItDisasterRecoveryReconfigurationTest.java     |   9 +-
 .../rebalance/ItRebalanceDistributedTest.java      |  24 +-
 .../internal/table/distributed/TableManager.java   | 370 ++++++++-------------
 .../replicator/PartitionReplicaListener.java       |  10 +
 .../table/distributed/PartitionMoverTest.java      |   9 +-
 .../distributed/TableManagerRecoveryTest.java      |  14 +-
 .../table/distributed/TableManagerTest.java        |  27 +-
 .../apache/ignite/distributed/ItTxTestCluster.java |  48 ++-
 20 files changed, 686 insertions(+), 430 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
index 76275091eb..694e8ed7fe 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
@@ -41,12 +41,12 @@ public class PartitionMover {
 
     private final IgniteSpinBusyLock busyLock;
 
-    private final Supplier<RaftGroupService> raftGroupServiceSupplier;
+    private final Supplier<CompletableFuture<RaftGroupService>> 
raftGroupServiceSupplier;
 
     /**
      * Constructor.
      */
-    public PartitionMover(IgniteSpinBusyLock busyLock, 
Supplier<RaftGroupService> raftGroupServiceSupplier) {
+    public PartitionMover(IgniteSpinBusyLock busyLock, 
Supplier<CompletableFuture<RaftGroupService>> raftGroupServiceSupplier) {
         this.busyLock = busyLock;
         this.raftGroupServiceSupplier = raftGroupServiceSupplier;
     }
@@ -64,8 +64,9 @@ public class PartitionMover {
         }
 
         try {
-            return raftGroupServiceSupplier.get()
-                    .changePeersAsync(peersAndLearners, term)
+            return raftGroupServiceSupplier
+                    .get()
+                    .thenCompose(raftGroupService -> 
raftGroupService.changePeersAsync(peersAndLearners, term))
                     .handle((resp, err) -> {
                         if (!busyLock.enterBusy()) {
                             throw new 
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/ExecutorInclinedRaftCommandRunner.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/ExecutorInclinedRaftCommandRunner.java
index 4b16d1131b..da7fb672d3 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/ExecutorInclinedRaftCommandRunner.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/ExecutorInclinedRaftCommandRunner.java
@@ -46,4 +46,9 @@ public class ExecutorInclinedRaftCommandRunner implements 
RaftCommandRunner {
 
         return future.thenApplyAsync(identity(), completionExecutor);
     }
+
+    /** Returns decorated Raft-client. */
+    public RaftCommandRunner decoratedCommandRunner() {
+        return commandRunner;
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 9d836a27b2..007e9387f4 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -717,7 +717,7 @@ public class NodeImpl implements Node, RaftServerService {
         electionRound = 0;
 
         if (electionAdjusted) {
-            LOG.info("Election timeout was reset to initial value due to 
successful leader election.");
+            LOG.info("Election timeout was reset to initial value.");
             resetElectionTimeoutMs(initialElectionTimeout);
             electionAdjusted = false;
         }
@@ -3445,6 +3445,7 @@ public class NodeImpl implements Node, RaftServerService {
             this.conf.setConf(newConf);
             this.conf.getOldConf().reset();
             stepDown(this.currTerm + 1, false, new Status(RaftError.ESETPEER, 
"Raft node set peer normally"));
+            resetElectionTimeoutToInitial();
             return Status.OK();
         }
         finally {
diff --git 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 5b09cd0e0a..f82741b11d 100644
--- 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++ 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -21,6 +21,7 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds;
+import static 
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -75,6 +76,7 @@ import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessage
 import 
org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
 import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.raft.TestLozaFactory;
@@ -83,7 +85,10 @@ import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import 
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
 import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.replicator.message.ReplicaMessageTestGroup;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
@@ -197,7 +202,13 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
                     Set.of(ReplicaMessageTestGroup.class),
                     new TestPlacementDriver(primaryReplicaSupplier),
                     partitionOperationsExecutor,
-                    new NoOpFailureProcessor()
+                    () -> 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
+                    new NoOpFailureProcessor(),
+                    // TODO: IGNITE-22222 can't pass 
ThreadLocalPartitionCommandsMarshaller there due to dependency loop
+                    null,
+                    topologyAwareRaftGroupServiceFactory,
+                    raftManager,
+                    new VolatileLogStorageFactoryCreator(nodeName, 
workDir.resolve("volatile-log-spillout"))
             );
 
             replicaManagers.put(nodeName, replicaManager);
@@ -220,7 +231,9 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
                 }
             });
 
-            servicesToClose.add(() -> 
IgniteUtils.shutdownAndAwaitTermination(partitionOperationsExecutor, 10, 
TimeUnit.SECONDS));
+            servicesToClose.addAll(List.of(
+                    () -> 
IgniteUtils.shutdownAndAwaitTermination(partitionOperationsExecutor, 10, 
TimeUnit.SECONDS)
+            ));
         }
     }
 
@@ -477,9 +490,11 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
 
             var rftNodeId = new RaftNodeId(groupId, peer);
 
+            PeersAndLearners newConfiguration = fromConsistentIds(nodes);
+
             CompletableFuture<TopologyAwareRaftGroupService> raftClientFut = 
raftManager.startRaftGroupNode(
                     rftNodeId,
-                    fromConsistentIds(nodes),
+                    newConfiguration,
                     new TestRaftGroupListener(),
                     RaftGroupEventsListener.noopLsnr,
                     RaftGroupOptions.defaults(),
@@ -487,24 +502,33 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
             );
             serviceFutures.add(raftClientFut);
 
-            CompletableFuture<Replica> replicaFuture = 
raftClientFut.thenCompose(raftClient -> {
+            CompletableFuture<Boolean> replicaFuture = 
raftClientFut.thenCompose(raftClient -> {
                 try {
+                    ReplicaListener listener = new ReplicaListener() {
+                        @Override
+                        public CompletableFuture<ReplicaResult> 
invoke(ReplicaRequest request, String senderId) {
+                            log.info("Handle request [type={}]", 
request.getClass().getSimpleName());
+
+                            return raftClient
+                                    
.run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().build())
+                                    .thenCompose(ignored -> replicaListener == 
null
+                                            ? completedFuture(new 
ReplicaResult(null, null))
+                                            : replicaListener.apply(request, 
senderId));
+                        }
+
+                        @Override
+                        public RaftCommandRunner raftClient() {
+                            return raftClient;
+                        }
+                    };
+
                     return replicaManager.startReplica(
                             groupId,
-                            (request, senderId) -> {
-                                log.info("Handle request [type={}]", 
request.getClass().getSimpleName());
-
-                                return 
raftClient.run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().build())
-                                        .thenCompose(ignored -> {
-                                            if (replicaListener == null) {
-                                                return completedFuture(new 
ReplicaResult(null, null));
-                                            } else {
-                                                return 
replicaListener.apply(request, senderId);
-                                            }
-                                        });
-                            },
-                            raftClient,
-                            new 
PendingComparableValuesTracker<>(Long.MAX_VALUE));
+                            newConfiguration,
+                            (unused) -> { },
+                            (unused) -> listener,
+                            new 
PendingComparableValuesTracker<>(Long.MAX_VALUE),
+                            completedFuture(raftClient));
                 } catch (NodeStoppingException e) {
                     throw new RuntimeException(e);
                 }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index 380e5c7b22..41883da2d6 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -100,7 +100,6 @@ public class Replica {
      * @param replicaGrpId Replication group id.
      * @param listener Replica listener.
      * @param storageIndexTracker Storage index tracker.
-     * @param raftClient Topology aware Raft client.
      * @param localNode Instance of the local node.
      * @param executor External executor.
      * @param placementDriver Placement driver.
@@ -110,7 +109,6 @@ public class Replica {
             ReplicationGroupId replicaGrpId,
             ReplicaListener listener,
             PendingComparableValuesTracker<Long, Void> storageIndexTracker,
-            TopologyAwareRaftGroupService raftClient,
             ClusterNode localNode,
             ExecutorService executor,
             PlacementDriver placementDriver,
@@ -119,7 +117,7 @@ public class Replica {
         this.replicaGrpId = replicaGrpId;
         this.listener = listener;
         this.storageIndexTracker = storageIndexTracker;
-        this.raftClient = raftClient;
+        this.raftClient = raftClient();
         this.localNode = localNode;
         this.executor = executor;
         this.placementDriver = placementDriver;
@@ -128,6 +126,11 @@ public class Replica {
         raftClient.subscribeLeader(this::onLeaderElected);
     }
 
+    /** Returns Raft-client. */
+    public final TopologyAwareRaftGroupService raftClient() {
+        return (TopologyAwareRaftGroupService) listener.raftClient();
+    }
+
     /**
      * Processes a replication request on the replica.
      *
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 64c29a7b81..2c410caffc 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -17,11 +17,9 @@
 
 package org.apache.ignite.internal.replicator;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED;
 import static 
org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED;
-import static 
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
 import static 
org.apache.ignite.internal.thread.ThreadOperation.TX_STATE_STORAGE_ACCESS;
@@ -46,8 +44,12 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.LongSupplier;
+import java.util.function.Supplier;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.components.LogSyncer;
 import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.failure.FailureContext;
 import org.apache.ignite.internal.failure.FailureProcessor;
@@ -67,7 +69,21 @@ import 
org.apache.ignite.internal.placementdriver.PlacementDriver;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Marshaller;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.configuration.LogStorageBudgetView;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.service.RaftGroupListener;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
+import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator;
 import 
org.apache.ignite.internal.replicator.exception.ExpectedReplicationException;
 import 
org.apache.ignite.internal.replicator.exception.ReplicaIsAlreadyStartedException;
 import 
org.apache.ignite.internal.replicator.exception.ReplicaStoppingException;
@@ -88,8 +104,10 @@ import org.apache.ignite.internal.thread.ThreadAttributes;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
+import org.jetbrains.annotations.VisibleForTesting;
 
 /**
  * Replica manager maintains {@link Replica} instances on an Ignite node.
@@ -125,6 +143,19 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
     /** Replica message handler. */
     private final NetworkMessageHandler handler;
 
+    /** Raft manager for RAFT-clients creation. */
+    // TODO: move into {@method Replica#shutdown} 
https://issues.apache.org/jira/browse/IGNITE-22372
+    private final RaftManager raftManager;
+
+    /** Raft clients factory for raft server endpoints starting. */
+    private final TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory;
+
+    /** Creator for {@link 
org.apache.ignite.internal.raft.storage.LogStorageFactory} for volatile tables. 
*/
+    private final LogStorageFactoryCreator volatileLogStorageFactoryCreator;
+
+    /** Raft command marshaller for raft server endpoints starting. */
+    private final Marshaller raftCommandsMarshaller;
+
     /** Message handler for placement driver messages. */
     private final NetworkMessageHandler placementDriverMessageHandler;
 
@@ -141,8 +172,10 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
     /** Scheduled executor for idle safe time sync. */
     private final ScheduledExecutorService scheduledIdleSafeTimeSyncExecutor;
 
+    /** Executor that will be used to execute requests by replicas. */
     private final Executor requestsExecutor;
 
+    /** Failure processor. */
     private final FailureProcessor failureProcessor;
 
     /** Set of message groups to handler as replica requests. */
@@ -154,39 +187,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
     private String localNodeId;
 
-    /**
-     * Constructor for a replica service.
-     *
-     * @param nodeName Node name.
-     * @param clusterNetSvc Cluster network service.
-     * @param cmgMgr Cluster group manager.
-     * @param clockService Clock service.
-     * @param messageGroupsToHandle Message handlers.
-     * @param placementDriver A placement driver.
-     */
-    @TestOnly
-    public ReplicaManager(
-            String nodeName,
-            ClusterService clusterNetSvc,
-            ClusterManagementGroupManager cmgMgr,
-            ClockService clockService,
-            Set<Class<?>> messageGroupsToHandle,
-            PlacementDriver placementDriver,
-            Executor requestsExecutor,
-            FailureProcessor failureProcessor
-    ) {
-        this(
-                nodeName,
-                clusterNetSvc,
-                cmgMgr,
-                clockService,
-                messageGroupsToHandle,
-                placementDriver,
-                requestsExecutor,
-                () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
-                failureProcessor
-        );
-    }
+    private String localNodeConsistentId;
 
     /**
      * Constructor for a replica service.
@@ -199,6 +200,12 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      * @param placementDriver A placement driver.
      * @param requestsExecutor Executor that will be used to execute requests 
by replicas.
      * @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe 
time propagation period in ms.
+     * @param failureProcessor Failure processor.
+     * @param raftCommandsMarshaller Command marshaller for raft groups 
creation.
+     * @param raftGroupServiceFactory A factory for raft-clients creation.
+     * @param raftManager The manager made up of songs and words to spite all 
my troubles is not so bad at all.
+     * @param volatileLogStorageFactoryCreator Creator for {@link 
org.apache.ignite.internal.raft.storage.LogStorageFactory} for
+     *      volatile tables.
      */
     public ReplicaManager(
             String nodeName,
@@ -209,18 +216,26 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             PlacementDriver placementDriver,
             Executor requestsExecutor,
             LongSupplier idleSafeTimePropagationPeriodMsSupplier,
-            FailureProcessor failureProcessor
+            FailureProcessor failureProcessor,
+            Marshaller raftCommandsMarshaller,
+            TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory,
+            RaftManager raftManager,
+            LogStorageFactoryCreator volatileLogStorageFactoryCreator
     ) {
         this.clusterNetSvc = clusterNetSvc;
         this.cmgMgr = cmgMgr;
         this.clockService = clockService;
         this.messageGroupsToHandle = messageGroupsToHandle;
+        this.volatileLogStorageFactoryCreator = 
volatileLogStorageFactoryCreator;
         this.handler = this::onReplicaMessageReceived;
         this.placementDriverMessageHandler = 
this::onPlacementDriverMessageReceived;
         this.placementDriver = placementDriver;
         this.requestsExecutor = requestsExecutor;
         this.idleSafeTimePropagationPeriodMsSupplier = 
idleSafeTimePropagationPeriodMsSupplier;
         this.failureProcessor = failureProcessor;
+        this.raftCommandsMarshaller = raftCommandsMarshaller;
+        this.raftGroupServiceFactory = raftGroupServiceFactory;
+        this.raftManager = raftManager;
 
         scheduledIdleSafeTimeSyncExecutor = Executors.newScheduledThreadPool(
                 1,
@@ -466,75 +481,170 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         });
     }
 
+    private CompletableFuture<Boolean> startReplicaInternal(
+            RaftGroupEventsListener raftGroupEventsListener,
+            RaftGroupListener raftGroupListener,
+            boolean isVolatileStorage,
+            SnapshotStorageFactory snapshotStorageFactory,
+            Consumer<RaftGroupService> updateTableRaftService,
+            Function<RaftGroupService, ReplicaListener> createListener,
+            PendingComparableValuesTracker<Long, Void> storageIndexTracker,
+            TablePartitionId replicaGrpId,
+            PeersAndLearners newConfiguration
+    ) throws NodeStoppingException {
+        RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new 
Peer(localNodeConsistentId));
+
+        RaftGroupOptions groupOptions = groupOptionsForPartition(
+                isVolatileStorage,
+                snapshotStorageFactory);
+
+        // TODO: move into {@method Replica#shutdown} 
https://issues.apache.org/jira/browse/IGNITE-22372
+        // TODO: use RaftManager interface, see 
https://issues.apache.org/jira/browse/IGNITE-18273
+        CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut = 
((Loza) raftManager).startRaftGroupNode(
+                raftNodeId,
+                newConfiguration,
+                raftGroupListener,
+                raftGroupEventsListener,
+                groupOptions,
+                raftGroupServiceFactory
+        );
+
+        return startReplica(
+                replicaGrpId,
+                newConfiguration,
+                updateTableRaftService,
+                createListener,
+                storageIndexTracker,
+                newRaftClientFut);
+    }
+
     /**
-     * Starts a replica. If a replica with the same partition id already 
exists, the method throws an exception.
+     * Creates and starts a new replica.
      *
+     * @param raftGroupEventsListener Raft group events listener for raft 
group starting.
+     * @param raftGroupListener Raft group listener for raft group starting.
+     * @param isVolatileStorage is table storage volatile?
+     * @param snapshotStorageFactory Snapshot storage factory for raft group 
option's parameterization.
+     * @param updateTableRaftService Temporal consumer while TableRaftService 
wouldn't be removed in
+     *      TODO: https://issues.apache.org/jira/browse/IGNITE-22218.
+     * @param createListener Due to creation of ReplicaListener in 
TableManager, the function returns desired listener by created
+     *      raft-client inside {@link #startReplica} method.
      * @param replicaGrpId Replication group id.
-     * @param listener Replica listener.
-     * @param raftClient Topology aware Raft client.
      * @param storageIndexTracker Storage index tracker.
-     * @throws NodeStoppingException If node is stopping.
-     * @throws ReplicaIsAlreadyStartedException Is thrown when a replica with 
the same replication group id has already been
-     *         started.
+     * @param newConfiguration A configuration for new raft group.
+     * @return Future that promises ready new replica when done.
      */
-    public CompletableFuture<Replica> startReplica(
-            ReplicationGroupId replicaGrpId,
-            ReplicaListener listener,
-            TopologyAwareRaftGroupService raftClient,
-            PendingComparableValuesTracker<Long, Void> storageIndexTracker
+    public CompletableFuture<Boolean> startReplica(
+            RaftGroupEventsListener raftGroupEventsListener,
+            RaftGroupListener raftGroupListener,
+            boolean isVolatileStorage,
+            SnapshotStorageFactory snapshotStorageFactory,
+            Consumer<RaftGroupService> updateTableRaftService,
+            Function<RaftGroupService, ReplicaListener> createListener,
+            PendingComparableValuesTracker<Long, Void> storageIndexTracker,
+            TablePartitionId replicaGrpId,
+            PeersAndLearners newConfiguration
     ) throws NodeStoppingException {
         if (!busyLock.enterBusy()) {
             throw new NodeStoppingException();
         }
 
         try {
-            return startReplicaInternal(replicaGrpId, listener, raftClient, 
storageIndexTracker);
+            return startReplicaInternal(
+                    raftGroupEventsListener,
+                    raftGroupListener,
+                    isVolatileStorage,
+                    snapshotStorageFactory,
+                    updateTableRaftService,
+                    createListener,
+                    storageIndexTracker,
+                    replicaGrpId,
+                    newConfiguration);
         } finally {
             busyLock.leaveBusy();
         }
     }
 
     /**
-     * Internal method for starting a replica.
+     * Starts a raft-client and pass it to a replica creation if the replica 
should be started too. If a replica with the same partition id
+     * already exists, the method throws an exception.
+     * TODO: must be deleted or be private after 
https://issues.apache.org/jira/browse/IGNITE-22373
      *
      * @param replicaGrpId Replication group id.
-     * @param listener Replica listener.
-     * @param raftClient Topology aware Raft client.
+     * @param newConfiguration Peers and Learners of the Raft group.
+     * @param updateTableRaftService A temporal clojure that updates table 
raft service with new raft-client, but
+     *      TODO: will be removed 
https://issues.apache.org/jira/browse/IGNITE-22218
+     * @param createListener A clojure that returns done {@link 
ReplicaListener} by given raft-client {@link RaftGroupService}.
      * @param storageIndexTracker Storage index tracker.
+     * @param newRaftClientFut A future that returns created raft-client.
+     * @throws NodeStoppingException If node is stopping.
+     * @throws ReplicaIsAlreadyStartedException Is thrown when a replica with 
the same replication group id has already been started.
      */
-    private CompletableFuture<Replica> startReplicaInternal(
+    @VisibleForTesting
+    @Deprecated
+    public CompletableFuture<Boolean> startReplica(
             ReplicationGroupId replicaGrpId,
-            ReplicaListener listener,
-            TopologyAwareRaftGroupService raftClient,
-            PendingComparableValuesTracker<Long, Void> storageIndexTracker
-    ) {
+            PeersAndLearners newConfiguration,
+            Consumer<RaftGroupService> updateTableRaftService,
+            Function<RaftGroupService, ReplicaListener> createListener,
+            PendingComparableValuesTracker<Long, Void> storageIndexTracker,
+            CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut
+    ) throws NodeStoppingException {
         LOG.info("Replica is about to start [replicationGroupId={}].", 
replicaGrpId);
 
-        ClusterNode localNode = clusterNetSvc.topologyService().localMember();
+        CompletableFuture<Boolean> resultFuture = 
newRaftClientFut.thenAccept(updateTableRaftService)
+                .thenApply((v) -> true);
 
-        Replica newReplica = new Replica(
-                replicaGrpId,
-                listener,
-                storageIndexTracker,
-                raftClient,
-                localNode,
-                executor,
-                placementDriver,
-                clockService
-        );
+        CompletableFuture<ReplicaListener> newReplicaListenerFut = 
newRaftClientFut.thenApply(createListener);
 
-        CompletableFuture<Replica> replicaFuture = 
replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> {
-            if (existingReplicaFuture == null || 
existingReplicaFuture.isDone()) {
-                assert existingReplicaFuture == null || 
isCompletedSuccessfully(existingReplicaFuture);
-                LOG.info("Replica is started [replicationGroupId={}].", 
replicaGrpId);
+        startReplica(replicaGrpId, storageIndexTracker, newReplicaListenerFut);
 
-                return completedFuture(newReplica);
-            } else {
-                existingReplicaFuture.complete(newReplica);
-                LOG.info("Replica is started, existing replica waiter was 
completed [replicationGroupId={}].", replicaGrpId);
+        return resultFuture;
+    }
 
-                return existingReplicaFuture;
-            }
+    /**
+     * Creates and start new replica.
+     * TODO: must be deleted or be private after 
https://issues.apache.org/jira/browse/IGNITE-22373
+     *
+     * @param replicaGrpId Replication group id.
+     * @param storageIndexTracker Storage index tracker.
+     * @param newReplicaListenerFut Future that returns ready ReplicaListener 
for replica creation.
+     * @return Future that promises ready new replica when done.
+     */
+    @VisibleForTesting
+    @Deprecated
+    public CompletableFuture<Replica> startReplica(
+            ReplicationGroupId replicaGrpId,
+            PendingComparableValuesTracker<Long, Void> storageIndexTracker,
+            CompletableFuture<ReplicaListener> newReplicaListenerFut
+    ) throws NodeStoppingException {
+
+        ClusterNode localNode = clusterNetSvc.topologyService().localMember();
+
+        CompletableFuture<Replica> replicaFuture = 
newReplicaListenerFut.thenCompose(listener -> {
+            Replica newReplica = new Replica(
+                    replicaGrpId,
+                    listener,
+                    storageIndexTracker,
+                    localNode,
+                    executor,
+                    placementDriver,
+                    clockService);
+
+            return replicas.compute(replicaGrpId, (k, existingReplicaFuture) 
-> {
+                if (existingReplicaFuture == null || 
existingReplicaFuture.isDone()) {
+                    assert existingReplicaFuture == null || 
isCompletedSuccessfully(existingReplicaFuture);
+                    LOG.info("Replica is started [replicationGroupId={}].", 
replicaGrpId);
+
+                    return CompletableFuture.completedFuture(newReplica);
+                } else {
+                    LOG.info("Replica is started, existing replica waiter was 
completed [replicationGroupId={}].", replicaGrpId);
+
+                    existingReplicaFuture.complete(newReplica);
+
+                    return existingReplicaFuture;
+                }
+            });
         });
 
         var eventParams = new LocalReplicaEventParameters(replicaGrpId);
@@ -548,6 +658,76 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 .thenCompose(v -> replicaFuture);
     }
 
+    /**
+     * Temporary public method for RAFT-client starting.
+     * TODO: will be removed after 
https://issues.apache.org/jira/browse/IGNITE-22315
+     *
+     * @param replicaGrpId Replication Group ID.
+     * @param newConfiguration Peers and learners nodes for a raft group.
+     * @param raftClientCache Temporal supplier that returns RAFT-client from 
TableRaftService if it's already exists and was put into the
+     *      service's map.
+     * @return Future that returns started RAFT-client.
+     * @throws NodeStoppingException In case if node was stopping.
+     */
+    @Deprecated
+    public CompletableFuture<TopologyAwareRaftGroupService> startRaftClient(
+            ReplicationGroupId replicaGrpId,
+            PeersAndLearners newConfiguration,
+            Supplier<RaftGroupService> raftClientCache)
+            throws NodeStoppingException {
+        RaftGroupService cachedRaftClient = raftClientCache.get();
+        return cachedRaftClient != null
+                ? 
CompletableFuture.completedFuture((TopologyAwareRaftGroupService) 
cachedRaftClient)
+                // TODO IGNITE-19614 This procedure takes 10 seconds if 
there's no majority online.
+                : raftManager.startRaftGroupService(replicaGrpId, 
newConfiguration, raftGroupServiceFactory, raftCommandsMarshaller);
+    }
+
+    /**
+     * Returns future with a replica if it was created or null if there no any 
replicas starting with given identifier.
+     *
+     * @param replicationGroupId Table-Partition identifier.
+     * @return replica if it was created or null otherwise.
+     */
+    public CompletableFuture<Replica> replica(ReplicationGroupId 
replicationGroupId) {
+        return replicas.get(replicationGroupId);
+    }
+
+    /**
+     * Performs a {@code resetPeers} operation on raft node.
+     *
+     * @param replicaGrpId Replication group ID.
+     * @param peersAndLearners New node configuration.
+     */
+    public void resetPeers(ReplicationGroupId replicaGrpId, PeersAndLearners 
peersAndLearners) {
+        RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new 
Peer(localNodeConsistentId));
+        ((Loza) raftManager).resetPeers(raftNodeId, peersAndLearners);
+    }
+
+    /** Getter for wrapped write-ahead log syncer. */
+    // TODO: will be removed after 
https://issues.apache.org/jira/browse/IGNITE-22292
+    public LogSyncer getLogSyncer() {
+        return raftManager.getLogSyncer();
+    }
+
+    private RaftGroupOptions groupOptionsForPartition(boolean 
isVolatileStorage, SnapshotStorageFactory snapshotFactory) {
+        RaftGroupOptions raftGroupOptions;
+
+        if (isVolatileStorage) {
+            LogStorageBudgetView view = ((Loza) 
raftManager).volatileRaft().logStorage().value();
+            raftGroupOptions = RaftGroupOptions.forVolatileStores()
+                    
.setLogStorageFactory(volatileLogStorageFactoryCreator.factory(view))
+                    .raftMetaStorageFactory((groupId, raftOptions) -> new 
VolatileRaftMetaStorage());
+        } else {
+            raftGroupOptions = RaftGroupOptions.forPersistentStores();
+        }
+
+        raftGroupOptions.snapshotStorageFactory(snapshotFactory);
+
+        raftGroupOptions.commandsMarshaller(raftCommandsMarshaller);
+
+        return raftGroupOptions;
+    }
+
     /**
      * Stops a replica by the partition group id.
      *
@@ -620,7 +800,16 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             }
         });
 
-        return isRemovedFuture;
+        return isRemovedFuture
+                .thenApply(v -> {
+                    try {
+                        // TODO: move into {@method Replica#shutdown} 
https://issues.apache.org/jira/browse/IGNITE-22372
+                        raftManager.stopRaftNodes(replicaGrpId);
+                    } catch (NodeStoppingException ignored) {
+                        // No-op.
+                    }
+                    return v;
+                });
     }
 
     /** {@inheritDoc} */
@@ -650,6 +839,8 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
         localNodeId = clusterNetSvc.topologyService().localMember().id();
 
+        localNodeConsistentId = 
clusterNetSvc.topologyService().localMember().name();
+
         return nullCompletedFuture();
     }
 
@@ -662,8 +853,10 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
         busyLock.block();
 
-        shutdownAndAwaitTermination(scheduledIdleSafeTimeSyncExecutor, 10, 
TimeUnit.SECONDS);
-        shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
+        int shutdownTimeoutSeconds = 10;
+
+        shutdownAndAwaitTermination(scheduledIdleSafeTimeSyncExecutor, 
shutdownTimeoutSeconds, TimeUnit.SECONDS);
+        shutdownAndAwaitTermination(executor, shutdownTimeoutSeconds, 
TimeUnit.SECONDS);
 
         assert replicas.values().stream().noneMatch(CompletableFuture::isDone)
                 : "There are replicas alive [replicas="
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
index 88a1937e97..3bb7b86822 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
@@ -18,11 +18,11 @@
 package org.apache.ignite.internal.replicator.listener;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
 import org.apache.ignite.internal.replicator.ReplicaResult;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 
 /** Replica listener. */
-@FunctionalInterface
 public interface ReplicaListener {
     /**
      * Invokes a replica listener to process request.
@@ -33,6 +33,9 @@ public interface ReplicaListener {
      */
     CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String 
senderId);
 
+    /** Returns Raft-client. */
+    RaftCommandRunner raftClient();
+
     /** Callback on replica shutdown. */
     default void onShutdown() {
         // No-op.
diff --git 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
index 676eb5ec36..57bc8420d8 100644
--- 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
@@ -115,11 +115,13 @@ public class PlacementDriverReplicaSideTest extends 
BaseIgniteAbstractTest {
 
         when(raftClient.run(any())).thenAnswer(invocationOnMock -> 
completedFuture(null));
 
+        var listener = mock(ReplicaListener.class);
+        when(listener.raftClient()).thenReturn(raftClient);
+
         return new Replica(
                 GRP_ID,
-                mock(ReplicaListener.class),
+                listener,
                 storageIndexTracker,
-                raftClient,
                 LOCAL_NODE,
                 executor,
                 new TestPlacementDriver(LOCAL_NODE),
diff --git 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index f6e661b2a8..4a7d1462a1 100644
--- 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.internal.replicator;
 
 import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED;
 import static 
org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED;
+import static 
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -50,7 +52,12 @@ import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.raft.Marshaller;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import 
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -75,6 +82,9 @@ public class ReplicaManagerTest extends 
BaseIgniteAbstractTest {
 
     private ReplicaManager replicaManager;
 
+    @Mock
+    private RaftManager raftManager;
+
     @BeforeEach
     void startReplicaManager(
             TestInfo testInfo,
@@ -82,7 +92,10 @@ public class ReplicaManagerTest extends 
BaseIgniteAbstractTest {
             @Mock ClusterManagementGroupManager cmgManager,
             @Mock PlacementDriver placementDriver,
             @Mock MessagingService messagingService,
-            @Mock TopologyService topologyService
+            @Mock TopologyService topologyService,
+            @Mock Marshaller marshaller,
+            @Mock TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory,
+            @Mock VolatileLogStorageFactoryCreator 
volatileLogStorageFactoryCreator
     ) {
         String nodeName = testNodeName(testInfo, 0);
 
@@ -110,7 +123,12 @@ public class ReplicaManagerTest extends 
BaseIgniteAbstractTest {
                 Set.of(),
                 placementDriver,
                 requestsExecutor,
-                new NoOpFailureProcessor()
+                () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
+                new NoOpFailureProcessor(),
+                marshaller,
+                raftGroupServiceFactory,
+                raftManager,
+                volatileLogStorageFactoryCreator
         );
 
         assertThat(replicaManager.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
@@ -140,6 +158,7 @@ public class ReplicaManagerTest extends 
BaseIgniteAbstractTest {
      */
     @Test
     void testReplicaEvents(
+            TestInfo testInfo,
             @Mock EventListener<LocalReplicaEventParameters> 
createReplicaListener,
             @Mock EventListener<LocalReplicaEventParameters> 
removeReplicaListener,
             @Mock ReplicaListener replicaListener,
@@ -154,12 +173,18 @@ public class ReplicaManagerTest extends 
BaseIgniteAbstractTest {
         replicaManager.listen(BEFORE_REPLICA_STOPPED, removeReplicaListener);
 
         var groupId = new TablePartitionId(0, 0);
+        when(replicaListener.raftClient()).thenReturn(raftGroupService);
+
+        String nodeName = testNodeName(testInfo, 0);
+        PeersAndLearners newConfiguration = 
PeersAndLearners.fromConsistentIds(Set.of(nodeName));
 
-        CompletableFuture<Replica> startReplicaFuture = 
replicaManager.startReplica(
+        CompletableFuture<Boolean> startReplicaFuture = 
replicaManager.startReplica(
                 groupId,
-                replicaListener,
-                raftGroupService,
-                new PendingComparableValuesTracker<>(0L)
+                newConfiguration,
+                (unused) -> { },
+                (unused) -> replicaListener,
+                new PendingComparableValuesTracker<>(0L),
+                completedFuture(raftGroupService)
         );
 
         assertThat(startReplicaFuture, willCompleteSuccessfully());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index f419d3b45d..80051692c6 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -176,6 +176,7 @@ import 
org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
+import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.test.WatchListenerInhibitor;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
@@ -481,6 +482,9 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 clockService
         );
 
+        ScheduledExecutorService rebalanceScheduler = new 
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
+                NamedThreadFactory.create(name, "test-rebalance-scheduler", 
logger()));
+
         ReplicaManager replicaMgr = new ReplicaManager(
                 name,
                 clusterSvc,
@@ -490,7 +494,11 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 placementDriverManager.placementDriver(),
                 threadPoolsManager.partitionOperationsExecutor(),
                 partitionIdleSafeTimePropagationPeriodMsSupplier,
-                failureProcessor
+                failureProcessor,
+                new 
ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry()),
+                topologyAwareRaftGroupServiceFactory,
+                raftMgr,
+                view -> new LocalLogStorageFactory()
         );
 
         var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
@@ -570,9 +578,6 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         var dataNodesMock = dataNodesMockByNode.get(idx);
 
-        ScheduledExecutorService rebalanceScheduler = new 
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
-                NamedThreadFactory.create(name, "test-rebalance-scheduler", 
logger()));
-
         DistributionZoneManager distributionZoneManager = new 
DistributionZoneManager(
                 name,
                 registry,
@@ -604,7 +609,6 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 messagingServiceReturningToStorageOperationsPool,
                 clusterSvc.topologyService(),
                 clusterSvc.serializationRegistry(),
-                raftMgr,
                 replicaMgr,
                 lockManager,
                 replicaService,
@@ -613,13 +617,12 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 storagePath,
                 metaStorageMgr,
                 schemaManager,
-                view -> new LocalLogStorageFactory(),
                 threadPoolsManager.tableIoExecutor(),
                 threadPoolsManager.partitionOperationsExecutor(),
+                rebalanceScheduler,
                 hybridClock,
                 clockService,
                 new OutgoingSnapshotsManager(clusterSvc.messagingService()),
-                topologyAwareRaftGroupServiceFactory,
                 distributionZoneManager,
                 schemaSyncService,
                 catalogManager,
@@ -627,7 +630,6 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 placementDriverManager.placementDriver(),
                 sqlRef::get,
                 resourcesRegistry,
-                rebalanceScheduler,
                 lowWatermark,
                 transactionInflights
         );
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 3bc9aa2a6d..e966a158e3 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -156,6 +156,7 @@ import 
org.apache.ignite.internal.network.wrapper.JumpToExecutorByConsistentIdAf
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
 import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Marshaller;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
@@ -206,6 +207,7 @@ import 
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOn
 import 
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
+import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
@@ -630,6 +632,14 @@ public class IgniteImpl implements Ignite {
 
         LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = 
partitionIdleSafeTimePropagationPeriodMsSupplier(replicationConfig);
 
+        ScheduledExecutorService rebalanceScheduler = new 
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
+                NamedThreadFactory.create(name, "rebalance-scheduler", LOG));
+
+        // TODO: IGNITE-22222 this instantiation should be moved inside 
ReplicaManager's constructor
+        Marshaller raftMarshaller = new 
ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry());
+
+        volatileLogStorageFactoryCreator = new 
VolatileLogStorageFactoryCreator(name, 
workDir.resolve("volatile-log-spillout"));
+
         replicaMgr = new ReplicaManager(
                 name,
                 clusterSvc,
@@ -639,7 +649,11 @@ public class IgniteImpl implements Ignite {
                 placementDriverMgr.placementDriver(),
                 threadPoolsManager.partitionOperationsExecutor(),
                 partitionIdleSafeTimePropagationPeriodMsSupplier,
-                failureProcessor
+                failureProcessor,
+                raftMarshaller,
+                topologyAwareRaftGroupServiceFactory,
+                raftMgr,
+                volatileLogStorageFactoryCreator
         );
 
         
metricManager.configure(clusterConfigRegistry.getConfiguration(MetricConfiguration.KEY));
@@ -670,8 +684,6 @@ public class IgniteImpl implements Ignite {
                 nodeConfigRegistry.getConfiguration(StorageConfiguration.KEY)
         );
 
-        volatileLogStorageFactoryCreator = new 
VolatileLogStorageFactoryCreator(name, 
workDir.resolve("volatile-log-spillout"));
-
         outgoingSnapshotsManager = new OutgoingSnapshotsManager(name, 
clusterSvc.messagingService());
 
         LongSupplier delayDurationMsSupplier = 
delayDurationMsSupplier(schemaSyncConfig);
@@ -697,9 +709,6 @@ public class IgniteImpl implements Ignite {
 
         schemaManager = new SchemaManager(registry, catalogManager);
 
-        ScheduledExecutorService rebalanceScheduler = new 
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
-                NamedThreadFactory.create(name, "rebalance-scheduler", LOG));
-
         distributionZoneManager = new DistributionZoneManager(
                 name,
                 registry,
@@ -771,7 +780,6 @@ public class IgniteImpl implements Ignite {
                 messagingServiceReturningToStorageOperationsPool,
                 clusterSvc.topologyService(),
                 clusterSvc.serializationRegistry(),
-                raftMgr,
                 replicaMgr,
                 lockMgr,
                 replicaSvc,
@@ -780,13 +788,12 @@ public class IgniteImpl implements Ignite {
                 storagePath,
                 metaStorageMgr,
                 schemaManager,
-                volatileLogStorageFactoryCreator,
                 threadPoolsManager.tableIoExecutor(),
                 threadPoolsManager.partitionOperationsExecutor(),
+                rebalanceScheduler,
                 clock,
                 clockService,
                 outgoingSnapshotsManager,
-                topologyAwareRaftGroupServiceFactory,
                 distributionZoneManager,
                 schemaSyncService,
                 catalogManager,
@@ -794,7 +801,6 @@ public class IgniteImpl implements Ignite {
                 placementDriverMgr.placementDriver(),
                 this::bareSql,
                 resourcesRegistry,
-                rebalanceScheduler,
                 lowWatermark,
                 transactionInflights
         );
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 92f1284085..fd14fd283b 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.distributed;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.distributed.ItTxTestCluster.NODE_PORT_BASE;
+import static 
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static org.apache.ignite.internal.table.TxAbstractTest.startNode;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -35,6 +36,7 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -45,6 +47,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -57,7 +61,12 @@ import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.StaticNodeFinder;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
 import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaResult;
@@ -67,8 +76,10 @@ import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfigurat
 import 
org.apache.ignite.internal.replicator.exception.ReplicaStoppingException;
 import org.apache.ignite.internal.replicator.exception.ReplicationException;
 import 
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaResponse;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
@@ -79,6 +90,7 @@ import 
org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
+import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
@@ -126,8 +138,25 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
 
     private ExecutorService requestsExecutor;
 
+    private Loza raftManager;
+
+    private TopologyAwareRaftGroupService raftClient;
+
+    private final Function<BiFunction<ReplicaRequest, String, 
CompletableFuture<ReplicaResult>>, ReplicaListener> replicaListenerCreator =
+            (invokeImpl) -> new ReplicaListener() {
+                @Override
+                public CompletableFuture<ReplicaResult> invoke(ReplicaRequest 
request, String senderId) {
+                    return invokeImpl.apply(request, senderId);
+                }
+
+                @Override
+                public RaftCommandRunner raftClient() {
+                    return raftClient;
+                }
+            };
+
     @BeforeEach
-    public void setup() {
+    public void setup() throws NodeStoppingException {
         var networkAddress = new NetworkAddress(getLocalAddress(), 
NODE_PORT_BASE + 1);
 
         var nodeFinder = new StaticNodeFinder(List.of(networkAddress));
@@ -139,6 +168,10 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
         // This test is run without Meta storage.
         
when(cmgManager.metaStorageNodes()).thenReturn(emptySetCompletedFuture());
 
+        raftManager = mock(Loza.class);
+        raftClient = mock(TopologyAwareRaftGroupService.class);
+        when(raftManager.startRaftGroupService(any(), any(), any(), 
any())).thenReturn(completedFuture(raftClient));
+
         requestsExecutor = new ThreadPoolExecutor(
                 0, 5,
                 0, TimeUnit.SECONDS,
@@ -151,6 +184,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
                 clock,
                 replicationConfiguration
         );
+
         replicaManager = new ReplicaManager(
                 NODE_NAME,
                 clusterService,
@@ -159,7 +193,12 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
                 Set.of(TableMessageGroup.class, TxMessageGroup.class),
                 new 
TestPlacementDriver(clusterService.topologyService().localMember()),
                 requestsExecutor,
-                new NoOpFailureProcessor()
+                () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
+                new NoOpFailureProcessor(),
+                mock(ThreadLocalPartitionCommandsMarshaller.class),
+                mock(TopologyAwareRaftGroupServiceFactory.class),
+                raftManager,
+                view -> new LocalLogStorageFactory()
         );
 
         assertThat(replicaManager.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
@@ -184,18 +223,27 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
 
         ReadWriteSingleRowReplicaRequest request = 
getRequest(tablePartitionId);
 
+        PeersAndLearners newConfiguration = 
PeersAndLearners.fromConsistentIds(Set.of(clusterNode.name()));
+
         
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class,
                 (message, sender, correlationId) -> {
                     try {
                         log.info("Replica msg " + 
message.getClass().getSimpleName());
 
+                        ReplicaListener listener = 
replicaListenerCreator.apply((req, senderId) -> {
+                            ReplicaResponse response = 
replicaMessageFactory.replicaResponse()
+                                    .result(5)
+                                    .build();
+                            return completedFuture(new ReplicaResult(response, 
null));
+                        });
+
                         replicaManager.startReplica(
                                 tablePartitionId,
-                                (request0, senderId) -> completedFuture(new 
ReplicaResult(replicaMessageFactory.replicaResponse()
-                                        .result(5)
-                                        .build(), null)),
-                                mock(TopologyAwareRaftGroupService.class),
-                                new PendingComparableValuesTracker<>(0L)
+                                newConfiguration,
+                                (unused) -> { },
+                                (unused) -> listener,
+                                new PendingComparableValuesTracker<>(0L),
+                                
completedFuture(mock(TopologyAwareRaftGroupService.class))
                         );
                     } catch (NodeStoppingException e) {
                         throw new RuntimeException(e);
@@ -297,16 +345,22 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
 
         TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
 
+        PeersAndLearners newConfiguration = 
PeersAndLearners.fromConsistentIds(Set.of(clusterNode.name()));
+
         
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class, 
(message, sender, correlationId) -> {
             runAsync(() -> {
                 try {
                     log.info("Replica msg " + 
message.getClass().getSimpleName());
 
+                    ReplicaListener listener = 
replicaListenerCreator.apply((r, id) -> new CompletableFuture<>());
+
                     replicaManager.startReplica(
                             tablePartitionId,
-                            (request, senderId) -> new CompletableFuture<>(),
-                            mock(TopologyAwareRaftGroupService.class),
-                            new PendingComparableValuesTracker<>(0L)
+                            newConfiguration,
+                            (unused) -> { },
+                            (unused) -> listener,
+                            new PendingComparableValuesTracker<>(0L),
+                            
completedFuture(mock(TopologyAwareRaftGroupService.class))
                     );
                 } catch (NodeStoppingException e) {
                     throw new RuntimeException(e);
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 2ea1aaeb48..15655475a0 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -30,6 +30,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThr
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
@@ -203,7 +204,7 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
                 Set.of()
         );
 
-        assertThat(updateFuture, willCompleteSuccessfully());
+        assertThat(updateFuture, willSucceedIn(60, SECONDS));
 
         awaitPrimaryReplica(node0, partId);
 
@@ -249,7 +250,7 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
                 Set.of(anotherPartId)
         );
 
-        assertThat(updateFuture, willCompleteSuccessfully());
+        assertThat(updateFuture, willSucceedIn(60, SECONDS));
 
         awaitPrimaryReplica(node0, anotherPartId);
 
@@ -302,7 +303,7 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
         CompletableFuture<ReplicaMeta> awaitPrimaryReplicaFuture = 
node0.placementDriver()
                 .awaitPrimaryReplica(new TablePartitionId(tableId, partId), 
node0.clock().now(), 60, SECONDS);
 
-        assertThat(awaitPrimaryReplicaFuture, willCompleteSuccessfully());
+        assertThat(awaitPrimaryReplicaFuture, willSucceedIn(60, SECONDS));
     }
 
     private void assertRealAssignments(IgniteImpl node0, int partId, 
Integer... expected) throws InterruptedException {
@@ -330,7 +331,7 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
             CompletableFuture<Void> insertFuture = keyValueView.putAsync(null, 
key, Tuple.create(of("val", i + offset)));
 
             try {
-                insertFuture.get(1000, MILLISECONDS);
+                insertFuture.get(10, SECONDS);
 
                 Tuple value = keyValueView.get(null, key);
                 assertNotNull(value);
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 39453a5142..9306da8ddd 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -48,6 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.notNull;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -155,7 +156,6 @@ import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.RaftNodeId;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
-import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
@@ -188,6 +188,7 @@ import 
org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
+import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -820,9 +821,9 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
     private void verifyThatRaftNodesAndReplicasWereStartedOnlyOnce() throws 
Exception {
         for (int i = 0; i < NODE_COUNT; i++) {
             verify(getNode(i).raftManager, 
timeout(AWAIT_TIMEOUT_MILLIS).times(1))
-                    .startRaftGroupNodeWithoutService(any(), any(), any(), 
any(), any(RaftGroupOptions.class));
+                    .startRaftGroupNode(any(), any(), any(), any(), any(), 
notNull(TopologyAwareRaftGroupServiceFactory.class));
             verify(getNode(i).replicaManager, 
timeout(AWAIT_TIMEOUT_MILLIS).times(1))
-                    .startReplica(any(), any(), any(), any());
+                    .startReplica(any(), any(), any());
         }
     }
 
@@ -1194,6 +1195,9 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     lowWatermark
             );
 
+            rebalanceScheduler = new 
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
+                    NamedThreadFactory.create(name, 
"test-rebalance-scheduler", logger()));
+
             replicaManager = spy(new ReplicaManager(
                     name,
                     clusterService,
@@ -1203,7 +1207,11 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     placementDriver,
                     threadPoolsManager.partitionOperationsExecutor(),
                     partitionIdleSafeTimePropagationPeriodMsSupplier,
-                    new NoOpFailureProcessor()
+                    new NoOpFailureProcessor(),
+                    new 
ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()),
+                    topologyAwareRaftGroupServiceFactory,
+                    raftManager,
+                    view -> new LocalLogStorageFactory()
             ));
 
             LongSupplier delayDurationMsSupplier = () -> 10L;
@@ -1219,9 +1227,6 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
             schemaSyncService = new 
SchemaSyncServiceImpl(metaStorageManager.clusterTime(), 
delayDurationMsSupplier);
 
-            rebalanceScheduler = new 
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
-                    NamedThreadFactory.create(name, 
"test-rebalance-scheduler", logger()));
-
             distributionZoneManager = new DistributionZoneManager(
                     name,
                     registry,
@@ -1244,7 +1249,6 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     clusterService.messagingService(),
                     clusterService.topologyService(),
                     clusterService.serializationRegistry(),
-                    raftManager,
                     replicaManager,
                     mock(LockManager.class),
                     replicaSvc,
@@ -1253,13 +1257,12 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     storagePath,
                     metaStorageManager,
                     schemaManager,
-                    view -> new LocalLogStorageFactory(),
                     threadPoolsManager.tableIoExecutor(),
                     threadPoolsManager.partitionOperationsExecutor(),
+                    rebalanceScheduler,
                     clock,
                     clockService,
                     new 
OutgoingSnapshotsManager(clusterService.messagingService()),
-                    topologyAwareRaftGroupServiceFactory,
                     distributionZoneManager,
                     schemaSyncService,
                     catalogManager,
@@ -1267,7 +1270,6 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     placementDriver,
                     () -> mock(IgniteSql.class),
                     resourcesRegistry,
-                    rebalanceScheduler,
                     lowWatermark,
                     transactionInflights
             ) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index a239a046f4..ce5246a8f6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -92,6 +92,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.IntSupplier;
 import java.util.function.LongFunction;
 import java.util.function.Supplier;
@@ -141,23 +142,19 @@ import 
org.apache.ignite.internal.network.MessagingService;
 import 
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
-import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
-import org.apache.ignite.internal.raft.RaftManager;
-import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
-import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
-import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator;
+import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
+import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.configuration.GcConfiguration;
@@ -171,7 +168,6 @@ import 
org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.LongPriorityQueue;
 import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.TableRaftService;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.gc.MvGc;
@@ -191,7 +187,6 @@ import 
org.apache.ignite.internal.table.distributed.schema.ExecutorInclinedSchem
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
-import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.distributed.storage.PartitionStorages;
 import 
org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl;
@@ -223,7 +218,6 @@ import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.util.IgniteNameUtils;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.TopologyService;
-import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.table.Table;
 import org.jetbrains.annotations.Nullable;
@@ -246,9 +240,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
     private final TopologyService topologyService;
 
-    /** Raft manager. */
-    private final RaftManager raftMgr;
-
     /** Replica manager. */
     private final ReplicaManager replicaMgr;
 
@@ -318,11 +309,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     /** Schema manager. */
     private final SchemaManager schemaManager;
 
-    private final LogStorageFactoryCreator volatileLogStorageFactoryCreator;
-
-    /** Executor for scheduling rebalance routine. */
-    private final ScheduledExecutorService rebalanceScheduler;
-
     /** Transaction state storage scheduled pool. */
     private final ScheduledExecutorService txStateStorageScheduledPool;
 
@@ -345,8 +331,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
     private final OutgoingSnapshotsManager outgoingSnapshotsManager;
 
-    private final TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory;
-
     private final DistributionZoneManager distributionZoneManager;
 
     private final SchemaSyncService executorInclinedSchemaSyncService;
@@ -369,8 +353,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
     private final LowWatermark lowWatermark;
 
-    private final Marshaller raftCommandsMarshaller;
-
     private final HybridTimestampTracker observableTimestampTracker;
 
     /** Placement driver. */
@@ -397,6 +379,9 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
      */
     private final Executor partitionOperationsExecutor;
 
+    /** Executor for scheduling rebalance routine. */
+    private final ScheduledExecutorService rebalanceScheduler;
+
     /** Marshallers provider. */
     private final ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
 
@@ -426,23 +411,19 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      * @param gcConfig Garbage collector configuration.
      * @param txCfg Transaction configuration.
      * @param storageUpdateConfig Storage update handler configuration.
-     * @param raftMgr Raft manager.
      * @param replicaMgr Replica manager.
      * @param lockMgr Lock manager.
      * @param replicaSvc Replica service.
      * @param txManager Transaction manager.
      * @param dataStorageMgr Data storage manager.
      * @param schemaManager Schema manager.
-     * @param volatileLogStorageFactoryCreator Creator for {@link 
org.apache.ignite.internal.raft.storage.LogStorageFactory} for
-     *         volatile tables.
      * @param ioExecutor Separate executor for IO operations like partition 
storage initialization or partition raft group meta data
      *     persisting.
      * @param partitionOperationsExecutor Striped executor on which partition 
operations (potentially requiring I/O with storages)
      *     will be executed.
-     * @param raftGroupServiceFactory Factory that is used for creation of 
raft group services for replication groups.
+     * @param rebalanceScheduler Executor for scheduling rebalance routine.
      * @param placementDriver Placement driver.
      * @param sql A supplier function that returns {@link IgniteSql}.
-     * @param rebalanceScheduler Executor for scheduling rebalance routine.
      * @param lowWatermark Low watermark.
      * @param transactionInflights Transaction inflights.
      */
@@ -455,7 +436,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
             MessagingService messagingService,
             TopologyService topologyService,
             MessageSerializationRegistry messageSerializationRegistry,
-            RaftManager raftMgr,
             ReplicaManager replicaMgr,
             LockManager lockMgr,
             ReplicaService replicaSvc,
@@ -464,13 +444,12 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             Path storagePath,
             MetaStorageManager metaStorageMgr,
             SchemaManager schemaManager,
-            LogStorageFactoryCreator volatileLogStorageFactoryCreator,
             ExecutorService ioExecutor,
             Executor partitionOperationsExecutor,
+            ScheduledExecutorService rebalanceScheduler,
             HybridClock clock,
             ClockService clockService,
             OutgoingSnapshotsManager outgoingSnapshotsManager,
-            TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory,
             DistributionZoneManager distributionZoneManager,
             SchemaSyncService schemaSyncService,
             CatalogService catalogService,
@@ -478,12 +457,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             PlacementDriver placementDriver,
             Supplier<IgniteSql> sql,
             RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry,
-            ScheduledExecutorService rebalanceScheduler,
             LowWatermark lowWatermark,
             TransactionInflights transactionInflights
     ) {
         this.topologyService = topologyService;
-        this.raftMgr = raftMgr;
         this.replicaMgr = replicaMgr;
         this.lockMgr = lockMgr;
         this.replicaSvc = replicaSvc;
@@ -491,20 +468,18 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         this.dataStorageMgr = dataStorageMgr;
         this.metaStorageMgr = metaStorageMgr;
         this.schemaManager = schemaManager;
-        this.volatileLogStorageFactoryCreator = 
volatileLogStorageFactoryCreator;
         this.ioExecutor = ioExecutor;
         this.partitionOperationsExecutor = partitionOperationsExecutor;
+        this.rebalanceScheduler = rebalanceScheduler;
         this.clock = clock;
         this.clockService = clockService;
         this.outgoingSnapshotsManager = outgoingSnapshotsManager;
-        this.raftGroupServiceFactory = raftGroupServiceFactory;
         this.distributionZoneManager = distributionZoneManager;
         this.catalogService = catalogService;
         this.observableTimestampTracker = observableTimestampTracker;
         this.sql = sql;
         this.storageUpdateConfig = storageUpdateConfig;
         this.remotelyTriggeredResourceRegistry = 
remotelyTriggeredResourceRegistry;
-        this.rebalanceScheduler = rebalanceScheduler;
         this.lowWatermark = lowWatermark;
         this.transactionInflights = transactionInflights;
         this.txCfg = txCfg;
@@ -565,8 +540,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
         mvGc = new MvGc(nodeName, gcConfig, lowWatermark);
 
-        raftCommandsMarshaller = new 
ThreadLocalPartitionCommandsMarshaller(messageSerializationRegistry);
-
         partitionReplicatorNodeRecovery = new PartitionReplicatorNodeRecovery(
                 metaStorageMgr,
                 messagingService,
@@ -581,7 +554,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                 storagePath.resolve(TX_STATE_DIR),
                 txStateStorageScheduledPool,
                 txStateStoragePool,
-                raftMgr.getLogSyncer(),
+                replicaMgr.getLogSyncer(),
                 TX_STATE_STORAGE_FLUSH_DELAY_SUPPLIER
         );
 
@@ -922,11 +895,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                 storageUpdateConfig
         );
 
-        Peer serverPeer = realConfiguration.peer(localNode().name());
-
-        var raftNodeId = localMemberAssignment == null ? null : new 
RaftNodeId(replicaGrpId, serverPeer);
-
-        boolean shouldStartRaftListeners = localMemberAssignment != null && 
!((Loza) raftMgr).isStarted(raftNodeId);
+        boolean shouldStartRaftListeners = 
shouldStartRaftListeners(assignments, nonStableNodeAssignments);
 
         if (shouldStartRaftListeners) {
             ((InternalTableImpl) internalTbl).updatePartitionTrackers(partId, 
safeTimeTracker, storageIndexTracker);
@@ -934,6 +903,24 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
             mvGc.addStorage(replicaGrpId, 
partitionUpdateHandlers.gcUpdateHandler);
         }
 
+        // TODO: will be removed in 
https://issues.apache.org/jira/browse/IGNITE-22315
+        Supplier<RaftGroupService> getCachedRaftClient = () -> {
+            try {
+                // Return existing service if it's already started.
+                return internalTbl
+                        .tableRaftService()
+                        .partitionRaftGroupService(replicaGrpId.partitionId());
+            } catch (IgniteInternalException e) {
+                // We use "IgniteInternalException" in accordance with the 
javadoc of "partitionRaftGroupService" method.
+                return null;
+            }
+        };
+
+        // TODO: will be removed in 
https://issues.apache.org/jira/browse/IGNITE-22218
+        Consumer<RaftGroupService> updateTableRaftService = (raftClient) -> 
((InternalTableImpl) internalTbl)
+                .tableRaftService()
+                .updateInternalTableRaftGroupService(partId, raftClient);
+
         CompletableFuture<Boolean> startGroupFut;
 
         if (localMemberAssignment != null) {
@@ -946,85 +933,88 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                     )
                     : trueCompletedFuture();
 
-            startGroupFut = shouldStartGroupFut.thenApplyAsync(startGroup -> 
inBusyLock(busyLock, () -> {
+            startGroupFut = shouldStartGroupFut.thenComposeAsync(startGroup -> 
inBusyLock(busyLock, () -> {
+                // (1) if partitionReplicatorNodeRecovery#shouldStartGroup 
fails -> do start nothing
                 if (!startGroup) {
-                    return false;
+                    return falseCompletedFuture();
                 }
 
-                if (((Loza) raftMgr).isStarted(raftNodeId)) {
+                // (2) if replica already started => check force reset and 
finish the process
+                if (replicaMgr.isReplicaStarted(replicaGrpId)) {
                     if (nonStableNodeAssignments != null && 
nonStableNodeAssignments.force()) {
-                        ((Loza) raftMgr).resetPeers(raftNodeId, 
configurationFromAssignments(nonStableNodeAssignments.nodes()));
+                        replicaMgr.resetPeers(replicaGrpId, 
configurationFromAssignments(nonStableNodeAssignments.nodes()));
                     }
-
-                    return true;
+                    return trueCompletedFuture();
                 }
 
+                // (3) Otherwise let's start replica manually
+                InternalTable internalTable = table.internalTable();
+
+                RaftGroupListener raftGroupListener = new PartitionListener(
+                        txManager,
+                        partitionDataStorage,
+                        partitionUpdateHandlers.storageUpdateHandler,
+                        partitionStorages.getTxStateStorage(),
+                        safeTimeTracker,
+                        storageIndexTracker,
+                        catalogService,
+                        table.schemaView(),
+                        clockService
+                );
+
+                SnapshotStorageFactory snapshotStorageFactory = 
createSnapshotStorageFactory(replicaGrpId,
+                        partitionUpdateHandlers, internalTable);
+
+                Function<RaftGroupService, ReplicaListener> createListener = 
(raftClient) -> createReplicaListener(
+                        replicaGrpId,
+                        table,
+                        safeTimeTracker,
+                        partitionStorages.getMvPartitionStorage(),
+                        partitionStorages.getTxStateStorage(),
+                        partitionUpdateHandlers,
+                        raftClient);
+
+                RaftGroupEventsListener raftGroupEventsListener = 
createRaftGroupEventsListener(zoneId, replicaGrpId);
+
+                MvTableStorage mvTableStorage = internalTable.storage();
+
                 try {
-                    startPartitionRaftGroupNode(
-                            replicaGrpId,
-                            raftNodeId,
-                            newConfiguration,
-                            safeTimeTracker,
+                    var ret = replicaMgr.startReplica(
+                            raftGroupEventsListener,
+                            raftGroupListener,
+                            mvTableStorage.isVolatile(),
+                            snapshotStorageFactory,
+                            updateTableRaftService,
+                            createListener,
                             storageIndexTracker,
-                            table,
-                            partitionStorages.getTxStateStorage(),
-                            partitionDataStorage,
-                            partitionUpdateHandlers,
-                            zoneId
-                    );
-
-                    return true;
-                } catch (NodeStoppingException ex) {
-                    throw new CompletionException(ex);
+                            replicaGrpId,
+                            newConfiguration);
+                    return ret;
+                } catch (NodeStoppingException e) {
+                    throw new AssertionError("Loza was stopped before Table 
manager", e);
                 }
             }), ioExecutor);
         } else {
+            // TODO: will be removed after 
https://issues.apache.org/jira/browse/IGNITE-22315
+            // (4) in case if node not in the assignments
             startGroupFut = falseCompletedFuture();
         }
 
         startGroupFut
-                .thenComposeAsync(v -> inBusyLock(busyLock, () -> {
-                    TableRaftService tableRaftService = 
table.internalTable().tableRaftService();
-
-                    try {
-                        // Return existing service if it's already started.
-                        return completedFuture(
-                                (TopologyAwareRaftGroupService) 
tableRaftService.partitionRaftGroupService(replicaGrpId.partitionId())
-                        );
-                    } catch (IgniteInternalException e) {
-                        // We use "IgniteInternalException" in accordance with 
the javadoc of "partitionRaftGroupService" method.
-                        try {
-                            // TODO IGNITE-19614 This procedure takes 10 
seconds if there's no majority online.
-                            return raftMgr
-                                    .startRaftGroupService(replicaGrpId, 
newConfiguration, raftGroupServiceFactory, raftCommandsMarshaller);
-                        } catch (NodeStoppingException ex) {
-                            return failedFuture(ex);
-                        }
-                    }
-                }), ioExecutor)
-                .thenAcceptAsync(updatedRaftGroupService -> 
inBusyLock(busyLock, () -> {
-                    ((InternalTableImpl) internalTbl).tableRaftService()
-                            .updateInternalTableRaftGroupService(partId, 
updatedRaftGroupService);
-
-                    boolean startedRaftNode = startGroupFut.join();
-                    if (localMemberAssignment == null || !startedRaftNode || 
replicaMgr.isReplicaStarted(replicaGrpId)) {
-                        return;
+                // TODO: the stage will be removed after 
https://issues.apache.org/jira/browse/IGNITE-22315
+                .thenComposeAsync(isReplicaStarted -> inBusyLock(busyLock, () 
-> {
+                    if (isReplicaStarted) {
+                        return nullCompletedFuture();
                     }
 
+                    CompletableFuture<TopologyAwareRaftGroupService> 
newRaftClientFut;
                     try {
-                        startReplicaWithNewListener(
-                                replicaGrpId,
-                                table,
-                                safeTimeTracker,
-                                storageIndexTracker,
-                                partitionStorages.getMvPartitionStorage(),
-                                partitionStorages.getTxStateStorage(),
-                                partitionUpdateHandlers,
-                                updatedRaftGroupService
-                        );
-                    } catch (NodeStoppingException ex) {
-                        throw new AssertionError("Loza was stopped before 
Table manager", ex);
+                        newRaftClientFut = replicaMgr.startRaftClient(
+                                replicaGrpId, newConfiguration, 
getCachedRaftClient);
+                    } catch (NodeStoppingException e) {
+                        throw new CompletionException(e);
                     }
+                    return newRaftClientFut.thenAccept(updateTableRaftService);
                 }), ioExecutor)
                 .whenComplete((res, ex) -> {
                     if (ex != null) {
@@ -1039,31 +1029,36 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return resultFuture;
     }
 
-    private void startReplicaWithNewListener(
-            TablePartitionId replicaGrpId,
-            TableImpl table,
-            PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTimeTracker,
-            PendingComparableValuesTracker<Long, Void> storageIndexTracker,
-            MvPartitionStorage mvPartitionStorage,
-            TxStateStorage txStatePartitionStorage,
-            PartitionUpdateHandlers partitionUpdateHandlers,
-            TopologyAwareRaftGroupService raftGroupService
-    ) throws NodeStoppingException {
-        PartitionReplicaListener listener = createReplicaListener(
-                replicaGrpId,
-                table,
-                safeTimeTracker,
-                mvPartitionStorage,
-                txStatePartitionStorage,
-                partitionUpdateHandlers,
-                raftGroupService
-        );
+    private boolean shouldStartRaftListeners(Assignments assignments, 
@Nullable Assignments nonStableNodeAssignments) {
+        Set<Assignment> nodesForStarting = nonStableNodeAssignments == null
+                ? assignments.nodes()
+                : RebalanceUtil.subtract(nonStableNodeAssignments.nodes(), 
assignments.nodes());
+        return nodesForStarting
+                .stream()
+                .anyMatch(assignment -> 
assignment.consistentId().equals(localNode().name()));
+    }
+
+    private PartitionMover createPartitionMover(TablePartitionId replicaGrpId) 
{
+        return new PartitionMover(busyLock, () -> {
+            CompletableFuture<Replica> replicaFut = 
replicaMgr.replica(replicaGrpId);
+            if (replicaFut == null) {
+                return failedFuture(new IgniteInternalException("No such 
replica for partition " + replicaGrpId.partitionId()
+                        + " in table " + replicaGrpId.tableId()));
+            }
+            return replicaFut.thenApply(Replica::raftClient);
+        });
+    }
+
+    private RaftGroupEventsListener createRaftGroupEventsListener(int zoneId, 
TablePartitionId replicaGrpId) {
+        PartitionMover partitionMover = createPartitionMover(replicaGrpId);
 
-        replicaMgr.startReplica(
+        return new RebalanceRaftGroupEventsListener(
+                metaStorageMgr,
                 replicaGrpId,
-                listener,
-                raftGroupService,
-                storageIndexTracker
+                busyLock,
+                partitionMover,
+                rebalanceScheduler,
+                zoneId
         );
     }
 
@@ -1122,46 +1117,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return new PartitionKey(internalTbl.tableId(), partId);
     }
 
-    private RaftGroupOptions groupOptionsForPartition(
-            MvTableStorage mvTableStorage,
-            TxStateTableStorage txStateTableStorage,
-            PartitionKey partitionKey,
-            PartitionUpdateHandlers partitionUpdateHandlers
-    ) {
-        RaftGroupOptions raftGroupOptions;
-
-        if (mvTableStorage.isVolatile()) {
-            raftGroupOptions = RaftGroupOptions.forVolatileStores()
-                    // TODO: use RaftManager interface, see 
https://issues.apache.org/jira/browse/IGNITE-18273
-                    
.setLogStorageFactory(volatileLogStorageFactoryCreator.factory(((Loza) 
raftMgr).volatileRaft().logStorage().value()))
-                    .raftMetaStorageFactory((groupId, raftOptions) -> new 
VolatileRaftMetaStorage());
-        } else {
-            raftGroupOptions = RaftGroupOptions.forPersistentStores();
-        }
-
-        raftGroupOptions.snapshotStorageFactory(new 
PartitionSnapshotStorageFactory(
-                topologyService,
-                outgoingSnapshotsManager,
-                new PartitionAccessImpl(
-                        partitionKey,
-                        mvTableStorage,
-                        txStateTableStorage,
-                        mvGc,
-                        partitionUpdateHandlers.indexUpdateHandler,
-                        partitionUpdateHandlers.gcUpdateHandler,
-                        fullStateTransferIndexChooser,
-                        schemaManager.schemaRegistry(partitionKey.tableId()),
-                        lowWatermark
-                ),
-                catalogService,
-                incomingSnapshotsExecutor
-        ));
-
-        raftGroupOptions.commandsMarshaller(raftCommandsMarshaller);
-
-        return raftGroupOptions;
-    }
-
     @Override
     public void beforeNodeStop() {
         if (!beforeStopGuard.compareAndSet(false, true)) {
@@ -1194,11 +1149,11 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                     mvGc,
                     fullStateTransferIndexChooser,
                     sharedTxStateStorage,
-                    () -> shutdownAndAwaitTermination(rebalanceScheduler, 
shutdownTimeoutSeconds, TimeUnit.SECONDS),
                     () -> shutdownAndAwaitTermination(txStateStoragePool, 
shutdownTimeoutSeconds, TimeUnit.SECONDS),
                     () -> 
shutdownAndAwaitTermination(txStateStorageScheduledPool, 
shutdownTimeoutSeconds, TimeUnit.SECONDS),
                     () -> shutdownAndAwaitTermination(scanRequestExecutor, 
shutdownTimeoutSeconds, TimeUnit.SECONDS),
                     () -> 
shutdownAndAwaitTermination(incomingSnapshotsExecutor, shutdownTimeoutSeconds, 
TimeUnit.SECONDS),
+                    () -> shutdownAndAwaitTermination(rebalanceScheduler, 
shutdownTimeoutSeconds, TimeUnit.SECONDS),
                     () -> shutdownAndAwaitTermination(streamerFlushExecutor, 
shutdownTimeoutSeconds, TimeUnit.SECONDS)
             );
         } catch (Exception e) {
@@ -1857,7 +1812,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             boolean isRecovery
     ) {
         ClusterNode localMember = localNode();
-        RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new 
Peer(localNode().name()));
 
         boolean pendingAssignmentsAreForced = pendingAssignments.force();
         Set<Assignment> pendingAssignmentsNodes = pendingAssignments.nodes();
@@ -1923,8 +1877,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                     }), ioExecutor);
         } else {
             localServicesStartFuture = runAsync(() -> {
-                if (pendingAssignmentsAreForced && ((Loza) 
raftMgr).isStarted(raftNodeId)) {
-                    ((Loza) raftMgr).resetPeers(raftNodeId, 
configurationFromAssignments(nonStableNodeAssignmentsFinal.nodes()));
+                if (pendingAssignmentsAreForced && 
replicaMgr.isReplicaStarted(replicaGrpId)) {
+                    replicaMgr.resetPeers(replicaGrpId, 
configurationFromAssignments(nonStableNodeAssignmentsFinal.nodes()));
                 }
             }, ioExecutor);
         }
@@ -2047,55 +2001,29 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 });
     }
 
-    private void startPartitionRaftGroupNode(
+    private SnapshotStorageFactory createSnapshotStorageFactory(
             TablePartitionId replicaGrpId,
-            RaftNodeId raftNodeId,
-            PeersAndLearners stableConfiguration,
-            PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTimeTracker,
-            PendingComparableValuesTracker<Long, Void> storageIndexTracker,
-            TableImpl table,
-            TxStateStorage txStatePartitionStorage,
-            PartitionDataStorage partitionDataStorage,
             PartitionUpdateHandlers partitionUpdateHandlers,
-            int zoneId
-    ) throws NodeStoppingException {
-        InternalTable internalTable = table.internalTable();
-
-        RaftGroupOptions groupOptions = groupOptionsForPartition(
-                internalTable.storage(),
-                internalTable.txStateStorage(),
-                partitionKey(internalTable, replicaGrpId.partitionId()),
-                partitionUpdateHandlers
-        );
+            InternalTable internalTable
+    ) {
+        PartitionKey partitionKey = partitionKey(internalTable, 
replicaGrpId.partitionId());
 
-        RaftGroupListener raftGrpLsnr = new PartitionListener(
-                txManager,
-                partitionDataStorage,
-                partitionUpdateHandlers.storageUpdateHandler,
-                txStatePartitionStorage,
-                safeTimeTracker,
-                storageIndexTracker,
+        return new PartitionSnapshotStorageFactory(
+                topologyService,
+                outgoingSnapshotsManager,
+                new PartitionAccessImpl(
+                        partitionKey,
+                        internalTable.storage(),
+                        internalTable.txStateStorage(),
+                        mvGc,
+                        partitionUpdateHandlers.indexUpdateHandler,
+                        partitionUpdateHandlers.gcUpdateHandler,
+                        fullStateTransferIndexChooser,
+                        schemaManager.schemaRegistry(partitionKey.tableId()),
+                        lowWatermark
+                ),
                 catalogService,
-                table.schemaView(),
-                clockService
-        );
-
-        RaftGroupEventsListener raftGrpEvtsLsnr = new 
RebalanceRaftGroupEventsListener(
-                metaStorageMgr,
-                replicaGrpId,
-                busyLock,
-                createPartitionMover(internalTable, 
replicaGrpId.partitionId()),
-                rebalanceScheduler,
-                zoneId
-        );
-
-        // TODO: use RaftManager interface, see 
https://issues.apache.org/jira/browse/IGNITE-18273
-        ((Loza) raftMgr).startRaftGroupNodeWithoutService(
-                raftNodeId,
-                stableConfiguration,
-                raftGrpLsnr,
-                raftGrpEvtsLsnr,
-                groupOptions
+                incomingSnapshotsExecutor
         );
     }
 
@@ -2169,10 +2097,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         };
     }
 
-    private PartitionMover createPartitionMover(InternalTable internalTable, 
int partId) {
-        return new PartitionMover(busyLock, () -> 
internalTable.tableRaftService().partitionRaftGroupService(partId));
-    }
-
     private static PeersAndLearners 
configurationFromAssignments(Collection<Assignment> assignments) {
         var peers = new HashSet<String>();
         var learners = new HashSet<String>();
@@ -2382,15 +2306,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         }
 
         return stopReplicaFuture
-                .thenCompose(v -> {
-                    try {
-                        raftMgr.stopRaftNodes(tablePartitionId);
-                    } catch (NodeStoppingException ignored) {
-                        // No-op.
-                    }
-
-                    return mvGc.removeStorage(tablePartitionId);
-                });
+                .thenCompose(v -> mvGc.removeStorage(tablePartitionId));
     }
 
     private CompletableFuture<Void> destroyPartitionStorages(TablePartitionId 
tablePartitionId, TableImpl table) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index b38ddbc09c..406954e831 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -91,6 +91,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
 import org.apache.ignite.internal.raft.service.RaftCommandRunner;
 import org.apache.ignite.internal.replicator.ReplicaResult;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -452,6 +453,15 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 });
     }
 
+    /** Returns Raft-client. */
+    @Override
+    public RaftCommandRunner raftClient() {
+        if (raftClient instanceof ExecutorInclinedRaftCommandRunner) {
+            return ((ExecutorInclinedRaftCommandRunner) 
raftClient).decoratedCommandRunner();
+        }
+        return raftClient;
+    }
+
     private CompletableFuture<?> processRequest(ReplicaRequest request, 
@Nullable Boolean isPrimary, String senderId,
             @Nullable Long leaseStartTime) {
         if (request instanceof SchemaVersionAwareReplicaRequest) {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java
index 261eb1461a..ce75a0e26a 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
@@ -66,7 +67,7 @@ class PartitionMoverTest extends BaseIgniteAbstractTest {
                 .thenReturn(failedFuture(new IOException()))
                 .thenReturn(nullCompletedFuture());
 
-        var partitionMover = new PartitionMover(new IgniteSpinBusyLock(), () 
-> raftService);
+        var partitionMover = new PartitionMover(new IgniteSpinBusyLock(), () 
-> completedFuture(raftService));
 
         assertThat(partitionMover.movePartition(PEERS_AND_LEARNERS, TERM), 
willCompleteSuccessfully());
 
@@ -77,7 +78,9 @@ class PartitionMoverTest extends BaseIgniteAbstractTest {
     public void testComponentStop() {
         var lock = new IgniteSpinBusyLock();
 
-        var partitionMover = new PartitionMover(lock, () -> 
mock(RaftGroupService.class));
+        RaftGroupService raftService = mock(RaftGroupService.class);
+
+        var partitionMover = new PartitionMover(lock, () -> 
completedFuture(raftService));
 
         lock.block();
 
@@ -93,7 +96,7 @@ class PartitionMoverTest extends BaseIgniteAbstractTest {
         when(raftService.changePeersAsync(any(), anyLong()))
                 .then(invocation -> CompletableFuture.runAsync(lock::block));
 
-        var partitionMover = new PartitionMover(lock, () -> raftService);
+        var partitionMover = new PartitionMover(lock, () -> 
completedFuture(raftService));
 
         assertThat(partitionMover.movePartition(PEERS_AND_LEARNERS, TERM), 
willThrowWithCauseOrSuppressed(NodeStoppingException.class));
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index d8233857e3..0a857939f1 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -26,6 +26,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptySetCompletedFuture;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
@@ -87,9 +88,7 @@ import 
org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
-import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaManager;
@@ -277,6 +276,12 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
         when(topologyService.localMember()).thenReturn(node);
         when(distributionZoneManager.dataNodes(anyLong(), anyInt(), 
anyInt())).thenReturn(emptySetCompletedFuture());
 
+        when(replicaMgr.getLogSyncer()).thenReturn(mock(LogSyncer.class));
+        when(replicaMgr.startReplica(any(), any(), any(), any(), any(), any()))
+                .thenReturn(nullCompletedFuture());
+        // TODO: will be removed after 
https://issues.apache.org/jira/browse/IGNITE-22315
+        when(replicaMgr.startRaftClient(any(), any(), any()))
+                
.thenReturn(completedFuture(mock(TopologyAwareRaftGroupService.class)));
         when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture());
 
         try (MockedStatic<SchemaUtils> schemaServiceMock = 
mockStatic(SchemaUtils.class)) {
@@ -314,7 +319,6 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 clusterService.messagingService(),
                 clusterService.topologyService(),
                 clusterService.serializationRegistry(),
-                rm,
                 replicaMgr,
                 null,
                 null,
@@ -323,13 +327,12 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 workDir,
                 metaStorageManager,
                 sm = new SchemaManager(revisionUpdater, catalogManager),
-                budgetView -> new LocalLogStorageFactory(),
                 partitionOperationsExecutor,
                 partitionOperationsExecutor,
+                mock(ScheduledExecutorService.class),
                 clock,
                 clockService,
                 new 
OutgoingSnapshotsManager(clusterService.messagingService()),
-                mock(TopologyAwareRaftGroupServiceFactory.class),
                 distributionZoneManager,
                 new AlwaysSyncedSchemaSyncService(),
                 catalogManager,
@@ -337,7 +340,6 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 placementDriver,
                 () -> mock(IgniteSql.class),
                 new RemotelyTriggeredResourceRegistry(),
-                mock(ScheduledExecutorService.class),
                 lowWatermark,
                 new TransactionInflights(placementDriver, clockService)
         ) {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index dcfad44ebc..9c80784c5f 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -43,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.atMost;
@@ -102,11 +103,7 @@ import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
-import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaManager;
@@ -281,6 +278,11 @@ public class TableManagerTest extends IgniteAbstractTest {
 
         when(distributionZoneManager.dataNodes(anyLong(), anyInt(), 
anyInt())).thenReturn(emptySetCompletedFuture());
 
+        when(replicaMgr.startReplica(any(), any(), anyBoolean(), any(), any(), 
any(), any(), any(), any()))
+                .thenReturn(trueCompletedFuture());
+        // TODO: will be removed after 
https://issues.apache.org/jira/browse/IGNITE-22315
+        when(replicaMgr.startRaftClient(any(), any(), any()))
+                
.thenReturn(completedFuture(mock(TopologyAwareRaftGroupService.class)));
         when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture());
 
         tblManagerFut = new CompletableFuture<>();
@@ -556,8 +558,6 @@ public class TableManagerTest extends IgniteAbstractTest {
     private IgniteBiTuple<TableViewInternal, TableManager> 
startTableManagerStopTest() throws Exception {
         TableViewInternal table = 
mockManagersAndCreateTable(DYNAMIC_TABLE_FOR_DROP_NAME, tblManagerFut);
 
-        verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any(), 
any(), any());
-
         TableManager tableManager = tblManagerFut.join();
 
         return new IgniteBiTuple<>(table, tableManager);
@@ -569,7 +569,6 @@ public class TableManagerTest extends IgniteAbstractTest {
         tableManager.beforeNodeStop();
         assertThat(tableManager.stopAsync(new ComponentContext()), 
willCompleteSuccessfully());
 
-        verify(rm, times(PARTITIONS)).stopRaftNodes(any());
         verify(replicaMgr, times(PARTITIONS)).stopReplica(any());
 
         verify(table.internalTable().storage()).close();
@@ -716,15 +715,6 @@ public class TableManagerTest extends IgniteAbstractTest {
     ) throws Exception {
         String consistentId = "node0";
 
-        when(rm.startRaftGroupService(any(), any(), any(), 
any())).thenAnswer(mock -> {
-            RaftGroupService raftGrpSrvcMock = 
mock(TopologyAwareRaftGroupService.class);
-
-            when(raftGrpSrvcMock.leader()).thenReturn(new Peer(consistentId));
-
-            return completedFuture(raftGrpSrvcMock);
-        });
-
-        // TODO: useless code 
https://issues.apache.org/jira/browse/IGNITE-22388
         when(ts.getByConsistentId(any())).thenReturn(new ClusterNodeImpl(
                 UUID.randomUUID().toString(),
                 consistentId,
@@ -795,7 +785,6 @@ public class TableManagerTest extends IgniteAbstractTest {
                 clusterService.messagingService(),
                 clusterService.topologyService(),
                 clusterService.serializationRegistry(),
-                rm,
                 replicaMgr,
                 null,
                 null,
@@ -804,13 +793,12 @@ public class TableManagerTest extends IgniteAbstractTest {
                 workDir,
                 msm,
                 sm = new SchemaManager(revisionUpdater, catalogManager),
-                budgetView -> new LocalLogStorageFactory(),
                 partitionOperationsExecutor,
                 partitionOperationsExecutor,
+                mock(ScheduledExecutorService.class),
                 clock,
                 new TestClockService(clock),
                 new 
OutgoingSnapshotsManager(clusterService.messagingService()),
-                mock(TopologyAwareRaftGroupServiceFactory.class),
                 distributionZoneManager,
                 new AlwaysSyncedSchemaSyncService(),
                 catalogManager,
@@ -818,7 +806,6 @@ public class TableManagerTest extends IgniteAbstractTest {
                 new TestPlacementDriver(node),
                 () -> mock(IgniteSql.class),
                 new RemotelyTriggeredResourceRegistry(),
-                mock(ScheduledExecutorService.class),
                 lowWatermark,
                 mock(TransactionInflights.class)
         ) {
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 84030d69bf..d242bf8be0 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -100,6 +100,7 @@ import org.apache.ignite.internal.raft.TestLozaFactory;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import 
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -384,8 +385,10 @@ public class ItTxTestCluster {
             HybridClock clock = new HybridClockImpl();
             TestClockService clockService = new TestClockService(clock);
 
-            clocks.put(node.name(), clock);
-            clockServices.put(node.name(), clockService);
+            String nodeName = node.name();
+
+            clocks.put(nodeName, clock);
+            clockServices.put(nodeName, clockService);
 
             var raftSrv = TestLozaFactory.create(
                     clusterService,
@@ -396,27 +399,41 @@ public class ItTxTestCluster {
 
             assertThat(raftSrv.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
 
-            raftServers.put(node.name(), raftSrv);
+            raftServers.put(nodeName, raftSrv);
 
             var cmgManager = mock(ClusterManagementGroupManager.class);
 
             // This test is run without Meta storage.
             
when(cmgManager.metaStorageNodes()).thenReturn(emptySetCompletedFuture());
 
+            var commandMarshaller = new 
ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry());
+
+            var raftClientFactory = new TopologyAwareRaftGroupServiceFactory(
+                    clusterService,
+                    logicalTopologyService(clusterService),
+                    Loza.FACTORY,
+                    new RaftGroupEventsClientListener()
+            );
+
             ReplicaManager replicaMgr = new ReplicaManager(
-                    node.name(),
+                    nodeName,
                     clusterService,
                     cmgManager,
                     clockService,
                     Set.of(TableMessageGroup.class, TxMessageGroup.class),
                     placementDriver,
                     partitionOperationsExecutor,
-                    new NoOpFailureProcessor()
+                    () -> 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
+                    new NoOpFailureProcessor(),
+                    commandMarshaller,
+                    raftClientFactory,
+                    raftSrv,
+                    new VolatileLogStorageFactoryCreator(nodeName, 
workDir.resolve("volatile-log-spillout"))
             );
 
             assertThat(replicaMgr.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
 
-            replicaManagers.put(node.name(), replicaMgr);
+            replicaManagers.put(nodeName, replicaMgr);
 
             LOG.info("Replica manager has been started, node=[" + node + ']');
 
@@ -428,15 +445,15 @@ public class ItTxTestCluster {
                     executor
             ));
 
-            replicaServices.put(node.name(), replicaSvc);
+            replicaServices.put(nodeName, replicaSvc);
 
             var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
 
             TransactionInflights transactionInflights = new 
TransactionInflights(placementDriver, clockService);
 
-            txInflights.put(node.name(), transactionInflights);
+            txInflights.put(nodeName, transactionInflights);
 
-            cursorRegistries.put(node.name(), resourcesRegistry);
+            cursorRegistries.put(nodeName, resourcesRegistry);
 
             TxManagerImpl txMgr = newTxManager(
                     clusterService,
@@ -451,7 +468,7 @@ public class ItTxTestCluster {
             );
 
             ResourceVacuumManager resourceVacuumManager = new 
ResourceVacuumManager(
-                    node.name(),
+                    nodeName,
                     resourcesRegistry,
                     clusterService.topologyService(),
                     clusterService.messagingService(),
@@ -460,12 +477,12 @@ public class ItTxTestCluster {
             );
 
             assertThat(txMgr.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
-            txManagers.put(node.name(), txMgr);
+            txManagers.put(nodeName, txMgr);
 
             assertThat(resourceVacuumManager.startAsync(new 
ComponentContext()), willCompleteSuccessfully());
-            resourceCleanupManagers.put(node.name(), resourceVacuumManager);
+            resourceCleanupManagers.put(nodeName, resourceVacuumManager);
 
-            txStateStorages.put(node.name(), new TestTxStateStorage());
+            txStateStorages.put(nodeName, new TestTxStateStorage());
         }
 
         LOG.info("Raft servers have been started");
@@ -689,9 +706,8 @@ public class ItTxTestCluster {
 
                                 replicaManagers.get(assignment).startReplica(
                                         new TablePartitionId(tableId, partId),
-                                        listener,
-                                        raftSvc,
-                                        storageIndexTracker
+                                        storageIndexTracker,
+                                        completedFuture(listener)
                                 );
                             } catch (NodeStoppingException e) {
                                 fail("Unexpected node stopping", e);

Reply via email to