This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c8a03bc7f5 IGNITE-22410 Implement rebalance triggers for zone based 
partitions (#3943)
c8a03bc7f5 is described below

commit c8a03bc7f5d5542bfb2cc3e2140a926f2bc803e6
Author: Mirza Aliev <[email protected]>
AuthorDate: Wed Jul 3 11:23:18 2024 +0400

    IGNITE-22410 Implement rebalance triggers for zone based partitions (#3943)
---
 .../rebalance/DistributionZoneRebalanceEngine.java |  13 +-
 .../DistributionZoneRebalanceEngineV2.java         | 213 ++++++++
 .../RebalanceRaftGroupEventsListener.java          |   2 +
 .../distributionzones/rebalance/RebalanceUtil.java |  64 +--
 ...a => ZoneRebalanceRaftGroupEventsListener.java} | 337 ++++++-------
 .../{RebalanceUtil.java => ZoneRebalanceUtil.java} | 383 ++++++---------
 .../replicator/ItReplicaLifecycleTest.java         | 125 ++++-
 .../PartitionReplicaLifecycleManager.java          | 542 ++++++++++++++++++++-
 .../ignite/internal/replicator/ReplicaManager.java |  63 ++-
 .../replicator/ZonePartitionReplicaImpl.java       |  15 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 11 files changed, 1230 insertions(+), 530 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index 39448ae24f..f8309f7fa0 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -99,6 +99,11 @@ public class DistributionZoneRebalanceEngine {
     /** Executor for scheduling rebalances. */
     private final ScheduledExecutorService rebalanceScheduler;
 
+    /** Zone rebalance manager. */
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 this class 
will replace DistributionZoneRebalanceEngine
+    // TODO: after switching to zone-based replication
+    private final DistributionZoneRebalanceEngineV2 
distributionZoneRebalanceEngineV2;
+
     /**
      * Constructor.
      *
@@ -122,6 +127,12 @@ public class DistributionZoneRebalanceEngine {
         this.dataNodesListener = createDistributionZonesDataNodesListener();
         this.partitionsCounterListener = createPartitionsCounterListener();
         this.rebalanceScheduler = rebalanceScheduler;
+        this.distributionZoneRebalanceEngineV2 = new 
DistributionZoneRebalanceEngineV2(
+                busyLock,
+                metaStorageManager,
+                distributionZoneManager,
+                catalogService
+        );
     }
 
     /**
@@ -148,7 +159,7 @@ public class DistributionZoneRebalanceEngine {
 
             long recoveryRevision = recoveryFinishFuture.join();
 
-            return rebalanceTriggersRecovery(recoveryRevision);
+            return rebalanceTriggersRecovery(recoveryRevision).thenCompose(v 
-> distributionZoneRebalanceEngineV2.start());
         });
     }
 
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
new file mode 100644
index 0000000000..a1b684de29
--- /dev/null
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones.rebalance;
+
+import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.parseDataNodes;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.extractZoneIdDataNodes;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.triggerZonePartitionsRebalance;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.events.AlterZoneEventParameters;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.distributionzones.Node;
+import 
org.apache.ignite.internal.distributionzones.utils.CatalogAlterZoneEventListener;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Zone rebalance manager. It listens to the changes in the distribution zones 
data nodes and replicas and triggers rebalance
+ * for the corresponding partitions. By triggering rebalance, it updates the 
pending assignments in the metastore.
+ * // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 this class will 
replace DistributionZoneRebalanceEngine
+ * // TODO: after switching to zone-based replication
+ */
+public class DistributionZoneRebalanceEngineV2 {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(DistributionZoneRebalanceEngineV2.class);
+
+    /** Prevents double stopping of the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /** External busy lock. */
+    private final IgniteSpinBusyLock busyLock;
+
+    /** Meta Storage manager. */
+    private final MetaStorageManager metaStorageManager;
+
+    /** Distribution zones manager. */
+    private final DistributionZoneManager distributionZoneManager;
+
+    /** Meta storage listener for data nodes changes. */
+    private final WatchListener dataNodesListener;
+
+    /** Catalog service. */
+    private final CatalogService catalogService;
+
+    /**
+     * Constructor.
+     *
+     * @param busyLock External busy lock.
+     * @param metaStorageManager Meta Storage manager.
+     * @param distributionZoneManager Distribution zones manager.
+     * @param catalogService Catalog service.
+     */
+    public DistributionZoneRebalanceEngineV2(
+            IgniteSpinBusyLock busyLock,
+            MetaStorageManager metaStorageManager,
+            DistributionZoneManager distributionZoneManager,
+            CatalogManager catalogService
+    ) {
+        this.busyLock = busyLock;
+        this.metaStorageManager = metaStorageManager;
+        this.distributionZoneManager = distributionZoneManager;
+        this.catalogService = catalogService;
+
+        this.dataNodesListener = createDistributionZonesDataNodesListener();
+    }
+
+    /**
+     * Starts the rebalance engine by registering corresponding meta storage 
and catalog listeners.
+     */
+    public CompletableFuture<Void> start() {
+        return IgniteUtils.inBusyLockAsync(busyLock, () -> {
+            catalogService.listen(ZONE_ALTER, new 
CatalogAlterZoneEventListener(catalogService) {
+                @Override
+                protected CompletableFuture<Void> 
onReplicasUpdate(AlterZoneEventParameters parameters, int oldReplicas) {
+                    return onUpdateReplicas(parameters);
+                }
+            });
+
+            // TODO: IGNITE-18694 - Recovery for the case when zones watch 
listener processed event but assignments were not updated.
+            metaStorageManager.registerPrefixWatch(zoneDataNodesKey(), 
dataNodesListener);
+
+            return nullCompletedFuture();
+        });
+    }
+
+    /**
+     * Stops the rebalance engine by unregistering meta storage watches.
+     */
+    public void stop() {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        metaStorageManager.unregisterWatch(dataNodesListener);
+    }
+
+    private WatchListener createDistributionZonesDataNodesListener() {
+        return new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent evt) {
+                return IgniteUtils.inBusyLockAsync(busyLock, () -> {
+                    Set<Node> dataNodes = 
parseDataNodes(evt.entryEvent().newEntry().value());
+
+                    if (dataNodes == null) {
+                        // The zone was removed so data nodes were removed too.
+                        return nullCompletedFuture();
+                    }
+
+                    int zoneId = 
extractZoneIdDataNodes(evt.entryEvent().newEntry().key());
+
+                    // It is safe to get the latest version of the catalog as 
we are in the metastore thread.
+                    int catalogVersion = catalogService.latestCatalogVersion();
+
+                    CatalogZoneDescriptor zoneDescriptor = 
catalogService.zone(zoneId, catalogVersion);
+
+                    if (zoneDescriptor == null) {
+                        // Zone has been removed.
+                        return nullCompletedFuture();
+                    }
+
+                    Set<String> filteredDataNodes = filterDataNodes(
+                            dataNodes,
+                            zoneDescriptor,
+                            distributionZoneManager.nodesAttributes()
+                    );
+
+                    if (filteredDataNodes.isEmpty()) {
+                        return nullCompletedFuture();
+                    }
+
+                    return triggerZonePartitionsRebalance(
+                            zoneDescriptor,
+                            filteredDataNodes,
+                            evt.entryEvent().newEntry().revision(),
+                            metaStorageManager
+                    );
+                });
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                LOG.warn("Unable to process data nodes event", e);
+            }
+        };
+    }
+
+    private CompletableFuture<Void> onUpdateReplicas(AlterZoneEventParameters 
parameters) {
+        return recalculateAssignmentsAndTriggerZonePartitionsRebalance(
+                parameters.zoneDescriptor(),
+                parameters.causalityToken(),
+                parameters.catalogVersion()
+        );
+    }
+
+    /**
+     * Calculate data nodes from the distribution zone for {@code 
zoneDescriptor} and {@code causalityToken} and trigger zones partitions
+     * rebalance. For more details see
+     * {@link 
ZoneRebalanceUtil#triggerZonePartitionsRebalance(CatalogZoneDescriptor, Set, 
long, MetaStorageManager)}
+     *
+     * @param zoneDescriptor Zone descriptor.
+     * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version.
+     * @return The future, which completes when the all metastore updates done.
+     */
+    private CompletableFuture<Void> 
recalculateAssignmentsAndTriggerZonePartitionsRebalance(
+            CatalogZoneDescriptor zoneDescriptor,
+            long causalityToken,
+            int catalogVersion
+    ) {
+        return distributionZoneManager.dataNodes(causalityToken, 
catalogVersion, zoneDescriptor.id())
+                .thenCompose(dataNodes -> 
IgniteUtils.inBusyLockAsync(busyLock, () -> {
+                    if (dataNodes.isEmpty()) {
+                        return nullCompletedFuture();
+                    }
+
+                    return triggerZonePartitionsRebalance(
+                            zoneDescriptor,
+                            dataNodes,
+                            causalityToken,
+                            metaStorageManager
+                    );
+                }));
+    }
+}
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
index ee1610ee3a..39373ef464 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
@@ -74,6 +74,8 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 /**
  * Listener for the raft group events, which must provide correct error 
handling of rebalance process
  * and start new rebalance after the current one finished.
+ * TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this class 
and use {@link ZoneRebalanceRaftGroupEventsListener} instead
+ *  after switching to zone-based replication.
  */
 public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener {
     /** Ignite logger. */
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index bc40b6caa3..3d77134d01 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -59,11 +59,12 @@ import 
org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Util class for methods needed for the rebalance process.
+ * TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this class 
and use {@link ZoneRebalanceUtil} instead
+ *  after switching to zone-based replication.
  */
 public class RebalanceUtil {
 
@@ -314,9 +315,6 @@ public class RebalanceUtil {
     /** Key prefix for pending assignments. */
     public static final String PENDING_ASSIGNMENTS_PREFIX = 
"assignments.pending.";
 
-    /** Key prefix for stable assignments. */
-    public static final String ZONE_STABLE_ASSIGNMENTS_PREFIX = 
"zone.assignments.stable.";
-
     /** Key prefix for stable assignments. */
     public static final String STABLE_ASSIGNMENTS_PREFIX = 
"assignments.stable.";
 
@@ -387,17 +385,6 @@ public class RebalanceUtil {
         return new ByteArray(STABLE_ASSIGNMENTS_PREFIX + partId);
     }
 
-    /**
-     * Key that is needed for the rebalance algorithm.
-     *
-     * @param partId Unique identifier of a partition.
-     * @return Key for a partition.
-     * @see <a 
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md";>Rebalance
 documentation</a>
-     */
-    public static ByteArray stablePartAssignmentsKey(ZonePartitionId partId) {
-        return new ByteArray(ZONE_STABLE_ASSIGNMENTS_PREFIX + partId);
-    }
-
     /**
      * Key that is needed for the rebalance algorithm.
      *
@@ -587,27 +574,6 @@ public class RebalanceUtil {
         return (entry == null || entry.empty() || entry.tombstone()) ? null : 
Assignments.fromBytes(entry.value()).nodes();
     }
 
-    /**
-     * Returns partition assignments from meta storage locally.
-     *
-     * @param metaStorageManager Meta storage manager.
-     * @param zoneId Zone id.
-     * @param partitionNumber Partition number.
-     * @param revision Revision.
-     * @return Returns partition assignments from meta storage locally or 
{@code null} if assignments is absent.
-     */
-    @Nullable
-    public static Set<Assignment> zonePartitionAssignmentsGetLocally(
-            MetaStorageManager metaStorageManager,
-            int zoneId,
-            int partitionNumber,
-            long revision
-    ) {
-        Entry entry = 
metaStorageManager.getLocally(stablePartAssignmentsKey(new 
ZonePartitionId(zoneId, partitionNumber)), revision);
-
-        return (entry == null || entry.empty() || entry.tombstone()) ? null : 
Assignments.fromBytes(entry.value()).nodes();
-    }
-
     /**
      * Returns table assignments for table partitions from meta storage.
      *
@@ -684,30 +650,4 @@ public class RebalanceUtil {
                 })
                 .collect(toList());
     }
-
-    /**
-     * Returns zone assignments for all zone partitions from meta storage 
locally. Assignments must be present.
-     *
-     * @param metaStorageManager Meta storage manager.
-     * @param zoneId Zone id.
-     * @param numberOfPartitions Number of partitions.
-     * @param revision Revision.
-     * @return Future with zone assignments as a value.
-     */
-    public static List<Assignments> zoneAssignmentsGetLocally(
-            MetaStorageManager metaStorageManager,
-            int zoneId,
-            int numberOfPartitions,
-            long revision
-    ) {
-        return IntStream.range(0, numberOfPartitions)
-                .mapToObj(p -> {
-                    Entry e = 
metaStorageManager.getLocally(stablePartAssignmentsKey(new 
ZonePartitionId(zoneId, p)), revision);
-
-                    assert e != null && !e.empty() && !e.tombstone() : e;
-
-                    return Assignments.fromBytes(e.value());
-                })
-                .collect(toList());
-    }
 }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
similarity index 70%
copy from 
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
copy to 
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
index ee1610ee3a..951101a3c7 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
@@ -17,16 +17,12 @@
 
 package org.apache.ignite.internal.distributionzones.rebalance;
 
-import static 
org.apache.ignite.internal.distributionzones.rebalance.DistributionZoneRebalanceEngine.calculateAssignments;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.raftConfigurationAppliedKey;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stableChangeTriggerKey;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.switchAppendKey;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.switchReduceKey;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tablesCounterKey;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.union;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.plannedPartAssignmentsKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.switchAppendKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.switchReduceKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.union;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.and;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
@@ -36,48 +32,52 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
 import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
-import static 
org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
-import static 
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static org.apache.ignite.internal.util.CollectionUtils.difference;
 import static org.apache.ignite.internal.util.CollectionUtils.intersect;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.affinity.Assignments;
 import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
+import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
 import org.apache.ignite.internal.metastorage.dsl.Update;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftError;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
 import org.apache.ignite.internal.raft.Status;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 
 /**
- * Listener for the raft group events, which must provide correct error 
handling of rebalance process
- * and start new rebalance after the current one finished.
+ * Listener for the raft group events, which must provide correct error 
handling of rebalance process and start new rebalance after the
+ * current one finished.
  */
-public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener {
+public class ZoneRebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener {
     /** Ignite logger. */
-    private static final IgniteLogger LOG = 
Loggers.forClass(RebalanceRaftGroupEventsListener.class);
+    private static final IgniteLogger LOG = 
Loggers.forClass(ZoneRebalanceRaftGroupEventsListener.class);
 
     /** Number of retrying of the current rebalance in case of errors. */
     private static final int REBALANCE_RETRY_THRESHOLD = 10;
@@ -97,12 +97,6 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
     /** Success code for the MetaStorage stable assignments change. */
     private static final int FINISH_REBALANCE_SUCCESS = 4;
 
-    /** Success code for the MetaStorage tables counter decrement. */
-    private static final int TABLES_COUNTER_DECREMENT_SUCCESS = 5;
-
-    /** Status for outdated MetaStorage update. */
-    private static final int OUTDATED_INVOKE_STATUS = 6;
-
     /** Failure code for the MetaStorage switch append assignments change. */
     private static final int SWITCH_APPEND_FAIL = -SWITCH_APPEND_SUCCESS;
 
@@ -115,14 +109,15 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
     /** Failure code for the MetaStorage stable assignments change. */
     private static final int FINISH_REBALANCE_FAIL = -FINISH_REBALANCE_SUCCESS;
 
-    /** Failure code for the MetaStorage tables counter decrement. */
-    private static final int PART_COUNTER_DECREMENT_FAIL = 
-TABLES_COUNTER_DECREMENT_SUCCESS;
-
     /** Meta storage manager. */
     private final MetaStorageManager metaStorageMgr;
 
+    private final CatalogService catalogService;
+
+    private final DistributionZoneManager distributionZoneManager;
+
     /** Unique table partition id. */
-    private final TablePartitionId tablePartitionId;
+    private final ZonePartitionId zonePartitionId;
 
     /** Busy lock of parent component for synchronous stop. */
     private final IgniteSpinBusyLock busyLock;
@@ -130,38 +125,37 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
     /** Executor for scheduling rebalance retries. */
     private final ScheduledExecutorService rebalanceScheduler;
 
-    /** Zone id. */
-    private final int zoneId;
-
     /** Performs reconfiguration of a Raft group of a partition. */
     private final PartitionMover partitionMover;
 
     /** Attempts to retry the current rebalance in case of errors. */
-    private final AtomicInteger rebalanceAttempts =  new AtomicInteger(0);
+    private final AtomicInteger rebalanceAttempts = new AtomicInteger(0);
 
     /**
      * Constructs new listener.
      *
      * @param metaStorageMgr Meta storage manager.
-     * @param tablePartitionId Partition id.
+     * @param zonePartitionId Partition id.
      * @param busyLock Busy lock.
      * @param partitionMover Class that moves partition between nodes.
      * @param rebalanceScheduler Executor for scheduling rebalance retries.
      */
-    public RebalanceRaftGroupEventsListener(
+    public ZoneRebalanceRaftGroupEventsListener(
             MetaStorageManager metaStorageMgr,
-            TablePartitionId tablePartitionId,
+            ZonePartitionId zonePartitionId,
             IgniteSpinBusyLock busyLock,
             PartitionMover partitionMover,
             ScheduledExecutorService rebalanceScheduler,
-            int zoneId
+            CatalogService catalogService,
+            DistributionZoneManager distributionZoneManager
     ) {
         this.metaStorageMgr = metaStorageMgr;
-        this.tablePartitionId = tablePartitionId;
+        this.zonePartitionId = zonePartitionId;
         this.busyLock = busyLock;
         this.partitionMover = partitionMover;
         this.rebalanceScheduler = rebalanceScheduler;
-        this.zoneId = zoneId;
+        this.distributionZoneManager = distributionZoneManager;
+        this.catalogService = catalogService;
     }
 
     /** {@inheritDoc} */
@@ -180,7 +174,7 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
                 try {
                     rebalanceAttempts.set(0);
 
-                    byte[] pendingAssignmentsBytes = 
metaStorageMgr.get(pendingPartAssignmentsKey(tablePartitionId)).get().value();
+                    byte[] pendingAssignmentsBytes = 
metaStorageMgr.get(pendingPartAssignmentsKey(zonePartitionId)).get().value();
 
                     if (pendingAssignmentsBytes != null) {
                         Set<Assignment> pendingAssignments = 
Assignments.fromBytes(pendingAssignmentsBytes).nodes();
@@ -197,8 +191,8 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
                         }
 
                         LOG.info(
-                                "New leader elected. Going to apply new 
configuration [tablePartitionId={}, peers={}, learners={}]",
-                                tablePartitionId, peers, learners
+                                "New leader elected. Going to apply new 
configuration [zonePartitionId={}, peers={}, learners={}]",
+                                zonePartitionId, peers, learners
                         );
 
                         PeersAndLearners peersAndLearners = 
PeersAndLearners.fromConsistentIds(peers, learners);
@@ -207,7 +201,7 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
                     }
                 } catch (Exception e) {
                     // TODO: IGNITE-14693
-                    LOG.warn("Unable to start rebalance [tablePartitionId, 
term={}]", e, tablePartitionId, term);
+                    LOG.warn("Unable to start rebalance [tablePartitionId, 
term={}]", e, zonePartitionId, term);
                 } finally {
                     busyLock.leaveBusy();
                 }
@@ -233,7 +227,13 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
                 try {
                     Set<Assignment> stable = createAssignments(configuration);
 
-                    countDownPartitionsFromZone(stable);
+                    doStableKeySwitch(
+                            stable,
+                            zonePartitionId,
+                            metaStorageMgr,
+                            catalogService,
+                            distributionZoneManager
+                    );
                 } finally {
                     busyLock.leaveBusy();
                 }
@@ -243,76 +243,6 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
         }
     }
 
-    /**
-     * This method count downs the counter for tables from the corresponding 
zone that are associated to the corresponding partition number
-     * when raft configuration of that partition raft group has been 
successfully changed.
-     *
-     * @param stable Stable assignments to save to metastore for further 
stable switch.
-     */
-    private void countDownPartitionsFromZone(Set<Assignment> stable) {
-        try {
-            int partId = tablePartitionId.partitionId();
-
-            Entry counterEntry = metaStorageMgr.get(tablesCounterKey(zoneId, 
partId)).get();
-
-            assert counterEntry.value() != null;
-
-            Set<Integer> counter = fromBytes(counterEntry.value());
-
-            assert !counter.isEmpty();
-
-            if (!counter.contains(tablePartitionId.tableId())) {
-                // Count down for this table has already been processed, just 
skip.
-                // For example, this can happen when leader re-election 
happened during the rebalance process.
-                return;
-            }
-
-            Condition condition = value(tablesCounterKey(zoneId, 
partId)).eq(counterEntry.value());
-
-            byte[] stableArray = Assignments.toBytes(stable);
-
-            counter.remove(tablePartitionId.tableId());
-
-            if (counter.isEmpty()) {
-                counter = Set.of();
-            }
-
-            Update successCase = ops(
-                    put(tablesCounterKey(zoneId, partId), toBytes(counter)),
-                    // Todo: change to one key 
https://issues.apache.org/jira/browse/IGNITE-18991
-                    put(raftConfigurationAppliedKey(tablePartitionId), 
stableArray)
-            ).yield(TABLES_COUNTER_DECREMENT_SUCCESS);
-
-            Update failCase = ops().yield(PART_COUNTER_DECREMENT_FAIL);
-
-            int res = metaStorageMgr.invoke(iif(condition, successCase, 
failCase)).get().getAsInt();
-
-            if (res < 0) {
-                LOG.info("Count down of zone's tables counter is failed. "
-                                + "Going to retry [zoneId={}, 
appliedPeers={}]",
-                        zoneId, 
stable.stream().map(Assignment::consistentId).collect(Collectors.toSet())
-                );
-
-                countDownPartitionsFromZone(stable);
-
-                return;
-            } else {
-                LOG.info(
-                        "Count down of zone's tables counter is succeeded 
[zoneId={}, partId={}, counter={}, appliedPeers={}]",
-                        zoneId,
-                        partId,
-                        counter,
-                        
stable.stream().map(Assignment::consistentId).collect(Collectors.toSet())
-                );
-            }
-
-            rebalanceAttempts.set(0);
-        } catch (InterruptedException | ExecutionException e) {
-            // TODO: IGNITE-14693
-            LOG.warn("Unable to count down partitions counter in metastore: " 
+ tablePartitionId, e);
-        }
-    }
-
     /** {@inheritDoc} */
     @Override
     public void onReconfigurationError(Status status, PeersAndLearners 
configuration, long term) {
@@ -325,7 +255,7 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
 
             if (status.equals(Status.LEADER_STEPPED_DOWN)) {
                 // Leader stepped down, so we are expecting 
RebalanceRaftGroupEventsListener.onLeaderElected to be called on a new leader.
-                LOG.info("Leader stepped down during rebalance [partId={}]", 
tablePartitionId);
+                LOG.info("Leader stepped down during rebalance [partId={}]", 
zonePartitionId);
 
                 return;
             }
@@ -335,12 +265,12 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
             assert raftError == RaftError.ECATCHUP : "According to the JRaft 
protocol, " + RaftError.ECATCHUP
                     + " is expected, got " + raftError;
 
-            LOG.debug("Error occurred during rebalance [partId={}]", 
tablePartitionId);
+            LOG.debug("Error occurred during rebalance [partId={}]", 
zonePartitionId);
 
             if (rebalanceAttempts.incrementAndGet() < 
REBALANCE_RETRY_THRESHOLD) {
                 scheduleChangePeers(configuration, term);
             } else {
-                LOG.info("Number of retries for rebalance exceeded the 
threshold [partId={}, threshold={}]", tablePartitionId,
+                LOG.info("Number of retries for rebalance exceeded the 
threshold [partId={}, threshold={}]", zonePartitionId,
                         REBALANCE_RETRY_THRESHOLD);
 
                 // TODO: currently we just retry intent to change peers 
according to the rebalance infinitely, until new leader is elected,
@@ -364,7 +294,7 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
                 return;
             }
 
-            LOG.info("Going to retry rebalance [attemptNo={}, partId={}]", 
rebalanceAttempts.get(), tablePartitionId);
+            LOG.info("Going to retry rebalance [attemptNo={}, partId={}]", 
rebalanceAttempts.get(), zonePartitionId);
 
             try {
                 partitionMover.movePartition(peersAndLearners, term).join();
@@ -379,20 +309,17 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
      */
     static void doStableKeySwitch(
             Set<Assignment> stableFromRaft,
-            TablePartitionId tablePartitionId,
-            long revision,
+            ZonePartitionId zonePartitionId,
             MetaStorageManager metaStorageMgr,
             CatalogService catalogService,
             DistributionZoneManager distributionZoneManager
     ) {
         try {
-            ByteArray pendingPartAssignmentsKey = 
pendingPartAssignmentsKey(tablePartitionId);
-            ByteArray stablePartAssignmentsKey = 
stablePartAssignmentsKey(tablePartitionId);
-            ByteArray plannedPartAssignmentsKey = 
plannedPartAssignmentsKey(tablePartitionId);
-            ByteArray switchReduceKey = switchReduceKey(tablePartitionId);
-            ByteArray switchAppendKey = switchAppendKey(tablePartitionId);
-            ByteArray stableChangeTriggerKey = 
stableChangeTriggerKey(tablePartitionId);
-
+            ByteArray pendingPartAssignmentsKey = 
pendingPartAssignmentsKey(zonePartitionId);
+            ByteArray stablePartAssignmentsKey = 
stablePartAssignmentsKey(zonePartitionId);
+            ByteArray plannedPartAssignmentsKey = 
plannedPartAssignmentsKey(zonePartitionId);
+            ByteArray switchReduceKey = switchReduceKey(zonePartitionId);
+            ByteArray switchAppendKey = switchAppendKey(zonePartitionId);
             // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove 
synchronous wait
             Map<ByteArray, Entry> values = metaStorageMgr.getAll(
                     Set.of(
@@ -400,26 +327,26 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
                             pendingPartAssignmentsKey,
                             stablePartAssignmentsKey,
                             switchReduceKey,
-                            switchAppendKey,
-                            stableChangeTriggerKey
+                            switchAppendKey
                     )
             ).get();
 
-            Set<Assignment> calculatedAssignments = 
calculateAssignments(tablePartitionId, catalogService, 
distributionZoneManager).get();
+            Set<Assignment> calculatedAssignments = calculateZoneAssignments(
+                    zonePartitionId,
+                    catalogService,
+                    distributionZoneManager
+            ).get();
 
             Entry stableEntry = values.get(stablePartAssignmentsKey);
             Entry pendingEntry = values.get(pendingPartAssignmentsKey);
             Entry plannedEntry = values.get(plannedPartAssignmentsKey);
             Entry switchReduceEntry = values.get(switchReduceKey);
             Entry switchAppendEntry = values.get(switchAppendKey);
-            Entry stableChangeTriggerEntry = 
values.get(stableChangeTriggerKey);
 
             Set<Assignment> retrievedStable = 
readAssignments(stableEntry).nodes();
             Set<Assignment> retrievedSwitchReduce = 
readAssignments(switchReduceEntry).nodes();
             Set<Assignment> retrievedSwitchAppend = 
readAssignments(switchAppendEntry).nodes();
             Set<Assignment> retrievedPending = 
readAssignments(pendingEntry).nodes();
-            long stableChangeTriggerValue = stableChangeTriggerEntry.value() 
== null
-                    ? 0L : 
bytesToLongKeepingOrder(stableChangeTriggerEntry.value());
 
             if (!retrievedPending.equals(stableFromRaft)) {
                 return;
@@ -517,35 +444,27 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
             }
 
             // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove 
synchronous wait
-            int res = metaStorageMgr.invoke(
-                    iif(or(
-                                    
notExists(stableChangeTriggerKey(tablePartitionId)),
-                                    
value(stableChangeTriggerKey(tablePartitionId)).lt(longToBytesKeepingOrder(revision))
-                            ),
-                            iif(retryPreconditions, successCase, failCase),
-                            ops().yield(OUTDATED_INVOKE_STATUS)
-                    )
-            ).get().getAsInt();
+            int res = metaStorageMgr.invoke(iif(retryPreconditions, 
successCase, failCase)).get().getAsInt();
 
             if (res < 0) {
                 switch (res) {
                     case SWITCH_APPEND_FAIL:
                         LOG.info("Rebalance keys changed while trying to 
update rebalance pending addition information. "
-                                        + "Going to retry 
[tablePartitionID={}, appliedPeers={}]",
-                                tablePartitionId, stableFromRaft
+                                        + "Going to retry [zonePartitionId={}, 
appliedPeers={}]",
+                                zonePartitionId, stableFromRaft
                         );
                         break;
                     case SWITCH_REDUCE_FAIL:
                         LOG.info("Rebalance keys changed while trying to 
update rebalance pending reduce information. "
-                                        + "Going to retry 
[tablePartitionID={}, appliedPeers={}]",
-                                tablePartitionId, stableFromRaft
+                                        + "Going to retry [zonePartitionId={}, 
appliedPeers={}]",
+                                zonePartitionId, stableFromRaft
                         );
                         break;
                     case SCHEDULE_PENDING_REBALANCE_FAIL:
                     case FINISH_REBALANCE_FAIL:
                         LOG.info("Rebalance keys changed while trying to 
update rebalance information. "
-                                        + "Going to retry 
[tablePartitionId={}, appliedPeers={}]",
-                                tablePartitionId, stableFromRaft
+                                        + "Going to retry [zonePartitionId={}, 
appliedPeers={}]",
+                                zonePartitionId, stableFromRaft
                         );
                         break;
                     default:
@@ -555,8 +474,7 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
 
                 doStableKeySwitch(
                         stableFromRaft,
-                        tablePartitionId,
-                        revision,
+                        zonePartitionId,
                         metaStorageMgr,
                         catalogService,
                         distributionZoneManager
@@ -568,31 +486,24 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
             switch (res) {
                 case SWITCH_APPEND_SUCCESS:
                     LOG.info("Rebalance finished. Going to schedule next 
rebalance with addition"
-                                    + " [tablePartitionId={}, appliedPeers={}, 
plannedPeers={}]",
-                            tablePartitionId, stableFromRaft, 
calculatedPendingAddition
+                                    + " [zonePartitionId={}, appliedPeers={}, 
plannedPeers={}]",
+                            zonePartitionId, stableFromRaft, 
calculatedPendingAddition
                     );
                     break;
                 case SWITCH_REDUCE_SUCCESS:
                     LOG.info("Rebalance finished. Going to schedule next 
rebalance with reduction"
-                                    + " [tablePartitionId={}, appliedPeers={}, 
plannedPeers={}]",
-                            tablePartitionId, stableFromRaft, 
calculatedPendingReduction
+                                    + " [zonePartitionId={}, appliedPeers={}, 
plannedPeers={}]",
+                            zonePartitionId, stableFromRaft, 
calculatedPendingReduction
                     );
                     break;
                 case SCHEDULE_PENDING_REBALANCE_SUCCESS:
                     LOG.info(
-                            "Rebalance finished. Going to schedule next 
rebalance [tablePartitionId={}, appliedPeers={}, plannedPeers={}]",
-                            tablePartitionId, stableFromRaft, 
Assignments.fromBytes(plannedEntry.value()).nodes()
+                            "Rebalance finished. Going to schedule next 
rebalance [zonePartitionId={}, appliedPeers={}, plannedPeers={}]",
+                            zonePartitionId, stableFromRaft, 
Assignments.fromBytes(plannedEntry.value()).nodes()
                     );
                     break;
                 case FINISH_REBALANCE_SUCCESS:
-                    LOG.info("Rebalance finished [tablePartitionId={}, 
appliedPeers={}]", tablePartitionId, stableFromRaft);
-                    break;
-
-                case OUTDATED_INVOKE_STATUS:
-                    LOG.debug("Stable switch skipped because event is outdated 
"
-                                    + "[tablePartitionId={}, 
stableChangeTriggerKey={}, revision={}]",
-                            tablePartitionId, stableChangeTriggerValue, 
revision
-                    );
+                    LOG.info("Rebalance finished [zonePartitionId={}, 
appliedPeers={}]", zonePartitionId, stableFromRaft);
                     break;
 
                 default:
@@ -602,7 +513,7 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
 
         } catch (InterruptedException | ExecutionException e) {
             // TODO: IGNITE-14693
-            LOG.warn("Unable to commit partition configuration to metastore: " 
+ tablePartitionId, e);
+            LOG.warn("Unable to commit partition configuration to metastore: " 
+ zonePartitionId, e);
         }
     }
 
