This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ff7407fd9a IGNITE-19736 Do not cancel tasks in 
DistributionZoneManager#executor if they were created by immediate 
scaleUp/scaleDown events. Avoid concurrent executing several tasks for the same 
zone. (#2201)
ff7407fd9a is described below

commit ff7407fd9a930bcabbecb40d0f3067320054075a
Author: Sergey Uttsel <utt...@gmail.com>
AuthorDate: Tue Jun 27 11:30:15 2023 +0300

    IGNITE-19736 Do not cancel tasks in DistributionZoneManager#executor if 
they were created by immediate scaleUp/scaleDown events. Avoid concurrent 
executing several tasks for the same zone. (#2201)
---
 .../distributionzones/DistributionZoneManager.java |  61 ++--
 .../distributionzones/DistributionZonesUtil.java   |  22 ++
 .../DistributionZoneManagerScaleUpTest.java        | 312 ---------------------
 .../DistributionZonesSchedulersTest.java           | 275 ++++++++++++++++--
 4 files changed, 312 insertions(+), 358 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 70f082753c..c2fb15a2af 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -21,8 +21,10 @@ import static java.util.Collections.emptySet;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesAndUpdateTriggerKeys;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractChangeTriggerRevision;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractDataNodes;
@@ -73,9 +75,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiFunction;
 import org.apache.ignite.configuration.ConfigurationChangeException;
@@ -282,10 +281,8 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
         nodesAttributes = new ConcurrentHashMap<>();
 
-        executor = new ScheduledThreadPoolExecutor(
-                Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
-                new 
NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, 
DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG),
-                new ThreadPoolExecutor.DiscardPolicy()
+        executor = createZoneManagerExecutor(
+                new 
NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, 
DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG)
         );
 
         // It's safe to leak with partially initialised object here, because 
rebalanceEngine is only accessible through this or by
@@ -361,7 +358,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
         metaStorageManager.unregisterWatch(topologyWatchListener);
         metaStorageManager.unregisterWatch(dataNodesWatchListener);
 
-        shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
+        shutdownAndAwaitTermination(executor, 10, SECONDS);
     }
 
     /**
@@ -1610,6 +1607,12 @@ public class DistributionZoneManager implements 
IgniteComponent {
         /** Schedule task for a scale down process. */
         private ScheduledFuture<?> scaleDownTask;
 
