alievmirza commented on code in PR #1729:
URL: https://github.com/apache/ignite-3/pull/1729#discussion_r1161731849
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -505,6 +543,133 @@ public int getZoneId(String name) {
}
}
+ /**
+ * The method for obtaining data nodes of the specified zone.
+ * The flow for the future completion:
+ * Waiting for DistributionZoneManager observe passed topology version or
greater version in topologyWatchListener.
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0
than wait for writing data nodes triggered
+ * by started nodes and corresponding to the passed topology version or
greater topology version
+ * to the data nodes into the meta storage.
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0
than wait for writing data nodes
+ * triggered by stopped nodes and corresponding to the passed topology
version or greater topology version
+ * to the data nodes into the meta storage.
+ * After waiting it completes the future with data nodes of the specified
zone.
+ * This method must be invoked in a change configuration closure to
guarantee that the zone is exists and values of scaleUp/scaleDown
+ * timers are up to date.
+ * The returned future can be completed with {@link
DistributionZoneNotFoundException} and {@link
DistributionZoneWasRemovedException}
+ * in case when the distribution zone was removed during method execution.
+ *
+ * @param zoneId Zone id.
+ * @param topVer Topology version.
+ * @return The data nodes future.
+ */
+ public CompletableFuture<Set<String>> topologyVersionedDataNodes(int
zoneId, long topVer) {
+ CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut =
awaitTopologyVersion(topVer)
+ .thenCompose(ignored -> getImmediateTimers(zoneId));
+
+ return allOf(
+ timerValuesFut.thenCompose(timerValues ->
scaleUpAwaiting(zoneId, timerValues.get1())),
+ timerValuesFut.thenCompose(timerValues ->
scaleDownAwaiting(zoneId, timerValues.get2()))
+ ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+ }
+
+ /**
+ * Waits for DistributionZoneManager observe passed topology version or
greater version in topologyWatchListener.
Review Comment:
waits for observing, please add {@link } to topologyWatchListener
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -505,6 +543,133 @@ public int getZoneId(String name) {
}
}
+ /**
+ * The method for obtaining data nodes of the specified zone.
+ * The flow for the future completion:
+ * Waiting for DistributionZoneManager observe passed topology version or
greater version in topologyWatchListener.
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0
than wait for writing data nodes triggered
+ * by started nodes and corresponding to the passed topology version or
greater topology version
+ * to the data nodes into the meta storage.
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0
than wait for writing data nodes
+ * triggered by stopped nodes and corresponding to the passed topology
version or greater topology version
+ * to the data nodes into the meta storage.
+ * After waiting it completes the future with data nodes of the specified
zone.
+ * This method must be invoked in a change configuration closure to
guarantee that the zone is exists and values of scaleUp/scaleDown
+ * timers are up to date.
+ * The returned future can be completed with {@link
DistributionZoneNotFoundException} and {@link
DistributionZoneWasRemovedException}
+ * in case when the distribution zone was removed during method execution.
+ *
+ * @param zoneId Zone id.
+ * @param topVer Topology version.
+ * @return The data nodes future.
+ */
+ public CompletableFuture<Set<String>> topologyVersionedDataNodes(int
zoneId, long topVer) {
+ CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut =
awaitTopologyVersion(topVer)
+ .thenCompose(ignored -> getImmediateTimers(zoneId));
+
+ return allOf(
+ timerValuesFut.thenCompose(timerValues ->
scaleUpAwaiting(zoneId, timerValues.get1())),
+ timerValuesFut.thenCompose(timerValues ->
scaleDownAwaiting(zoneId, timerValues.get2()))
+ ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+ }
+
+ /**
+ * Waits for DistributionZoneManager observe passed topology version or
greater version in topologyWatchListener.
+ *
+ * @param topVer Topology version.
+ * @return Future for chaining.
+ */
+ private CompletableFuture<Void> awaitTopologyVersion(long topVer) {
+ return inBusyLock(busyLock, () -> topVerTracker.waitFor(topVer));
+ }
+
+ /**
+ * Transforms {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * and {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} values to
boolean values.
+ * True if it equals to zero and false if it greater than zero. Zero means
that data nodes changing must be started immediate.
Review Comment:
immediately
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -484,6 +537,189 @@ public int getZoneId(String name) {
}
}
+ /**
+ * The method for obtaining data nodes of the specified zone.
Review Comment:
Comment from Alex is still actual, you have added implementation details of
mechanism of awaiting, which is ok (but this details are too complicated and
with grammar mistakes, please try to simplify it, maybe explain only one
scenario with scale up, and mention that scale down logic is the same), but at
the first place we must add some short explanation of the method purpose.
> The method for obtaining data nodes of the specified zone.
This is not just a method for obtaining data nodes, but it takes into
account provided {@code topVer} and compares it with the explored by DZM
topology version, please add short javadoc in the beginning so it could be
clear what this method actually does
> @return The data nodes future.
This is also quite short, we should say that the completion of this future
depends on the explored by DZM topology version etc.
Also please fix typos, like `than` -> `then`
##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java:
##########
@@ -0,0 +1,910 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfigurationSchema;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneWasRemovedException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.StatementResultImpl;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.schema.configuration.TableChange;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests awaiting data nodes algorithm in distribution zone manager in case
when
+ * {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * or {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown}
are immediate.
+ */
+public class DistributionZoneAwaitDataNodesTest extends IgniteAbstractTest {
+ private static final IgniteLogger LOG =
Loggers.forClass(DistributionZoneAwaitDataNodesTest.class);
+
+ private MetaStorageManager metaStorageManager;
+
+ private DistributionZoneManager distributionZoneManager;
+
+ private LogicalTopology logicalTopology;
+
+ private ClusterStateStorage clusterStateStorage;
+
+ private DistributionZonesConfiguration zonesConfiguration;
+
+ private ConfigurationManager clusterCfgMgr;
+
+ private ClusterManagementGroupManager cmgManager;
+
+ private VaultManager vaultManager;
+
+ private WatchListener topologyWatchListener;
+
+ private WatchListener dataNodesWatchListener;
+
+ @BeforeEach
+ void setUp() {
+ vaultManager = mock(VaultManager.class);
+
+
when(vaultManager.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyKey(), null)));
+
+ when(vaultManager.get(zonesLogicalTopologyVersionKey()))
+ .thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyVersionKey(), longToBytes(0))));
+
+ cmgManager = mock(ClusterManagementGroupManager.class);
+
+ metaStorageManager = mock(MetaStorageManager.class);
+
+ doAnswer(invocation -> {
+ ByteArray key = invocation.getArgument(0);
+
+ WatchListener watchListener = invocation.getArgument(1);
+
+ if (Arrays.equals(key.bytes(),
zoneLogicalTopologyPrefix().bytes())) {
+ topologyWatchListener = watchListener;
+ } else if (Arrays.equals(key.bytes(),
zonesDataNodesPrefix().bytes())) {
+ dataNodesWatchListener = watchListener;
+ }
+
+ return null;
+ }).when(metaStorageManager).registerPrefixWatch(any(), any());
+
+ when(metaStorageManager.invoke(any()))
+
.thenReturn(completedFuture(StatementResultImpl.builder().result(new byte[]
{0}).build()));
+
+ clusterStateStorage = new TestClusterStateStorage();
+
+ logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+ LogicalTopologyServiceImpl logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+ TablesConfiguration tablesConfiguration =
mock(TablesConfiguration.class);
+
+ NamedConfigurationTree<TableConfiguration, TableView, TableChange>
tables = mock(NamedConfigurationTree.class);
+
+ when(tablesConfiguration.tables()).thenReturn(tables);
+
+ NamedListView<TableView> tablesValue = mock(NamedListView.class);
+
+ when(tables.value()).thenReturn(tablesValue);
+
+ when(tablesValue.size()).thenReturn(0);
+
+ clusterCfgMgr = new ConfigurationManager(
+ List.of(DistributionZonesConfiguration.KEY),
+ Set.of(),
+ new TestConfigurationStorage(DISTRIBUTED),
+ List.of(),
+ List.of()
+ );
+
+ clusterCfgMgr.start();
+
+ zonesConfiguration = clusterCfgMgr.configurationRegistry()
+ .getConfiguration(DistributionZonesConfiguration.KEY);
+
+ distributionZoneManager = new DistributionZoneManager(
+ zonesConfiguration,
+ tablesConfiguration,
+ metaStorageManager,
+ logicalTopologyService,
+ vaultManager,
+ "node"
+ );
+
+ mockCmgLocalNodes();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ distributionZoneManager.stop();
+ clusterCfgMgr.stop();
+ clusterStateStorage.stop();
+ }
+
+ /**
+ * Test checks that data nodes futures are completed on topology with
added and removed nodes.
+ */
+ @Test
+ void testSeveralScaleUpAndSeveralScaleDownThenScaleUpAndScaleDown() throws
Exception {
+ startZoneManager(0);
+
+ TestSeveralScaleUpAndSeveralScaleDownDataObject testData =
testSeveralScaleUpAndSeveralScaleDownGeneral();
+
+ LOG.info("Topology with added and removed nodes.");
+
+ Set<String> dataNodes = Set.of("node0", "node2");
+
+ topologyWatchListenerOnUpdate(dataNodes, testData.topVer2,
testData.dataNodesRevision2);
+
+ assertTrue(testData.topVerUpFut2.isDone());
+ assertTrue(testData.topVerDownFut2.isDone());
+
+ CompletableFuture<Void> revisionUpFut =
distributionZoneManager.zonesState().get(DEFAULT_ZONE_ID)
+ .scaleUpRevisionTracker().waitFor(testData.dataNodesRevision2);
+
+ assertFalse(revisionUpFut.isDone());
+
+ dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, true,
testData.dataNodesRevision2,
+ testData.dataNodesRevision2 + 1);
+
+ assertTrue(waitForCondition(revisionUpFut::isDone, 3000));
+
+ CompletableFuture<Void> revisionDownFut =
distributionZoneManager.zonesState().get(DEFAULT_ZONE_ID).scaleDownRevisionTracker()
+ .waitFor(testData.dataNodesRevision2);
+
+ assertFalse(revisionDownFut.isDone());
+
+ assertFalse(testData.dataNodesUpFut3.isDone());
+ assertFalse(testData.dataNodesDownFut3.isDone());
+
+ dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, false,
testData.dataNodesRevision2,
+ testData.dataNodesRevision2 + 2);
+
+ assertTrue(waitForCondition(revisionDownFut::isDone, 3000));
+
+ assertEquals(dataNodes, testData.dataNodesUpFut3.get(3, SECONDS));
+ assertEquals(dataNodes, testData.dataNodesDownFut3.get(3, SECONDS));
+ }
+
+ /**
+ * Test checks that data nodes futures are completed on topology with
added nodes.
+ */
+ @Test
+ void testSeveralScaleUpAndSeveralScaleDownThenScaleUp() throws Exception {
+ startZoneManager(0);
+
+ TestSeveralScaleUpAndSeveralScaleDownDataObject testData =
testSeveralScaleUpAndSeveralScaleDownGeneral();
+
+ LOG.info("Topology with added nodes.");
+
+ Set<String> dataNodes = Set.of("node0", "node1", "node2");
+
+ topologyWatchListenerOnUpdate(dataNodes, testData.topVer2,
testData.dataNodesRevision2);
+
+ assertTrue(testData.topVerUpFut2.isDone());
+ assertTrue(testData.topVerDownFut2.isDone());
+
+ CompletableFuture<Void> revisionUpFut =
distributionZoneManager.zonesState().get(DEFAULT_ZONE_ID)
+
.scaleUpRevisionTracker().waitFor(testData.dataNodesRevision2);
+
+ assertFalse(revisionUpFut.isDone());
+
+ dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, true,
testData.dataNodesRevision2,
+ testData.dataNodesRevision2 + 1);
+
+ assertTrue(waitForCondition(revisionUpFut::isDone, 3000));
+ assertEquals(dataNodes, testData.dataNodesUpFut3.get(3, SECONDS));
+ assertEquals(dataNodes, testData.dataNodesDownFut3.get(3, SECONDS));
+ }
+
+ /**
+ * Test checks that data nodes futures are completed on topology with
removed nodes.
+ */
+ @Test
+ void testSeveralScaleUpAndSeveralScaleDownThenScaleDown() throws Exception
{
+ startZoneManager(0);
+
+ TestSeveralScaleUpAndSeveralScaleDownDataObject testData =
testSeveralScaleUpAndSeveralScaleDownGeneral();
+
+ LOG.info("Topology with removed nodes.");
+
+ Set<String> dataNodes = Set.of("node0");
+
+ topologyWatchListenerOnUpdate(dataNodes, testData.topVer2,
testData.dataNodesRevision2);
+
+ assertTrue(testData.topVerUpFut2.isDone());
+ assertTrue(testData.topVerDownFut2.isDone());
+
+ CompletableFuture<Void> revisionDownFut =
distributionZoneManager.zonesState().get(DEFAULT_ZONE_ID)
+ .scaleDownRevisionTracker().waitFor((long)
testData.dataNodesRevision2);
+
+ assertFalse(revisionDownFut.isDone());
+
+ dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, false,
testData.dataNodesRevision2,
+ testData.dataNodesRevision2 + 1);
+
+ assertTrue(waitForCondition(revisionDownFut::isDone, 3000));
+ assertEquals(dataNodes, testData.dataNodesUpFut3.get(3, SECONDS));
+ assertEquals(dataNodes, testData.dataNodesDownFut3.get(3, SECONDS));
+ }
+
+ private static class TestSeveralScaleUpAndSeveralScaleDownDataObject {
+ private final long topVer2;
+ private final long dataNodesRevision2;
+ private final CompletableFuture<Void> topVerUpFut2;
+ private final CompletableFuture<Void> topVerDownFut2;
+ private final CompletableFuture<Set<String>> dataNodesUpFut3;
+ private final CompletableFuture<Set<String>> dataNodesDownFut3;
+
+ TestSeveralScaleUpAndSeveralScaleDownDataObject(
+ long topVer2,
+ long dataNodesRevision2,
+ CompletableFuture<Void> topVerUpFut2,
+ CompletableFuture<Void> topVerDownFut2,
+ CompletableFuture<Set<String>> dataNodesUpFut3,
+ CompletableFuture<Set<String>> dataNodesDownFut3) {
+ this.topVer2 = topVer2;
+ this.dataNodesRevision2 = dataNodesRevision2;
+ this.topVerUpFut2 = topVerUpFut2;
+ this.topVerDownFut2 = topVerDownFut2;
+ this.dataNodesUpFut3 = dataNodesUpFut3;
+ this.dataNodesDownFut3 = dataNodesDownFut3;
+ }
+ }
+
+ /**
+ * This method invokes {@link
DistributionZoneManager#topologyVersionedDataNodes(int, long)} with default and
non-default zone id
+ * and different logical topology version. Collects data nodes futures.
+ * Simulates new logical topology with new nodes. Check that some of data
nodes futures are completed.
+ * Simulates new logical topology with removed nodes. Check that some of
data nodes futures are completed.
+ *
+ * @return Structure with data for continue testing.
+ */
+ private TestSeveralScaleUpAndSeveralScaleDownDataObject
testSeveralScaleUpAndSeveralScaleDownGeneral()
Review Comment:
I do not understand this concept with
`TestSeveralScaleUpAndSeveralScaleDownDataObject`, many things are done in one
method, it is hard to read this code, please do something with that.
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -505,6 +543,133 @@ public int getZoneId(String name) {
}
}
+ /**
+ * The method for obtaining data nodes of the specified zone.
+ * The flow for the future completion:
+ * Waiting for DistributionZoneManager observe passed topology version or
greater version in topologyWatchListener.
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0
than wait for writing data nodes triggered
+ * by started nodes and corresponding to the passed topology version or
greater topology version
+ * to the data nodes into the meta storage.
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0
than wait for writing data nodes
+ * triggered by stopped nodes and corresponding to the passed topology
version or greater topology version
+ * to the data nodes into the meta storage.
+ * After waiting it completes the future with data nodes of the specified
zone.
+ * This method must be invoked in a change configuration closure to
guarantee that the zone is exists and values of scaleUp/scaleDown
+ * timers are up to date.
+ * The returned future can be completed with {@link
DistributionZoneNotFoundException} and {@link
DistributionZoneWasRemovedException}
+ * in case when the distribution zone was removed during method execution.
+ *
+ * @param zoneId Zone id.
+ * @param topVer Topology version.
+ * @return The data nodes future.
+ */
+ public CompletableFuture<Set<String>> topologyVersionedDataNodes(int
zoneId, long topVer) {
+ CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut =
awaitTopologyVersion(topVer)
+ .thenCompose(ignored -> getImmediateTimers(zoneId));
+
+ return allOf(
+ timerValuesFut.thenCompose(timerValues ->
scaleUpAwaiting(zoneId, timerValues.get1())),
+ timerValuesFut.thenCompose(timerValues ->
scaleDownAwaiting(zoneId, timerValues.get2()))
+ ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+ }
+
+ /**
+ * Waits for DistributionZoneManager observe passed topology version or
greater version in topologyWatchListener.
+ *
+ * @param topVer Topology version.
+ * @return Future for chaining.
+ */
+ private CompletableFuture<Void> awaitTopologyVersion(long topVer) {
+ return inBusyLock(busyLock, () -> topVerTracker.waitFor(topVer));
+ }
+
+ /**
+ * Transforms {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * and {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} values to
boolean values.
+ * True if it equals to zero and false if it greater than zero. Zero means
that data nodes changing must be started immediate.
+ *
+ * @param zoneId Zone id.
+ * @return Future with the result.
+ */
+ private CompletableFuture<IgniteBiTuple<Boolean, Boolean>>
getImmediateTimers(int zoneId) {
+ return inBusyLock(busyLock, () -> {
+ DistributionZoneConfiguration zoneCfg =
getZoneById(zonesConfiguration, zoneId);
+
+ return completedFuture(new IgniteBiTuple<>(
+ zoneCfg.dataNodesAutoAdjustScaleUp().value() ==
IMMEDIATE_TIMER_VALUE,
+ zoneCfg.dataNodesAutoAdjustScaleDown().value() ==
IMMEDIATE_TIMER_VALUE
+ ));
+ });
+ }
+
+ /**
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0
than waits for writing data nodes triggered
Review Comment:
Also it could be useful to mention case with
`DistributionZoneWasRemovedException`
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -898,45 +1083,62 @@ private void initDataNodesFromVaultManager() {
}
try {
- vaultMgr.get(zonesLogicalTopologyKey())
- .thenAccept(vaultEntry -> {
- if (!busyLock.enterBusy()) {
- throw new
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
- }
+ long appliedRevision = metaStorageManager.appliedRevision();
- try {
- long appliedRevision =
metaStorageManager.appliedRevision();
+ lastScaleUpRevision = appliedRevision;
- if (vaultEntry != null && vaultEntry.value() !=
null) {
- logicalTopology =
fromBytes(vaultEntry.value());
+ lastScaleDownRevision = appliedRevision;
- // init keys and data nodes for default zone
- saveDataNodesAndUpdateTriggerKeysInMetaStorage(
- DEFAULT_ZONE_ID,
- appliedRevision,
- logicalTopology
- );
+ VaultEntry topVerEntry =
vaultMgr.get(zonesLogicalTopologyVersionKey()).join();
-
zonesConfiguration.distributionZones().value().forEach(zone -> {
- int zoneId = zone.zoneId();
+ if (topVerEntry != null && topVerEntry.value() != null) {
+ topVerTracker.update(bytesToLong(topVerEntry.value()));
+ }
-
saveDataNodesAndUpdateTriggerKeysInMetaStorage(
- zoneId,
- appliedRevision,
- logicalTopology
- );
- });
- }
- } finally {
- busyLock.leaveBusy();
- }
- });
+ VaultEntry topologyEntry =
vaultMgr.get(zonesLogicalTopologyKey()).join();
+
+ if (topologyEntry != null && topologyEntry.value() != null) {
+ logicalTopology = fromBytes(topologyEntry.value());
+
+ // init keys and data nodes for default zone
+ saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+ DEFAULT_ZONE_ID,
+ appliedRevision,
+ logicalTopology
+ );
+
+ zonesConfiguration.distributionZones().value().forEach(zone ->
{
+ int zoneId = zone.zoneId();
+
+ saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+ zoneId,
+ appliedRevision,
+ logicalTopology
+ );
+ });
+ }
+
+ zonesState.values().forEach(zoneState -> {
+ zoneState.scaleUpRevisionTracker().update(lastScaleUpRevision);
+
+
zoneState.scaleDownRevisionTracker().update(lastScaleDownRevision);
+
+ zoneState.nodes(logicalTopology);
+ });
+
+ assert topologyEntry == null || topologyEntry.value() == null ||
logicalTopology.equals(fromBytes(topologyEntry.value()))
+ : "DistributionZoneManager.logicalTopology was changed
after initialization from the vault manager.";
Review Comment:
Please, remove `DistributionZoneManager.logicalTopology` from this message
and try to rephrase it. This code can seen by user
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -505,6 +543,133 @@ public int getZoneId(String name) {
}
}
+ /**
+ * The method for obtaining data nodes of the specified zone.
+ * The flow for the future completion:
+ * Waiting for DistributionZoneManager observe passed topology version or
greater version in topologyWatchListener.
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0
than wait for writing data nodes triggered
+ * by started nodes and corresponding to the passed topology version or
greater topology version
+ * to the data nodes into the meta storage.
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0
than wait for writing data nodes
+ * triggered by stopped nodes and corresponding to the passed topology
version or greater topology version
+ * to the data nodes into the meta storage.
+ * After waiting it completes the future with data nodes of the specified
zone.
+ * This method must be invoked in a change configuration closure to
guarantee that the zone is exists and values of scaleUp/scaleDown
+ * timers are up to date.
+ * The returned future can be completed with {@link
DistributionZoneNotFoundException} and {@link
DistributionZoneWasRemovedException}
+ * in case when the distribution zone was removed during method execution.
+ *
+ * @param zoneId Zone id.
+ * @param topVer Topology version.
+ * @return The data nodes future.
+ */
+ public CompletableFuture<Set<String>> topologyVersionedDataNodes(int
zoneId, long topVer) {
+ CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut =
awaitTopologyVersion(topVer)
+ .thenCompose(ignored -> getImmediateTimers(zoneId));
+
+ return allOf(
+ timerValuesFut.thenCompose(timerValues ->
scaleUpAwaiting(zoneId, timerValues.get1())),
+ timerValuesFut.thenCompose(timerValues ->
scaleDownAwaiting(zoneId, timerValues.get2()))
+ ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+ }
+
+ /**
+ * Waits for DistributionZoneManager observe passed topology version or
greater version in topologyWatchListener.
+ *
+ * @param topVer Topology version.
+ * @return Future for chaining.
+ */
+ private CompletableFuture<Void> awaitTopologyVersion(long topVer) {
+ return inBusyLock(busyLock, () -> topVerTracker.waitFor(topVer));
+ }
+
+ /**
+ * Transforms {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * and {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} values to
boolean values.
+ * True if it equals to zero and false if it greater than zero. Zero means
that data nodes changing must be started immediate.
+ *
+ * @param zoneId Zone id.
+ * @return Future with the result.
+ */
+ private CompletableFuture<IgniteBiTuple<Boolean, Boolean>>
getImmediateTimers(int zoneId) {
+ return inBusyLock(busyLock, () -> {
+ DistributionZoneConfiguration zoneCfg =
getZoneById(zonesConfiguration, zoneId);
+
+ return completedFuture(new IgniteBiTuple<>(
+ zoneCfg.dataNodesAutoAdjustScaleUp().value() ==
IMMEDIATE_TIMER_VALUE,
+ zoneCfg.dataNodesAutoAdjustScaleDown().value() ==
IMMEDIATE_TIMER_VALUE
+ ));
+ });
+ }
+
+ /**
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0
than waits for writing data nodes triggered
Review Comment:
Please rephrase this javadoc and one below, it is really hard to understand
what is going on here
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -945,24 +1147,57 @@ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
}
try {
- assert evt.single() : "Expected an event with one entry
but was an event with several entries with keys: "
+ assert evt.entryEvents().size() == 2 :
+ "Expected an event with logical topology and
logical topology version entries but was events with keys: "
+ evt.entryEvents().stream().map(entry ->
entry.newEntry() == null ? "null" : entry.newEntry().key())
.collect(toList());
- Entry newEntry = evt.entryEvent().newEntry();
+ long topVer = 0;
+
+ byte[] newLogicalTopologyBytes = null;
+
+ Set<String> newLogicalTopology = null;
+
+ long revision = 0;
+
+ for (EntryEvent event : evt.entryEvents()) {
+ Entry e = event.newEntry();
- long revision = newEntry.revision();
+ if (Arrays.equals(e.key(),
zonesLogicalTopologyVersionKey().bytes())) {
+ topVer = bytesToLong(e.value());
- byte[] newLogicalTopologyBytes = newEntry.value();
+ revision = e.revision();
+ } else if (Arrays.equals(e.key(),
zonesLogicalTopologyKey().bytes())) {
+ newLogicalTopologyBytes = e.value();
- Set<String> newLogicalTopology =
fromBytes(newLogicalTopologyBytes);
+ newLogicalTopology =
fromBytes(newLogicalTopologyBytes);
+ }
+ }
+
+ assert newLogicalTopology != null;
+ assert revision > 0;
+
+ Set<String> newLogicalTopology0 = newLogicalTopology;
Set<String> removedNodes =
- logicalTopology.stream().filter(node ->
!newLogicalTopology.contains(node)).collect(toSet());
+ logicalTopology.stream().filter(node ->
!newLogicalTopology0.contains(node)).collect(toSet());
Set<String> addedNodes =
newLogicalTopology.stream().filter(node ->
!logicalTopology.contains(node)).collect(toSet());
+ if (!addedNodes.isEmpty()) {
+ lastScaleUpRevision = revision;
+ }
+
+ if (!removedNodes.isEmpty()) {
+ lastScaleDownRevision = revision;
+ }
+
+ //The topology version must be updated after the
lastScaleUpRevision and lastScaleDownRevision are updated.
+ //This is necessary in order to when topology version
waiters will be notified that topology version is updated
Review Comment:
please try to rephrase the second sentence, it is unclear what you wanted to
say
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java:
##########
@@ -92,6 +93,8 @@ protected IgniteSql igniteSql() {
return CLUSTER_NODES.get(0).sql();
}
+
+ @Disabled
Review Comment:
Disabled without a ticket. Are this issues still valid?
##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java:
##########
@@ -0,0 +1,910 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfigurationSchema;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneWasRemovedException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.StatementResultImpl;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.schema.configuration.TableChange;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests awaiting data nodes algorithm in distribution zone manager in case
when
+ * {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * or {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown}
are immediate.
+ */
+public class DistributionZoneAwaitDataNodesTest extends IgniteAbstractTest {
+ private static final IgniteLogger LOG =
Loggers.forClass(DistributionZoneAwaitDataNodesTest.class);
+
+ private MetaStorageManager metaStorageManager;
+
+ private DistributionZoneManager distributionZoneManager;
+
+ private LogicalTopology logicalTopology;
+
+ private ClusterStateStorage clusterStateStorage;
+
+ private DistributionZonesConfiguration zonesConfiguration;
+
+ private ConfigurationManager clusterCfgMgr;
+
+ private ClusterManagementGroupManager cmgManager;
+
+ private VaultManager vaultManager;
+
+ private WatchListener topologyWatchListener;
+
+ private WatchListener dataNodesWatchListener;
+
+ @BeforeEach
+ void setUp() {
+ vaultManager = mock(VaultManager.class);
+
+
when(vaultManager.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyKey(), null)));
+
+ when(vaultManager.get(zonesLogicalTopologyVersionKey()))
+ .thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyVersionKey(), longToBytes(0))));
+
+ cmgManager = mock(ClusterManagementGroupManager.class);
+
+ metaStorageManager = mock(MetaStorageManager.class);
+
+ doAnswer(invocation -> {
+ ByteArray key = invocation.getArgument(0);
+
+ WatchListener watchListener = invocation.getArgument(1);
+
+ if (Arrays.equals(key.bytes(),
zoneLogicalTopologyPrefix().bytes())) {
+ topologyWatchListener = watchListener;
+ } else if (Arrays.equals(key.bytes(),
zonesDataNodesPrefix().bytes())) {
+ dataNodesWatchListener = watchListener;
+ }
+
+ return null;
+ }).when(metaStorageManager).registerPrefixWatch(any(), any());
+
+ when(metaStorageManager.invoke(any()))
+
.thenReturn(completedFuture(StatementResultImpl.builder().result(new byte[]
{0}).build()));
+
+ clusterStateStorage = new TestClusterStateStorage();
+
+ logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+ LogicalTopologyServiceImpl logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+ TablesConfiguration tablesConfiguration =
mock(TablesConfiguration.class);
Review Comment:
Please rewrite it to a `@InjectConfiguration`
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -945,24 +1147,57 @@ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
}
try {
- assert evt.single() : "Expected an event with one entry
but was an event with several entries with keys: "
+ assert evt.entryEvents().size() == 2 :
+ "Expected an event with logical topology and
logical topology version entries but was events with keys: "
+ evt.entryEvents().stream().map(entry ->
entry.newEntry() == null ? "null" : entry.newEntry().key())
.collect(toList());
- Entry newEntry = evt.entryEvent().newEntry();
+ long topVer = 0;
+
+ byte[] newLogicalTopologyBytes = null;
+
+ Set<String> newLogicalTopology = null;
+
+ long revision = 0;
+
+ for (EntryEvent event : evt.entryEvents()) {
+ Entry e = event.newEntry();
- long revision = newEntry.revision();
+ if (Arrays.equals(e.key(),
zonesLogicalTopologyVersionKey().bytes())) {
+ topVer = bytesToLong(e.value());
- byte[] newLogicalTopologyBytes = newEntry.value();
+ revision = e.revision();
+ } else if (Arrays.equals(e.key(),
zonesLogicalTopologyKey().bytes())) {
+ newLogicalTopologyBytes = e.value();
- Set<String> newLogicalTopology =
fromBytes(newLogicalTopologyBytes);
+ newLogicalTopology =
fromBytes(newLogicalTopologyBytes);
+ }
+ }
+
+ assert newLogicalTopology != null;
Review Comment:
asserts without a message
##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java:
##########
@@ -0,0 +1,910 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfigurationSchema;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneWasRemovedException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.StatementResultImpl;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.schema.configuration.TableChange;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests awaiting data nodes algorithm in distribution zone manager in case
when
+ * {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * or {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown}
are immediate.
+ */
+public class DistributionZoneAwaitDataNodesTest extends IgniteAbstractTest {
Review Comment:
Please do something with the readability of this class, it is hard to
understand scenarios, javadocs and test names do not help,
`TestSeveralScaleUpAndSeveralScaleDownDataObject` looks too complicated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]