alievmirza commented on code in PR #1729:
URL: https://github.com/apache/ignite-3/pull/1729#discussion_r1161731849


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -505,6 +543,133 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version or 
greater version in topologyWatchListener.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
than wait for writing data nodes triggered
+     * by started nodes and corresponding to the passed topology version or 
greater topology version
+     * to the data nodes into the meta storage.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 
than wait for writing data nodes
+     * triggered by stopped nodes and corresponding to the passed topology 
version or greater topology version
+     * to the data nodes into the meta storage.
+     * After waiting it completes the future with data nodes of the specified 
zone.
+     * This method must be invoked in a change configuration closure to 
guarantee that the zone is exists and values of scaleUp/scaleDown
+     * timers are up to date.
+     * The returned future can be completed with {@link 
DistributionZoneNotFoundException} and {@link 
DistributionZoneWasRemovedException}
+     * in case when the distribution zone was removed during method execution.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionedDataNodes(int 
zoneId, long topVer) {
+        CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut = 
awaitTopologyVersion(topVer)
+                .thenCompose(ignored -> getImmediateTimers(zoneId));
+
+        return allOf(
+                timerValuesFut.thenCompose(timerValues -> 
scaleUpAwaiting(zoneId, timerValues.get1())),
+                timerValuesFut.thenCompose(timerValues -> 
scaleDownAwaiting(zoneId, timerValues.get2()))
+        ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+    }
+
+    /**
+     * Waits for DistributionZoneManager observe passed topology version or 
greater version in topologyWatchListener.

Review Comment:
   waits for observing, please add {@link } to topologyWatchListener 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -505,6 +543,133 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version or 
greater version in topologyWatchListener.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
than wait for writing data nodes triggered
+     * by started nodes and corresponding to the passed topology version or 
greater topology version
+     * to the data nodes into the meta storage.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 
than wait for writing data nodes
+     * triggered by stopped nodes and corresponding to the passed topology 
version or greater topology version
+     * to the data nodes into the meta storage.
+     * After waiting it completes the future with data nodes of the specified 
zone.
+     * This method must be invoked in a change configuration closure to 
guarantee that the zone is exists and values of scaleUp/scaleDown
+     * timers are up to date.
+     * The returned future can be completed with {@link 
DistributionZoneNotFoundException} and {@link 
DistributionZoneWasRemovedException}
+     * in case when the distribution zone was removed during method execution.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionedDataNodes(int 
zoneId, long topVer) {
+        CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut = 
awaitTopologyVersion(topVer)
+                .thenCompose(ignored -> getImmediateTimers(zoneId));
+
+        return allOf(
+                timerValuesFut.thenCompose(timerValues -> 
scaleUpAwaiting(zoneId, timerValues.get1())),
+                timerValuesFut.thenCompose(timerValues -> 
scaleDownAwaiting(zoneId, timerValues.get2()))
+        ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+    }
+
+    /**
+     * Waits for DistributionZoneManager observe passed topology version or 
greater version in topologyWatchListener.
+     *
+     * @param topVer Topology version.
+     * @return Future for chaining.
+     */
+    private CompletableFuture<Void> awaitTopologyVersion(long topVer) {
+        return inBusyLock(busyLock, () -> topVerTracker.waitFor(topVer));
+    }
+
+    /**
+     * Transforms {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+     * and {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} values to 
boolean values.
+     * True if it equals to zero and false if it greater than zero. Zero means 
that data nodes changing must be started immediate.

Review Comment:
   immediately 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -484,6 +537,189 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.

Review Comment:
   Comment from Alex is still actual, you have added implementation details of 
mechanism of awaiting, which is ok (but this details are too complicated and 
with grammar mistakes, please try to simplify it, maybe explain only one 
scenario with scale up, and mention that scale down logic is the same), but at 
the first place we must add some short explanation of the method purpose. 
   
   > The method for obtaining data nodes of the specified zone.
   
   This is not just a method for obtaining data nodes, but it takes into 
account provided {@code topVer} and compares it with the explored by DZM 
topology version, please add short javadoc in the beginning so it could be 
clear what this method actually does
   
   > @return The data nodes future.
   
   This is also quite short, we should say that the completion of this future 
depends on the explored by DZM topology version etc.
   
   Also please fix typos, like `than` -> `then`



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java:
##########
@@ -0,0 +1,910 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfigurationSchema;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneWasRemovedException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.StatementResultImpl;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.schema.configuration.TableChange;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests awaiting data nodes algorithm in distribution zone manager in case 
when
+ * {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * or {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} 
are immediate.
+ */
+public class DistributionZoneAwaitDataNodesTest extends IgniteAbstractTest {
+    private static final IgniteLogger LOG = 
Loggers.forClass(DistributionZoneAwaitDataNodesTest.class);
+
+    private MetaStorageManager metaStorageManager;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private LogicalTopology logicalTopology;
+
+    private ClusterStateStorage clusterStateStorage;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private ClusterManagementGroupManager cmgManager;
+
+    private VaultManager vaultManager;
+
+    private WatchListener topologyWatchListener;
+
+    private WatchListener dataNodesWatchListener;
+
+    @BeforeEach
+    void setUp() {
+        vaultManager = mock(VaultManager.class);
+
+        
when(vaultManager.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new
 VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(vaultManager.get(zonesLogicalTopologyVersionKey()))
+                .thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyVersionKey(), longToBytes(0))));
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        doAnswer(invocation -> {
+            ByteArray key = invocation.getArgument(0);
+
+            WatchListener watchListener = invocation.getArgument(1);
+
+            if (Arrays.equals(key.bytes(), 
zoneLogicalTopologyPrefix().bytes())) {
+                topologyWatchListener = watchListener;
+            } else if (Arrays.equals(key.bytes(), 
zonesDataNodesPrefix().bytes())) {
+                dataNodesWatchListener = watchListener;
+            }
+
+            return null;
+        }).when(metaStorageManager).registerPrefixWatch(any(), any());
+
+        when(metaStorageManager.invoke(any()))
+                
.thenReturn(completedFuture(StatementResultImpl.builder().result(new byte[] 
{0}).build()));
+
+        clusterStateStorage = new TestClusterStateStorage();
+
+        logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+        LogicalTopologyServiceImpl logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+        TablesConfiguration tablesConfiguration = 
mock(TablesConfiguration.class);
+
+        NamedConfigurationTree<TableConfiguration, TableView, TableChange> 
tables = mock(NamedConfigurationTree.class);
+
+        when(tablesConfiguration.tables()).thenReturn(tables);
+
+        NamedListView<TableView> tablesValue = mock(NamedListView.class);
+
+        when(tables.value()).thenReturn(tablesValue);
+
+        when(tablesValue.size()).thenReturn(0);
+
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Set.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        clusterCfgMgr.start();
+
+        zonesConfiguration = clusterCfgMgr.configurationRegistry()
+                .getConfiguration(DistributionZonesConfiguration.KEY);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                tablesConfiguration,
+                metaStorageManager,
+                logicalTopologyService,
+                vaultManager,
+                "node"
+        );
+
+        mockCmgLocalNodes();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        distributionZoneManager.stop();
+        clusterCfgMgr.stop();
+        clusterStateStorage.stop();
+    }
+
+    /**
+     * Test checks that data nodes futures are completed on topology with 
added and removed nodes.
+     */
+    @Test
+    void testSeveralScaleUpAndSeveralScaleDownThenScaleUpAndScaleDown() throws 
Exception {
+        startZoneManager(0);
+
+        TestSeveralScaleUpAndSeveralScaleDownDataObject testData = 
testSeveralScaleUpAndSeveralScaleDownGeneral();
+
+        LOG.info("Topology with added and removed nodes.");
+
+        Set<String> dataNodes = Set.of("node0", "node2");
+
+        topologyWatchListenerOnUpdate(dataNodes, testData.topVer2, 
testData.dataNodesRevision2);
+
+        assertTrue(testData.topVerUpFut2.isDone());
+        assertTrue(testData.topVerDownFut2.isDone());
+
+        CompletableFuture<Void> revisionUpFut = 
distributionZoneManager.zonesState().get(DEFAULT_ZONE_ID)
+                .scaleUpRevisionTracker().waitFor(testData.dataNodesRevision2);
+
+        assertFalse(revisionUpFut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, true, 
testData.dataNodesRevision2,
+                testData.dataNodesRevision2 + 1);
+
+        assertTrue(waitForCondition(revisionUpFut::isDone, 3000));
+
+        CompletableFuture<Void> revisionDownFut = 
distributionZoneManager.zonesState().get(DEFAULT_ZONE_ID).scaleDownRevisionTracker()
+                    .waitFor(testData.dataNodesRevision2);
+
+        assertFalse(revisionDownFut.isDone());
+
+        assertFalse(testData.dataNodesUpFut3.isDone());
+        assertFalse(testData.dataNodesDownFut3.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, false, 
testData.dataNodesRevision2,
+                testData.dataNodesRevision2 + 2);
+
+        assertTrue(waitForCondition(revisionDownFut::isDone, 3000));
+
+        assertEquals(dataNodes, testData.dataNodesUpFut3.get(3, SECONDS));
+        assertEquals(dataNodes, testData.dataNodesDownFut3.get(3, SECONDS));
+    }
+
+    /**
+     * Test checks that data nodes futures are completed on topology with 
added nodes.
+     */
+    @Test
+    void testSeveralScaleUpAndSeveralScaleDownThenScaleUp() throws Exception {
+        startZoneManager(0);
+
+        TestSeveralScaleUpAndSeveralScaleDownDataObject testData = 
testSeveralScaleUpAndSeveralScaleDownGeneral();
+
+        LOG.info("Topology with added nodes.");
+
+        Set<String> dataNodes = Set.of("node0", "node1", "node2");
+
+        topologyWatchListenerOnUpdate(dataNodes, testData.topVer2, 
testData.dataNodesRevision2);
+
+        assertTrue(testData.topVerUpFut2.isDone());
+        assertTrue(testData.topVerDownFut2.isDone());
+
+        CompletableFuture<Void> revisionUpFut = 
distributionZoneManager.zonesState().get(DEFAULT_ZONE_ID)
+                    
.scaleUpRevisionTracker().waitFor(testData.dataNodesRevision2);
+
+        assertFalse(revisionUpFut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, true, 
testData.dataNodesRevision2,
+                testData.dataNodesRevision2 + 1);
+
+        assertTrue(waitForCondition(revisionUpFut::isDone, 3000));
+        assertEquals(dataNodes, testData.dataNodesUpFut3.get(3, SECONDS));
+        assertEquals(dataNodes, testData.dataNodesDownFut3.get(3, SECONDS));
+    }
+
+    /**
+     * Test checks that data nodes futures are completed on topology with 
removed nodes.
+     */
+    @Test
+    void testSeveralScaleUpAndSeveralScaleDownThenScaleDown() throws Exception 
{
+        startZoneManager(0);
+
+        TestSeveralScaleUpAndSeveralScaleDownDataObject testData = 
testSeveralScaleUpAndSeveralScaleDownGeneral();
+
+        LOG.info("Topology with removed nodes.");
+
+        Set<String> dataNodes = Set.of("node0");
+
+        topologyWatchListenerOnUpdate(dataNodes, testData.topVer2, 
testData.dataNodesRevision2);
+
+        assertTrue(testData.topVerUpFut2.isDone());
+        assertTrue(testData.topVerDownFut2.isDone());
+
+        CompletableFuture<Void> revisionDownFut = 
distributionZoneManager.zonesState().get(DEFAULT_ZONE_ID)
+                .scaleDownRevisionTracker().waitFor((long) 
testData.dataNodesRevision2);
+
+        assertFalse(revisionDownFut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, false, 
testData.dataNodesRevision2,
+                testData.dataNodesRevision2 + 1);
+
+        assertTrue(waitForCondition(revisionDownFut::isDone, 3000));
+        assertEquals(dataNodes, testData.dataNodesUpFut3.get(3, SECONDS));
+        assertEquals(dataNodes, testData.dataNodesDownFut3.get(3, SECONDS));
+    }
+
+    private static class TestSeveralScaleUpAndSeveralScaleDownDataObject {
+        private final long topVer2;
+        private final long dataNodesRevision2;
+        private final CompletableFuture<Void> topVerUpFut2;
+        private final CompletableFuture<Void> topVerDownFut2;
+        private final CompletableFuture<Set<String>> dataNodesUpFut3;
+        private final CompletableFuture<Set<String>> dataNodesDownFut3;
+
+        TestSeveralScaleUpAndSeveralScaleDownDataObject(
+                long topVer2,
+                long dataNodesRevision2,
+                CompletableFuture<Void> topVerUpFut2,
+                CompletableFuture<Void> topVerDownFut2,
+                CompletableFuture<Set<String>> dataNodesUpFut3,
+                CompletableFuture<Set<String>> dataNodesDownFut3) {
+            this.topVer2 = topVer2;
+            this.dataNodesRevision2 = dataNodesRevision2;
+            this.topVerUpFut2 = topVerUpFut2;
+            this.topVerDownFut2 = topVerDownFut2;
+            this.dataNodesUpFut3 = dataNodesUpFut3;
+            this.dataNodesDownFut3 = dataNodesDownFut3;
+        }
+    }
+
+    /**
+     * This method invokes {@link 
DistributionZoneManager#topologyVersionedDataNodes(int, long)} with default and 
non-default zone id
+     * and different logical topology version. Collects data nodes futures.
+     * Simulates new logical topology with new nodes. Check that some of data 
nodes futures are completed.
+     * Simulates new logical topology with removed nodes. Check that some of 
data nodes futures are completed.
+     *
+     * @return Structure with data for continue testing.
+     */
+    private TestSeveralScaleUpAndSeveralScaleDownDataObject 
testSeveralScaleUpAndSeveralScaleDownGeneral()