@@ -629,4 +540,100 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
 
         return value == null ? Assignments.EMPTY : 
Assignments.fromBytes(value);
     }
+
+    /**
+     * Handles assignments switch reduce changed updating pending assignments 
if there is no rebalancing in progress.
+     * If there is rebalancing in progress, then new assignments will be 
applied when rebalance finishes.
+     *
+     * @param metaStorageMgr MetaStorage manager.
+     * @param dataNodes Data nodes.
+     * @param replicas Replicas count.
+     * @param partId Partition's raft group id.
+     * @param event Assignments switch reduce change event.
+     * @return Completable future that signifies the completion of this 
operation.
+     */
+    public static CompletableFuture<Void> 
handleReduceChanged(MetaStorageManager metaStorageMgr, Collection<String> 
dataNodes,
+            int replicas, ZonePartitionId partId, WatchEvent event) {
+        Entry entry = event.entryEvent().newEntry();
+        byte[] eventData = entry.value();
+
+        assert eventData != null : "Null event data for " + partId;
+
+        Assignments switchReduce = Assignments.fromBytes(eventData);
+
+        if (switchReduce.isEmpty()) {
+            return nullCompletedFuture();
+        }
+
+        Set<Assignment> assignments = 
AffinityUtils.calculateAssignmentForPartition(dataNodes, partId.partitionId(), 
replicas);
+
+        ByteArray pendingKey = pendingPartAssignmentsKey(partId);
+
+        Set<Assignment> pendingAssignments = difference(assignments, 
switchReduce.nodes());
+
+        byte[] pendingByteArray = Assignments.toBytes(pendingAssignments);
+        byte[] assignmentsByteArray = Assignments.toBytes(assignments);
+
+        ByteArray changeTriggerKey = 
ZoneRebalanceUtil.pendingChangeTriggerKey(partId);
+        byte[] rev = ByteUtils.longToBytesKeepingOrder(entry.revision());
+
+        // Here is what happens in the MetaStorage:
+        // if ((notExists(changeTriggerKey) || value(changeTriggerKey) < 
revision) && (notExists(pendingKey) && notExists(stableKey)) {
+        //     put(pendingKey, pending)
+        //     put(stableKey, assignments)
+        //     put(changeTriggerKey, revision)
+        // } else if ((notExists(changeTriggerKey) || value(changeTriggerKey) 
< revision) && (notExists(pendingKey))) {
+        //     put(pendingKey, pending)
+        //     put(changeTriggerKey, revision)
+        // }
+
+        Iif resultingOperation = iif(
+                and(
+                        or(notExists(changeTriggerKey), 
value(changeTriggerKey).lt(rev)),
+                        and(notExists(pendingKey), 
(notExists(stablePartAssignmentsKey(partId))))
+                ),
+                ops(
+                        put(pendingKey, pendingByteArray),
+                        put(stablePartAssignmentsKey(partId), 
assignmentsByteArray),
+                        put(changeTriggerKey, rev)
+                ).yield(),
+                iif(
+                        and(
+                                or(notExists(changeTriggerKey), 
value(changeTriggerKey).lt(rev)),
+                                notExists(pendingKey)
+                        ),
+                        ops(
+                                put(pendingKey, pendingByteArray),
+                                put(changeTriggerKey, rev)
+                        ).yield(),
+                        ops().yield()
+                )
+        );
+
+        return metaStorageMgr.invoke(resultingOperation).thenApply(unused -> 
null);
+    }
+
+    private static CompletableFuture<Set<Assignment>> calculateZoneAssignments(
+            ZonePartitionId zonePartitionId,
+            CatalogService catalogService,
+            DistributionZoneManager distributionZoneManager
+    ) {
+        int catalogVersion = catalogService.latestCatalogVersion();
+
+        CatalogZoneDescriptor zoneDescriptor = 
catalogService.zone(zonePartitionId.zoneId(), catalogVersion);
+
+        int zoneId = zonePartitionId.zoneId();
+
+        return distributionZoneManager.dataNodes(
+                zoneDescriptor.updateToken(),
+                catalogVersion,
+                zoneId
+        ).thenApply(dataNodes ->
+                AffinityUtils.calculateAssignmentForPartition(
+                        dataNodes,
+                        zonePartitionId.partitionId(),
+                        zoneDescriptor.replicas()
+                )
+        );
+    }
 }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