+        /** The delay for the scale up task. */
+        private long scaleUpTaskDelay;
+
+        /** The delay for the scale down task. */
+        private long scaleDownTaskDelay;
+
         /**
          * Map that stores pairs revision -> {@link Augmentation} for a zone. 
With this map we can track which nodes
          * should be added or removed in the processes of scale up or scale 
down. Revision helps to track visibility of the events
@@ -1644,56 +1647,64 @@ public class DistributionZoneManager implements 
IgniteComponent {
         }
 
         /**
-         * Reschedules existing scale up task, if it is not started yet, or 
schedules new one, if the current task cannot be canceled.
+         * Reschedules existing scale up task, if it is not started yet and 
the delay of this task is not immediate,
+         * or schedules new one, if the current task cannot be canceled.
          *
          * @param delay Delay to start runnable in seconds.
          * @param runnable Custom logic to run.
          */
         synchronized void rescheduleScaleUp(long delay, Runnable runnable) {
-            if (scaleUpTask != null) {
-                scaleUpTask.cancel(false);
-            }
+            stopScaleUp();
 
-            scaleUpTask = executor.schedule(runnable, delay, TimeUnit.SECONDS);
+            scaleUpTask = executor.schedule(runnable, delay, SECONDS);
+
+            scaleUpTaskDelay = delay;
         }
 
         /**
-         * Reschedules existing scale down task, if it is not started yet, or 
schedules new one, if the current task cannot be canceled.
+         * Reschedules existing scale down task, if it is not started yet and 
the delay of this task is not immediate,
+         * or schedules new one, if the current task cannot be canceled.
          *
          * @param delay Delay to start runnable in seconds.
          * @param runnable Custom logic to run.
          */
         synchronized void rescheduleScaleDown(long delay, Runnable runnable) {
-            if (scaleDownTask != null) {
-                scaleDownTask.cancel(false);
-            }
+            stopScaleDown();
 
-            scaleDownTask = executor.schedule(runnable, delay, 
TimeUnit.SECONDS);
+            scaleDownTask = executor.schedule(runnable, delay, SECONDS);
+
+            scaleDownTaskDelay = delay;
         }
 
         /**
-         * Cancels task for scale up and scale down.
+         * Cancels task for scale up and scale down. Used on {@link 
ZonesConfigurationListener#onDelete(ConfigurationNotificationEvent)}.
+         * Not need to check {@code scaleUpTaskDelay} and {@code 
scaleDownTaskDelay} because after timer stopping on zone delete event
+         * the data nodes value will be updated.
          */
         synchronized void stopTimers() {
-            stopScaleUp();
+            if (scaleUpTask != null) {
+                scaleUpTask.cancel(false);
+            }
 
-            stopScaleDown();
+            if (scaleDownTask != null) {
+                scaleDownTask.cancel(false);
+            }
         }
 
         /**
-         * Cancels task for scale up.
+         * Cancels task for scale up if it is not started yet and the delay of 
this task is not immediate.
          */
         synchronized void stopScaleUp() {
-            if (scaleUpTask != null) {
+            if (scaleUpTask != null && scaleUpTaskDelay > 0) {
                 scaleUpTask.cancel(false);
             }
         }
 
         /**
-         * Cancels task for scale down.
+         * Cancels task for scale down if it is not started yet and the delay 
of this task is not immediate.
          */
         synchronized void stopScaleDown() {
-            if (scaleDownTask != null) {
+            if (scaleDownTask != null && scaleDownTaskDelay > 0) {
                 scaleDownTask.cancel(false);
             }
         }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 2046629c2a..32d3571683 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -40,6 +40,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
@@ -47,6 +50,7 @@ import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
 import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
 import org.apache.ignite.internal.metastorage.dsl.Update;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.DistributionZoneNotFoundException;
@@ -532,4 +536,22 @@ public class DistributionZonesUtil {
                 .map(Node::nodeName)
                 .collect(toSet());
     }
+
+    /**
+     * Create an executor for the zone manager.
+     * Used a single thread executor to avoid concurrent executing several 
tasks for the same zone.
+     * ScheduledThreadPoolExecutor guarantee that tasks scheduled for exactly 
the same
+     * execution time are enabled in first-in-first-out (FIFO) order of 
submission.
+     * // TODO: IGNITE-19783 Need to use a striped executor.
+     *
+     * @param namedThreadFactory Named thread factory.
+     * @return Executor.
+     */
+    static ScheduledExecutorService 
createZoneManagerExecutor(NamedThreadFactory namedThreadFactory) {
+        return new ScheduledThreadPoolExecutor(
+                1,
+                namedThreadFactory,
+                new ThreadPoolExecutor.DiscardPolicy()
+        );
+    }
 }
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
index d6b47b81ea..53f853cad0 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
@@ -37,7 +37,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 
@@ -46,15 +45,10 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import org.apache.ignite.configuration.NamedConfigurationTree;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters.Builder;
 import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
-import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
-import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
-import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.server.If;
 import org.apache.ignite.network.NetworkAddress;
@@ -307,312 +301,6 @@ public class DistributionZoneManagerScaleUpTest extends 
BaseDistributionZoneMana
         assertZoneScaleDownChangeTriggerKey(null, ZONE_1_ID, keyValueStorage);
     }
 