Review Comment:
   I do not understand this concept with 
`TestSeveralScaleUpAndSeveralScaleDownDataObject`, many things are done in one 
method, it is hard to read this code, please do something with that.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -505,6 +543,133 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version or 
greater version in topologyWatchListener.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
than wait for writing data nodes triggered
+     * by started nodes and corresponding to the passed topology version or 
greater topology version
+     * to the data nodes into the meta storage.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 
than wait for writing data nodes
+     * triggered by stopped nodes and corresponding to the passed topology 
version or greater topology version
+     * to the data nodes into the meta storage.
+     * After waiting it completes the future with data nodes of the specified 
zone.
+     * This method must be invoked in a change configuration closure to 
guarantee that the zone is exists and values of scaleUp/scaleDown
+     * timers are up to date.
+     * The returned future can be completed with {@link 
DistributionZoneNotFoundException} and {@link 
DistributionZoneWasRemovedException}
+     * in case when the distribution zone was removed during method execution.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionedDataNodes(int 
zoneId, long topVer) {
+        CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut = 
awaitTopologyVersion(topVer)
+                .thenCompose(ignored -> getImmediateTimers(zoneId));
+
+        return allOf(
+                timerValuesFut.thenCompose(timerValues -> 
scaleUpAwaiting(zoneId, timerValues.get1())),
+                timerValuesFut.thenCompose(timerValues -> 
scaleDownAwaiting(zoneId, timerValues.get2()))
+        ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+    }
+
+    /**
+     * Waits for DistributionZoneManager observe passed topology version or 
greater version in topologyWatchListener.
+     *
+     * @param topVer Topology version.
+     * @return Future for chaining.
+     */
+    private CompletableFuture<Void> awaitTopologyVersion(long topVer) {
+        return inBusyLock(busyLock, () -> topVerTracker.waitFor(topVer));
+    }
+
+    /**
+     * Transforms {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+     * and {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} values to 
boolean values.
+     * True if it equals to zero and false if it greater than zero. Zero means 
that data nodes changing must be started immediate.
+     *
+     * @param zoneId Zone id.
+     * @return Future with the result.
+     */
+    private CompletableFuture<IgniteBiTuple<Boolean, Boolean>> 
getImmediateTimers(int zoneId) {
+        return inBusyLock(busyLock, () -> {
+            DistributionZoneConfiguration zoneCfg = 
getZoneById(zonesConfiguration, zoneId);
+
+            return completedFuture(new IgniteBiTuple<>(
+                    zoneCfg.dataNodesAutoAdjustScaleUp().value() == 
IMMEDIATE_TIMER_VALUE,
+                    zoneCfg.dataNodesAutoAdjustScaleDown().value() == 
IMMEDIATE_TIMER_VALUE
+            ));
+        });
+    }
+
+    /**
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
than waits for writing data nodes triggered

Review Comment:
   Also it could be useful to mention case with 
`DistributionZoneWasRemovedException`



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -898,45 +1083,62 @@ private void initDataNodesFromVaultManager() {
         }
 
         try {
-            vaultMgr.get(zonesLogicalTopologyKey())
-                    .thenAccept(vaultEntry -> {
-                        if (!busyLock.enterBusy()) {
-                            throw new 
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
-                        }
+            long appliedRevision = metaStorageManager.appliedRevision();
 
-                        try {
-                            long appliedRevision = 
metaStorageManager.appliedRevision();
+            lastScaleUpRevision = appliedRevision;
 
-                            if (vaultEntry != null && vaultEntry.value() != 
null) {
-                                logicalTopology = 
fromBytes(vaultEntry.value());
+            lastScaleDownRevision = appliedRevision;
 
-                                // init keys and data nodes for default zone
-                                saveDataNodesAndUpdateTriggerKeysInMetaStorage(
-                                        DEFAULT_ZONE_ID,
-                                        appliedRevision,
-                                        logicalTopology
-                                );
+            VaultEntry topVerEntry = 
vaultMgr.get(zonesLogicalTopologyVersionKey()).join();
 
-                                
zonesConfiguration.distributionZones().value().forEach(zone -> {
-                                    int zoneId = zone.zoneId();
+            if (topVerEntry != null && topVerEntry.value() != null) {
+                topVerTracker.update(bytesToLong(topVerEntry.value()));
+            }
 
-                                    
saveDataNodesAndUpdateTriggerKeysInMetaStorage(
-                                            zoneId,
-                                            appliedRevision,
-                                            logicalTopology
-                                    );
-                                });
-                            }
-                        } finally {
-                            busyLock.leaveBusy();
-                        }
-                    });
+            VaultEntry topologyEntry = 
vaultMgr.get(zonesLogicalTopologyKey()).join();
+
+            if (topologyEntry != null && topologyEntry.value() != null) {
+                logicalTopology = fromBytes(topologyEntry.value());
+
+                // init keys and data nodes for default zone
+                saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+                        DEFAULT_ZONE_ID,
+                        appliedRevision,
+                        logicalTopology
+                );
+
+                zonesConfiguration.distributionZones().value().forEach(zone -> 
{
+                    int zoneId = zone.zoneId();
+
+                    saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+                            zoneId,
+                            appliedRevision,
+                            logicalTopology
+                    );
+                });
+            }
+
+            zonesState.values().forEach(zoneState -> {
+                zoneState.scaleUpRevisionTracker().update(lastScaleUpRevision);
+
+                
zoneState.scaleDownRevisionTracker().update(lastScaleDownRevision);
+
+                zoneState.nodes(logicalTopology);
+            });
+
+            assert topologyEntry == null || topologyEntry.value() == null || 
logicalTopology.equals(fromBytes(topologyEntry.value()))
+                    : "DistributionZoneManager.logicalTopology was changed 
after initialization from the vault manager.";

Review Comment:
   Please, remove `DistributionZoneManager.logicalTopology` from this message 
and try to rephrase it. This code can seen by user



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -505,6 +543,133 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version or 
greater version in topologyWatchListener.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
than wait for writing data nodes triggered
+     * by started nodes and corresponding to the passed topology version or 
greater topology version
+     * to the data nodes into the meta storage.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 
than wait for writing data nodes
+     * triggered by stopped nodes and corresponding to the passed topology 
version or greater topology version
+     * to the data nodes into the meta storage.
+     * After waiting it completes the future with data nodes of the specified 
zone.
+     * This method must be invoked in a change configuration closure to 
guarantee that the zone is exists and values of scaleUp/scaleDown
+     * timers are up to date.
+     * The returned future can be completed with {@link 
DistributionZoneNotFoundException} and {@link 
DistributionZoneWasRemovedException}
+     * in case when the distribution zone was removed during method execution.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionedDataNodes(int 
zoneId, long topVer) {
+        CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut = 
awaitTopologyVersion(topVer)
+                .thenCompose(ignored -> getImmediateTimers(zoneId));
+
+        return allOf(
+                timerValuesFut.thenCompose(timerValues -> 
scaleUpAwaiting(zoneId, timerValues.get1())),
+                timerValuesFut.thenCompose(timerValues -> 
scaleDownAwaiting(zoneId, timerValues.get2()))
+        ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+    }
+
+    /**
+     * Waits for DistributionZoneManager observe passed topology version or 
greater version in topologyWatchListener.
+     *
+     * @param topVer Topology version.
+     * @return Future for chaining.
+     */
+    private CompletableFuture<Void> awaitTopologyVersion(long topVer) {
+        return inBusyLock(busyLock, () -> topVerTracker.waitFor(topVer));
+    }
+
+    /**
+     * Transforms {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+     * and {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} values to 
boolean values.
+     * True if it equals to zero and false if it greater than zero. Zero means 
that data nodes changing must be started immediate.
+     *
+     * @param zoneId Zone id.
+     * @return Future with the result.
+     */
+    private CompletableFuture<IgniteBiTuple<Boolean, Boolean>> 
getImmediateTimers(int zoneId) {
+        return inBusyLock(busyLock, () -> {
+            DistributionZoneConfiguration zoneCfg = 
getZoneById(zonesConfiguration, zoneId);
+
+            return completedFuture(new IgniteBiTuple<>(
+                    zoneCfg.dataNodesAutoAdjustScaleUp().value() == 
IMMEDIATE_TIMER_VALUE,
+                    zoneCfg.dataNodesAutoAdjustScaleDown().value() == 
IMMEDIATE_TIMER_VALUE
+            ));
+        });
+    }
+
+    /**
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
than waits for writing data nodes triggered

Review Comment:
   Please rephrase this javadoc and one below, it is really hard to understand 
what is going on here 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -945,24 +1147,57 @@ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
                 }
 
                 try {
-                    assert evt.single() : "Expected an event with one entry 
but was an event with several entries with keys: "
+                    assert evt.entryEvents().size() == 2 :
+                            "Expected an event with logical topology and 
logical topology version entries but was events with keys: "
                             + evt.entryEvents().stream().map(entry -> 
entry.newEntry() == null ? "null" : entry.newEntry().key())
                             .collect(toList());
 
-                    Entry newEntry = evt.entryEvent().newEntry();
+                    long topVer = 0;
+
+                    byte[] newLogicalTopologyBytes = null;
+
+                    Set<String> newLogicalTopology = null;
+
+                    long revision = 0;
+
+                    for (EntryEvent event : evt.entryEvents()) {
+                        Entry e = event.newEntry();
 
-                    long revision = newEntry.revision();
+                        if (Arrays.equals(e.key(), 
zonesLogicalTopologyVersionKey().bytes())) {
+                            topVer = bytesToLong(e.value());
 
-                    byte[] newLogicalTopologyBytes = newEntry.value();
+                            revision = e.revision();
+                        } else if (Arrays.equals(e.key(), 
zonesLogicalTopologyKey().bytes())) {
+                            newLogicalTopologyBytes = e.value();
 
-                    Set<String> newLogicalTopology = 
fromBytes(newLogicalTopologyBytes);
+                            newLogicalTopology = 
fromBytes(newLogicalTopologyBytes);
+                        }
+                    }
+
+                    assert newLogicalTopology != null;
+                    assert revision > 0;
+
+                    Set<String> newLogicalTopology0 = newLogicalTopology;
 
                     Set<String> removedNodes =
-                            logicalTopology.stream().filter(node -> 
!newLogicalTopology.contains(node)).collect(toSet());
+                            logicalTopology.stream().filter(node -> 
!newLogicalTopology0.contains(node)).collect(toSet());
 
                     Set<String> addedNodes =
                             newLogicalTopology.stream().filter(node -> 
!logicalTopology.contains(node)).collect(toSet());
 
+                    if (!addedNodes.isEmpty()) {
+                        lastScaleUpRevision = revision;
+                    }
+
+                    if (!removedNodes.isEmpty()) {
+                        lastScaleDownRevision = revision;
+                    }
+
+                    //The topology version must be updated after the 
lastScaleUpRevision and lastScaleDownRevision are updated.
+                    //This is necessary in order to when topology version 
waiters will be notified that topology version is updated

Review Comment:
   please try to rephrase the second sentence, it is unclear what you wanted to 
say



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java:
##########
@@ -92,6 +93,8 @@ protected IgniteSql igniteSql() {
         return CLUSTER_NODES.get(0).sql();
     }
 
+
+    @Disabled

Review Comment:
   Disabled without a ticket. Are this issues still valid? 



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java:
##########
@@ -0,0 +1,910 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfigurationSchema;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneWasRemovedException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.StatementResultImpl;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.schema.configuration.TableChange;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests awaiting data nodes algorithm in distribution zone manager in case 
when
+ * {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * or {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} 
are immediate.
+ */
+public class DistributionZoneAwaitDataNodesTest extends IgniteAbstractTest {
+    private static final IgniteLogger LOG = 
Loggers.forClass(DistributionZoneAwaitDataNodesTest.class);
+
+    private MetaStorageManager metaStorageManager;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private LogicalTopology logicalTopology;
+
+    private ClusterStateStorage clusterStateStorage;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private ClusterManagementGroupManager cmgManager;
+
+    private VaultManager vaultManager;
+
+    private WatchListener topologyWatchListener;
+
+    private WatchListener dataNodesWatchListener;
+
+    @BeforeEach
+    void setUp() {
+        vaultManager = mock(VaultManager.class);
+
+        
when(vaultManager.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new
 VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(vaultManager.get(zonesLogicalTopologyVersionKey()))
+                .thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyVersionKey(), longToBytes(0))));
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        doAnswer(invocation -> {
+            ByteArray key = invocation.getArgument(0);
+
+            WatchListener watchListener = invocation.getArgument(1);
+
+            if (Arrays.equals(key.bytes(), 
zoneLogicalTopologyPrefix().bytes())) {
+                topologyWatchListener = watchListener;
+            } else if (Arrays.equals(key.bytes(), 
zonesDataNodesPrefix().bytes())) {
+                dataNodesWatchListener = watchListener;
+            }
+
+            return null;
+        }).when(metaStorageManager).registerPrefixWatch(any(), any());
+
+        when(metaStorageManager.invoke(any()))
+                
.thenReturn(completedFuture(StatementResultImpl.builder().result(new byte[] 
{0}).build()));
+
+        clusterStateStorage = new TestClusterStateStorage();
+
+        logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+        LogicalTopologyServiceImpl logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+        TablesConfiguration tablesConfiguration = 
mock(TablesConfiguration.class);

Review Comment:
   Please rewrite it to a `@InjectConfiguration`



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -945,24 +1147,57 @@ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
                 }
 
                 try {
-                    assert evt.single() : "Expected an event with one entry 
but was an event with several entries with keys: "
+                    assert evt.entryEvents().size() == 2 :
+                            "Expected an event with logical topology and 
logical topology version entries but was events with keys: "
                             + evt.entryEvents().stream().map(entry -> 
entry.newEntry() == null ? "null" : entry.newEntry().key())
                             .collect(toList());
 
-                    Entry newEntry = evt.entryEvent().newEntry();
+                    long topVer = 0;
+
+                    byte[] newLogicalTopologyBytes = null;
+
+                    Set<String> newLogicalTopology = null;
+
+                    long revision = 0;
+
+                    for (EntryEvent event : evt.entryEvents()) {
+                        Entry e = event.newEntry();
 
-                    long revision = newEntry.revision();
+                        if (Arrays.equals(e.key(), 
zonesLogicalTopologyVersionKey().bytes())) {
+                            topVer = bytesToLong(e.value());
 
-                    byte[] newLogicalTopologyBytes = newEntry.value();
+                            revision = e.revision();
+                        } else if (Arrays.equals(e.key(), 
zonesLogicalTopologyKey().bytes())) {
+                            newLogicalTopologyBytes = e.value();
 
-                    Set<String> newLogicalTopology = 
fromBytes(newLogicalTopologyBytes);
+                            newLogicalTopology = 
fromBytes(newLogicalTopologyBytes);
+                        }
+                    }
+
+                    assert newLogicalTopology != null;

Review Comment:
   asserts without a message



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java:
##########
@@ -0,0 +1,910 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfigurationSchema;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneWasRemovedException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.StatementResultImpl;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.schema.configuration.TableChange;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests awaiting data nodes algorithm in distribution zone manager in case 
when
+ * {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * or {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} 
are immediate.
+ */
+public class DistributionZoneAwaitDataNodesTest extends IgniteAbstractTest {

Review Comment:
   Please do something with the readability of this class, it is hard to 
understand scenarios, javadocs and test names do not help, 
`TestSeveralScaleUpAndSeveralScaleDownDataObject` looks too complicated 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to