similarity index 66%
copy from 
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
copy to 
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
index bc40b6caa3..23fea49067 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.distributionzones.rebalance;
 
+import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.ASSIGNMENT_NOT_UPDATED;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.OUTDATED_UPDATE_RECEIVED;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.PENDING_KEY_UPDATED;
@@ -45,11 +47,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.affinity.Assignments;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -58,21 +60,20 @@ import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
-import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.ExceptionUtils;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Util class for methods needed for the rebalance process.
  */
-public class RebalanceUtil {
-
+public class ZoneRebalanceUtil {
     /** Logger. */
-    private static final IgniteLogger LOG = 
Loggers.forClass(RebalanceUtil.class);
+    private static final IgniteLogger LOG = 
Loggers.forClass(ZoneRebalanceUtil.class);
 
     /**
      * Status values for methods like
-     * {@link #updatePendingAssignmentsKeys(CatalogTableDescriptor, 
TablePartitionId, Collection, int, long, MetaStorageManager, int, Set)}.
+     * {@link #updatePendingAssignmentsKeys(CatalogZoneDescriptor, 
ZonePartitionId, Collection, int, long, MetaStorageManager, int, Set)}.
      */
     public enum UpdateStatus {
         /**
@@ -118,43 +119,40 @@ public class RebalanceUtil {
         }
     }
 
-    /** Rebalance scheduler pool size. */
-    public static final int REBALANCE_SCHEDULER_POOL_SIZE = 
Math.min(Runtime.getRuntime().availableProcessors() * 3, 20);
-
     /**
      * Update keys that related to rebalance algorithm in Meta Storage. Keys 
are specific for partition.
      *
-     * @param tableDescriptor Table descriptor.
-     * @param partId Unique identifier of a partition.
+     * @param zoneDescriptor Zone descriptor.
+     * @param zonePartitionId Unique aggregate identifier of a partition of a 
zone.
      * @param dataNodes Data nodes.
-     * @param replicas Number of replicas for a table.
+     * @param replicas Number of replicas for a zone.
      * @param revision Revision of Meta Storage that is specific for the 
assignment update.
      * @param metaStorageMgr Meta Storage manager.
      * @param partNum Partition id.
-     * @param tableCfgPartAssignments Table configuration assignments.
+     * @param zoneCfgPartAssignments Zone configuration assignments.
      * @return Future representing result of updating keys in {@code 
metaStorageMgr}
      */
     public static CompletableFuture<Void> updatePendingAssignmentsKeys(
-            CatalogTableDescriptor tableDescriptor,
-            TablePartitionId partId,
+            CatalogZoneDescriptor zoneDescriptor,
+            ZonePartitionId zonePartitionId,
             Collection<String> dataNodes,
             int replicas,
             long revision,
             MetaStorageManager metaStorageMgr,
             int partNum,
-            Set<Assignment> tableCfgPartAssignments
+            Set<Assignment> zoneCfgPartAssignments
     ) {
-        ByteArray partChangeTriggerKey = pendingChangeTriggerKey(partId);
+        ByteArray partChangeTriggerKey = 
pendingChangeTriggerKey(zonePartitionId);
 
-        ByteArray partAssignmentsPendingKey = 
pendingPartAssignmentsKey(partId);
+        ByteArray partAssignmentsPendingKey = 
pendingPartAssignmentsKey(zonePartitionId);
 
-        ByteArray partAssignmentsPlannedKey = 
plannedPartAssignmentsKey(partId);
+        ByteArray partAssignmentsPlannedKey = 
plannedPartAssignmentsKey(zonePartitionId);
 
-        ByteArray partAssignmentsStableKey = stablePartAssignmentsKey(partId);
+        ByteArray partAssignmentsStableKey = 
stablePartAssignmentsKey(zonePartitionId);
 
         Set<Assignment> partAssignments = 
AffinityUtils.calculateAssignmentForPartition(dataNodes, partNum, replicas);
 
-        boolean isNewAssignments = 
!tableCfgPartAssignments.equals(partAssignments);
+        boolean isNewAssignments = 
!zoneCfgPartAssignments.equals(partAssignments);
 
         byte[] partAssignmentsBytes = Assignments.toBytes(partAssignments);
 
@@ -208,23 +206,23 @@ public class RebalanceUtil {
             switch (UpdateStatus.valueOf(sr.getAsInt())) {
                 case PENDING_KEY_UPDATED:
                     LOG.info(
-                            "Update metastore pending partitions key [key={}, 
partition={}, table={}/{}, newVal={}]",
-                            partAssignmentsPendingKey.toString(), partNum, 
tableDescriptor.id(), tableDescriptor.name(),
+                            "Update metastore pending partitions key [key={}, 
partition={}, zone={}/{}, newVal={}]",
+                            partAssignmentsPendingKey.toString(), partNum, 
zoneDescriptor.id(), zoneDescriptor.name(),
                             partAssignments);
 
                     break;
                 case PLANNED_KEY_UPDATED:
                     LOG.info(
-                            "Update metastore planned partitions key [key={}, 
partition={}, table={}/{}, newVal={}]",
-                            partAssignmentsPlannedKey, partNum, 
tableDescriptor.id(), tableDescriptor.name(),
+                            "Update metastore planned partitions key [key={}, 
partition={}, zone={}/{}, newVal={}]",
+                            partAssignmentsPlannedKey, partNum, 
zoneDescriptor.id(), zoneDescriptor.name(),
                             partAssignments
                     );
 
                     break;
                 case PLANNED_KEY_REMOVED_EQUALS_PENDING:
                     LOG.info(
-                            "Remove planned key because current pending key 
has the same value [key={}, partition={}, table={}/{}, val={}]",
-                            partAssignmentsPlannedKey.toString(), partNum, 
tableDescriptor.id(), tableDescriptor.name(),
+                            "Remove planned key because current pending key 
has the same value [key={}, partition={}, zone={}/{}, val={}]",
+                            partAssignmentsPlannedKey.toString(), partNum, 
zoneDescriptor.id(), zoneDescriptor.name(),
                             partAssignments
                     );
 
@@ -232,24 +230,24 @@ public class RebalanceUtil {
                 case PLANNED_KEY_REMOVED_EMPTY_PENDING:
                     LOG.info(
                             "Remove planned key because pending is empty and 
calculated assignments are equal to current assignments "
-                                    + "[key={}, partition={}, table={}/{}, 
val={}]",
-                            partAssignmentsPlannedKey.toString(), partNum, 
tableDescriptor.id(), tableDescriptor.name(),
+                                    + "[key={}, partition={}, zone={}/{}, 
val={}]",
+                            partAssignmentsPlannedKey.toString(), partNum, 
zoneDescriptor.id(), zoneDescriptor.name(),
                             partAssignments
                     );
 
                     break;
                 case ASSIGNMENT_NOT_UPDATED:
                     LOG.debug(
-                            "Assignments are not updated [key={}, 
partition={}, table={}/{}, val={}]",
-                            partAssignmentsPlannedKey.toString(), partNum, 
tableDescriptor.id(), tableDescriptor.name(),
+                            "Assignments are not updated [key={}, 
partition={}, zone={}/{}, val={}]",
+                            partAssignmentsPlannedKey.toString(), partNum, 
zoneDescriptor.id(), zoneDescriptor.name(),
                             partAssignments
                     );
 
                     break;
                 case OUTDATED_UPDATE_RECEIVED:
                     LOG.debug(
-                            "Received outdated rebalance trigger event 
[revision={}, partition={}, table={}/{}]",
-                            revision, partNum, tableDescriptor.id(), 
tableDescriptor.name());
+                            "Received outdated rebalance trigger event 
[revision={}, partition={}, zone={}/{}]",
+                            revision, partNum, zoneDescriptor.id(), 
zoneDescriptor.name());
 
                     break;
                 default:
