alievmirza commented on code in PR #1729:
URL: https://github.com/apache/ignite-3/pull/1729#discussion_r1127397612


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -480,6 +625,152 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone. If the 
dataNodesAutoAdjustScaleUp or dataNodesAutoAdjustScaleDown
+     * equals to 0 then returned completable future will be completed when 
data nodes will be saved to the meta storage.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> getDataNodes(int zoneId, long 
topVer) {
+        boolean immediateScaleUp0 = false;
+        boolean immediateScaleDown0 = false;
+
+        if (zoneId == DEFAULT_ZONE_ID) {
+            immediateScaleUp0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().value()
 == 0;
+            immediateScaleDown0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().value()
 == 0;
+        } else {
+            NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
+                    zonesConfiguration.distributionZones();
+
+            for (int i = 0; i < zones.value().size(); i++) {
+                DistributionZoneView zone = zones.value().get(i);
+
+                if (zone.zoneId() == zoneId) {
+                    immediateScaleUp0 = zone.dataNodesAutoAdjustScaleUp() == 0;
+                    immediateScaleDown0 = zone.dataNodesAutoAdjustScaleDown() 
== 0;
+
+                    break;
+                }
+            }
+        }
+
+        boolean immediateScaleUp = immediateScaleUp0;
+        boolean immediateScaleDown = immediateScaleDown0;
+
+        if (!immediateScaleUp && !immediateScaleDown) {
+
+            DataNodes dataNodes = this.dataNodes.get(zoneId);
+
+            Set<String> nodes;
+
+            if (dataNodes == null) {
+                nodes = emptySet();
+            } else {
+                nodes = this.dataNodes.get(zoneId).nodes();
+            }
+
+            return completedFuture(nodes);
+        }
+
+        CompletableFuture<Set<String>> dataNodesFut = new 
CompletableFuture<>();
+
+        CompletableFuture<Void> topVerFut;
+
+        synchronized (dataNodesMutex) {
+            if (topVer > lastTopVer) {
+                topVerFut = topVerFutures.get(topVer);
+
+                if (topVerFut == null) {
+                    topVerFut = new CompletableFuture<>();
+
+                    topVerFutures.put(topVer, topVerFut);
+                }
+            } else {
+                topVerFut = completedFuture(null);
+            }
+        }
+
+        topVerFut.thenAcceptAsync(ignored -> {

Review Comment:
   Using async without setting an explicit executor is a bad idea 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java:
##########
@@ -136,6 +171,10 @@ static ByteArray zonesLogicalTopologyVersionKey() {
         return DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY;
     }
 
+    static ByteArray zonesDataNodesPrefix() {

Review Comment:
   javadoc
   



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java:
##########
@@ -0,0 +1,718 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.StatementResultImpl;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests awaiting data nodes algorithm in distribution zone manager in case 
when dataNodesAutoAdjustScaleUp
+ * or dataNodesAutoAdjustScaleDown equals to 0.
+ */
+public class DistributionZoneAwaitDataNodesTest extends IgniteAbstractTest {
+    private static final IgniteLogger LOG = 
Loggers.forClass(DistributionZoneAwaitDataNodesTest.class);
+
+    private MetaStorageManager metaStorageManager;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private LogicalTopology logicalTopology;
+
+    private ClusterStateStorage clusterStateStorage;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private ClusterManagementGroupManager cmgManager;
+
+    private WatchListener topologyWatchListener;
+
+    private WatchListener dataNodesWatchListener;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        VaultManager vaultManager = mock(VaultManager.class);
+
+        when(vaultManager.get(any())).thenReturn(completedFuture(null));
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        doAnswer(invocation -> {
+            ByteArray key = invocation.getArgument(0);
+
+            WatchListener watchListener = invocation.getArgument(1);
+
+            if (Arrays.equals(key.bytes(), 
zoneLogicalTopologyPrefix().bytes())) {
+                topologyWatchListener = watchListener;
+            } else if (Arrays.equals(key.bytes(), 
zonesDataNodesPrefix().bytes())) {
+                dataNodesWatchListener = watchListener;
+            }
+
+            return null;
+        }).when(metaStorageManager).registerPrefixWatch(any(), any());
+
+        
when(metaStorageManager.invoke(any())).thenReturn(completedFuture(StatementResultImpl.builder().result(new
 byte[] {0}).build()));
+
+        vaultManager.start();
+
+        clusterStateStorage = new TestClusterStateStorage();
+
+        logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+        LogicalTopologyServiceImpl logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+        TablesConfiguration tablesConfiguration = 
mock(TablesConfiguration.class);
+
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Set.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        clusterCfgMgr.start();
+
+        zonesConfiguration = clusterCfgMgr.configurationRegistry()
+                .getConfiguration(DistributionZonesConfiguration.KEY);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                tablesConfiguration,
+                metaStorageManager,
+                logicalTopologyService,
+                vaultManager,
+                "node"
+        );
+
+        mockCmgLocalNodes();
+
+        distributionZoneManager.start();
+
+        distributionZoneManager.alterZone(DEFAULT_ZONE_NAME, new 
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+                        
.dataNodesAutoAdjustScaleUp(0).dataNodesAutoAdjustScaleDown(0).build())
+                .get(3, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        distributionZoneManager.stop();
+        clusterCfgMgr.stop();
+        clusterStateStorage.stop();
+    }
+
+    @Test
+    void testSeveralScaleUpAndSeveralScaleDownThenScaleUpAndScaleDown() throws 
Exception {
+        LOG.info("Topology with added and removed nodes.");
+
+        TestSeveralScaleUpAndSeveralScaleDownDataObject testData = 
testSeveralScaleUpAndSeveralScaleDownGeneral();
+
+        Set<String> dataNodes = Set.of("node0", "node2");
+
+        topologyWatchListenerOnUpdate(dataNodes, testData.topVer2, 
testData.dataNodesRevision2);
+
+        assertTrue(testData.topVerUpFut2.isDone());
+        assertTrue(testData.topVerDownFut2.isDone());
+
+        DistributionZoneManager.DataNodes dataNodesMeta = 
distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID);
+
+        assertTrue(waitForCondition(() -> 
dataNodesMeta.revisionScaleUpFutures().size() == 1, 3_000));
+
+        
assertTrue(testData.dataNodesMeta.revisionScaleDownFutures().isEmpty());
+
+        CompletableFuture<Void> revisionUpFut = 
testData.dataNodesMeta.revisionScaleUpFutures().get((long) 
testData.dataNodesRevision2);
+
+        assertFalse(revisionUpFut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, true, 
testData.dataNodesRevision2, testData.dataNodesRevision2 + 1);
+
+        assertTrue(waitForCondition(() -> revisionUpFut.isDone(), 3000));
+
+        assertTrue(waitForCondition(() -> 
dataNodesMeta.revisionScaleDownFutures().size() == 1, 3_000));
+
+        CompletableFuture<Void> revisionDownFut = 
testData.dataNodesMeta.revisionScaleDownFutures()
+                .get((long) testData.dataNodesRevision2);
+
+        assertFalse(revisionDownFut.isDone());
+
+        assertFalse(testData.dataNodesUpFut3.isDone());
+        assertFalse(testData.dataNodesDownFut3.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, false, 
testData.dataNodesRevision2, testData.dataNodesRevision2 + 2);
+
+        assertTrue(waitForCondition(() -> revisionDownFut.isDone(), 3000));
+
+        assertEquals(dataNodes, testData.dataNodesUpFut3.get(3, 
TimeUnit.SECONDS));
+        assertEquals(dataNodes, testData.dataNodesDownFut3.get(3, 
TimeUnit.SECONDS));
+
+        assertTrue(distributionZoneManager.topVerFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleUpFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleDownFutures().isEmpty());
+    }
+
+    @Test
+    void testSeveralScaleUpAndSeveralScaleDownThenScaleUp() throws Exception {
+        LOG.info("Topology with added nodes.");
+
+        TestSeveralScaleUpAndSeveralScaleDownDataObject testData = 
testSeveralScaleUpAndSeveralScaleDownGeneral();
+
+        Set<String> dataNodes = Set.of("node0", "node1", "node2");
+
+        topologyWatchListenerOnUpdate(dataNodes, testData.topVer2, 
testData.dataNodesRevision2);
+
+        assertTrue(testData.topVerUpFut2.isDone());
+        assertTrue(testData.topVerDownFut2.isDone());
+
+        DistributionZoneManager.DataNodes dataNodesMeta = 
distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID);
+
+        assertTrue(waitForCondition(() -> 
dataNodesMeta.revisionScaleUpFutures().size() == 1, 3_000));
+
+        
assertTrue(testData.dataNodesMeta.revisionScaleDownFutures().isEmpty());
+
+        CompletableFuture<Void> revisionUpFut = 
testData.dataNodesMeta.revisionScaleUpFutures().get((long) 
testData.dataNodesRevision2);
+
+        assertFalse(revisionUpFut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, true, 
testData.dataNodesRevision2, testData.dataNodesRevision2 + 1);
+
+        assertTrue(waitForCondition(() -> revisionUpFut.isDone(), 3000));
+        assertEquals(dataNodes, testData.dataNodesUpFut3.get(3, 
TimeUnit.SECONDS));
+        assertEquals(dataNodes, testData.dataNodesDownFut3.get(3, 
TimeUnit.SECONDS));
+
+        assertTrue(distributionZoneManager.topVerFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleUpFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleDownFutures().isEmpty());
+    }
+
+    @Test
+    void testSeveralScaleUpAndSeveralScaleDownThenScaleDown() throws Exception 
{
+        LOG.info("Topology with removed nodes.");
+
+        TestSeveralScaleUpAndSeveralScaleDownDataObject testData = 
testSeveralScaleUpAndSeveralScaleDownGeneral();
+
+        Set<String> dataNodes = Set.of("node0");
+
+        topologyWatchListenerOnUpdate(dataNodes, testData.topVer2, 
testData.dataNodesRevision2);
+
+        assertTrue(testData.topVerUpFut2.isDone());
+        assertTrue(testData.topVerDownFut2.isDone());
+
+        assertTrue(waitForCondition(() -> {
+            DistributionZoneManager.DataNodes dataNodesMeta0 = 
distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID);
+
+            if (dataNodesMeta0 == null) {
+                return false;
+            }
+
+            return dataNodesMeta0.revisionScaleDownFutures().size() == 1;
+        },
+                3_000));
+
+        assertTrue(testData.dataNodesMeta.revisionScaleUpFutures().isEmpty());
+
+        CompletableFuture<Void> revisionDownFut = 
testData.dataNodesMeta.revisionScaleDownFutures()
+                .get((long) testData.dataNodesRevision2);
+
+        assertFalse(revisionDownFut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, false, 
testData.dataNodesRevision2, testData.dataNodesRevision2 + 1);
+
+        assertTrue(waitForCondition(() -> revisionDownFut.isDone(), 3000));
+        assertEquals(dataNodes, testData.dataNodesUpFut3.get(3, 
TimeUnit.SECONDS));
+        assertEquals(dataNodes, testData.dataNodesDownFut3.get(3, 
TimeUnit.SECONDS));
+
+        assertTrue(distributionZoneManager.topVerFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleUpFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleDownFutures().isEmpty());
+    }
+
+    private static class TestSeveralScaleUpAndSeveralScaleDownDataObject {
+        private long topVer2;
+        private long dataNodesRevision2;
+        private CompletableFuture<Void> topVerUpFut2;
+        private CompletableFuture<Void> topVerDownFut2;
+        private DistributionZoneManager.DataNodes dataNodesMeta;
+        private CompletableFuture<Set<String>> dataNodesUpFut3;
+        private CompletableFuture<Set<String>> dataNodesDownFut3;
+
+        public TestSeveralScaleUpAndSeveralScaleDownDataObject(
+                long topVer2,
+                long dataNodesRevision2,
+                CompletableFuture<Void> topVerUpFut2,
+                CompletableFuture<Void> topVerDownFut2,
+                DistributionZoneManager.DataNodes dataNodesMeta,
+                CompletableFuture<Set<String>> dataNodesUpFut3,
+                CompletableFuture<Set<String>> dataNodesDownFut3) {
+            this.topVer2 = topVer2;
+            this.dataNodesRevision2 = dataNodesRevision2;
+            this.topVerUpFut2 = topVerUpFut2;
+            this.topVerDownFut2 = topVerDownFut2;
+            this.dataNodesMeta = dataNodesMeta;
+            this.dataNodesUpFut3 = dataNodesUpFut3;
+            this.dataNodesDownFut3 = dataNodesDownFut3;
+        }
+    }
+
+    private TestSeveralScaleUpAndSeveralScaleDownDataObject 
testSeveralScaleUpAndSeveralScaleDownGeneral() throws Exception {
+        LOG.info("Topology with added nodes.");
+
+        CompletableFuture<Set<String>> dataNodesUpFut0 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 1);
+        CompletableFuture<Set<String>> dataNodesUpFut1 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 1);
+        CompletableFuture<Set<String>> dataNodesUpFut2 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 2);
+        CompletableFuture<Set<String>> dataNodesUpFut3 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 11);
+
+        assertTrue(waitForCondition(() -> 
distributionZoneManager.topVerFutures().size() == 3,
+                3_000));
+
+        int topVer0 = 2;
+
+        int dataNodesRevision0 = 2;
+
+        CompletableFuture<Void> topVerUpFut0 = 
distributionZoneManager.topVerFutures().get(1L);
+        CompletableFuture<Void> topVerUpFut1 = 
distributionZoneManager.topVerFutures().get(2L);
+        CompletableFuture<Void> topVerUpFut2 = 
distributionZoneManager.topVerFutures().get(11L);
+
+        assertFalse(topVerUpFut0.isDone());
+        assertFalse(topVerUpFut1.isDone());
+        assertFalse(topVerUpFut2.isDone());
+
+        Set<String> threeNodes = Set.of("node0", "node1", "node2");
+
+        topologyWatchListenerOnUpdate(threeNodes, topVer0, dataNodesRevision0);
+
+        assertTrue(waitForCondition(() -> topVerUpFut0.isDone(), 3_000));
+        assertTrue(waitForCondition(() -> topVerUpFut1.isDone(), 3_000));
+        assertFalse(topVerUpFut2.isDone());
+
+        assertTrue(waitForCondition(() -> {
+            DistributionZoneManager.DataNodes dataNodesMeta0 = 
distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID);
+
+            if (dataNodesMeta0 == null) {
+                return false;
+            }
+
+            return dataNodesMeta0.revisionScaleUpFutures().size() == 1;
+        },
+                3_000));
+
+        DistributionZoneManager.DataNodes dataNodesMeta = 
distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID);
+
+        CompletableFuture<Void> revision2Fut = 
dataNodesMeta.revisionScaleUpFutures().get((long) dataNodesRevision0);
+
+        assertFalse(revision2Fut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, threeNodes, true, 
dataNodesRevision0, dataNodesRevision0 + 1);
+
+        assertTrue(waitForCondition(() -> revision2Fut.isDone(),
+                3_000));
+
+        assertEquals(threeNodes, dataNodesUpFut0.get());
+        assertEquals(threeNodes, dataNodesUpFut1.get());
+        assertEquals(threeNodes, dataNodesUpFut2.get());
+        assertFalse(dataNodesUpFut3.isDone());
+
+        assertTrue(distributionZoneManager.topVerFutures().size() == 1);
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleUpFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleDownFutures().isEmpty());
+
+
+        LOG.info("Topology with removed nodes.");
+
+        CompletableFuture<Set<String>> dataNodesDownFut0 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 4);
+        CompletableFuture<Set<String>> dataNodesDownFut1 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 4);
+        CompletableFuture<Set<String>> dataNodesDownFut2 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 5);
+        CompletableFuture<Set<String>> dataNodesDownFut3 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 6);
+
+        assertTrue(waitForCondition(() -> 
distributionZoneManager.topVerFutures().size() == 4,
+                3_000));
+
+        CompletableFuture<Void> topVerDownFut0 = 
distributionZoneManager.topVerFutures().get(4L);
+        CompletableFuture<Void> topVerDownFut1 = 
distributionZoneManager.topVerFutures().get(5L);
+        CompletableFuture<Void> topVerDownFut2 = 
distributionZoneManager.topVerFutures().get(6L);
+
+        assertFalse(topVerDownFut0.isDone());
+        assertFalse(topVerDownFut1.isDone());
+        assertFalse(topVerDownFut2.isDone());
+
+        int topVer1 = 5;
+
+        int dataNodesRevision1 = dataNodesRevision0 + 2;
+
+        Set<String> twoNodes = Set.of("node0", "node1");
+
+        topologyWatchListenerOnUpdate(twoNodes, topVer1, dataNodesRevision1);
+
+        assertTrue(waitForCondition(() -> topVerDownFut0.isDone(), 3_000));
+        assertTrue(waitForCondition(() -> topVerDownFut1.isDone(), 3_000));
+        assertFalse(waitForCondition(() -> topVerDownFut2.isDone(), 3_000));
+
+        assertTrue(waitForCondition(() -> 
dataNodesMeta.revisionScaleDownFutures().size() == 1, 3_000));
+
+        CompletableFuture<Void> revision5Fut = 
dataNodesMeta.revisionScaleDownFutures().get((long) dataNodesRevision1);
+
+        assertFalse(revision5Fut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, twoNodes, false, 
dataNodesRevision1, dataNodesRevision1 + 1);
+
+        assertTrue(waitForCondition(() -> revision5Fut.isDone(), 3_000));
+
+        assertEquals(twoNodes, dataNodesDownFut0.get());
+        assertEquals(twoNodes, dataNodesDownFut1.get());
+        assertEquals(twoNodes, dataNodesDownFut2.get());
+        assertFalse(dataNodesDownFut3.isDone());
+
+        assertEquals(2, distributionZoneManager.topVerFutures().size());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleUpFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleDownFutures().isEmpty());
+
+        int topVer2 = 20;
+
+        int dataNodesRevision2 = dataNodesRevision1 + 2;
+
+        return new TestSeveralScaleUpAndSeveralScaleDownDataObject(
+                topVer2,
+                dataNodesRevision2,
+                topVerUpFut2,
+                topVerDownFut2,
+                dataNodesMeta,
+                dataNodesUpFut3,
+                dataNodesDownFut3
+        );
+    }
+
+    @Test
+    void testScaleUpAndThenScaleDown() throws Exception {
+        CompletableFuture<Set<String>> dataNodesFut = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 5);
+
+        AtomicReference<CompletableFuture<Void>> topVerFut = new 
AtomicReference<>();
+
+        assertTrue(waitForCondition(() -> {
+            topVerFut.set(distributionZoneManager.topVerFutures().get(5L));
+
+            return topVerFut.get() != null;
+        },
+                3_000));
+
+        assertFalse(topVerFut.get().isDone());
+
+        topologyWatchListenerOnUpdate(Set.of("node0", "node1"), 100, 10);
+
+        assertTrue(waitForCondition(() -> topVerFut.get().isDone(),
+                3_000));
+
+        AtomicReference<CompletableFuture<Void>> revisionFut = new 
AtomicReference<>();
+
+        assertTrue(waitForCondition(() -> {
+            DistributionZoneManager.DataNodes dataNodes = 
distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID);
+
+            if (dataNodes == null) {
+                return false;
+            }
+
+            revisionFut.set(dataNodes.revisionScaleUpFutures().get(10L));
+
+            return revisionFut.get() != null;
+        },
+                3_000));
+
+        assertFalse(revisionFut.get().isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, Set.of("node0", 
"node1"), true, 10, 11);
+
+        assertTrue(waitForCondition(() -> revisionFut.get().isDone(),
+                3_000));
+
+        assertEquals(Set.of("node0", "node1"), dataNodesFut.get());
+
+        assertTrue(distributionZoneManager.topVerFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleUpFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleDownFutures().isEmpty());
+
+        dataNodesFut = distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 
106);
+
+        topVerFut.set(null);
+
+        assertTrue(waitForCondition(() -> {
+            topVerFut.set(distributionZoneManager.topVerFutures().get(106L));
+
+            return topVerFut.get() != null;
+        },
+                3_000));
+
+        assertFalse(topVerFut.get().isDone());
+
+        topologyWatchListenerOnUpdate(Set.of("node0"), 200, 12);
+
+        assertTrue(waitForCondition(() -> topVerFut.get().isDone(),
+                3_000));
+
+        revisionFut.set(null);
+
+        assertTrue(waitForCondition(() -> {
+            DistributionZoneManager.DataNodes dataNodes = 
distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID);
+
+            if (dataNodes == null) {
+                return false;
+            }
+
+            revisionFut.set(dataNodes.revisionScaleDownFutures().get(12L));
+
+            return revisionFut.get() != null;
+        },
+                3_000));
+
+        assertFalse(revisionFut.get().isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, Set.of("node0"), 
false, 12, 13);
+
+        assertTrue(waitForCondition(() -> revisionFut.get().isDone(),
+                3_000));
+
+        assertEquals(Set.of("node0"), dataNodesFut.get());
+
+        assertTrue(distributionZoneManager.topVerFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleUpFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleDownFutures().isEmpty());
+    }
+
+    @Test
+    void testAwaitingScaleUpOnly() throws Exception {
+        distributionZoneManager.alterZone(DEFAULT_ZONE_NAME, new 
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+                        
.dataNodesAutoAdjustScaleUp(Integer.MAX_VALUE).dataNodesAutoAdjustScaleDown(Integer.MAX_VALUE).build())
+                .get(3, TimeUnit.SECONDS);
+
+        distributionZoneManager.createZone(
+                        new 
DistributionZoneConfigurationParameters.Builder("zone1")
+                                .dataNodesAutoAdjustScaleUp(0)
+                                
.dataNodesAutoAdjustScaleDown(Integer.MAX_VALUE)
+                                .build()
+                )
+                .get(3, TimeUnit.SECONDS);
+
+        int zoneId = distributionZoneManager.getZoneId("zone1");
+
+        CompletableFuture<Set<String>> dataNodesFut = 
distributionZoneManager.getDataNodes(zoneId, 1);
+
+        CompletableFuture<Void> topVerUpFut = 
distributionZoneManager.topVerFutures().get(1L);
+
+        assertFalse(topVerUpFut.isDone());
+
+        Set<String> nodes0 = Set.of("node0", "node1");
+
+        topologyWatchListenerOnUpdate(nodes0, 1, 1);
+
+        dataNodesWatchListenerOnUpdate(zoneId, nodes0, true, 1, 2);
+
+        dataNodesFut.get(3, TimeUnit.SECONDS);
+
+        dataNodesFut = distributionZoneManager.getDataNodes(zoneId, 2);
+
+        Set<String> nodes1 = Set.of("node0");
+
+        topologyWatchListenerOnUpdate(nodes1, 2, 2);
+
+        dataNodesFut.get(3, TimeUnit.SECONDS);
+    }
+
+    @Test
+    void testAwaitingScaleDownOnly() throws Exception {

Review Comment:
   Taking into account the test name, we should check here that other zones do 
not wait for data nodes.  



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -480,6 +625,152 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone. If the 
dataNodesAutoAdjustScaleUp or dataNodesAutoAdjustScaleDown
+     * equals to 0 then returned completable future will be completed when 
data nodes will be saved to the meta storage.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> getDataNodes(int zoneId, long 
topVer) {
+        boolean immediateScaleUp0 = false;
+        boolean immediateScaleDown0 = false;
+
+        if (zoneId == DEFAULT_ZONE_ID) {
+            immediateScaleUp0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().value()
 == 0;
+            immediateScaleDown0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().value()
 == 0;
+        } else {
+            NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
+                    zonesConfiguration.distributionZones();
+
+            for (int i = 0; i < zones.value().size(); i++) {
+                DistributionZoneView zone = zones.value().get(i);
+
+                if (zone.zoneId() == zoneId) {
+                    immediateScaleUp0 = zone.dataNodesAutoAdjustScaleUp() == 0;
+                    immediateScaleDown0 = zone.dataNodesAutoAdjustScaleDown() 
== 0;
+
+                    break;
+                }
+            }
+        }
+
+        boolean immediateScaleUp = immediateScaleUp0;
+        boolean immediateScaleDown = immediateScaleDown0;
+
+        if (!immediateScaleUp && !immediateScaleDown) {
+
+            DataNodes dataNodes = this.dataNodes.get(zoneId);
+
+            Set<String> nodes;
+
+            if (dataNodes == null) {
+                nodes = emptySet();
+            } else {
+                nodes = this.dataNodes.get(zoneId).nodes();
+            }
+
+            return completedFuture(nodes);
+        }
+
+        CompletableFuture<Set<String>> dataNodesFut = new 
CompletableFuture<>();
+
+        CompletableFuture<Void> topVerFut;
+
+        synchronized (dataNodesMutex) {
+            if (topVer > lastTopVer) {
+                topVerFut = topVerFutures.get(topVer);
+
+                if (topVerFut == null) {
+                    topVerFut = new CompletableFuture<>();
+
+                    topVerFutures.put(topVer, topVerFut);

Review Comment:
   Could we use `putIfAbsent` here instead all of checks for null?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -480,6 +625,152 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone. If the 
dataNodesAutoAdjustScaleUp or dataNodesAutoAdjustScaleDown
+     * equals to 0 then returned completable future will be completed when 
data nodes will be saved to the meta storage.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> getDataNodes(int zoneId, long 
topVer) {

Review Comment:
   Please, divide this method and extract part when we react on future 
completion, it will increase readability.  Also please add comments right 
inside the code, currently it's auite hard to understand what's going on



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -480,6 +625,152 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone. If the 
dataNodesAutoAdjustScaleUp or dataNodesAutoAdjustScaleDown
+     * equals to 0 then returned completable future will be completed when 
data nodes will be saved to the meta storage.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> getDataNodes(int zoneId, long 
topVer) {
+        boolean immediateScaleUp0 = false;
+        boolean immediateScaleDown0 = false;
+
+        if (zoneId == DEFAULT_ZONE_ID) {
+            immediateScaleUp0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().value()
 == 0;
+            immediateScaleDown0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().value()
 == 0;
+        } else {
+            NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
+                    zonesConfiguration.distributionZones();
+
+            for (int i = 0; i < zones.value().size(); i++) {
+                DistributionZoneView zone = zones.value().get(i);
+
+                if (zone.zoneId() == zoneId) {
+                    immediateScaleUp0 = zone.dataNodesAutoAdjustScaleUp() == 0;
+                    immediateScaleDown0 = zone.dataNodesAutoAdjustScaleDown() 
== 0;
+
+                    break;
+                }
+            }
+        }
+
+        boolean immediateScaleUp = immediateScaleUp0;
+        boolean immediateScaleDown = immediateScaleDown0;
+
+        if (!immediateScaleUp && !immediateScaleDown) {
+
+            DataNodes dataNodes = this.dataNodes.get(zoneId);
+
+            Set<String> nodes;
+
+            if (dataNodes == null) {
+                nodes = emptySet();
+            } else {
+                nodes = this.dataNodes.get(zoneId).nodes();
+            }
+
+            return completedFuture(nodes);
+        }
+
+        CompletableFuture<Set<String>> dataNodesFut = new 
CompletableFuture<>();
+
+        CompletableFuture<Void> topVerFut;
+
+        synchronized (dataNodesMutex) {
+            if (topVer > lastTopVer) {
+                topVerFut = topVerFutures.get(topVer);
+
+                if (topVerFut == null) {
+                    topVerFut = new CompletableFuture<>();
+
+                    topVerFutures.put(topVer, topVerFut);
+                }
+            } else {
+                topVerFut = completedFuture(null);
+            }
+        }
+
+        topVerFut.thenAcceptAsync(ignored -> {
+            synchronized (dataNodesMutex) {
+                CompletableFuture<Void> topVerScaleUpFut = null;
+
+                if (immediateScaleUp) {
+                    Map.Entry<Long, Long> scaleUpRevisionEntry = 
topVerAndScaleUpRevision.ceilingEntry(topVer);
+
+                    Long scaleUpRevision = null;
+
+                    if (scaleUpRevisionEntry != null) {
+                        scaleUpRevision = scaleUpRevisionEntry.getValue();
+                    }
+
+                    if (scaleUpRevision != null
+                            && (dataNodes.get(zoneId) == null || 
dataNodes.get(zoneId).scaleUpRevision() < scaleUpRevision)) {
+                        Map.Entry<Long, CompletableFuture<Void>> ceilingEntry =
+                                
dataNodes.get(zoneId).revisionScaleUpFutures().ceilingEntry(scaleUpRevision);
+
+                        if (ceilingEntry != null) {
+                            topVerScaleUpFut = ceilingEntry.getValue();
+                        }
+
+                        if (topVerScaleUpFut == null) {
+                            topVerScaleUpFut = new CompletableFuture<>();
+
+                            
dataNodes.get(zoneId).revisionScaleUpFutures().put(scaleUpRevision, 
topVerScaleUpFut);
+                        }
+                    } else {
+                        topVerScaleUpFut = completedFuture(null);
+                    }
+                } else {
+                    topVerScaleUpFut = completedFuture(null);
+                }
+
+                topVerScaleUpFut.thenAcceptAsync(ignored0 -> {
+                    CompletableFuture<Void> topVerScaleDownFut = null;
+
+                    synchronized (dataNodesMutex) {
+                        if (immediateScaleDown) {
+                            Map.Entry<Long, Long> scaleDownRevisionEntry = 
topVerAndScaleDownRevision.ceilingEntry(topVer);
+
+                            Long scaleDownRevision = null;
+
+                            if (scaleDownRevisionEntry != null) {
+                                scaleDownRevision = 
scaleDownRevisionEntry.getValue();
+                            }
+
+                            if (scaleDownRevision != null
+                                    && (dataNodes.get(zoneId) == null || 
dataNodes.get(zoneId).scaleDownRevision() < scaleDownRevision)) {
+                                Map.Entry<Long, CompletableFuture<Void>> 
ceilingEntry =
+                                        
dataNodes.get(zoneId).revisionScaleDownFutures().ceilingEntry(scaleDownRevision);
+
+                                if (ceilingEntry != null) {
+                                    topVerScaleDownFut = 
ceilingEntry.getValue();
+                                }
+
+                                if (topVerScaleDownFut == null) {
+                                    topVerScaleDownFut = new 
CompletableFuture<>();
+
+                                    
dataNodes.get(zoneId).revisionScaleDownFutures.put(scaleDownRevision, 
topVerScaleDownFut);
+                                }
+                            } else {
+                                topVerScaleDownFut = completedFuture(null);
+                            }
+                        } else {
+                            topVerScaleDownFut = completedFuture(null);
+                        }
+                    }
+
+                    topVerScaleDownFut.thenAcceptAsync(ignored1 -> {

Review Comment:
   Why do we need this code? Why we do this only got scale down? 



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java:
##########
@@ -0,0 +1,718 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.StatementResultImpl;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests awaiting data nodes algorithm in distribution zone manager in case 
when dataNodesAutoAdjustScaleUp
+ * or dataNodesAutoAdjustScaleDown equals to 0.
+ */
+public class DistributionZoneAwaitDataNodesTest extends IgniteAbstractTest {
+    private static final IgniteLogger LOG = 
Loggers.forClass(DistributionZoneAwaitDataNodesTest.class);
+
+    private MetaStorageManager metaStorageManager;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private LogicalTopology logicalTopology;
+
+    private ClusterStateStorage clusterStateStorage;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private ClusterManagementGroupManager cmgManager;
+
+    private WatchListener topologyWatchListener;
+
+    private WatchListener dataNodesWatchListener;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        VaultManager vaultManager = mock(VaultManager.class);
+
+        when(vaultManager.get(any())).thenReturn(completedFuture(null));
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        doAnswer(invocation -> {
+            ByteArray key = invocation.getArgument(0);
+
+            WatchListener watchListener = invocation.getArgument(1);
+
+            if (Arrays.equals(key.bytes(), 
zoneLogicalTopologyPrefix().bytes())) {
+                topologyWatchListener = watchListener;
+            } else if (Arrays.equals(key.bytes(), 
zonesDataNodesPrefix().bytes())) {
+                dataNodesWatchListener = watchListener;
+            }
+
+            return null;
+        }).when(metaStorageManager).registerPrefixWatch(any(), any());
+
+        
when(metaStorageManager.invoke(any())).thenReturn(completedFuture(StatementResultImpl.builder().result(new
 byte[] {0}).build()));
+
+        vaultManager.start();
+
+        clusterStateStorage = new TestClusterStateStorage();
+
+        logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+        LogicalTopologyServiceImpl logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+        TablesConfiguration tablesConfiguration = 
mock(TablesConfiguration.class);
+
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Set.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        clusterCfgMgr.start();
+
+        zonesConfiguration = clusterCfgMgr.configurationRegistry()
+                .getConfiguration(DistributionZonesConfiguration.KEY);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                tablesConfiguration,
+                metaStorageManager,
+                logicalTopologyService,
+                vaultManager,
+                "node"
+        );
+
+        mockCmgLocalNodes();
+
+        distributionZoneManager.start();
+
+        distributionZoneManager.alterZone(DEFAULT_ZONE_NAME, new 
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+                        
.dataNodesAutoAdjustScaleUp(0).dataNodesAutoAdjustScaleDown(0).build())
+                .get(3, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        distributionZoneManager.stop();
+        clusterCfgMgr.stop();
+        clusterStateStorage.stop();
+    }
+
+    @Test
+    void testSeveralScaleUpAndSeveralScaleDownThenScaleUpAndScaleDown() throws 
Exception {

Review Comment:
   This and several tests below test await logic only for default zone, I would 
test the same logic, but for several non-default zones.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java:
##########
@@ -266,4 +305,32 @@ public static Map<String, Integer> 
toDataNodesMap(Set<String> dataNodes) {
 
         return dataNodesMap;
     }
+
+    /**
+     * Utility method to check if one byte array starts with a specified 
sequence
+     * of bytes.
+     *
+     * @param array The array to check.
+     * @param prefix The prefix bytes to test for.
+     * @return {@code true} if the array starts with the bytes from the prefix.
+     */
+    public static boolean startsWith(byte[] array, byte[] prefix) {

Review Comment:
   You could extract 
`org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage#startsWith`
 to some common place and reuse it. I prefer variant with `Arrays.equals`



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java:
##########
@@ -0,0 +1,718 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.StatementResultImpl;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests awaiting data nodes algorithm in distribution zone manager in case 
when dataNodesAutoAdjustScaleUp
+ * or dataNodesAutoAdjustScaleDown equals to 0.
+ */
+public class DistributionZoneAwaitDataNodesTest extends IgniteAbstractTest {

Review Comment:
   I would add test scenarios when we have immediate scale up/down, and and in 
the middle of awaiting logic we change scale up/down to a non-zero value.
   
   Also I would expect the scenario when zone was deleted



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -159,6 +167,36 @@ public class DistributionZoneManager implements 
IgniteComponent {
      */
     private final Map<Integer, ZoneState> zonesState;
 
+    /** Data nodes modification mutex. */
+    private final Object dataNodesMutex = new Object();
+
+    /** The last topology version wich was observed by distribution zone 
manager. */
+    private long lastTopVer;
+
+    /**
+     * Contains data nodes and meta info for zones.
+     * Map (zone id -> data nodes).
+     */
+    private final Map<Integer, DataNodes> dataNodes = new 
ConcurrentHashMap<>();

Review Comment:
   If this and maps below are accessed only in synchronised block, do we really 
need `ConcurrentHashMap` here? 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -183,8 +221,89 @@ public void onTopologyLeap(LogicalTopologySnapshot 
newTopology) {
      */
     private volatile Set<String> logicalTopology;
 
-    /** Watch listener. Needed to unregister it on {@link 
DistributionZoneManager#stop()}. */
-    private final WatchListener watchListener;
+    /** Watch listener for logical topology keys. */
+    private final WatchListener topologyWatchListener;
+
+    /** Watch listener for data nodes keys. */
+    private final WatchListener dataNodesWatchListener;
+
+    /**
+     * Contains data nodes and meta info.
+     */
+    static class DataNodes {
+        /** Data nodes. */
+        private Set<String> nodes;
+
+        /** Scale up metastorage revision of current nodes value. */
+        private long scaleUpRevision;
+
+        /** Scale down metastorage revision of current nodes value. */
+        private long scaleDownRevision;
+
+        /**
+         * The map contains futures which are completed when zone manager 
observe data nodes bound to appropriate scale up revision.
+         * Map (revision -> future).
+         */
+        private final NavigableMap<Long, CompletableFuture<Void>> 
revisionScaleUpFutures = new ConcurrentSkipListMap();
+
+        /**
+         * The map contains futures which are completed when zone manager 
observe data nodes bound to appropriate scale down revision.
+         * Map (revision -> future).
+         */
+        private final NavigableMap<Long, CompletableFuture<Void>> 
revisionScaleDownFutures = new ConcurrentSkipListMap();
+
+        DataNodes() {
+            nodes = emptySet();
+        }
+
+        DataNodes(Set<String> nodes, long scaleUpRevision, long 
scaleDownRevision) {
+            this.nodes = nodes;
+            this.scaleUpRevision = scaleUpRevision;
+            this.scaleDownRevision = scaleDownRevision;
+        }
+
+        Set<String> nodes() {
+            return nodes;
+        }
+
+        void nodes(Set<String> nodes) {
+            this.nodes = nodes;
+        }
+
+        long scaleUpRevision() {
+            return scaleUpRevision;
+        }
+
+        void scaleUpRevision(long scaleUpRevision) {
+            this.scaleUpRevision = scaleUpRevision;
+        }
+
+        long scaleDownRevision() {
+            return scaleDownRevision;
+        }
+
+        void scaleDownRevision(long scaleDownRevision) {
+            this.scaleDownRevision = scaleDownRevision;
+        }
+
+        NavigableMap<Long, CompletableFuture<Void>> revisionScaleUpFutures() {
+            return revisionScaleUpFutures;
+        }
+
+        NavigableMap<Long, CompletableFuture<Void>> revisionScaleDownFutures() 
{
+            return revisionScaleDownFutures;
+        }
+    }
+
+    @TestOnly
+    Map<Integer, DataNodes> dataNodes() {

Review Comment:
   This two `TestOnly` methods could be invoked concurrently 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -480,6 +625,152 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone. If the 
dataNodesAutoAdjustScaleUp or dataNodesAutoAdjustScaleDown

Review Comment:
   We need links to `dataNodesAutoAdjustScaleUp`, 
`dataNodesAutoAdjustScaleDown`. User must be able to understand what these 
concepts mean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to