This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 30ec21459a IGNITE-18088 Trigger rebalance on zone.dataNodes change. 
(#1572)
30ec21459a is described below

commit 30ec21459ad179646060ed6ff32f4aa346c8a8aa
Author: Sergey Uttsel <utt...@gmail.com>
AuthorDate: Fri Feb 3 14:51:37 2023 +0300

    IGNITE-18088 Trigger rebalance on zone.dataNodes change. (#1572)
---
 .../ignite/internal/affinity/AffinityUtils.java    |  26 +-
 .../affinity/RendezvousAffinityFunction.java       | 109 +++--
 .../internal/affinity/AffinityServiceTest.java     |  14 +-
 .../affinity/RendezvousAffinityFunctionTest.java   |  37 +-
 .../distributionzones/DistributionZonesUtil.java   |  33 +-
 .../ignite/internal/rebalance/ItRebalanceTest.java | 246 +++++++++++
 modules/table/build.gradle                         |   3 +
 .../distributed/ItTxDistributedTestSingleNode.java |   2 +-
 .../internal/table/distributed/TableManager.java   | 148 ++++++-
 .../ignite/internal/utils/RebalanceUtil.java       |  13 +-
 .../TableManagerDistributionZonesTest.java         | 462 +++++++++++++++++++++
 .../table/distributed/TableManagerTest.java        |   8 +-
 12 files changed, 965 insertions(+), 136 deletions(-)

diff --git 
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
index ec15cedc02..cf64ba7d6e 100644
--- 
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
+++ 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
@@ -25,7 +25,6 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import org.apache.ignite.network.ClusterNode;
 
 /**
  * Stateless affinity utils that produces helper methods for an affinity 
assignments calculation.
@@ -34,13 +33,14 @@ public class AffinityUtils {
     /**
      * Calculates affinity assignments.
      *
+     * @param dataNodes Data nodes.
      * @param partitions Partitions count.
      * @param replicas Replicas count.
      * @return List assignments by partition.
      */
-    public static List<Set<Assignment>> 
calculateAssignments(Collection<ClusterNode> baselineNodes, int partitions, int 
replicas) {
-        List<Set<ClusterNode>> affinityNodes = 
RendezvousAffinityFunction.assignPartitions(
-                baselineNodes,
+    public static List<Set<Assignment>> 
calculateAssignments(Collection<String> dataNodes, int partitions, int 
replicas) {
+        List<Set<String>> affinityNodes = 
RendezvousAffinityFunction.assignPartitions(
+                dataNodes,
                 partitions,
                 replicas,
                 false,
@@ -48,21 +48,21 @@ public class AffinityUtils {
                 HashSet::new
         );
 
-        return 
affinityNodes.stream().map(AffinityUtils::clusterNodesToAssignments).collect(toList());
+        return 
affinityNodes.stream().map(AffinityUtils::dataNodesToAssignments).collect(toList());
     }
 
     /**
      * Calculates affinity assignments for a single partition.
      *
-     * @param baselineNodes Nodes.
+     * @param dataNodes Data nodes.
      * @param partition Partition id.
      * @param replicas Replicas count.
-     * @return List of assignments.
+     * @return Set of assignments.
      */
-    public static Set<Assignment> 
calculateAssignmentForPartition(Collection<ClusterNode> baselineNodes, int 
partition, int replicas) {
-        Set<ClusterNode> affinityNodes = 
RendezvousAffinityFunction.assignPartition(
+    public static Set<Assignment> 
calculateAssignmentForPartition(Collection<String> dataNodes, int partition, 
int replicas) {
+        Set<String> affinityNodes = RendezvousAffinityFunction.assignPartition(
                 partition,
-                new ArrayList<>(baselineNodes),
+                new ArrayList<>(dataNodes),
                 replicas,
                 null,
                 false,
@@ -70,10 +70,10 @@ public class AffinityUtils {
                 HashSet::new
         );
 
-        return clusterNodesToAssignments(affinityNodes);
+        return dataNodesToAssignments(affinityNodes);
     }
 
-    private static Set<Assignment> 
clusterNodesToAssignments(Collection<ClusterNode> nodes) {
-        return nodes.stream().map(node -> 
Assignment.forPeer(node.name())).collect(toSet());
+    private static Set<Assignment> dataNodesToAssignments(Collection<String> 
nodes) {
+        return nodes.stream().map(Assignment::forPeer).collect(toSet());
     }
 }
diff --git 
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
index 91adae62fd..0cc1adf545 100644
--- 
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
+++ 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
@@ -33,7 +33,6 @@ import java.util.function.IntFunction;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.network.ClusterNode;
 
 /**
  * Affinity function for partitioned table based on Highest Random Weight 
algorithm. This function supports the following configuration:
@@ -59,7 +58,7 @@ public class RendezvousAffinityFunction {
     private static final IgniteLogger LOG = 
Loggers.forClass(RendezvousAffinityFunction.class);
 
     /** Comparator. */
-    private static final Comparator<IgniteBiTuple<Long, ClusterNode>> 
COMPARATOR = new HashComparator();
+    private static final Comparator<IgniteBiTuple<Long, String>> COMPARATOR = 
new HashComparator();
 
     /** Maximum number of partitions. */
     public static final int MAX_PARTITIONS_COUNT = 65000;
@@ -67,16 +66,6 @@ public class RendezvousAffinityFunction {
     /** Exclude neighbors warning. */
     private static boolean exclNeighborsWarn;
 
-    /**
-     * Resolves node hash.
-     *
-     * @param node Cluster node;
-     * @return Node hash.
-     */
-    public static Object resolveNodeHash(ClusterNode node) {
-        return node.name();
-    }
-
     /**
      * Returns collection of nodes for specified partition.
      *
@@ -89,13 +78,13 @@ public class RendezvousAffinityFunction {
      * @param aggregator        Function that creates a collection for the 
partition assignments.
      * @return Assignment.
      */
-    public static <T extends Collection<ClusterNode>> T assignPartition(
+    public static <T extends Collection<String>> T assignPartition(
             int part,
-            List<ClusterNode> nodes,
+            List<String> nodes,
             int replicas,
-            Map<String, Collection<ClusterNode>> neighborhoodCache,
+            Map<String, Collection<String>> neighborhoodCache,
             boolean exclNeighbors,
-            BiPredicate<ClusterNode, T> nodeFilter,
+            BiPredicate<String, T> nodeFilter,
             IntFunction<T> aggregator
     ) {
         if (nodes.size() <= 1) {
@@ -106,58 +95,56 @@ public class RendezvousAffinityFunction {
             return res;
         }
 
-        IgniteBiTuple<Long, ClusterNode>[] hashArr =
-                (IgniteBiTuple<Long, ClusterNode>[]) new 
IgniteBiTuple[nodes.size()];
+        IgniteBiTuple<Long, String>[] hashArr =
+                (IgniteBiTuple<Long, String>[]) new 
IgniteBiTuple[nodes.size()];
 
         for (int i = 0; i < nodes.size(); i++) {
-            ClusterNode node = nodes.get(i);
-
-            Object nodeHash = resolveNodeHash(node);
+            String node = nodes.get(i);
 
-            long hash = hash(nodeHash.hashCode(), part);
+            long hash = hash(node.hashCode(), part);
 
             hashArr[i] = new IgniteBiTuple<>(hash, node);
         }
 
         final int effectiveReplicas = replicas == Integer.MAX_VALUE ? 
nodes.size() : Math.min(replicas, nodes.size());
 
-        Iterable<ClusterNode> sortedNodes = new 
LazyLinearSortedContainer(hashArr, effectiveReplicas);
+        Iterable<String> sortedNodes = new LazyLinearSortedContainer(hashArr, 
effectiveReplicas);
 
         // REPLICATED cache case
         if (replicas == Integer.MAX_VALUE) {
             return replicatedAssign(nodes, sortedNodes, aggregator);
         }
 
-        Iterator<ClusterNode> it = sortedNodes.iterator();
+        Iterator<String> it = sortedNodes.iterator();
 
         T res = aggregator.apply(effectiveReplicas);
 
-        Collection<ClusterNode> allNeighbors = new HashSet<>();
+        Collection<String> allNeighbors = new HashSet<>();
 
-        ClusterNode first = it.next();
+        String first = it.next();
 
         res.add(first);
 
         if (exclNeighbors) {
-            allNeighbors.addAll(neighborhoodCache.get(first.id()));
+            allNeighbors.addAll(neighborhoodCache.get(first));
         }
 
         // Select another replicas.
         if (replicas > 1) {
             while (it.hasNext() && res.size() < effectiveReplicas) {
-                ClusterNode node = it.next();
+                String node = it.next();
 
                 if (exclNeighbors) {
                     if (!allNeighbors.contains(node)) {
                         res.add(node);
 
-                        allNeighbors.addAll(neighborhoodCache.get(node.id()));
+                        allNeighbors.addAll(neighborhoodCache.get(node));
                     }
                 } else if (nodeFilter == null || nodeFilter.test(node, res)) {
                     res.add(node);
 
                     if (exclNeighbors) {
-                        allNeighbors.addAll(neighborhoodCache.get(node.id()));
+                        allNeighbors.addAll(neighborhoodCache.get(node));
                     }
                 }
             }
@@ -170,7 +157,7 @@ public class RendezvousAffinityFunction {
             it.next();
 
             while (it.hasNext() && res.size() < effectiveReplicas) {
-                ClusterNode node = it.next();
+                String node = it.next();
 
                 if (!res.contains(node)) {
                     res.add(node);
@@ -198,15 +185,15 @@ public class RendezvousAffinityFunction {
      * @param aggregator  Function that creates a collection for the partition 
assignments.
      * @return Assignment.
      */
-    private static <T extends Collection<ClusterNode>> T 
replicatedAssign(List<ClusterNode> nodes,
-            Iterable<ClusterNode> sortedNodes, IntFunction<T> aggregator) {
-        ClusterNode first = sortedNodes.iterator().next();
+    private static <T extends Collection<String>> T 
replicatedAssign(List<String> nodes,
+            Iterable<String> sortedNodes, IntFunction<T> aggregator) {
+        String first = sortedNodes.iterator().next();
 
         T res = aggregator.apply(nodes.size());
 
         res.add(first);
 
-        for (ClusterNode n : nodes) {
+        for (String n : nodes) {
             if (!n.equals(first)) {
                 res.add(n);
             }
@@ -250,12 +237,12 @@ public class RendezvousAffinityFunction {
      * @param nodeFilter              Filter for nodes.
      * @return List nodes by partition.
      */
-    public static List<List<ClusterNode>> assignPartitions(
-            Collection<ClusterNode> currentTopologySnapshot,
+    public static List<List<String>> assignPartitions(
+            Collection<String> currentTopologySnapshot,
             int partitions,
             int replicas,
             boolean exclNeighbors,
-            BiPredicate<ClusterNode, List<ClusterNode>> nodeFilter
+            BiPredicate<String, List<String>> nodeFilter
     ) {
         return assignPartitions(currentTopologySnapshot, partitions, replicas, 
exclNeighbors, nodeFilter, ArrayList::new);
     }
@@ -271,12 +258,12 @@ public class RendezvousAffinityFunction {
      * @param aggregator              Function that creates a collection for 
the partition assignments.
      * @return List nodes by partition.
      */
-    public static <T extends Collection<ClusterNode>> List<T> assignPartitions(
-            Collection<ClusterNode> currentTopologySnapshot,
+    public static <T extends Collection<String>> List<T> assignPartitions(
+            Collection<String> currentTopologySnapshot,
             int partitions,
             int replicas,
             boolean exclNeighbors,
-            BiPredicate<ClusterNode, T> nodeFilter,
+            BiPredicate<String, T> nodeFilter,
             IntFunction<T> aggregator
     ) {
         assert partitions <= MAX_PARTITIONS_COUNT : "partitions <= " + 
MAX_PARTITIONS_COUNT;
@@ -285,9 +272,9 @@ public class RendezvousAffinityFunction {
 
         List<T> assignments = new ArrayList<>(partitions);
 
-        Map<String, Collection<ClusterNode>> neighborhoodCache = exclNeighbors 
? neighbors(currentTopologySnapshot) : null;
+        Map<String, Collection<String>> neighborhoodCache = exclNeighbors ? 
neighbors(currentTopologySnapshot) : null;
 
-        List<ClusterNode> nodes = new ArrayList<>(currentTopologySnapshot);
+        List<String> nodes = new ArrayList<>(currentTopologySnapshot);
 
         for (int i = 0; i < partitions; i++) {
             T partAssignment = assignPartition(i, nodes, replicas, 
neighborhoodCache, exclNeighbors, nodeFilter, aggregator);
@@ -304,15 +291,15 @@ public class RendezvousAffinityFunction {
      * @param topSnapshot Topology snapshot.
      * @return Neighbors map.
      */
-    public static Map<String, Collection<ClusterNode>> 
neighbors(Collection<ClusterNode> topSnapshot) {
-        Map<String, Collection<ClusterNode>> macMap = new 
HashMap<>(topSnapshot.size(), 1.0f);
+    public static Map<String, Collection<String>> neighbors(Collection<String> 
topSnapshot) {
+        Map<String, Collection<String>> macMap = new 
HashMap<>(topSnapshot.size(), 1.0f);
 
         // Group by mac addresses.
-        for (ClusterNode node : topSnapshot) {
+        for (String node : topSnapshot) {
             String macs = String.valueOf(node.hashCode());
             //node.attribute(IgniteNodeAttributes.ATTR_MACS);
 
-            Collection<ClusterNode> nodes = macMap.get(macs);
+            Collection<String> nodes = macMap.get(macs);
 
             if (nodes == null) {
                 macMap.put(macs, nodes = new HashSet<>());
@@ -321,11 +308,11 @@ public class RendezvousAffinityFunction {
             nodes.add(node);
         }
 
-        Map<String, Collection<ClusterNode>> neighbors = new 
HashMap<>(topSnapshot.size(), 1.0f);
+        Map<String, Collection<String>> neighbors = new 
HashMap<>(topSnapshot.size(), 1.0f);
 
-        for (Collection<ClusterNode> group : macMap.values()) {
-            for (ClusterNode node : group) {
-                neighbors.put(node.id(), group);
+        for (Collection<String> group : macMap.values()) {
+            for (String node : group) {
+                neighbors.put(node, group);
             }
         }
 
@@ -335,24 +322,24 @@ public class RendezvousAffinityFunction {
     /**
      * Hash comparator.
      */
-    private static class HashComparator implements 
Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable {
+    private static class HashComparator implements 
Comparator<IgniteBiTuple<Long, String>>, Serializable {
         /** Serial version uid. */
         private static final long serialVersionUID = 0L;
 
         /** {@inheritDoc} */
         @Override
-        public int compare(IgniteBiTuple<Long, ClusterNode> o1, 
IgniteBiTuple<Long, ClusterNode> o2) {
+        public int compare(IgniteBiTuple<Long, String> o1, IgniteBiTuple<Long, 
String> o2) {
             return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 :
-                    o1.get2().name().compareTo(o2.get2().name());
+                    o1.get2().compareTo(o2.get2());
         }
     }
 
     /**
      * Sorts the initial array with linear sort algorithm array.
      */
-    private static class LazyLinearSortedContainer implements 
Iterable<ClusterNode> {
+    private static class LazyLinearSortedContainer implements Iterable<String> 
{
         /** Initial node-hash array. */
-        private final IgniteBiTuple<Long, ClusterNode>[] arr;
+        private final IgniteBiTuple<Long, String>[] arr;
 
         /** Count of the sorted elements. */
         private int sorted;
@@ -363,7 +350,7 @@ public class RendezvousAffinityFunction {
          * @param arr                Node / partition hash list.
          * @param needFirstSortedCnt Estimate count of elements to return by 
iterator.
          */
-        LazyLinearSortedContainer(IgniteBiTuple<Long, ClusterNode>[] arr, int 
needFirstSortedCnt) {
+        LazyLinearSortedContainer(IgniteBiTuple<Long, String>[] arr, int 
needFirstSortedCnt) {
             this.arr = arr;
 
             if (needFirstSortedCnt > (int) Math.log(arr.length)) {
@@ -375,14 +362,14 @@ public class RendezvousAffinityFunction {
 
         /** {@inheritDoc} */
         @Override
-        public Iterator<ClusterNode> iterator() {
+        public Iterator<String> iterator() {
             return new SortIterator();
         }
 
         /**
          * Sorting iterator.
          */
-        private class SortIterator implements Iterator<ClusterNode> {
+        private class SortIterator implements Iterator<String> {
             /** Index of the first unsorted element. */
             private int cur;
 
@@ -394,7 +381,7 @@ public class RendezvousAffinityFunction {
 
             /** {@inheritDoc} */
             @Override
-            public ClusterNode next() {
+            public String next() {
                 if (!hasNext()) {
                     throw new NoSuchElementException();
                 }
@@ -403,7 +390,7 @@ public class RendezvousAffinityFunction {
                     return arr[cur++].get2();
                 }
 
-                IgniteBiTuple<Long, ClusterNode> min = arr[cur];
+                IgniteBiTuple<Long, String> min = arr[cur];
 
                 int minIdx = cur;
 
diff --git 
a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
 
b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
index 5310ab1dcc..0d0012d4bf 100644
--- 
a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
+++ 
b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
@@ -23,9 +23,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.UUID;
-import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -36,16 +33,7 @@ public class AffinityServiceTest {
     @Test
     public void testCalculatedAssignmentHappyPath() {
         List<Set<Assignment>> assignments = AffinityUtils.calculateAssignments(
-                Arrays.asList(
-                        new ClusterNode(
-                                UUID.randomUUID().toString(), "node0",
-                                new NetworkAddress("localhost", 8080)
-                        ),
-                        new ClusterNode(
-                                UUID.randomUUID().toString(), "node1",
-                                new NetworkAddress("localhost", 8081)
-                        )
-                ),
+                Arrays.asList("node0", "node1"),
                 10,
                 3
         );
diff --git 
a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
 
b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
index 6866d9a0fb..187bfecd89 100644
--- 
a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
+++ 
b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
@@ -27,7 +27,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.UUID;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -35,7 +34,6 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.network.NetworkAddress;
 import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.Test;
 
@@ -51,32 +49,32 @@ public class RendezvousAffinityFunctionTest {
 
     @Test
     public void testPartitionDistribution() {
-        int nodes = 50;
+        int nodeCount = 50;
 
         int parts = 10_000;
 
         int replicas = 4;
 
-        List<ClusterNode> clusterNodes = prepareNetworkTopology(nodes);
+        List<String> nodes = prepareNetworkTopology(nodeCount);
 
-        assertTrue(parts > nodes, "Partitions should be more that nodes");
+        assertTrue(parts > nodeCount, "Partitions should be more than nodes");
 
-        int ideal = (parts * replicas) / nodes;
+        int ideal = (parts * replicas) / nodeCount;
 
-        List<List<ClusterNode>> assignment = 
RendezvousAffinityFunction.assignPartitions(
-                clusterNodes,
+        List<List<String>> assignment = 
RendezvousAffinityFunction.assignPartitions(
+                nodes,
                 parts,
                 replicas,
                 false,
                 null
         );
 
-        HashMap<ClusterNode, ArrayList<Integer>> assignmentByNode = new 
HashMap<>(nodes);
+        HashMap<String, ArrayList<Integer>> assignmentByNode = new 
HashMap<>(nodeCount);
 
         int part = 0;
 
-        for (List<ClusterNode> partNodes : assignment) {
-            for (ClusterNode node : partNodes) {
+        for (List<String> partNodes : assignment) {
+            for (String node : partNodes) {
                 ArrayList<Integer> nodeParts = assignmentByNode.get(node);
 
                 if (nodeParts == null) {
@@ -89,7 +87,7 @@ public class RendezvousAffinityFunctionTest {
             part++;
         }
 
-        for (ClusterNode node : clusterNodes) {
+        for (String node : nodes) {
             ArrayList<Integer> nodeParts = assignmentByNode.get(node);
 
             assertNotNull(nodeParts);
@@ -104,29 +102,26 @@ public class RendezvousAffinityFunctionTest {
     }
 
     @NotNull
-    private List<ClusterNode> prepareNetworkTopology(int nodes) {
-        var addr = new NetworkAddress("127.0.0.1", 121212);
-
+    private List<String> prepareNetworkTopology(int nodes) {
         return IntStream.range(0, nodes)
                 .mapToObj(i -> "Node " + i)
-                .map(name -> new ClusterNode(UUID.randomUUID().toString(), 
name, addr))
                 .collect(Collectors.toUnmodifiableList());
     }
 
     @Test
     public void serializeAssignment() {
-        int nodes = 50;
+        int nodeCount = 50;
 
         int parts = 10_000;
 
         int replicas = 4;
 
-        List<ClusterNode> clusterNodes = prepareNetworkTopology(nodes);
+        List<String> nodes = prepareNetworkTopology(nodeCount);
 
-        assertTrue(parts > nodes, "Partitions should be more that nodes");
+        assertTrue(parts > nodeCount, "Partitions should be more than nodes");
 
-        List<List<ClusterNode>> assignment = 
RendezvousAffinityFunction.assignPartitions(
-                clusterNodes,
+        List<List<String>> assignment = 
RendezvousAffinityFunction.assignPartitions(
+                nodes,
                 parts,
                 replicas,
                 false,
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index ea1d8e6177..ac9cb9905d 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -25,6 +25,7 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Set;
 import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
 import org.apache.ignite.internal.metastorage.dsl.Update;
@@ -34,7 +35,7 @@ import org.apache.ignite.lang.ByteArray;
 /**
  * Util class for Distribution Zones flow.
  */
-class DistributionZonesUtil {
+public class DistributionZonesUtil {
     /** Key prefix for zone's data nodes. */
     private static final String DISTRIBUTION_ZONE_DATA_NODES_PREFIX = 
"distributionZone.dataNodes.";
 
@@ -60,11 +61,37 @@ class DistributionZonesUtil {
     private static final ByteArray 
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY =
             new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION);
 
-    /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}. */
-    static ByteArray zoneDataNodesKey(int zoneId) {
+    /**
+     * ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}.
+     *
+     * @param zoneId Zone id.
+     * @return ByteArray representation.
+     */
+    public static ByteArray zoneDataNodesKey(int zoneId) {
         return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX + zoneId);
     }
 
+    /**
+     * ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}.
+     *
+     * @return ByteArray representation.
+     */
+    public static ByteArray zoneDataNodesPrefix() {
+        return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX);
+    }
+
+    /**
+     * Extract zone id from a distribution zone data nodes key.
+     *
+     * @param key Key.
+     * @return Zone id.
+     */
+    public static int extractZoneId(byte[] key) {
+        var strKey = new String(key, StandardCharsets.UTF_8);
+
+        return 
Integer.parseInt(strKey.substring(DISTRIBUTION_ZONE_DATA_NODES_PREFIX.length()));
+    }
+
     /**
      * The key needed for processing an event about zone's creation and 
deletion.
      * With this key we can be sure that event was triggered only once.
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
new file mode 100644
index 0000000000..665cac7338
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rebalance;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import 
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import 
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for the rebalance.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItRebalanceTest extends BaseIgniteAbstractTest {
+    private static final IgniteLogger LOG = 
Loggers.forClass(ItRebalanceTest.class);
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo, workDir);
+    }
+
+    @AfterEach
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    /**
+     * The test checks that data is rebalanced after node with replica is left 
and joined to the cluster.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18692";)
+    void assignmentsChangingOnNodeLeaveNodeJoin() throws Exception {
+        cluster.startAndInit(4);
+
+        //Creates table with 1 partition and 3 replicas.
+        createTestTable();
+
+        assertTrue(waitAssignments(List.of(
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2)
+        )));
+
+        TableImpl table = (TableImpl) cluster.node(0).tables().table("TEST");
+
+        BinaryRowEx row = new 
TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 
1).set("value", "value1"));
+        BinaryRowEx key = new 
TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 1));
+
+        assertNull(table.internalTable().get(key, new HybridClockImpl().now(), 
cluster.node(0).node()).get());
+        assertNull(table.internalTable().get(key, new HybridClockImpl().now(), 
cluster.node(1).node()).get());
+        assertNull(table.internalTable().get(key, new HybridClockImpl().now(), 
cluster.node(2).node()).get());
+
+        table.internalTable().insert(row, null).get();
+
+        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(0).node()).get());
+        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(1).node()).get());
+        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(2).node()).get());
+
+        try {
+            table.internalTable().get(key, new HybridClockImpl().now(), 
cluster.node(3).node()).get();
+
+            fail();
+        } catch (Exception e) {
+            assertInstanceOf(ExecutionException.class, e);
+
+            assertInstanceOf(ReplicaUnavailableException.class, e.getCause());
+        }
+
+        cluster.knockOutNode(2, PARTITION_NETWORK);
+
+        assertTrue(waitAssignments(List.of(
+                Set.of(0, 1, 3),
+                Set.of(0, 1, 3),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 3)
+        )));
+
+        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(0).node()).get());
+        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(1).node()).get());
+        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(3).node()).get());
+
+        cluster.reanimateNode(2, PARTITION_NETWORK);
+
+        assertTrue(waitAssignments(List.of(
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2)
+        )));
+
+        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(0).node()).get());
+        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(1).node()).get());
+        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(2).node()).get());
+
+        try {
+            table.internalTable().get(key, new HybridClockImpl().now(), 
cluster.node(3).node()).get();
+
+            fail();
+        } catch (Exception e) {
+            assertInstanceOf(ExecutionException.class, e);
+
+            assertInstanceOf(ReplicaUnavailableException.class, e.getCause());
+        }
+    }
+
+    /**
+     * Wait assignments on nodes.
+     *
+     * @param nodes Expected assignments.
+     * @return {@code true} if the expected and actual assignments are the 
same.
+     * @throws InterruptedException If interrupted.
+     */
+    private boolean waitAssignments(List<Set<Integer>> nodes) throws 
InterruptedException {
+        return waitForCondition(() -> {
+            for (int i = 0; i < nodes.size(); i++) {
+                Set<Integer> expectedAssignments = nodes.get(i);
+
+                ExtendedTableConfiguration table =
+                        (ExtendedTableConfiguration) cluster.node(i)
+                                
.clusterConfiguration().getConfiguration(TablesConfiguration.KEY).tables().get("TEST");
+
+                byte[] assignmentsBytes = table.assignments().value();
+
+                Set<String> assignments;
+
+                if (assignmentsBytes != null) {
+                    assignments = ((List<Set<Assignment>>) 
ByteUtils.fromBytes(assignmentsBytes)).get(0)
+                            .stream().map(assignment -> 
assignment.consistentId()).collect(Collectors.toSet());
+                } else {
+                    assignments = Collections.emptySet();
+                }
+
+                LOG.info("Assignments for node " + i + ": " + assignments);
+
+                if (!(expectedAssignments.size() == assignments.size())
+                        || !expectedAssignments.stream().allMatch(node -> 
assignments.contains(cluster.node(node).name()))) {
+                    return false;
+                }
+            }
+
+            return true;
+        },
+                5000);
+    }
+
+    private void createTestTable() throws InterruptedException {
+        String sql1 = "create zone test_zone with "
+                + "data_nodes_auto_adjust_scale_up=0, "
+                + "data_nodes_auto_adjust_scale_down=0";
+        String sql2 = "create table test (id int primary key, value 
varchar(20))"
+                + " with partitions=1, replicas=3, primary_zone='TEST_ZONE'";
+
+        cluster.doInSession(0, session -> {
+            executeUpdate(sql1, session);
+            executeUpdate(sql2, session);
+        });
+
+        waitForTableToStart();
+    }
+
+    private void waitForTableToStart() throws InterruptedException {
+        // TODO: IGNITE-18203 - remove this wait because when a table creation 
query is executed, the table must be fully ready.
+
+        BooleanSupplier tableStarted = () -> {
+            int numberOfStartedRaftNodes = cluster.runningNodes()
+                    .map(ItRebalanceTest::tablePartitionIds)
+                    .mapToInt(List::size)
+                    .sum();
+            return numberOfStartedRaftNodes == 3;
+        };
+
+        assertTrue(waitForCondition(tableStarted, 10_000), "Did not see all 
table RAFT nodes started");
+    }
+
+    /**
+     * Returns the IDs of all table partitions that exist on the given node.
+     */
+    private static List<TablePartitionId> tablePartitionIds(IgniteImpl node) {
+        return node.raftManager().localNodes().stream()
+                .map(RaftNodeId::groupId)
+                .filter(TablePartitionId.class::isInstance)
+                .map(TablePartitionId.class::cast)
+                .collect(toList());
+    }
+}
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index e2d59f5bc3..54a04a315e 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -59,6 +59,7 @@ dependencies {
     testImplementation(testFixtures(project(':ignite-configuration')))
     testImplementation(testFixtures(project(':ignite-transactions')))
     testImplementation(testFixtures(project(':ignite-storage-api')))
+    testImplementation(testFixtures(project(':ignite-metastorage')))
     testImplementation libs.mockito.core
     testImplementation libs.mockito.inline
     testImplementation libs.mockito.junit
@@ -80,6 +81,7 @@ dependencies {
     testFixturesImplementation(testFixtures(project(':ignite-core')))
     testFixturesImplementation(testFixtures(project(':ignite-storage-api')))
     testFixturesImplementation(testFixtures(project(':ignite-transactions')))
+    
testFixturesImplementation(testFixtures(project(':ignite-cluster-management')))
     testFixturesImplementation libs.jetbrains.annotations
     testFixturesImplementation libs.fastutil.core
     testFixturesImplementation libs.mockito.core
@@ -96,6 +98,7 @@ dependencies {
     integrationTestImplementation(testFixtures(project(':ignite-raft')))
     integrationTestImplementation(testFixtures(project(':ignite-storage-api')))
     
integrationTestImplementation(testFixtures(project(':ignite-transactions')))
+    
integrationTestImplementation(testFixtures(project(':ignite-cluster-management')))
     integrationTestImplementation libs.fastutil.core
 }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 80e0f957ca..7a368f6021 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -371,7 +371,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
      */
     private Int2ObjectOpenHashMap<RaftGroupService> startTable(UUID tblId, 
SchemaDescriptor schemaDescriptor) throws Exception {
         List<Set<Assignment>> calculatedAssignments = 
AffinityUtils.calculateAssignments(
-                cluster.stream().map(node -> 
node.topologyService().localMember()).collect(toList()),
+                cluster.stream().map(node -> 
node.topologyService().localMember().name()).collect(toList()),
                 1,
                 replicas()
         );
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index d1a483094f..6bfa418150 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -24,6 +24,8 @@ import static 
java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesPrefix;
 import static 
org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
@@ -70,6 +72,7 @@ import java.util.function.IntSupplier;
 import java.util.stream.Stream;
 import org.apache.ignite.configuration.ConfigurationChangeException;
 import org.apache.ignite.configuration.ConfigurationProperty;
+import org.apache.ignite.configuration.NamedConfigurationTree;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
 import org.apache.ignite.internal.affinity.AffinityUtils;
@@ -293,6 +296,18 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
     private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new 
TableMessagesFactory();
 
+    /** Meta storage listener for changes in the distribution zones data 
nodes. */
+    private final WatchListener distributionZonesDataNodesListener;
+
+    /** Meta storage listener for pending assignments. */
+    private final WatchListener pendingAssignmentsRebalanceListener;
+
+    /** Meta storage listener for stable assignments. */
+    private final WatchListener stableAssignmentsRebalanceListener;
+
+    /** Meta storage listener for switch reduce assignments. */
+    private final WatchListener assignmentsSwitchRebalanceListener;
+
     /**
      * Creates a new table manager.
      *
@@ -427,6 +442,14 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 new LinkedBlockingQueue<>(),
                 NamedThreadFactory.create(nodeName, "incoming-raft-snapshot", 
LOG)
         );
+
+        distributionZonesDataNodesListener = 
createDistributionZonesDataNodesListener();
+
+        pendingAssignmentsRebalanceListener = 
createPendingAssignmentsRebalanceListener();
+
+        stableAssignmentsRebalanceListener = 
createStableAssignmentsRebalanceListener();
+
+        assignmentsSwitchRebalanceListener = 
createAssignmentsSwitchRebalanceListener();
     }
 
     /** {@inheritDoc} */
@@ -434,7 +457,12 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     public void start() {
         tablesCfg.tables().any().replicas().listen(this::onUpdateReplicas);
 
-        registerRebalanceListeners();
+        // TODO: IGNITE-18694 - Recovery for the case when zones watch 
listener processed event but assignments were not updated.
+        metaStorageMgr.registerPrefixWatch(zoneDataNodesPrefix(), 
distributionZonesDataNodesListener);
+
+        
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
 pendingAssignmentsRebalanceListener);
+        
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
 stableAssignmentsRebalanceListener);
+        
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
 assignmentsSwitchRebalanceListener);
 
         ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).assignments().listen(this::onUpdateAssignments);
 
@@ -606,7 +634,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 for (int i = 0; i < partCnt; i++) {
                     TablePartitionId replicaGrpId = new 
TablePartitionId(((ExtendedTableConfiguration) tblCfg).id().value(), i);
 
-                    futures[i] = 
updatePendingAssignmentsKeys(tblCfg.name().value(), replicaGrpId, 
baselineMgr.nodes(), newReplicas,
+                    futures[i] = 
updatePendingAssignmentsKeys(tblCfg.name().value(), replicaGrpId,
+                            
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()), 
newReplicas,
                             replicasCtx.storageRevision(), metaStorageMgr, i);
                 }
 
@@ -953,6 +982,12 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             return;
         }
 
+        metaStorageMgr.unregisterWatch(distributionZonesDataNodesListener);
+
+        metaStorageMgr.unregisterWatch(pendingAssignmentsRebalanceListener);
+        metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener);
+        metaStorageMgr.unregisterWatch(assignmentsSwitchRebalanceListener);
+
         busyLock.block();
 
         Map<UUID, TableImpl> tables = tablesByIdVv.latest();
@@ -1248,7 +1283,11 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     }
 
     private Set<Assignment> calculateAssignments(TableConfiguration tableCfg, 
int partNum) {
-        return 
AffinityUtils.calculateAssignmentForPartition(baselineMgr.nodes(), partNum, 
tableCfg.value().replicas());
+        return AffinityUtils.calculateAssignmentForPartition(
+                
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
+                partNum,
+                tableCfg.value().replicas()
+        );
     }
 
     /**
@@ -1308,7 +1347,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                         // Affinity assignments calculation.
                         
extConfCh.changeAssignments(ByteUtils.toBytes(AffinityUtils.calculateAssignments(
-                                baselineMgr.nodes(),
+                                
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
                                 tableChange.partitions(),
                                 tableChange.replicas())));
                     });
@@ -1789,10 +1828,79 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     }
 
     /**
-     * Register the new meta storage listener for changes in the 
rebalance-specific keys.
+     * Creates meta storage listener for distribution zones data nodes updates.
+     *
+     * @return The watch listener.
+     */
+    private WatchListener createDistributionZonesDataNodesListener() {
+        return new WatchListener() {
+            @Override
+            public void onUpdate(WatchEvent evt) {
+                if (!busyLock.enterBusy()) {
+                    throw new IgniteInternalException(new 
NodeStoppingException());
+                }
+
+                try {
+                    byte[] dataNodesBytes = 
evt.entryEvent().newEntry().value();
+
+                    if (dataNodesBytes == null) {
+                        //The zone was removed so data nodes was removed too.
+                        return;
+                    }
+
+                    NamedConfigurationTree<TableConfiguration, TableView, 
TableChange> tables = tablesCfg.tables();
+
+                    int zoneId = 
extractZoneId(evt.entryEvent().newEntry().key());
+
+                    Set<String> dataNodes = 
ByteUtils.fromBytes(dataNodesBytes);
+
+                    for (int i = 0; i < tables.value().size(); i++) {
+                        TableView tableView = tables.value().get(i);
+
+                        int tableZoneId = tableView.zoneId();
+
+                        if (zoneId == tableZoneId) {
+                            TableConfiguration tableCfg = 
tables.get(tableView.name());
+
+                            for (int part = 0; part < tableView.partitions(); 
part++) {
+                                UUID tableId = ((ExtendedTableConfiguration) 
tableCfg).id().value();
+
+                                TablePartitionId replicaGrpId = new 
TablePartitionId(tableId, part);
+
+                                int partId = part;
+
+                                updatePendingAssignmentsKeys(
+                                        tableView.name(), replicaGrpId, 
dataNodes, tableView.replicas(),
+                                        
evt.entryEvent().newEntry().revision(), metaStorageMgr, part
+                                ).exceptionally(e -> {
+                                    LOG.error(
+                                            "Exception on updating assignments 
for [table={}, partition={}]", e, tableView.name(), partId
+                                    );
+
+                                    return null;
+                                });
+                            }
+                        }
+                    }
+                } finally {
+                    busyLock.leaveBusy();
+                }
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                LOG.warn("Unable to process data nodes event", e);
+            }
+        };
+    }
+
+    /**
+     * Creates meta storage listener for pending assignments updates.
+     *
+     * @return The watch listener.
      */
-    private void registerRebalanceListeners() {
-        
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
 new WatchListener() {
+    private WatchListener createPendingAssignmentsRebalanceListener() {
+        return new WatchListener() {
             @Override
             public void onUpdate(WatchEvent evt) {
                 if (!busyLock.enterBusy()) {
@@ -1960,9 +2068,16 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             public void onError(Throwable e) {
                 LOG.warn("Unable to process pending assignments event", e);
             }
-        });
+        };
+    }
 
-        
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
 new WatchListener() {
+    /**
+     * Creates meta storage listener for stable assignments updates.
+     *
+     * @return The watch listener.
+     */
+    private WatchListener createStableAssignmentsRebalanceListener() {
+        return new WatchListener() {
             @Override
             public void onUpdate(WatchEvent evt) {
                 handleChangeStableAssignmentEvent(evt);
@@ -1972,9 +2087,16 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             public void onError(Throwable e) {
                 LOG.warn("Unable to process stable assignments event", e);
             }
-        });
+        };
+    }
 
-        
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
 new WatchListener() {
+    /**
+     * Creates meta storage listener for switch reduce assignments updates.
+     *
+     * @return The watch listener.
+     */
+    private WatchListener createAssignmentsSwitchRebalanceListener() {
+        return new WatchListener() {
             @Override
             public void onUpdate(WatchEvent evt) {
                 byte[] key = evt.entryEvent().newEntry().key();
@@ -1990,7 +2112,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                 RebalanceUtil.handleReduceChanged(
                         metaStorageMgr,
-                        baselineMgr.nodes(),
+                        
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
                         tblCfg.value().replicas(),
                         partitionNumber,
                         replicaGrpId,
@@ -2002,7 +2124,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             public void onError(Throwable e) {
                 LOG.warn("Unable to process switch reduce event", e);
             }
-        });
+        };
     }
 
     private PartitionMover createPartitionMover(InternalTable internalTable, 
int partId) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
index 0b0b5f36c4..a3dea5ffa2 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.metastorage.dsl.Operations;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -82,14 +81,14 @@ public class RebalanceUtil {
      *
      * @param tableName Table name.
      * @param partId Unique identifier of a partition.
-     * @param baselineNodes Nodes in baseline.
+     * @param dataNodes Data nodes.
      * @param replicas Number of replicas for a table.
      * @param revision Revision of Meta Storage that is specific for the 
assignment update.
      * @param metaStorageMgr Meta Storage manager.
      * @return Future representing result of updating keys in {@code 
metaStorageMgr}
      */
     public static @NotNull CompletableFuture<Void> 
updatePendingAssignmentsKeys(
-            String tableName, TablePartitionId partId, Collection<ClusterNode> 
baselineNodes,
+            String tableName, TablePartitionId partId, Collection<String> 
dataNodes,
             int replicas, long revision, MetaStorageManager metaStorageMgr, 
int partNum) {
         ByteArray partChangeTriggerKey = partChangeTriggerKey(partId);
 
@@ -99,7 +98,7 @@ public class RebalanceUtil {
 
         ByteArray partAssignmentsStableKey = stablePartAssignmentsKey(partId);
 
-        Set<Assignment> partAssignments = 
AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
+        Set<Assignment> partAssignments = 
AffinityUtils.calculateAssignmentForPartition(dataNodes, partNum, replicas);
 
         byte[] partAssignmentsBytes = ByteUtils.toBytes(partAssignments);
 
@@ -341,14 +340,14 @@ public class RebalanceUtil {
      * If there is rebalancing in progress, then new assignments will be 
applied when rebalance finishes.
      *
      * @param metaStorageMgr MetaStorage manager.
-     * @param baselineNodes Baseline nodes.
+     * @param dataNodes Data nodes.
      * @param replicas Replicas count.
      * @param partNum Number of the partition.
      * @param partId Partition's raft group id.
      * @param event Assignments switch reduce change event.
      * @return Completable future that signifies the completion of this 
operation.
      */
-    public static CompletableFuture<Void> 
handleReduceChanged(MetaStorageManager metaStorageMgr, Collection<ClusterNode> 
baselineNodes,
+    public static CompletableFuture<Void> 
handleReduceChanged(MetaStorageManager metaStorageMgr, Collection<String> 
dataNodes,
             int replicas, int partNum, TablePartitionId partId, WatchEvent 
event) {
         Entry entry = event.entryEvent().newEntry();
         byte[] eventData = entry.value();
@@ -359,7 +358,7 @@ public class RebalanceUtil {
             return CompletableFuture.completedFuture(null);
         }
 
-        Set<Assignment> assignments = 
AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
+        Set<Assignment> assignments = 
AffinityUtils.calculateAssignmentForPartition(dataNodes, partNum, replicas);
 
         ByteArray pendingKey = pendingPartAssignmentsKey(partId);
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
new file mode 100644
index 0000000000..1645d99ebe
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Collections.emptySet;
+import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static 
org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import 
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.dsl.Iif;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.schema.SchemaManager;
+import 
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableChange;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.utils.RebalanceUtil;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+/**
+ * Tests the distribution zone watch listener in {@link TableManager}.
+ */
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class TableManagerDistributionZonesTest extends IgniteAbstractTest {
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    @Mock()
+    private ClusterService clusterService;
+
+    private TablesConfiguration tablesConfiguration;
+
+    private WatchListener watchListener;
+
+    private TableManager tableManager;
+
+    @BeforeEach
+    public void setUp() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Set.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        MetaStorageManager metaStorageManager = mock(MetaStorageManager.class);
+
+        doAnswer(invocation -> {
+            WatchListener listener = invocation.getArgument(1);
+
+            if (watchListener == null) {
+                watchListener = listener;
+            }
+
+            return null;
+        }).when(metaStorageManager).registerPrefixWatch(any(), any());
+
+        tablesConfiguration = mock(TablesConfiguration.class);
+
+        clusterCfgMgr.start();
+
+        AtomicLong raftIndex = new AtomicLong();
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
+
+        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage);
+
+        RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+        // Delegate directly to listener.
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new 
CompletableFuture<>();
+
+                    CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public WriteCommand command() {
+                            return (WriteCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onWrite(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new 
IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any());
+
+        MetaStorageCommandsFactory commandsFactory = new 
MetaStorageCommandsFactory();
+
+        lenient().doAnswer(invocationClose -> {
+            Iif iif = invocationClose.getArgument(0);
+
+            MultiInvokeCommand multiInvokeCommand = 
commandsFactory.multiInvokeCommand().iif(iif).build();
+
+            return metaStorageService.run(multiInvokeCommand);
+        }).when(metaStorageManager).invoke(any());
+
+        when(clusterService.messagingService()).thenAnswer(invocation -> {
+            MessagingService ret = mock(MessagingService.class);
+
+            return ret;
+        });
+
+        tableManager = new TableManager(
+                "node1",
+                (x) -> {},
+                tablesConfiguration,
+                clusterService,
+                null,
+                null,
+                null,
+                null,
+                null,
+                mock(TopologyService.class),
+                null,
+                null,
+                null,
+                metaStorageManager,
+                mock(SchemaManager.class),
+                null,
+                null,
+                mock(OutgoingSnapshotsManager.class)
+        );
+    }
+
+    @Test
+    void dataNodesTriggersAssignmentsChanging() {
+        IgniteBiTuple<TableView, ExtendedTableConfiguration> table0 = 
mockTable(0, 1, 0);
+        IgniteBiTuple<TableView, ExtendedTableConfiguration> table1 = 
mockTable(1, 2, 0);
+        IgniteBiTuple<TableView, ExtendedTableConfiguration> table2 = 
mockTable(2, 1, 1);
+        IgniteBiTuple<TableView, ExtendedTableConfiguration> table3 = 
mockTable(3, 2, 1);
+        IgniteBiTuple<TableView, ExtendedTableConfiguration> table4 = 
mockTable(4, 1, 1);
+        IgniteBiTuple<TableView, ExtendedTableConfiguration> table5 = 
mockTable(5, 2, 1);
+
+        List<IgniteBiTuple<TableView, ExtendedTableConfiguration>> 
mockedTables =
+                List.of(table0, table1, table2, table3, table4, table5);
+
+        mockTables(mockedTables);
+
+        tableManager.start();
+
+        Set<String> nodes = Set.of("node0", "node1", "node2");
+
+        watchListenerOnUpdate(1, nodes, 1);
+
+        Map<Integer, Set<String>> zoneNodes = new HashMap<>();
+
+        zoneNodes.put(1, nodes);
+
+        checkAssignments(mockedTables, zoneNodes, 
RebalanceUtil::pendingPartAssignmentsKey);
+
+        verify(keyValueStorage, timeout(1000).times(6)).invoke(any());
+    }
+
+    @Test
+    void sequentialAssignmentsChanging() {
+        IgniteBiTuple<TableView, ExtendedTableConfiguration> table = 
mockTable(0, 1, 1);
+
+        List<IgniteBiTuple<TableView, ExtendedTableConfiguration>> 
mockedTables = List.of(table);
+
+        mockTables(mockedTables);
+
+        tableManager.start();
+
+        Set<String> nodes = Set.of("node0", "node1", "node2");
+
+        watchListenerOnUpdate(1, nodes, 1);
+
+        Map<Integer, Set<String>> zoneNodes = new HashMap<>();
+
+        zoneNodes.put(1, nodes);
+
+        checkAssignments(mockedTables, zoneNodes, 
RebalanceUtil::pendingPartAssignmentsKey);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        nodes = Set.of("node3", "node4", "node5");
+
+        watchListenerOnUpdate(1, nodes, 2);
+
+        zoneNodes.clear();
+        zoneNodes.put(1, nodes);
+
+        checkAssignments(mockedTables, zoneNodes, 
RebalanceUtil::plannedPartAssignmentsKey);
+
+        verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
+    }
+
+    @Test
+    void sequentialEmptyAssignmentsChanging() {
+        IgniteBiTuple<TableView, ExtendedTableConfiguration> table = 
mockTable(0, 1, 1);
+
+        List<IgniteBiTuple<TableView, ExtendedTableConfiguration>> 
mockedTables = List.of(table);
+
+        mockTables(mockedTables);
+
+        tableManager.start();
+
+        watchListenerOnUpdate(1, null, 1);
+
+        Set<String> nodes = Set.of("node0", "node1", "node2");
+
+        watchListenerOnUpdate(1, nodes, 2);
+
+        Map<Integer, Set<String>> zoneNodes = new HashMap<>();
+
+        zoneNodes.put(1, nodes);
+
+        checkAssignments(mockedTables, zoneNodes, 
RebalanceUtil::pendingPartAssignmentsKey);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        nodes = emptySet();
+
+        watchListenerOnUpdate(1, nodes, 3);
+
+        zoneNodes.clear();
+        zoneNodes.put(1, nodes);
+
+        checkAssignments(mockedTables, zoneNodes, 
RebalanceUtil::plannedPartAssignmentsKey);
+
+        verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
+    }
+
+    @Test
+    void staleDataNodesEvent() {
+        IgniteBiTuple<TableView, ExtendedTableConfiguration> table = 
mockTable(0, 1, 1);
+
+        List<IgniteBiTuple<TableView, ExtendedTableConfiguration>> 
mockedTables = List.of(table);
+
+        mockTables(mockedTables);
+
+        tableManager.start();
+
+        Set<String> nodes = Set.of("node0", "node1", "node2");
+
+        watchListenerOnUpdate(1, nodes, 1);
+
+        Map<Integer, Set<String>> zoneNodes = new HashMap<>();
+
+        zoneNodes.put(1, nodes);
+
+        checkAssignments(mockedTables, zoneNodes, 
RebalanceUtil::pendingPartAssignmentsKey);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Set<String> nodes2 = Set.of("node3", "node4", "node5");
+
+        watchListenerOnUpdate(1, nodes2, 1);
+
+        checkAssignments(mockedTables, zoneNodes, 
RebalanceUtil::pendingPartAssignmentsKey);
+
+        TablePartitionId partId = new TablePartitionId(new UUID(0, 0), 0);
+
+        
assertNull(keyValueStorage.get(RebalanceUtil.plannedPartAssignmentsKey(partId).bytes()).value());
+
+        verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
+    }
+
+    private void checkAssignments(
+            List<IgniteBiTuple<TableView, ExtendedTableConfiguration>> 
mockedTables,
+            Map<Integer, Set<String>> zoneNodes,
+            Function<TablePartitionId, ByteArray> assignmentFunction
+    ) {
+        for (int i = 0; i < mockedTables.size(); i++) {
+            TableView tableView = mockedTables.get(i).get1();
+
+            for (int j = 0; j < tableView.partitions(); j++) {
+                TablePartitionId partId = new TablePartitionId(new UUID(0, i), 
j);
+
+                byte[] actualAssignmentsBytes = 
keyValueStorage.get(assignmentFunction.apply(partId).bytes()).value();
+
+                Set<String> expectedNodes = zoneNodes.get(tableView.zoneId());
+
+                if (expectedNodes != null) {
+                    Set<String> expectedAssignments =
+                            calculateAssignmentForPartition(expectedNodes, j, 
tableView.replicas())
+                                    .stream().map(assignment -> 
assignment.consistentId()).collect(Collectors.toSet());
+
+                    assertNotNull(actualAssignmentsBytes);
+
+                    Set<String> actualAssignments = ((Set<Assignment>) 
fromBytes(actualAssignmentsBytes))
+                            .stream().map(assignment -> 
assignment.consistentId()).collect(Collectors.toSet());
+
+                    
assertTrue(expectedAssignments.containsAll(actualAssignments));
+
+                    assertEquals(expectedAssignments.size(), 
actualAssignments.size());
+                } else {
+                    assertNull(actualAssignmentsBytes);
+                }
+            }
+        }
+    }
+
+    private void watchListenerOnUpdate(int zoneId, Set<String> nodes, long 
rev) {
+        byte[] newLogicalTopology;
+
+        if (nodes != null) {
+            newLogicalTopology = toBytes(nodes);
+        } else {
+            newLogicalTopology = null;
+        }
+
+        Entry newEntry = new EntryImpl(zoneDataNodesKey(zoneId).bytes(), 
newLogicalTopology, rev, 1);
+
+        EntryEvent entryEvent = new EntryEvent(null, newEntry);
+
+        WatchEvent evt = new WatchEvent(entryEvent);
+
+        watchListener.onUpdate(evt);
+    }
+
+    private IgniteBiTuple<TableView, ExtendedTableConfiguration> mockTable(int 
tableNum, int partNum, int zoneId) {
+        TableView tableView = mock(TableView.class);
+        when(tableView.zoneId()).thenReturn(zoneId);
+        when(tableView.name()).thenReturn("table" + tableNum);
+        when(tableView.replicas()).thenReturn(1);
+        when(tableView.partitions()).thenReturn(partNum);
+
+        ExtendedTableConfiguration tableCfg = 
mock(ExtendedTableConfiguration.class);
+        ConfigurationValue valueId = mock(ConfigurationValue.class);
+        when(valueId.value()).thenReturn(new UUID(0, tableNum));
+        when(tableCfg.id()).thenReturn(valueId);
+
+        return new IgniteBiTuple<>(tableView, tableCfg);
+
+    }
+
+    private void mockTables(List<IgniteBiTuple<TableView, 
ExtendedTableConfiguration>> mockedTables) {
+        NamedListView<TableView> valueList = mock(NamedListView.class);
+
+        when(valueList.namedListKeys()).thenReturn(new ArrayList<>());
+
+        NamedConfigurationTree<TableConfiguration, TableView, TableChange> 
tables = mock(NamedConfigurationTree.class);
+
+        when(valueList.size()).thenReturn(mockedTables.size());
+        when(tables.value()).thenReturn(valueList);
+
+        for (int i = 0; i < mockedTables.size(); i++) {
+            IgniteBiTuple<TableView, ExtendedTableConfiguration> mockedTable = 
mockedTables.get(i);
+
+            TableView tableView = mockedTable.get1();
+
+            when(valueList.get(i)).thenReturn(tableView);
+
+            when(tables.get(tableView.name())).thenReturn(mockedTable.get2());
+        }
+
+        ExtendedTableConfiguration tableCfg = 
mock(ExtendedTableConfiguration.class);
+        when(tables.any()).thenReturn(tableCfg);
+        when(tableCfg.replicas()).thenReturn(mock(ConfigurationValue.class));
+        
when(tableCfg.assignments()).thenReturn(mock(ConfigurationValue.class));
+
+        when(tablesConfiguration.tables()).thenReturn(tables);
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index a4520d5f1c..64e5fa0708 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -255,6 +255,8 @@ public class TableManagerTest extends IgniteAbstractTest {
     public void testPreconfiguredTable() throws Exception {
         when(rm.startRaftGroupService(any(), any())).thenAnswer(mock -> 
completedFuture(mock(RaftGroupService.class)));
 
+        mockMetastore();
+
         TableManager tableManager = createTableManager(tblManagerFut, false);
 
         tblManagerFut.complete(tableManager);
@@ -264,8 +266,6 @@ public class TableManagerTest extends IgniteAbstractTest {
                 SchemaBuilders.column("val", 
ColumnType.INT64).asNullable(true).build()
         ).withPrimaryKey("key").build();
 
-        mockMetastore();
-
         tblsCfg.tables().change(tablesChange -> {
             tablesChange.create(scmTbl.name(), tableChange -> {
                 (SchemaConfigurationConverter.convert(scmTbl, tableChange))
@@ -650,6 +650,8 @@ public class TableManagerTest extends IgniteAbstractTest {
                     .thenReturn(assignment);
         }
 
+        mockMetastore();
+
         TableManager tableManager = createTableManager(tblManagerFut, true);
 
         final int tablesBeforeCreation = tableManager.tables().size();
@@ -687,8 +689,6 @@ public class TableManagerTest extends IgniteAbstractTest {
                         .changePartitions(PARTITIONS)
         );
 
-        mockMetastore();
-
         assertTrue(createTblLatch.await(10, TimeUnit.SECONDS));
 
         TableImpl tbl2 = (TableImpl) tbl2Fut.get();

Reply via email to