-    @Test
-    void testTwoScaleUpTimersSecondTimerRunFirst() throws Exception {
-        preparePrerequisites();
-
-        NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
-                zonesConfiguration.distributionZones();
-
-        DistributionZoneView zoneView = zones.value().get(0);
-
-        CountDownLatch in1 = new CountDownLatch(1);
-        CountDownLatch in2 = new CountDownLatch(1);
-        CountDownLatch out1 = new CountDownLatch(1);
-        CountDownLatch out2 = new CountDownLatch(1);
-
-        distributionZoneManager.scheduleTimers(
-                zoneView,
-                Set.of(D),
-                Set.of(),
-                prerequisiteRevision + 1,
-                (zoneId, revision) -> {
-                    try {
-                        in1.await();
-                    } catch (InterruptedException e) {
-                        fail();
-                    }
-
-                    return testSaveDataNodesOnScaleUp(zoneId, 
revision).thenRun(out1::countDown);
-                },
-                (t1, t2) -> null
-        );
-
-        // Assert that first task was run and event about adding node "D" with 
revision {@code prerequisiteRevision + 1} was added
-        // to the topologyAugmentationMap of the zone.
-        assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, 
prerequisiteRevision + 1);
-
-        distributionZoneManager.scheduleTimers(
-                zoneView,
-                Set.of(E),
-                Set.of(),
-                prerequisiteRevision + 2,
-                (zoneId, revision) -> {
-                    try {
-                        in2.await();
-                    } catch (InterruptedException e) {
-                        fail();
-                    }
-
-                    return testSaveDataNodesOnScaleUp(zoneId, 
revision).thenRun(() -> {
-                        try {
-                            out2.await();
-                        } catch (InterruptedException e) {
-                            fail();
-                        }
-                    });
-                },
-                (t1, t2) -> null
-        );
-
-        // Assert that second task was run and event about adding node "E" 
with revision {@code prerequisiteRevision + 2} was added
-        // to the topologyAugmentationMap of the zone.
-        assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, 
prerequisiteRevision + 2);
-
-        //Second task is propagating data nodes first.
-        in2.countDown();
-
-        assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, 
keyValueStorage);
-
-        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E), 
keyValueStorage);
-
-        out2.countDown();
-
-        in1.countDown();
-
-        //Waiting for the first scheduler ends it work.
-        out1.countDown();
-
-        // Assert that nothing has been changed.
-        assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, 
keyValueStorage);
-
-        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E), 
keyValueStorage);
-    }
-
-    @Test
-    void testTwoScaleDownTimersSecondTimerRunFirst() throws Exception {
-        preparePrerequisites();
-
-        NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
-                zonesConfiguration.distributionZones();
-
-        DistributionZoneView zoneView = zones.value().get(0);
-
-        CountDownLatch in1 = new CountDownLatch(1);
-        CountDownLatch in2 = new CountDownLatch(1);
-        CountDownLatch out1 = new CountDownLatch(1);
-        CountDownLatch out2 = new CountDownLatch(1);
-
-        distributionZoneManager.scheduleTimers(
-                zoneView,
-                Set.of(),
-                Set.of(B),
-                prerequisiteRevision + 1,
-                (t1, t2) -> null,
-                (zoneId, revision) -> {
-                    try {
-                        in1.await();
-                    } catch (InterruptedException e) {
-                        fail();
-                    }
-
-                    return testSaveDataNodesOnScaleDown(zoneId, 
revision).thenRun(out1::countDown);
-                }
-        );
-
-        // Assert that first task was run and event about removing node "B" 
with revision {@code prerequisiteRevision + 1} was added
-        // to the topologyAugmentationMap of the zone.
-        assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, 
prerequisiteRevision + 1);
-
-        distributionZoneManager.scheduleTimers(
-                zoneView,
-                Set.of(),
-                Set.of(C),
-                prerequisiteRevision + 2,
-                (t1, t2) -> null,
-                (zoneId, revision) -> {
-                    try {
-                        in2.await();
-                    } catch (InterruptedException e) {
-                        fail();
-                    }
-
-                    return testSaveDataNodesOnScaleDown(zoneId, 
revision).thenRun(() -> {
-                        try {
-                            out2.await();
-                        } catch (InterruptedException e) {
-                            fail();
-                        }
-                    });
-                }
-        );
-
-        // Assert that second task was run and event about removing node "C" 
with revision {@code prerequisiteRevision + 2} was added
-        // to the topologyAugmentationMap of the zone.
-        assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, 
prerequisiteRevision + 2);
-
-        //Second task is propagating data nodes first.
-        in2.countDown();
-
-        assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2, 
ZONE_1_ID, keyValueStorage);
-
-        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A), 
keyValueStorage);
-
-        out2.countDown();
-
-        in1.countDown();
-
-        //Waiting for the first scheduler ends it work.
-        out1.countDown();
-
-        // Assert that nothing has been changed.
-        assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2, 
ZONE_1_ID, keyValueStorage);
-
-        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A), 
keyValueStorage);
-    }
-
-    @Test
-    void testTwoScaleUpTimersFirstTimerRunFirst() throws Exception {
-        preparePrerequisites();
-
-        NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
-                zonesConfiguration.distributionZones();
-
-        DistributionZoneView zoneView = zones.value().get(0);
-
-        CountDownLatch in1 = new CountDownLatch(1);
-        CountDownLatch in2 = new CountDownLatch(1);
-        CountDownLatch out1 = new CountDownLatch(1);
-
-        distributionZoneManager.scheduleTimers(
-                zoneView,
-                Set.of(D),
-                Set.of(),
-                prerequisiteRevision + 1,
-                (zoneId, revision) -> {
-                    in1.countDown();
-
-                    return testSaveDataNodesOnScaleUp(zoneId, 
revision).thenRun(() -> {
-                        try {
-                            out1.await();
-                        } catch (InterruptedException e) {
-                            fail();
-                        }
-                    });
-                },
-                (t1, t2) -> null
-        );
-
-        // Waiting for the first task to be run. We have to do that to be sure 
that watch events,
-        // which we try to emulate, are handled sequentially.
-        in1.await();
-
-        distributionZoneManager.scheduleTimers(
-                zoneView,
-                Set.of(E),
-                Set.of(),
-                prerequisiteRevision + 2,
-                (zoneId, revision) -> {
-                    try {
-                        in2.await();
-                    } catch (InterruptedException e) {
-                        fail();
-                    }
-
-                    return testSaveDataNodesOnScaleUp(zoneId, revision);
-                },
-                (t1, t2) -> null
-        );
-
-        // Assert that second task was run and event about adding node "E" 
with revision {@code prerequisiteRevision + 2} was added
-        // to the topologyAugmentationMap of the zone.
-        assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, 
prerequisiteRevision + 2);
-
-        assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 1, ZONE_1_ID, 
keyValueStorage);
-
-        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), 
keyValueStorage);
-
-        // Second task is run and we await that data nodes will be changed 
from ["A", "B", "C", "D"] to ["A", "B", "C", "D", "E"]
-        in2.countDown();
-
-        assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, 
keyValueStorage);
-
-        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E), 
keyValueStorage);
-
-        out1.countDown();
-    }
-
-    @Test
-    void testTwoScaleDownTimersFirstTimerRunFirst() throws Exception {
-        preparePrerequisites();
-
-        NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
-                zonesConfiguration.distributionZones();
-
-        DistributionZoneView zoneView = zones.value().get(0);
-
-        CountDownLatch in1 = new CountDownLatch(1);
-        CountDownLatch in2 = new CountDownLatch(1);
-        CountDownLatch out1 = new CountDownLatch(1);
-
-        distributionZoneManager.scheduleTimers(
-                zoneView,
-                Set.of(),
-                Set.of(B),
-                prerequisiteRevision + 1,
-                (t1, t2) -> null,
-                (zoneId, revision) -> {
-                    in1.countDown();
-
-                    return testSaveDataNodesOnScaleDown(zoneId, 
revision).thenRun(() -> {
-                        try {
-                            out1.await();
-                        } catch (InterruptedException e) {
-                            fail();
-                        }
-                    });
-                }
-        );
-
-        // Waiting for the first task to be run. We have to do that to be sure 
that watch events,
-        // which we try to emulate, are handled sequentially.
-        in1.await();
-
-        distributionZoneManager.scheduleTimers(
-                zoneView,
-                Set.of(),
-                Set.of(C),
-                prerequisiteRevision + 2,
-                (t1, t2) -> null,
-                (zoneId, revision) -> {
-                    try {
-                        in2.await();
-                    } catch (InterruptedException e) {
-                        fail();
-                    }
-
-                    return testSaveDataNodesOnScaleDown(zoneId, revision);
-                }
-        );
-
-        // Assert that second task was run and event about removing node "C" 
with revision {@code prerequisiteRevision + 2} was added
-        // to the topologyAugmentationMap of the zone.
-        assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, 
prerequisiteRevision + 2);
-
-        assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 1, 
ZONE_1_ID, keyValueStorage);
-
-        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, C), 
keyValueStorage);
-
-        // Second task is run and we await that data nodes will be changed 
from ["A", "C"] to ["A"]
-        in2.countDown();
-
-        assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2, 
ZONE_1_ID, keyValueStorage);
-
-        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A), 
keyValueStorage);
-
-        out1.countDown();
-    }
-
     @Test
     void testEmptyDataNodesOnStart() throws Exception {
         startDistributionZoneManager();
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java
index 2e14cc044e..0a6eb584e5 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java
@@ -17,21 +17,20 @@
 
 package org.apache.ignite.internal.distributionzones;
 
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.after;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
+import java.util.function.Supplier;
 import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -45,10 +44,8 @@ import org.junit.jupiter.api.Test;
 public class DistributionZonesSchedulersTest {
     private static final IgniteLogger LOG = 
Loggers.forClass(DistributionZonesSchedulersTest.class);
 
-    private static final ScheduledExecutorService executor = new 
ScheduledThreadPoolExecutor(
-            Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
-            new NamedThreadFactory("test-dst-zones-scheduler", LOG),
-            new ThreadPoolExecutor.DiscardPolicy()
+    private static ScheduledExecutorService executor = 
createZoneManagerExecutor(
+            new NamedThreadFactory("test-dst-zones-scheduler", LOG)
     );
 
     @AfterAll
@@ -83,27 +80,118 @@ public class DistributionZonesSchedulersTest {
     }
 
     @Test
-    void testScaleUpReScheduleNotStartedTask() {
+    void testScaleUpReScheduling() throws InterruptedException {
         ZoneState state = new DistributionZoneManager.ZoneState(executor);
 
-        testReScheduleNotStartedTask(state::rescheduleScaleUp);
+        testReScheduling(state::rescheduleScaleUp);
     }
 
     @Test
-    void testScaleDownReScheduleNotStartedTask() {
+    void testScaleDownReScheduling() throws InterruptedException {
         ZoneState state = new DistributionZoneManager.ZoneState(executor);
 
-        testReScheduleNotStartedTask(state::rescheduleScaleDown);
+        testReScheduling(state::rescheduleScaleDown);
     }
 
-    private static void testReScheduleNotStartedTask(BiConsumer<Long, 
Runnable> fn) {
-        Runnable runnable = mock(Runnable.class);
+    /**
+     * Tests that scaleUp/scaleDown tasks with a zero delay will not be 
canceled by other tasks.
+     * Tests that scaleUp/scaleDown tasks with a delay grater then zero will 
be canceled by other tasks.
+     */
+    private static void testReScheduling(BiConsumer<Long, Runnable> fn) throws 
InterruptedException {
+        AtomicInteger counter = new AtomicInteger();
 
-        fn.accept(1L, runnable);
+        CountDownLatch firstTaskLatch = new CountDownLatch(1);
 
-        fn.accept(0L, runnable);
+        CountDownLatch lastTaskLatch = new CountDownLatch(1);
 
-        verify(runnable, after(1200).times(1)).run();
+        fn.accept(0L, () -> {
+            try {
+                assertTrue(firstTaskLatch.await(3, TimeUnit.SECONDS));
+
+                counter.incrementAndGet();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        fn.accept(1L, counter::incrementAndGet);
+
+        fn.accept(0L, counter::incrementAndGet);
+
+        fn.accept(0L, counter::incrementAndGet);
+
+        fn.accept(1L, counter::incrementAndGet);
+
+        fn.accept(1L, counter::incrementAndGet);
+
+        fn.accept(0L, counter::incrementAndGet);
+
+        fn.accept(0L, () -> {
+            counter.incrementAndGet();
+
+            lastTaskLatch.countDown();
+        });
+
+        firstTaskLatch.countDown();
+
+        assertTrue(lastTaskLatch.await(3, TimeUnit.SECONDS));
+
+        assertEquals(5, counter.get());
+    }
+
+    @Test
+    void testScaleUpOrdering() throws InterruptedException {
+        ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+        testOrdering(state::rescheduleScaleUp);
+    }
+
+    @Test
+    void testScaleDownOrdering() throws InterruptedException {
+        ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+        testOrdering(state::rescheduleScaleDown);
+    }
+
+    private static void testOrdering(BiConsumer<Long, Runnable> fn) throws 
InterruptedException {
+        AtomicInteger counter = new AtomicInteger();
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        AtomicBoolean sequentialOrder = new AtomicBoolean(true);
+
+        fn.accept(0L, () -> {
+            try {
+                assertTrue(latch.await(3, TimeUnit.SECONDS));
+
+                counter.incrementAndGet();
+
+                if (counter.get() != 1) {
+                    sequentialOrder.set(false);
+                }
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        for (int i = 2; i < 11; i++) {
+            int j = i;
+
+            fn.accept(0L, () -> {
+                counter.incrementAndGet();
+
+                if (counter.get() != j) {
+                    sequentialOrder.set(false);
+                }
+            });
+        }
+
+        latch.countDown();
+
+        waitForCondition(() -> counter.get() == 10, 3000);
+        assertEquals(10, counter.get(), "Not all tasks were executed.");
+
+        assertTrue(sequentialOrder.get(), "The order of tasks execution is not 
sequential.");
     }
 
     @Test
@@ -131,11 +219,156 @@ public class DistributionZonesSchedulersTest {
 
         latch.await(1000, TimeUnit.MILLISECONDS);
 
-        fn.accept(0L, () -> {
-            flag.set(true);
-        });
+        fn.accept(0L, () -> flag.set(true));
 
         assertTrue(waitForCondition(() -> 0L == latch.getCount(), 1500));
         assertTrue(waitForCondition(flag::get, 1500));
     }
+
+    @Test
+    void testCancelScaleUpTaskOnStopScaleUp() {
+        ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+        testCancelTask(state::rescheduleScaleUp, state::stopScaleUp, () -> 
state.scaleUpTask().isCancelled());
+    }
+
+    @Test
+    void testCancelScaleDownTaskOnStopScaleDown() {
+        ZoneState state = new ZoneState(executor);
+
+        testCancelTask(state::rescheduleScaleDown, state::stopScaleDown, () -> 
state.scaleDownTask().isCancelled());
+    }
+
+    @Test
+    void testCancelScaleUpTasksOnStopTimers() {
+        ZoneState state = new ZoneState(executor);
+
+        testCancelTask(state::rescheduleScaleUp, state::stopTimers, () -> 
state.scaleUpTask().isCancelled());
+    }
+
+    @Test
+    void testCancelScaleDownTasksOnStopTimers() {
+        ZoneState state = new ZoneState(executor);
+
+        testCancelTask(state::rescheduleScaleDown, state::stopTimers, () -> 
state.scaleDownTask().isCancelled());
+    }
+
+    /**
+     * {@link ZoneState#stopScaleUp()}, {@link ZoneState#stopScaleDown()} and 
{@link ZoneState#stopTimers()} cancel task
+     * if it is not started and has a delay greater than zero.
+     */
+    private static void testCancelTask(
+            BiConsumer<Long, Runnable> fn,
+            Runnable stopTask,
+            Supplier<Boolean> isTaskCancelled
+    ) {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        fn.accept(0L, () -> {
+            try {
+                assertTrue(latch.await(3, TimeUnit.SECONDS));
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        fn.accept(1L, () -> {});
+
+        assertFalse(isTaskCancelled.get());
+
+        stopTask.run();
+
+        assertTrue(isTaskCancelled.get());
+
+        latch.countDown();
+    }
+
+    @Test
+    void testNotCancelScaleUpTaskOnStopScaleUp() {
+        ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+        testNotCancelTask(state::rescheduleScaleUp, state::stopScaleUp, () -> 
state.scaleUpTask().isCancelled());
+    }
+
+    @Test
+    void testNotCancelScaleDownTaskOnStopScaleDown() {
+        ZoneState state = new ZoneState(executor);
+
+        testNotCancelTask(state::rescheduleScaleDown, state::stopScaleDown, () 
-> state.scaleDownTask().isCancelled());
+
+    }
+
+    /**
+     * {@link ZoneState#stopScaleUp()} and {@link ZoneState#stopScaleDown()} 
doesn't cancel task
+     * if it is not started and has a delay equal to zero.
+     */
+    private static void testNotCancelTask(
+            BiConsumer<Long, Runnable> fn,
+            Runnable stopTask,
+            Supplier<Boolean> isTaskCancelled
+    ) {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        fn.accept(0L, () -> {
+            try {
+                assertTrue(latch.await(3, TimeUnit.SECONDS));
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        fn.accept(0L, () -> {});
+
+        assertFalse(isTaskCancelled.get());
+
+        stopTask.run();
+
+        assertFalse(isTaskCancelled.get());
+
+        latch.countDown();
+    }
+
+    /**
+     * {@link ZoneState#stopTimers()} cancel task if it is not started and has 
a delay equal to zero.
+     */
+    @Test
+    public void testCancelTasksOnStopTimersAndImmediateTimerValues() {
+        ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+        CountDownLatch scaleUpTaskLatch = new CountDownLatch(1);
+
+        state.rescheduleScaleUp(0L, () -> {
+            try {
+                assertTrue(scaleUpTaskLatch.await(3, TimeUnit.SECONDS));
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        state.rescheduleScaleUp(0L, () -> {});
+
+        assertFalse(state.scaleUpTask().isCancelled());
+
+        CountDownLatch scaleDownTaskLatch = new CountDownLatch(1);
+
+        state.rescheduleScaleDown(0L, () -> {
+            try {
+                assertTrue(scaleDownTaskLatch.await(3, TimeUnit.SECONDS));
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        state.rescheduleScaleDown(0L, () -> {});
+
+        assertFalse(state.scaleDownTask().isCancelled());
+
+        state.stopTimers();
+
+        assertTrue(state.scaleUpTask().isCancelled());
+        assertTrue(state.scaleDownTask().isCancelled());
+
+        scaleUpTaskLatch.countDown();
+        scaleDownTaskLatch.countDown();
+    }
 }


Reply via email to