@@ -259,212 +257,174 @@ public class RebalanceUtil {
     }
 
     /**
-     * Triggers rebalance on all partitions of the provided table: that is, 
reads table assignments from
-     * the MetaStorage, computes new ones based on the current properties of 
the table, its zone and the
+     * Triggers rebalance on all partitions of the provided zone: that is, 
reads zone assignments from
+     * the MetaStorage, computes new ones based on the current properties of 
the zone, the
      * provided data nodes, and, if the calculated assignments are different 
from the ones loaded from the
      * MetaStorages, writes them as pending assignments.
      *
-     * @param tableDescriptor Table descriptor.
      * @param zoneDescriptor Zone descriptor.
      * @param dataNodes Data nodes to use.
      * @param storageRevision MetaStorage revision corresponding to this 
request.
      * @param metaStorageManager MetaStorage manager used to read/write 
assignments.
-     * @return Array of futures, one per partition of the table; the futures 
complete when the described
+     * @return Array of futures, one per partition of the zone; the futures 
complete when the described
      *     rebalance triggering completes.
      */
-    public static CompletableFuture<?>[] triggerAllTablePartitionsRebalance(
-            CatalogTableDescriptor tableDescriptor,
+    static CompletableFuture<Void> triggerZonePartitionsRebalance(
             CatalogZoneDescriptor zoneDescriptor,
             Set<String> dataNodes,
             long storageRevision,
             MetaStorageManager metaStorageManager
     ) {
-        CompletableFuture<Map<Integer, Assignments>> tableAssignmentsFut = 
tableAssignments(
+        CompletableFuture<Map<Integer, Assignments>> zoneAssignmentsFut = 
zoneAssignments(
                 metaStorageManager,
-                tableDescriptor.id(),
+                zoneDescriptor.id(),
                 Set.of(),
                 zoneDescriptor.partitions()
         );
 
-        CompletableFuture<?>[] futures = new 
CompletableFuture[zoneDescriptor.partitions()];
+        CompletableFuture<?>[] partitionFutures = new 
CompletableFuture[zoneDescriptor.partitions()];
 
         for (int partId = 0; partId < zoneDescriptor.partitions(); partId++) {
-            TablePartitionId replicaGrpId = new 
TablePartitionId(tableDescriptor.id(), partId);
+            ZonePartitionId replicaGrpId = new 
ZonePartitionId(zoneDescriptor.id(), partId);
 
             int finalPartId = partId;
 
-            futures[partId] = tableAssignmentsFut.thenCompose(tableAssignments 
->
+            partitionFutures[partId] = 
zoneAssignmentsFut.thenCompose(zoneAssignments ->
                     // TODO https://issues.apache.org/jira/browse/IGNITE-19763 
We should distinguish empty stable assignments on
                     // TODO node recovery in case of interrupted table 
creation, and moving from empty assignments to non-empty.
-                    tableAssignments.isEmpty() ? nullCompletedFuture() : 
updatePendingAssignmentsKeys(
-                            tableDescriptor,
+                    zoneAssignments.isEmpty() ? nullCompletedFuture() : 
updatePendingAssignmentsKeys(
+                            zoneDescriptor,
                             replicaGrpId,
                             dataNodes,
                             zoneDescriptor.replicas(),
                             storageRevision,
                             metaStorageManager,
                             finalPartId,
-                            tableAssignments.get(finalPartId).nodes()
+                            zoneAssignments.get(finalPartId).nodes()
                     ));
         }
 
-        return futures;
+        // This set is used to deduplicate exceptions (if there is an 
exception from upstream, for instance,
+        // when reading from MetaStorage, it will be encountered by every 
partition future) to avoid noise
+        // in the logs.
+        Set<Throwable> unwrappedCauses = ConcurrentHashMap.newKeySet();
+
+        for (int partId = 0; partId < partitionFutures.length; partId++) {
+            int finalPartId = partId;
+
+            partitionFutures[partId].exceptionally(e -> {
+                Throwable cause = ExceptionUtils.unwrapCause(e);
+
+                if (unwrappedCauses.add(cause)) {
+                    // The exception is specific to this partition.
+                    LOG.error(
+                            "Exception on updating assignments for [zone={}, 
partition={}]",
+                            e,
+                            zoneInfo(zoneDescriptor), finalPartId
+                    );
+                } else {
+                    // The exception is from upstream and not specific for 
this partition, so don't log the partition index.
+                    LOG.error(
+                            "Exception on updating assignments for [zone={}]",
+                            e,
+                            zoneInfo(zoneDescriptor)
+                    );
+                }
+
+                return null;
+            });
+        }
+
+        return allOf(partitionFutures);
+    }
+
+    private static String zoneInfo(CatalogZoneDescriptor zoneDescriptor) {
+        return zoneDescriptor.id() + "/" + zoneDescriptor.name();
     }
 
     /** Key prefix for pending assignments. */
-    public static final String PENDING_ASSIGNMENTS_PREFIX = 
"assignments.pending.";
+    public static final String PENDING_ASSIGNMENTS_PREFIX = 
"zone.assignments.pending.";
 
     /** Key prefix for stable assignments. */
-    public static final String ZONE_STABLE_ASSIGNMENTS_PREFIX = 
"zone.assignments.stable.";
+    public static final String STABLE_ASSIGNMENTS_PREFIX = 
"zone.assignments.stable.";
 
-    /** Key prefix for stable assignments. */
-    public static final String STABLE_ASSIGNMENTS_PREFIX = 
"assignments.stable.";
+    /** Key prefix for planned assignments. */
+    public static final String PLANNED_ASSIGNMENTS_PREFIX = 
"zone.assignments.planned.";
 
     /** Key prefix for switch reduce assignments. */
-    public static final String ASSIGNMENTS_SWITCH_REDUCE_PREFIX = 
"assignments.switch.reduce.";
+    public static final String ASSIGNMENTS_SWITCH_REDUCE_PREFIX = 
"zone.assignments.switch.reduce.";
 
     /** Key prefix for switch append assignments. */
-    public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX = 
"assignments.switch.append.";
-
-    /** Key prefix for counter of rebalances of tables from a zone that are 
associated with the specified partition. */
-    private static final String TABLES_COUNTER_PREFIX = "tables.counter.";
-
-    /** Key prefix for a raft configuration that was applied during rebalance 
of the specified partition form a table. */
-    private static final String RAFT_CONF_APPLIED_PREFIX = 
"assignments.raft.conf.applied.";
+    public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX = 
"zone.assignments.switch.append.";
 
     /**
      * Key that is needed for skipping stale events of pending key change.
      *
-     * @param partId Unique identifier of a partition.
+     * @param zonePartitionId Unique aggregate identifier of a partition of a 
zone.
      * @return Key for a partition.
      * @see <a 
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md";>Rebalance
 documentation</a>
      */
-    public static ByteArray pendingChangeTriggerKey(TablePartitionId partId) {
-        return new ByteArray(partId + "pending.change.trigger");
-    }
-
-    /**
-     * Key that is needed for skipping stale events of stable key change.
-     *
-     * @param partId Unique identifier of a partition.
-     * @return Key for a partition.
-     * @see <a 
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md";>Rebalance
 documentation</a>
-     */
-    public static ByteArray stableChangeTriggerKey(TablePartitionId partId) {
-        return new ByteArray(partId + "stable.change.trigger");
-    }
-
-    /**
-     * Key that is needed for the rebalance algorithm.
-     *
-     * @param partId Unique identifier of a partition.
-     * @return Key for a partition.
-     * @see <a 
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md";>Rebalance
 documentation</a>
-     */
-    public static ByteArray pendingPartAssignmentsKey(TablePartitionId partId) 
{
-        return new ByteArray(PENDING_ASSIGNMENTS_PREFIX + partId);
+    public static ByteArray pendingChangeTriggerKey(ZonePartitionId 
zonePartitionId) {
+        return new ByteArray(zonePartitionId + "zone.pending.change.trigger");
     }
 
     /**
      * Key that is needed for the rebalance algorithm.
      *
-     * @param partId Unique identifier of a partition.
+     * @param zonePartitionId Unique aggregate identifier of a partition of a 
zone.
      * @return Key for a partition.
      * @see <a 
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md";>Rebalance
 documentation</a>
      */
-    public static ByteArray plannedPartAssignmentsKey(TablePartitionId partId) 
{
-        return new ByteArray("assignments.planned." + partId);
+    public static ByteArray pendingPartAssignmentsKey(ZonePartitionId 
zonePartitionId) {
+        return new ByteArray(PENDING_ASSIGNMENTS_PREFIX + zonePartitionId);
     }
 
     /**
      * Key that is needed for the rebalance algorithm.
      *
-     * @param partId Unique identifier of a partition.
+     * @param zonePartitionId Unique aggregate identifier of a partition of a 
zone.
      * @return Key for a partition.
      * @see <a 
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md";>Rebalance
 documentation</a>
      */
-    public static ByteArray stablePartAssignmentsKey(TablePartitionId partId) {
-        return new ByteArray(STABLE_ASSIGNMENTS_PREFIX + partId);
+    public static ByteArray plannedPartAssignmentsKey(ZonePartitionId 
zonePartitionId) {
+        return new ByteArray(PLANNED_ASSIGNMENTS_PREFIX + zonePartitionId);
     }
 
     /**
      * Key that is needed for the rebalance algorithm.
      *
-     * @param partId Unique identifier of a partition.
+     * @param zonePartitionId Unique aggregate identifier of a partition of a 
zone.
      * @return Key for a partition.
      * @see <a 
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md";>Rebalance
 documentation</a>
      */
-    public static ByteArray stablePartAssignmentsKey(ZonePartitionId partId) {
-        return new ByteArray(ZONE_STABLE_ASSIGNMENTS_PREFIX + partId);
+    public static ByteArray stablePartAssignmentsKey(ZonePartitionId 
zonePartitionId) {
+        return new ByteArray(STABLE_ASSIGNMENTS_PREFIX + zonePartitionId);
     }
 
     /**
      * Key that is needed for the rebalance algorithm.
      *
-     * @param partId Unique identifier of a partition.
+     * @param zonePartitionId Unique aggregate identifier of a partition of a 
zone.
      * @return Key for a partition.
      * @see <a 
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md";>Rebalance
 documentation</a>
      */
-    public static ByteArray switchReduceKey(TablePartitionId partId) {
-        return new ByteArray(ASSIGNMENTS_SWITCH_REDUCE_PREFIX + partId);
+    public static ByteArray switchReduceKey(ZonePartitionId zonePartitionId) {
+        return new ByteArray(ASSIGNMENTS_SWITCH_REDUCE_PREFIX + 
zonePartitionId);
     }
 
     /**
      * Key that is needed for the rebalance algorithm.
      *
-     * @param partId Unique identifier of a partition.
+     * @param zonePartitionId Unique aggregate identifier of a partition of a 
zone.
      * @return Key for a partition.
      * @see <a 
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md";>Rebalance
 documentation</a>
      */
-    public static ByteArray switchAppendKey(TablePartitionId partId) {
-        return new ByteArray(ASSIGNMENTS_SWITCH_APPEND_PREFIX + partId);
-    }
-
-    /**
-     * ByteArray key for a counter of rebalances of tables from a zone that 
are associated with the specified partition.
-     *
-     * @param zoneId Identifier of a zone.
-     * @param partId Unique identifier of a partition.
-     * @return Key for a partition.
-     */
-    public static ByteArray tablesCounterKey(int zoneId, int partId) {
-        return new ByteArray(TABLES_COUNTER_PREFIX + zoneId + "_part_" + 
partId);
-    }
-
-    /**
-     * ByteArray prefix for counter of rebalances of tables from a zone that 
are associated with the specified partition.
-     *
-     * @return Prefix for a counter of rebalances of tables partition.
-     */
-    public static ByteArray tablesCounterPrefixKey() {
-        return new ByteArray(TABLES_COUNTER_PREFIX);
+    public static ByteArray switchAppendKey(ZonePartitionId zonePartitionId) {
+        return new ByteArray(ASSIGNMENTS_SWITCH_APPEND_PREFIX + 
zonePartitionId);
     }
 
     /**
-     * ByteArray key for a raft configuration that was applied during 
rebalance of the specified partition form a table.
-     *
-     * @param partId Unique identifier of a partition.
-     * @return Key for a applied raft configuration.
-     */
-    public static ByteArray raftConfigurationAppliedKey(TablePartitionId 
partId) {
-        return new ByteArray(RAFT_CONF_APPLIED_PREFIX + partId);
-    }
-
-    /**
-     * Extract table id from a metastorage key of partition.
-     *
-     * @param key Key.
-     * @param prefix Key prefix.
-     * @return Table id.
-     */
-    public static int extractTableId(byte[] key, String prefix) {
-        String strKey = new String(key, StandardCharsets.UTF_8);
-
-        return Integer.parseInt(strKey.substring(prefix.length(), 
strKey.indexOf("_part_")));
-    }
-
-    /**
-     * Extract table id from a metastorage key of partition.
+     * Extract zone id from a metastorage key of partition.
      *
      * @param key Key.
      * @param prefix Key prefix.
@@ -473,7 +433,7 @@ public class RebalanceUtil {
     public static int extractZoneId(byte[] key, String prefix) {
         String strKey = new String(key, StandardCharsets.UTF_8);
 
-        return Integer.parseInt(strKey.substring(prefix.length()));
+        return Integer.parseInt(strKey.substring(prefix.length(), 
strKey.indexOf("_part_")));
     }
 
     /**
@@ -482,10 +442,10 @@ public class RebalanceUtil {
      * @param key Key.
      * @return Table id.
      */
-    static int extractZoneIdFromTablesCounter(byte[] key) {
+    public static int extractZoneIdDataNodes(byte[] key) {
         String strKey = new String(key, StandardCharsets.UTF_8);
 
-        return 
Integer.parseInt(strKey.substring(TABLES_COUNTER_PREFIX.length(), 
strKey.indexOf("_part_")));
+        return 
Integer.parseInt(strKey.substring(DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX.length()));
     }
 
     /**
@@ -548,90 +508,77 @@ public class RebalanceUtil {
         return op1.stream().filter(op2::contains).collect(toSet());
     }
 
-    /**
-     * Returns partition assignments from meta storage.
-     *
-     * @param metaStorageManager Meta storage manager.
-     * @param tableId Table ID.
-     * @param partitionId Partition ID.
-     * @return Future with partition assignments as a value.
-     */
-    public static CompletableFuture<Set<Assignment>> partitionAssignments(
-            MetaStorageManager metaStorageManager,
-            int tableId,
-            int partitionId
-    ) {
-        return metaStorageManager
-                .get(stablePartAssignmentsKey(new TablePartitionId(tableId, 
partitionId)))
-                .thenApply(e -> (e.value() == null) ? null : 
Assignments.fromBytes(e.value()).nodes());
-    }
-
     /**
      * Returns partition assignments from meta storage locally.
      *
      * @param metaStorageManager Meta storage manager.
-     * @param tableId Table id.
+     * @param zoneId Zone id.
      * @param partitionNumber Partition number.
      * @param revision Revision.
      * @return Returns partition assignments from meta storage locally or 
{@code null} if assignments is absent.
      */
     @Nullable
-    public static Set<Assignment> partitionAssignmentsGetLocally(
+    public static Set<Assignment> zonePartitionAssignmentsGetLocally(
             MetaStorageManager metaStorageManager,
-            int tableId,
+            int zoneId,
             int partitionNumber,
             long revision
     ) {
-        Entry entry = 
metaStorageManager.getLocally(stablePartAssignmentsKey(new 
TablePartitionId(tableId, partitionNumber)), revision);
+        Entry entry = 
metaStorageManager.getLocally(stablePartAssignmentsKey(new 
ZonePartitionId(zoneId, partitionNumber)), revision);
 
         return (entry == null || entry.empty() || entry.tombstone()) ? null : 
Assignments.fromBytes(entry.value()).nodes();
     }
 
     /**
-     * Returns partition assignments from meta storage locally.
+     * Returns zone assignments for all zone partitions from meta storage 
locally. Assignments must be present.
      *
      * @param metaStorageManager Meta storage manager.
      * @param zoneId Zone id.
-     * @param partitionNumber Partition number.
+     * @param numberOfPartitions Number of partitions.
      * @param revision Revision.
-     * @return Returns partition assignments from meta storage locally or 
{@code null} if assignments is absent.
+     * @return Future with zone assignments as a value.
      */
-    @Nullable
-    public static Set<Assignment> zonePartitionAssignmentsGetLocally(
+    public static List<Assignments> zoneAssignmentsGetLocally(
             MetaStorageManager metaStorageManager,
             int zoneId,
-            int partitionNumber,
+            int numberOfPartitions,
             long revision
     ) {
-        Entry entry = 
metaStorageManager.getLocally(stablePartAssignmentsKey(new 
ZonePartitionId(zoneId, partitionNumber)), revision);
+        return IntStream.range(0, numberOfPartitions)
+                .mapToObj(p -> {
+                    Entry e = 
metaStorageManager.getLocally(stablePartAssignmentsKey(new 
ZonePartitionId(zoneId, p)), revision);
 
-        return (entry == null || entry.empty() || entry.tombstone()) ? null : 
Assignments.fromBytes(entry.value()).nodes();
+                    assert e != null && !e.empty() && !e.tombstone() : e;
+
+                    return Assignments.fromBytes(e.value());
+                })
+                .collect(toList());
     }
 
     /**
-     * Returns table assignments for table partitions from meta storage.
+     * Returns zone assignments for zone partitions from meta storage.
      *
      * @param metaStorageManager Meta storage manager.
-     * @param tableId Table id.
+     * @param zoneId Zone id.
      * @param partitionIds IDs of partitions to get assignments for. If empty, 
get all partition assignments.
      * @param numberOfPartitions Number of partitions. Ignored if partition 
IDs are specified.
-     * @return Future with table assignments as a value.
+     * @return Future with zone assignments as a value.
      */
-    public static CompletableFuture<Map<Integer, Assignments>> 
tableAssignments(
+    private static CompletableFuture<Map<Integer, Assignments>> 
zoneAssignments(
             MetaStorageManager metaStorageManager,
-            int tableId,
+            int zoneId,
             Set<Integer> partitionIds,
             int numberOfPartitions
     ) {
-        IntStream partitionIdsStream = partitionIds.isEmpty()
-                ? IntStream.range(0, numberOfPartitions)
-                : partitionIds.stream().mapToInt(Integer::intValue);
-
-        Map<ByteArray, Integer> partitionKeysToPartitionNumber = 
partitionIdsStream.collect(
-                HashMap::new,
-                (map, partId) -> map.put(stablePartAssignmentsKey(new 
TablePartitionId(tableId, partId)), partId),
-                Map::putAll
-        );
+        Map<ByteArray, Integer> partitionKeysToPartitionNumber = new 
HashMap<>();
+
+        Collection<Integer> ids = partitionIds.isEmpty()
+                ? IntStream.range(0, 
numberOfPartitions).boxed().collect(toList())
+                : partitionIds;
+
+        for (Integer partId : ids) {
+            partitionKeysToPartitionNumber.put(stablePartAssignmentsKey(new 
ZonePartitionId(zoneId, partId)), partId);
+        }
 
         return 
metaStorageManager.getAll(partitionKeysToPartitionNumber.keySet())
                 .thenApply(entries -> {
@@ -653,61 +600,9 @@ public class RebalanceUtil {
 
                     assert numberOfMsPartitions == 0 || numberOfMsPartitions 
== entries.size()
                             : "Invalid number of stable partition entries 
received from meta storage [received="
-                            + numberOfMsPartitions + ", numberOfPartitions=" + 
entries.size() + ", tableId=" + tableId + "].";
+                            + numberOfMsPartitions + ", numberOfPartitions=" + 
entries.size() + ", zoneId=" + zoneId + "].";
 
                     return numberOfMsPartitions == 0 ? Map.of() : result;
                 });
     }
-
-    /**
-     * Returns table assignments for all table partitions from meta storage 
locally. Assignments must be present.
-     *
-     * @param metaStorageManager Meta storage manager.
-     * @param tableId Table id.
-     * @param numberOfPartitions Number of partitions.
-     * @param revision Revision.
-     * @return Future with table assignments as a value.
-     */
-    public static List<Assignments> tableAssignmentsGetLocally(
-            MetaStorageManager metaStorageManager,
-            int tableId,
-            int numberOfPartitions,
-            long revision
-    ) {
-        return IntStream.range(0, numberOfPartitions)
-                .mapToObj(p -> {
-                    Entry e = 
metaStorageManager.getLocally(stablePartAssignmentsKey(new 
TablePartitionId(tableId, p)), revision);
-
-                    assert e != null && !e.empty() && !e.tombstone() : e;
-
-                    return Assignments.fromBytes(e.value());
-                })
-                .collect(toList());
-    }
-
-    /**
-     * Returns zone assignments for all zone partitions from meta storage 
locally. Assignments must be present.
-     *
-     * @param metaStorageManager Meta storage manager.
-     * @param zoneId Zone id.
-     * @param numberOfPartitions Number of partitions.
-     * @param revision Revision.
-     * @return Future with zone assignments as a value.
-     */
-    public static List<Assignments> zoneAssignmentsGetLocally(
-            MetaStorageManager metaStorageManager,
-            int zoneId,
-            int numberOfPartitions,
-            long revision
-    ) {
-        return IntStream.range(0, numberOfPartitions)
-                .mapToObj(p -> {
-                    Entry e = 
metaStorageManager.getLocally(stablePartAssignmentsKey(new 
ZonePartitionId(zoneId, p)), revision);
-
-                    assert e != null && !e.empty() && !e.tombstone() : e;
-
-                    return Assignments.fromBytes(e.value());
-                })
-                .collect(toList());
-    }
 }
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index d4864210ed..f0a435f281 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -23,10 +23,11 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.internal.BaseIgniteRestartTest.createVault;
 import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZone;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
 import static 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager.FEATURE_FLAG_NAME;
 import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
@@ -167,7 +168,6 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
@@ -323,7 +323,6 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-22410";)
     void testAlterReplicaTrigger() throws Exception {
         Assignment replicaAssignment = (Assignment) 
AffinityUtils.calculateAssignmentForPartition(
                 nodes.stream().map(n -> n.name).collect(Collectors.toList()), 
0, 1).toArray()[0];
@@ -332,12 +331,10 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
 
         
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
 
-        createZone(node, "test_zone", 2, 3);
+        createZone(node, "test_zone", 1, 3);
 
         int zoneId = DistributionZonesTestUtil.getZoneId(node.catalogManager, 
"test_zone", node.hybridClock.nowLong());
 
-        createTable(node, "test_zone", "test_table");
-
         MetaStorageManager metaStorageManager = node.metaStorageManager;
 
         ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
@@ -348,7 +345,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                 (v) -> Assignments.fromBytes(v).nodes()
                         
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
                 nodes.stream().map(n -> n.name).collect(Collectors.toSet()),
-                10_000L
+                20_000L
         );
 
         CatalogManager catalogManager = node.catalogManager;
@@ -361,7 +358,92 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                 (v) -> Assignments.fromBytes(v).nodes()
                         
.stream().map(Assignment::consistentId).collect(Collectors.toSet()).size(),
                 2,
-                10_000L * 2
+                20_000L
+        );
+
+        assertValueInStorage(
+                metaStorageManager,
+                stablePartAssignmentsKey(partId),
+                (v) -> 
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
+                true,
+                20_000L
+        );
+    }
+
+    @Test
+    void testAlterReplicaTriggerDefaultZone() throws Exception {
+        Assignment replicaAssignment = (Assignment) 
AffinityUtils.calculateAssignmentForPartition(
+                nodes.stream().map(n -> n.name).collect(Collectors.toList()), 
0, 1).toArray()[0];
+
+        Node node = getNode(replicaAssignment.consistentId());
+
+        
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
+
+        CatalogManager catalogManager = node.catalogManager;
+
+        int zoneId = 
defaultZoneIdOpt(catalogManager.catalog(catalogManager.latestCatalogVersion()));
+
+        String defaultZoneName = catalogManager.zone(zoneId, 
catalogManager.latestCatalogVersion()).name();
+
+        MetaStorageManager metaStorageManager = node.metaStorageManager;
+
+        ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
+
+        assertValueInStorage(
+                metaStorageManager,
+                stablePartAssignmentsKey(partId),
+                (v) -> Assignments.fromBytes(v).nodes()
+                        
.stream().map(Assignment::consistentId).collect(Collectors.toSet()).size(),
+                1,
+                20_000L
+        );
+
+        alterZone(catalogManager, defaultZoneName, 2);
+
+        assertValueInStorage(
+                metaStorageManager,
+                stablePartAssignmentsKey(partId),
+                (v) -> Assignments.fromBytes(v).nodes()
+                        
.stream().map(Assignment::consistentId).collect(Collectors.toSet()).size(),
+                2,
+                20_000L
+        );
+
+        assertValueInStorage(
+                metaStorageManager,
+                stablePartAssignmentsKey(partId),
+                (v) -> 
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
+                true,
+                20_000L
+        );
+    }
+
+    @Test
+    void testAlterReplicaExtensionTrigger() throws Exception {
+        Assignment replicaAssignment = (Assignment) 
AffinityUtils.calculateAssignmentForPartition(
+                nodes.stream().map(n -> n.name).collect(Collectors.toList()), 
0, 1).toArray()[0];
+
+        Node node = getNode(replicaAssignment.consistentId());
+
+        
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
+
+        assertTrue(waitForCondition(() -> 
node.distributionZoneManager.logicalTopology().size() == 3, 10_000L));
+
+        createZone(node, "test_zone", 2, 2);
+
+        int zoneId = DistributionZonesTestUtil.getZoneId(node.catalogManager, 
"test_zone", node.hybridClock.nowLong());
+
+        MetaStorageManager metaStorageManager = node.metaStorageManager;
+
+        ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
+
+        assertValueInStorage(
+                metaStorageManager,
+                stablePartAssignmentsKey(partId),
+                (v) -> Assignments.fromBytes(v).nodes()
+                        
.stream().map(Assignment::consistentId).collect(Collectors.toSet()).size(),
+                2,
+                20_000L
         );
 
         assertValueInStorage(
@@ -369,12 +451,24 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                 stablePartAssignmentsKey(partId),
                 (v) -> 
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
                 true,
-                10_000L * 2
+                20_000L
+        );
+
+        CatalogManager catalogManager = node.catalogManager;
+
+        alterZone(catalogManager, "test_zone", 3);
+
+        assertValueInStorage(
+                metaStorageManager,
+                stablePartAssignmentsKey(partId),
+                (v) -> Assignments.fromBytes(v).nodes()
+                        
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
+                nodes.stream().map(n -> n.name).collect(Collectors.toSet()),
+                20_000L
         );
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-22410";)
     void testAlterFilterTrigger() throws Exception {
         Assignment replicaAssignment = (Assignment) 
AffinityUtils.calculateAssignmentForPartition(
                 nodes.stream().map(n -> n.name).collect(Collectors.toList()), 
0, 1).toArray()[0];
@@ -387,8 +481,6 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
 
         int zoneId = DistributionZonesTestUtil.getZoneId(node.catalogManager, 
"test_zone", node.hybridClock.nowLong());
 
-        createTable(node, "test_zone", "test_table");
-
         MetaStorageManager metaStorageManager = node.metaStorageManager;
 
         ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
@@ -399,7 +491,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                 (v) -> Assignments.fromBytes(v).nodes()
                         
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
                 nodes.stream().map(n -> n.name).collect(Collectors.toSet()),
-                10_000L
+                20_000L
         );
 
         CatalogManager catalogManager = node.catalogManager;
@@ -414,7 +506,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                 (v) -> Assignments.fromBytes(v).nodes()
                         
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
                 Set.of(nodes.get(0).name),
-                10_000L * 2
+                20_000L
         );
 
         assertValueInStorage(
@@ -422,7 +514,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                 stablePartAssignmentsKey(partId),
                 (v) -> 
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
                 true,
-                10_000L * 2
+                20_000L
         );
     }
 
@@ -786,7 +878,8 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                     distributionZoneManager,
                     metaStorageManager,
                     clusterService.topologyService(),
-                    threadPoolsManager.tableIoExecutor()
+                    threadPoolsManager.tableIoExecutor(),
+                    rebalanceScheduler
             );
 
             StorageUpdateConfiguration storageUpdateConfiguration = 
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index df2db83be3..c55c5d0410 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -17,27 +17,42 @@
 
 package org.apache.ignite.internal.partition.replicator;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.emptySet;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_CREATE;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.zoneAssignmentsGetLocally;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.zonePartitionAssignmentsGetLocally;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener.handleReduceChanged;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.PENDING_ASSIGNMENTS_PREFIX;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.extractPartitionNumber;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.extractZoneId;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneAssignmentsGetLocally;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zonePartitionAssignmentsGetLocally;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.raft.PeersAndLearners.fromAssignments;
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -45,18 +60,24 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.affinity.AffinityUtils;
+import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.affinity.Assignments;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.distributionzones.rebalance.PartitionMover;
+import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
+import 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener;
 import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -65,13 +86,18 @@ import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.partition.replicator.snapshot.FailFastSnapshotStorageFactory;
+import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
-import org.apache.ignite.internal.raft.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -79,6 +105,7 @@ import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * The main responsibilities of this class:
@@ -102,6 +129,15 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
 
     private final TopologyService topologyService;
 
+    /** Meta storage listener for pending assignments. */
+    private final WatchListener pendingAssignmentsRebalanceListener;
+
+    /** Meta storage listener for stable assignments. */
+    private final WatchListener stableAssignmentsRebalanceListener;
+
+    /** Meta storage listener for switch reduce assignments. */
+    private final WatchListener assignmentsSwitchRebalanceListener;
+
     /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(PartitionReplicaLifecycleManager.class);
 
@@ -115,6 +151,9 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
      */
     private final ExecutorService ioExecutor;
 
+    /** Executor for scheduling rebalance routine. */
+    private final ScheduledExecutorService rebalanceScheduler;
+
     /**
      * The constructor.
      *
@@ -131,7 +170,8 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
             DistributionZoneManager distributionZoneMgr,
             MetaStorageManager metaStorageMgr,
             TopologyService topologyService,
-            ExecutorService ioExecutor
+            ExecutorService ioExecutor,
+            ScheduledExecutorService rebalanceScheduler
     ) {
         this.catalogMgr = catalogMgr;
         this.replicaMgr = replicaMgr;
@@ -139,6 +179,11 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
         this.metaStorageMgr = metaStorageMgr;
         this.topologyService = topologyService;
         this.ioExecutor = ioExecutor;
+        this.rebalanceScheduler = rebalanceScheduler;
+
+        pendingAssignmentsRebalanceListener = 
createPendingAssignmentsRebalanceListener();
+        stableAssignmentsRebalanceListener = 
createStableAssignmentsRebalanceListener();
+        assignmentsSwitchRebalanceListener = 
createAssignmentsSwitchRebalanceListener();
     }
 
     @Override
@@ -147,6 +192,12 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
             return nullCompletedFuture();
         }
 
+        
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
 pendingAssignmentsRebalanceListener);
+
+        
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
 stableAssignmentsRebalanceListener);
+
+        
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
 assignmentsSwitchRebalanceListener);
+
         catalogMgr.listen(ZONE_CREATE,
                 (CreateZoneEventParameters parameters) ->
                         inBusyLock(busyLock, () -> 
onCreateZone(parameters).thenApply((ignored) -> false))
@@ -177,41 +228,81 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
 
             List<CompletableFuture<?>> partitionsStartFutures = new 
ArrayList<>();
 
-            for (int i = 0; i < assignments.size(); i++) {
-                int partId = i;
+            for (int partId = 0; partId < assignments.size(); partId++) {
+                Assignments zoneAssignment = assignments.get(partId);
+
+                Assignment localMemberAssignment = 
localMemberAssignment(zoneAssignment);
 
-                
partitionsStartFutures.add(createZonePartitionReplicationNodes(zoneId, partId, 
assignments.get(i)));
+                
partitionsStartFutures.add(createZonePartitionReplicationNode(zoneId, partId, 
localMemberAssignment, zoneAssignment));
             }
 
             return allOf(partitionsStartFutures.toArray(new 
CompletableFuture<?>[0]));
         }));
     }
 
-    private CompletableFuture<Void> createZonePartitionReplicationNodes(int 
zoneId, int partId, Assignments assignments) {
-        if (!shouldStartLocally(assignments)) {
+    /**
+     * Start a replica for the corresponding {@code zoneId} and {@code partId} 
on the local node if {@code localMemberAssignment} is not
+     * null, meaning that the local node is part of the assignment.
+     *
+     * @param zoneId Zone id.
+     * @param partId Partition id.
+     * @param localMemberAssignment Assignment of the local member, or null if 
local member is not part of the assignment.
+     * @param stableAssignments Stable assignments.
+     * @return Future that completes when a replica is started.
+     */
+    private CompletableFuture<Void> createZonePartitionReplicationNode(
+            int zoneId,
+            int partId,
+            @Nullable Assignment localMemberAssignment,
+            Assignments stableAssignments
+    ) {
+        if (localMemberAssignment == null) {
             return nullCompletedFuture();
         }
 
-        PeersAndLearners realConfiguration = 
PeersAndLearners.fromAssignments(assignments.nodes());
+        PeersAndLearners stablePeersAndLearners = 
fromAssignments(stableAssignments.nodes());
 
         ZonePartitionId replicaGrpId = new ZonePartitionId(zoneId, partId);
 
         RaftGroupListener raftGroupListener = new ZonePartitionRaftListener();
 
+        ZoneRebalanceRaftGroupEventsListener raftGroupEventsListener = new 
ZoneRebalanceRaftGroupEventsListener(
+                metaStorageMgr,
+                replicaGrpId,
+                busyLock,
+                createPartitionMover(replicaGrpId),
+                rebalanceScheduler,
+                catalogMgr,
+                distributionZoneMgr
+        );
+
+        replicationGroupIds.add(replicaGrpId);
+
         try {
             return replicaMgr.startReplica(
                     replicaGrpId,
                     new ZonePartitionReplicaListener(),
                     new FailFastSnapshotStorageFactory(),
-                    realConfiguration,
+                    stablePeersAndLearners,
                     raftGroupListener,
-                    RaftGroupEventsListener.noopLsnr
-            ).thenRun(() -> replicationGroupIds.add(replicaGrpId));
+                    raftGroupEventsListener
+            ).thenApply(ignored -> null);
         } catch (NodeStoppingException e) {
             return failedFuture(e);
         }
     }
 
+    private PartitionMover createPartitionMover(ZonePartitionId replicaGrpId) {
+        return new PartitionMover(busyLock, () -> {
+            CompletableFuture<Replica> replicaFut = 
replicaMgr.replica(replicaGrpId);
+            if (replicaFut == null) {
+                return failedFuture(new IgniteInternalException("No such 
replica for partition " + replicaGrpId.partitionId()
+                        + " in zone " + replicaGrpId.zoneId()));
+            }
+            return replicaFut.thenApply(Replica::raftClient);
+        });
+    }
+
     private boolean shouldStartLocally(Assignments assignments) {
         return assignments
                 .nodes()
@@ -227,6 +318,10 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
     public void beforeNodeStop() {
         busyLock.block();
 
+        metaStorageMgr.unregisterWatch(pendingAssignmentsRebalanceListener);
+        metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener);
+        metaStorageMgr.unregisterWatch(assignmentsSwitchRebalanceListener);
+
         cleanUpPartitionsResources(replicationGroupIds);
     }
 
@@ -360,6 +455,415 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
         return assignmentsFuture;
     }
 
+
+    /**
+     * Creates meta storage listener for pending assignments updates.
+     *
+     * @return The watch listener.
+     */
+    private WatchListener createPendingAssignmentsRebalanceListener() {
+        return new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent evt) {
+                if (!busyLock.enterBusy()) {
+                    return failedFuture(new NodeStoppingException());
+                }
+
+                try {
+                    Entry newEntry = evt.entryEvent().newEntry();
+
+                    return handleChangePendingAssignmentEvent(newEntry, 
evt.revision(), false);
+                } finally {
+                    busyLock.leaveBusy();
+                }
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                LOG.warn("Unable to process pending assignments event", e);
+            }
+        };
+    }
+
+    /**
+     * Creates Meta storage listener for stable assignments updates.
+     *
+     * @return The watch listener.
+     */
+    private WatchListener createStableAssignmentsRebalanceListener() {
+        return new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent evt) {
+                if (!busyLock.enterBusy()) {
+                    return failedFuture(new NodeStoppingException());
+                }
+
+                try {
+                    return handleChangeStableAssignmentEvent(evt);
+                } finally {
+                    busyLock.leaveBusy();
+                }
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                LOG.warn("Unable to process stable assignments event", e);
+            }
+        };
+    }
+
+    /** Creates Meta storage listener for switch reduce assignments updates. */
+    private WatchListener createAssignmentsSwitchRebalanceListener() {
+        return new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent evt) {
+                return inBusyLockAsync(busyLock, () -> {
+                    byte[] key = evt.entryEvent().newEntry().key();
+
+                    int partitionId = extractPartitionNumber(key);
+                    int zoneId = extractZoneId(key, 
ASSIGNMENTS_SWITCH_REDUCE_PREFIX);
+
+                    ZonePartitionId replicaGrpId = new ZonePartitionId(zoneId, 
partitionId);
+
+                    // It is safe to get the latest version of the catalog as 
we are in the metastore thread.
+                    int catalogVersion = catalogMgr.latestCatalogVersion();
+
+                    CatalogZoneDescriptor zoneDescriptor = 
catalogMgr.zone(zoneId, catalogVersion);
+
+                    long causalityToken = zoneDescriptor.updateToken();
+
+                    return distributionZoneMgr.dataNodes(causalityToken, 
catalogVersion, zoneId)
+                            .thenCompose(dataNodes -> handleReduceChanged(
+                                    metaStorageMgr,
+                                    dataNodes,
+                                    zoneDescriptor.replicas(),
+                                    replicaGrpId,
+                                    evt
+                            ));
+
+                });
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                LOG.warn("Unable to process switch reduce event", e);
+            }
+        };
+    }
+
+    /**
+     * Handles the {@link 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil#STABLE_ASSIGNMENTS_PREFIX}
 update event.
+     *
+     * @param evt Event.
+     */
+    protected CompletableFuture<Void> 
handleChangeStableAssignmentEvent(WatchEvent evt) {
+        if (evt.entryEvents().stream().allMatch(e -> e.oldEntry().value() == 
null)) {
+            // It's the initial write to zone stable assignments on zone 
create event.
+            return nullCompletedFuture();
+        }
+
+        if (!evt.single()) {
+            // If there is not a single entry, then all entries must be 
tombstones (this happens after zone drop).
+            assert evt.entryEvents().stream().allMatch(entryEvent -> 
entryEvent.newEntry().tombstone()) : evt;
+
+            return nullCompletedFuture();
+        }
+
+        // here we can receive only update from the rebalance logic
+        // these updates always processing only 1 partition, so, only 1 stable 
partition key.
+        assert evt.single() : evt;
+
+        if (evt.entryEvent().oldEntry() == null) {
+            // This means it's an event on zone creation.
+            return nullCompletedFuture();
+        }
+
+        Entry stableAssignmentsWatchEvent = evt.entryEvent().newEntry();
+
+        long revision = evt.revision();
+
+        assert stableAssignmentsWatchEvent.revision() == revision : 
stableAssignmentsWatchEvent;
+
+        if (stableAssignmentsWatchEvent.value() == null) {
+            return nullCompletedFuture();
+        }
+
+        return handleChangeStableAssignmentEvent(stableAssignmentsWatchEvent, 
evt.revision(), false);
+    }
+
+    protected CompletableFuture<Void> handleChangeStableAssignmentEvent(
+            Entry stableAssignmentsWatchEvent,
+            long revision,
+            boolean isRecovery
+    ) {
+        int partitionId = 
extractPartitionNumber(stableAssignmentsWatchEvent.key());
+        int zoneId = extractZoneId(stableAssignmentsWatchEvent.key(), 
STABLE_ASSIGNMENTS_PREFIX);
+
+        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 
partitionId);
+
+        Set<Assignment> stableAssignments = 
stableAssignmentsWatchEvent.value() == null
+                ? emptySet()
+                : 
Assignments.fromBytes(stableAssignmentsWatchEvent.value()).nodes();
+
+
+        return supplyAsync(() -> {
+            Entry pendingAssignmentsEntry = 
metaStorageMgr.getLocally(pendingPartAssignmentsKey(zonePartitionId), revision);
+
+            byte[] pendingAssignmentsFromMetaStorage = 
pendingAssignmentsEntry.value();
+
+            Assignments pendingAssignments = pendingAssignmentsFromMetaStorage 
== null
+                    ? Assignments.EMPTY
+                    : Assignments.fromBytes(pendingAssignmentsFromMetaStorage);
+
+            return stopAndDestroyPartitionAndUpdateClients(
+                    zonePartitionId,
+                    stableAssignments,
+                    pendingAssignments,
+                    isRecovery
+            );
+        }, ioExecutor).thenCompose(identity());
+    }
+
+    private CompletableFuture<Void> updatePartitionClients(
+            ZonePartitionId zonePartitionId,
+            Set<Assignment> stableAssignments
+    ) {
+        // Update raft client peers and learners according to the actual 
assignments.
+        if (replicaMgr.isReplicaStarted(zonePartitionId)) {
+            replicaMgr.getReplica(zonePartitionId).join()
+                    
.raftClient().updateConfiguration(fromAssignments(stableAssignments));
+        }
+
+        return nullCompletedFuture();
+    }
+
+    private CompletableFuture<Void> stopAndDestroyPartitionAndUpdateClients(
+            ZonePartitionId zonePartitionId,
+            Set<Assignment> stableAssignments,
+            Assignments pendingAssignments,
+            boolean isRecovery
+    ) {
+        CompletableFuture<Void> clientUpdateFuture = isRecovery
+                // Updating clients is not needed on recovery.
+                ? nullCompletedFuture()
+                : updatePartitionClients(zonePartitionId, stableAssignments);
+
+        boolean shouldStopLocalServices = (pendingAssignments.force()
+                ? pendingAssignments.nodes().stream()
+                : Stream.concat(stableAssignments.stream(), 
pendingAssignments.nodes().stream())
+        )
+                .noneMatch(assignment -> 
assignment.consistentId().equals(localNode().name()));
+
+        if (shouldStopLocalServices) {
+            return allOf(
+                    clientUpdateFuture,
+                    stopAndDestroyPartition(zonePartitionId)
+            );
+        } else {
+            return clientUpdateFuture;
+        }
+    }
+
+    private CompletableFuture<?> stopAndDestroyPartition(ReplicationGroupId 
zonePartitionId) {
+        return stopPartition(zonePartitionId);
+    }
+
+    private CompletableFuture<Void> handleChangePendingAssignmentEvent(
+            Entry pendingAssignmentsEntry,
+            long revision,
+            boolean isRecovery
+    ) {
+        if (pendingAssignmentsEntry.value() == null || 
pendingAssignmentsEntry.empty()) {
+            return nullCompletedFuture();
+        }
+
+        int partId = extractPartitionNumber(pendingAssignmentsEntry.key());
+        int zoneId = extractZoneId(pendingAssignmentsEntry.key(), 
PENDING_ASSIGNMENTS_PREFIX);
+
+        var zonePartitionId = new ZonePartitionId(zoneId, partId);
+
+        // Stable assignments from the meta store, which revision is bounded 
by the current pending event.
+        Assignments stableAssignments = stableAssignments(zonePartitionId, 
revision);
+
+        Assignments pendingAssignments = 
Assignments.fromBytes(pendingAssignmentsEntry.value());
+
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.<Void>failedFuture(new 
NodeStoppingException());
+        }
+
+        try {
+            if (LOG.isInfoEnabled()) {
+                var stringKey = new String(pendingAssignmentsEntry.key(), 
UTF_8);
+
+                LOG.info("Received update on pending assignments. Check if new 
replication node should be started [key={}, "
+                                + "partition={}, zoneId={}, 
localMemberAddress={}, pendingAssignments={}, revision={}]",
+                        stringKey, partId, zoneId, localNode().address(), 
pendingAssignments, revision);
+            }
+
+            return handleChangePendingAssignmentEvent(
+                    zonePartitionId,
+                    stableAssignments,
+                    pendingAssignments
+            ).thenCompose(v -> changePeersOnRebalance(
+                    replicaMgr,
+                    zonePartitionId,
+                    pendingAssignments.nodes(),
+                    stableAssignments == null ? emptySet() : 
stableAssignments.nodes(),
+                    revision)
+            );
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<Void> handleChangePendingAssignmentEvent(
+            ZonePartitionId replicaGrpId,
+            @Nullable Assignments stableAssignments,
+            Assignments pendingAssignments
+    ) {
+        boolean pendingAssignmentsAreForced = pendingAssignments.force();
+        Set<Assignment> pendingAssignmentsNodes = pendingAssignments.nodes();
+
+        // Start a new Raft node and Replica if this node has appeared in the 
new assignments.
+        Assignment localMemberAssignment = 
localMemberAssignment(pendingAssignments);
+
+        boolean shouldStartLocalGroupNode = localMemberAssignment != null
+                && (stableAssignments == null || 
!stableAssignments.nodes().contains(localMemberAssignment));
+
+        // This is a set of assignments for nodes that are not the part of 
stable assignments, i.e. unstable part of the distribution.
+        // For regular pending assignments we use (old) stable set, so that 
none of new nodes would be able to propose itself as a leader.
+        // For forced assignments, we should do the same thing, but only for 
the subset of stable set that is alive right now. Dead nodes
+        // are excluded. It is calculated precisely as an intersection between 
forced assignments and (old) stable assignments.
+        Assignments computedStableAssignments;
+
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-22600 remove the 
second condition
+        //  when we will have a proper handling of empty stable assignments
+        if (stableAssignments == null || stableAssignments.nodes().isEmpty()) {
+            // This condition can only pass if all stable nodes are dead, and 
we start new raft group from scratch.
+            // In this case new initial configuration must match new forced 
assignments.
+            computedStableAssignments = 
Assignments.forced(pendingAssignmentsNodes);
+        } else if (pendingAssignmentsAreForced) {
+            // In case of forced assignments we need to remove nodes that are 
present in the stable set but are missing from the
+            // pending set. Such operation removes dead stable nodes from the 
resulting stable set, which guarantees that we will
+            // have a live majority.
+            Set<Assignment> intersection = 
RebalanceUtil.intersect(stableAssignments.nodes(), pendingAssignmentsNodes);
+
+            computedStableAssignments = intersection.isEmpty() ? 
pendingAssignments : Assignments.forced(intersection);
+        } else {
+            computedStableAssignments = stableAssignments;
+        }
+
+        int partitionId = replicaGrpId.partitionId();
+        int zoneId = replicaGrpId.zoneId();
+
+        CompletableFuture<Void> localServicesStartFuture;
+
+        if (shouldStartLocalGroupNode) {
+            localServicesStartFuture = createZonePartitionReplicationNode(
+                    zoneId,
+                    partitionId,
+                    localMemberAssignment,
+                    computedStableAssignments
+            );
+        } else {
+            localServicesStartFuture = runAsync(() -> {
+                if (pendingAssignmentsAreForced && 
replicaMgr.isReplicaStarted(replicaGrpId)) {
+                    replicaMgr.resetPeers(replicaGrpId, 
fromAssignments(computedStableAssignments.nodes()));
+                }
+            }, ioExecutor);
+        }
+
+        return localServicesStartFuture.thenRunAsync(() -> {
+            if (!replicaMgr.isReplicaStarted(replicaGrpId)) {
+                return;
+            }
+
+            // For forced assignments, we exclude dead stable nodes, and all 
alive stable nodes are already in pending assignments.
+            // Union is not required in such a case.
+            Set<Assignment> newAssignments = pendingAssignmentsAreForced || 
stableAssignments == null
+                    ? pendingAssignmentsNodes
+                    : RebalanceUtil.union(pendingAssignmentsNodes, 
stableAssignments.nodes());
+
+            
replicaMgr.getReplica(replicaGrpId).join().raftClient().updateConfiguration(fromAssignments(newAssignments));
+        }, ioExecutor);
+    }
+
+    private CompletableFuture<Void> changePeersOnRebalance(
+            ReplicaManager replicaMgr,
+            ZonePartitionId replicaGrpId,
+            Set<Assignment> pendingAssignments,
+            Set<Assignment> stableAssignments,
+            long revision
+    ) {
+        Set<Assignment> union = new HashSet<>();
+        union.addAll(pendingAssignments);
+        union.addAll(stableAssignments);
+
+        if 
(!union.stream().map(Assignment::consistentId).collect(toSet()).contains(localNode().name()))
 {
+            return nullCompletedFuture();
+        }
+
+        int partId = replicaGrpId.partitionId();
+
+        RaftGroupService partGrpSvc = 
replicaMgr.replica(replicaGrpId).join().raftClient();
+
+        return partGrpSvc.refreshAndGetLeaderWithTerm()
+                .exceptionally(throwable -> {
+                    throwable = unwrapCause(throwable);
+
+                    if (throwable instanceof TimeoutException) {
+                        LOG.info("Node couldn't get the leader within timeout 
so the changing peers is skipped [grp={}].", replicaGrpId);
+
+                        return LeaderWithTerm.NO_LEADER;
+                    }
+
+                    throw new IgniteInternalException(
+                            INTERNAL_ERR,
+                            "Failed to get a leader for the RAFT replication 
group [get=" + replicaGrpId + "].",
+                            throwable
+                    );
+                })
+                .thenCompose(leaderWithTerm -> {
+                    if (leaderWithTerm.isEmpty() || 
!isLocalPeer(leaderWithTerm.leader())) {
+                        return nullCompletedFuture();
+                    }
+
+                    // run update of raft configuration if this node is a 
leader
+                    LOG.info("Current node={} is the leader of partition raft 
group={}. "
+                                    + "Initiate rebalance process for 
partition={}, zoneId={}",
+                            leaderWithTerm.leader(), replicaGrpId, partId, 
replicaGrpId.zoneId());
+
+                    return 
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId))
+                            .thenCompose(latestPendingAssignmentsEntry -> {
+                                // Do not change peers of the raft group if 
this is a stale event.
+                                // Note that we start raft node before for the 
sake of the consistency in a
+                                // starting and stopping raft nodes.
+                                if (revision < 
latestPendingAssignmentsEntry.revision()) {
+                                    return nullCompletedFuture();
+                                }
+
+                                PeersAndLearners newConfiguration = 
fromAssignments(pendingAssignments);
+
+                                CompletableFuture<Void> voidCompletableFuture 
= partGrpSvc.changePeersAsync(newConfiguration,
+                                        leaderWithTerm.term()).exceptionally(e 
-> {
+                                            return null;
+                                        });
+                                return voidCompletableFuture;
+                            });
+                });
+    }
+
+    private boolean isLocalPeer(Peer peer) {
+        return peer.consistentId().equals(localNode().name());
+    }
+
+    @Nullable
+    private Assignment localMemberAssignment(Assignments assignments) {
+        Assignment localMemberAssignment = 
Assignment.forPeer(localNode().name());
+
+        return assignments.nodes().contains(localMemberAssignment) ? 
localMemberAssignment : null;
+    }
+
     @Override
     public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
         if (!ENABLED) {
@@ -369,6 +873,16 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
         return nullCompletedFuture();
     }
 
+    private static String zoneInfo(CatalogZoneDescriptor zoneDescriptor) {
+        return zoneDescriptor.id() + "/" + zoneDescriptor.name();
+    }
+
+    private @Nullable Assignments stableAssignments(ZonePartitionId 
zonePartitionId, long revision) {
+        Entry entry = 
metaStorageMgr.getLocally(stablePartAssignmentsKey(zonePartitionId), revision);
+
+        return Assignments.fromBytes(entry.value());
+    }
+
     /**
      * Stops all resources associated with a given partition, like replicas 
and partition trackers.
      *
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index c243706ea8..ca37e69f04 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.replicator;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.raft.PeersAndLearners.fromAssignments;
@@ -672,44 +673,56 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
         RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new 
Peer(localNodeConsistentId));
 
-        ((Loza) raftManager).startRaftGroupNodeWithoutService(
+        CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut = 
((Loza) raftManager).startRaftGroupNode(
                 raftNodeId,
                 newConfiguration,
                 raftGroupListener,
                 raftGroupEventsListener,
-                groupOptions
+                groupOptions,
+                raftGroupServiceFactory
         );
 
-        LOG.info("Replica is about to start [replicationGroupId={}].", 
replicaGrpId);
+        return newRaftClientFut.thenComposeAsync(raftClient -> {
+            if (!busyLock.enterBusy()) {
+                return failedFuture(new NodeStoppingException());
+            }
 
-        Replica newReplica = new ZonePartitionReplicaImpl(
-                replicaGrpId,
-                listener
-        );
+            try {
+                LOG.info("Replica is about to start [replicationGroupId={}].", 
replicaGrpId);
 
-        CompletableFuture<Replica> replicaFuture = 
replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> {
-            if (existingReplicaFuture == null || 
existingReplicaFuture.isDone()) {
-                assert existingReplicaFuture == null || 
isCompletedSuccessfully(existingReplicaFuture);
-                LOG.info("Replica is started [replicationGroupId={}].", 
replicaGrpId);
+                Replica newReplica = new ZonePartitionReplicaImpl(
+                        replicaGrpId,
+                        listener,
+                        raftClient
+                );
 
-                return completedFuture(newReplica);
-            } else {
-                existingReplicaFuture.complete(newReplica);
-                LOG.info("Replica is started, existing replica waiter was 
completed [replicationGroupId={}].", replicaGrpId);
+                CompletableFuture<Replica> replicaFuture = 
replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> {
+                    if (existingReplicaFuture == null || 
existingReplicaFuture.isDone()) {
+                        assert existingReplicaFuture == null || 
isCompletedSuccessfully(existingReplicaFuture);
+                        LOG.info("Replica is started 
[replicationGroupId={}].", replicaGrpId);
 
-                return existingReplicaFuture;
-            }
-        });
+                        return completedFuture(newReplica);
+                    } else {
+                        existingReplicaFuture.complete(newReplica);
+                        LOG.info("Replica is started, existing replica waiter 
was completed [replicationGroupId={}].", replicaGrpId);
 
-        var eventParams = new LocalReplicaEventParameters(replicaGrpId);
+                        return existingReplicaFuture;
+                    }
+                });
 
-        return fireEvent(AFTER_REPLICA_STARTED, eventParams)
-                .exceptionally(e -> {
-                    LOG.error("Error when notifying about 
AFTER_REPLICA_STARTED event.", e);
+                var eventParams = new 
LocalReplicaEventParameters(replicaGrpId);
 
-                    return null;
-                })
-                .thenCompose(v -> replicaFuture);
+                return fireEvent(AFTER_REPLICA_STARTED, eventParams)
+                        .exceptionally(e -> {
+                            LOG.error("Error when notifying about 
AFTER_REPLICA_STARTED event.", e);
+
+                            return null;
+                        })
+                        .thenCompose(v -> replicaFuture);
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }, executor);
     }
 
     /**
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
index 4b91767e80..5cd05a8a5e 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
@@ -35,17 +35,28 @@ public class ZonePartitionReplicaImpl implements Replica {
 
     private final ReplicaListener listener;
 
+    TopologyAwareRaftGroupService raftClient;
+
+    /**
+     * Constructor.
+     *
+     * @param replicaGrpId  Replication group id.
+     * @param listener Listener for the replica.
+     * @param raftClient Raft client.
+     */
     public ZonePartitionReplicaImpl(
             ReplicationGroupId replicaGrpId,
-            ReplicaListener listener
+            ReplicaListener listener,
+            TopologyAwareRaftGroupService raftClient
     )  {
         this.replicaGrpId = replicaGrpId;
         this.listener = listener;
+        this.raftClient = raftClient;
     }
 
     @Override
     public TopologyAwareRaftGroupService raftClient() {
-        throw new UnsupportedOperationException("raftClient");
+        return raftClient;
     }
 
     @Override
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index adfdac3464..d340c3b9ce 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -740,7 +740,8 @@ public class IgniteImpl implements Ignite {
                 distributionZoneManager,
                 metaStorageMgr,
                 clusterSvc.topologyService(),
-                threadPoolsManager.tableIoExecutor()
+                threadPoolsManager.tableIoExecutor(),
+                rebalanceScheduler
         );
 
         TransactionConfiguration txConfig = 
clusterConfigRegistry.getConfiguration(TransactionConfiguration.KEY);

Reply via email to