This is an automated email from the ASF dual-hosted git repository. sk0x50 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new ff7407fd9a IGNITE-19736 Do not cancel tasks in DistributionZoneManager#executor if they were created by immediate scaleUp/scaleDown events. Avoid concurrent executing several tasks for the same zone. (#2201) ff7407fd9a is described below commit ff7407fd9a930bcabbecb40d0f3067320054075a Author: Sergey Uttsel <utt...@gmail.com> AuthorDate: Tue Jun 27 11:30:15 2023 +0300 IGNITE-19736 Do not cancel tasks in DistributionZoneManager#executor if they were created by immediate scaleUp/scaleDown events. Avoid concurrent executing several tasks for the same zone. (#2201) --- .../distributionzones/DistributionZoneManager.java | 61 ++-- .../distributionzones/DistributionZonesUtil.java | 22 ++ .../DistributionZoneManagerScaleUpTest.java | 312 --------------------- .../DistributionZonesSchedulersTest.java | 275 ++++++++++++++++-- 4 files changed, 312 insertions(+), 358 deletions(-) diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java index 70f082753c..c2fb15a2af 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java @@ -21,8 +21,10 @@ import static java.util.Collections.emptySet; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.CompletableFuture.supplyAsync; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesAndUpdateTriggerKeys; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractChangeTriggerRevision; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractDataNodes; @@ -73,9 +75,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import org.apache.ignite.configuration.ConfigurationChangeException; @@ -282,10 +281,8 @@ public class DistributionZoneManager implements IgniteComponent { nodesAttributes = new ConcurrentHashMap<>(); - executor = new ScheduledThreadPoolExecutor( - Math.min(Runtime.getRuntime().availableProcessors() * 3, 20), - new NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG), - new ThreadPoolExecutor.DiscardPolicy() + executor = createZoneManagerExecutor( + new NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG) ); // It's safe to leak with partially initialised object here, because rebalanceEngine is only accessible through this or by @@ -361,7 +358,7 @@ public class DistributionZoneManager implements IgniteComponent { metaStorageManager.unregisterWatch(topologyWatchListener); metaStorageManager.unregisterWatch(dataNodesWatchListener); - shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS); + shutdownAndAwaitTermination(executor, 10, SECONDS); } /** @@ -1610,6 +1607,12 @@ public class DistributionZoneManager implements IgniteComponent { /** Schedule task for a scale down process. */ private ScheduledFuture<?> scaleDownTask; + /** The delay for the scale up task. */ + private long scaleUpTaskDelay; + + /** The delay for the scale down task. */ + private long scaleDownTaskDelay; + /** * Map that stores pairs revision -> {@link Augmentation} for a zone. With this map we can track which nodes * should be added or removed in the processes of scale up or scale down. Revision helps to track visibility of the events @@ -1644,56 +1647,64 @@ public class DistributionZoneManager implements IgniteComponent { } /** - * Reschedules existing scale up task, if it is not started yet, or schedules new one, if the current task cannot be canceled. + * Reschedules existing scale up task, if it is not started yet and the delay of this task is not immediate, + * or schedules new one, if the current task cannot be canceled. * * @param delay Delay to start runnable in seconds. * @param runnable Custom logic to run. */ synchronized void rescheduleScaleUp(long delay, Runnable runnable) { - if (scaleUpTask != null) { - scaleUpTask.cancel(false); - } + stopScaleUp(); - scaleUpTask = executor.schedule(runnable, delay, TimeUnit.SECONDS); + scaleUpTask = executor.schedule(runnable, delay, SECONDS); + + scaleUpTaskDelay = delay; } /** - * Reschedules existing scale down task, if it is not started yet, or schedules new one, if the current task cannot be canceled. + * Reschedules existing scale down task, if it is not started yet and the delay of this task is not immediate, + * or schedules new one, if the current task cannot be canceled. * * @param delay Delay to start runnable in seconds. * @param runnable Custom logic to run. */ synchronized void rescheduleScaleDown(long delay, Runnable runnable) { - if (scaleDownTask != null) { - scaleDownTask.cancel(false); - } + stopScaleDown(); - scaleDownTask = executor.schedule(runnable, delay, TimeUnit.SECONDS); + scaleDownTask = executor.schedule(runnable, delay, SECONDS); + + scaleDownTaskDelay = delay; } /** - * Cancels task for scale up and scale down. + * Cancels task for scale up and scale down. Used on {@link ZonesConfigurationListener#onDelete(ConfigurationNotificationEvent)}. + * Not need to check {@code scaleUpTaskDelay} and {@code scaleDownTaskDelay} because after timer stopping on zone delete event + * the data nodes value will be updated. */ synchronized void stopTimers() { - stopScaleUp(); + if (scaleUpTask != null) { + scaleUpTask.cancel(false); + } - stopScaleDown(); + if (scaleDownTask != null) { + scaleDownTask.cancel(false); + } } /** - * Cancels task for scale up. + * Cancels task for scale up if it is not started yet and the delay of this task is not immediate. */ synchronized void stopScaleUp() { - if (scaleUpTask != null) { + if (scaleUpTask != null && scaleUpTaskDelay > 0) { scaleUpTask.cancel(false); } } /** - * Cancels task for scale down. + * Cancels task for scale down if it is not started yet and the delay of this task is not immediate. */ synchronized void stopScaleDown() { - if (scaleDownTask != null) { + if (scaleDownTask != null && scaleDownTaskDelay > 0) { scaleDownTask.cancel(false); } } diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java index 2046629c2a..32d3571683 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java @@ -40,6 +40,9 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration; import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration; @@ -47,6 +50,7 @@ import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.dsl.CompoundCondition; import org.apache.ignite.internal.metastorage.dsl.SimpleCondition; import org.apache.ignite.internal.metastorage.dsl.Update; +import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.lang.ByteArray; import org.apache.ignite.lang.DistributionZoneNotFoundException; @@ -532,4 +536,22 @@ public class DistributionZonesUtil { .map(Node::nodeName) .collect(toSet()); } + + /** + * Create an executor for the zone manager. + * Used a single thread executor to avoid concurrent executing several tasks for the same zone. + * ScheduledThreadPoolExecutor guarantee that tasks scheduled for exactly the same + * execution time are enabled in first-in-first-out (FIFO) order of submission. + * // TODO: IGNITE-19783 Need to use a striped executor. + * + * @param namedThreadFactory Named thread factory. + * @return Executor. + */ + static ScheduledExecutorService createZoneManagerExecutor(NamedThreadFactory namedThreadFactory) { + return new ScheduledThreadPoolExecutor( + 1, + namedThreadFactory, + new ThreadPoolExecutor.DiscardPolicy() + ); + } } diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java index d6b47b81ea..53f853cad0 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java @@ -37,7 +37,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -46,15 +45,10 @@ import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledThreadPoolExecutor; -import org.apache.ignite.configuration.NamedConfigurationTree; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters.Builder; import org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState; -import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange; -import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration; -import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.metastorage.server.If; import org.apache.ignite.network.NetworkAddress; @@ -307,312 +301,6 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana assertZoneScaleDownChangeTriggerKey(null, ZONE_1_ID, keyValueStorage); } - @Test - void testTwoScaleUpTimersSecondTimerRunFirst() throws Exception { - preparePrerequisites(); - - NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> zones = - zonesConfiguration.distributionZones(); - - DistributionZoneView zoneView = zones.value().get(0); - - CountDownLatch in1 = new CountDownLatch(1); - CountDownLatch in2 = new CountDownLatch(1); - CountDownLatch out1 = new CountDownLatch(1); - CountDownLatch out2 = new CountDownLatch(1); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(D), - Set.of(), - prerequisiteRevision + 1, - (zoneId, revision) -> { - try { - in1.await(); - } catch (InterruptedException e) { - fail(); - } - - return testSaveDataNodesOnScaleUp(zoneId, revision).thenRun(out1::countDown); - }, - (t1, t2) -> null - ); - - // Assert that first task was run and event about adding node "D" with revision {@code prerequisiteRevision + 1} was added - // to the topologyAugmentationMap of the zone. - assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, prerequisiteRevision + 1); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(E), - Set.of(), - prerequisiteRevision + 2, - (zoneId, revision) -> { - try { - in2.await(); - } catch (InterruptedException e) { - fail(); - } - - return testSaveDataNodesOnScaleUp(zoneId, revision).thenRun(() -> { - try { - out2.await(); - } catch (InterruptedException e) { - fail(); - } - }); - }, - (t1, t2) -> null - ); - - // Assert that second task was run and event about adding node "E" with revision {@code prerequisiteRevision + 2} was added - // to the topologyAugmentationMap of the zone. - assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, prerequisiteRevision + 2); - - //Second task is propagating data nodes first. - in2.countDown(); - - assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E), keyValueStorage); - - out2.countDown(); - - in1.countDown(); - - //Waiting for the first scheduler ends it work. - out1.countDown(); - - // Assert that nothing has been changed. - assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E), keyValueStorage); - } - - @Test - void testTwoScaleDownTimersSecondTimerRunFirst() throws Exception { - preparePrerequisites(); - - NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> zones = - zonesConfiguration.distributionZones(); - - DistributionZoneView zoneView = zones.value().get(0); - - CountDownLatch in1 = new CountDownLatch(1); - CountDownLatch in2 = new CountDownLatch(1); - CountDownLatch out1 = new CountDownLatch(1); - CountDownLatch out2 = new CountDownLatch(1); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(), - Set.of(B), - prerequisiteRevision + 1, - (t1, t2) -> null, - (zoneId, revision) -> { - try { - in1.await(); - } catch (InterruptedException e) { - fail(); - } - - return testSaveDataNodesOnScaleDown(zoneId, revision).thenRun(out1::countDown); - } - ); - - // Assert that first task was run and event about removing node "B" with revision {@code prerequisiteRevision + 1} was added - // to the topologyAugmentationMap of the zone. - assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, prerequisiteRevision + 1); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(), - Set.of(C), - prerequisiteRevision + 2, - (t1, t2) -> null, - (zoneId, revision) -> { - try { - in2.await(); - } catch (InterruptedException e) { - fail(); - } - - return testSaveDataNodesOnScaleDown(zoneId, revision).thenRun(() -> { - try { - out2.await(); - } catch (InterruptedException e) { - fail(); - } - }); - } - ); - - // Assert that second task was run and event about removing node "C" with revision {@code prerequisiteRevision + 2} was added - // to the topologyAugmentationMap of the zone. - assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, prerequisiteRevision + 2); - - //Second task is propagating data nodes first. - in2.countDown(); - - assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A), keyValueStorage); - - out2.countDown(); - - in1.countDown(); - - //Waiting for the first scheduler ends it work. - out1.countDown(); - - // Assert that nothing has been changed. - assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A), keyValueStorage); - } - - @Test - void testTwoScaleUpTimersFirstTimerRunFirst() throws Exception { - preparePrerequisites(); - - NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> zones = - zonesConfiguration.distributionZones(); - - DistributionZoneView zoneView = zones.value().get(0); - - CountDownLatch in1 = new CountDownLatch(1); - CountDownLatch in2 = new CountDownLatch(1); - CountDownLatch out1 = new CountDownLatch(1); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(D), - Set.of(), - prerequisiteRevision + 1, - (zoneId, revision) -> { - in1.countDown(); - - return testSaveDataNodesOnScaleUp(zoneId, revision).thenRun(() -> { - try { - out1.await(); - } catch (InterruptedException e) { - fail(); - } - }); - }, - (t1, t2) -> null - ); - - // Waiting for the first task to be run. We have to do that to be sure that watch events, - // which we try to emulate, are handled sequentially. - in1.await(); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(E), - Set.of(), - prerequisiteRevision + 2, - (zoneId, revision) -> { - try { - in2.await(); - } catch (InterruptedException e) { - fail(); - } - - return testSaveDataNodesOnScaleUp(zoneId, revision); - }, - (t1, t2) -> null - ); - - // Assert that second task was run and event about adding node "E" with revision {@code prerequisiteRevision + 2} was added - // to the topologyAugmentationMap of the zone. - assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, prerequisiteRevision + 2); - - assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 1, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage); - - // Second task is run and we await that data nodes will be changed from ["A", "B", "C", "D"] to ["A", "B", "C", "D", "E"] - in2.countDown(); - - assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E), keyValueStorage); - - out1.countDown(); - } - - @Test - void testTwoScaleDownTimersFirstTimerRunFirst() throws Exception { - preparePrerequisites(); - - NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> zones = - zonesConfiguration.distributionZones(); - - DistributionZoneView zoneView = zones.value().get(0); - - CountDownLatch in1 = new CountDownLatch(1); - CountDownLatch in2 = new CountDownLatch(1); - CountDownLatch out1 = new CountDownLatch(1); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(), - Set.of(B), - prerequisiteRevision + 1, - (t1, t2) -> null, - (zoneId, revision) -> { - in1.countDown(); - - return testSaveDataNodesOnScaleDown(zoneId, revision).thenRun(() -> { - try { - out1.await(); - } catch (InterruptedException e) { - fail(); - } - }); - } - ); - - // Waiting for the first task to be run. We have to do that to be sure that watch events, - // which we try to emulate, are handled sequentially. - in1.await(); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(), - Set.of(C), - prerequisiteRevision + 2, - (t1, t2) -> null, - (zoneId, revision) -> { - try { - in2.await(); - } catch (InterruptedException e) { - fail(); - } - - return testSaveDataNodesOnScaleDown(zoneId, revision); - } - ); - - // Assert that second task was run and event about removing node "C" with revision {@code prerequisiteRevision + 2} was added - // to the topologyAugmentationMap of the zone. - assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, prerequisiteRevision + 2); - - assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 1, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, C), keyValueStorage); - - // Second task is run and we await that data nodes will be changed from ["A", "C"] to ["A"] - in2.countDown(); - - assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A), keyValueStorage); - - out1.countDown(); - } - @Test void testEmptyDataNodesOnStart() throws Exception { startDistributionZoneManager(); diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java index 2e14cc044e..0a6eb584e5 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java @@ -17,21 +17,20 @@ package org.apache.ignite.internal.distributionzones; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.after; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; +import java.util.function.Supplier; import org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -45,10 +44,8 @@ import org.junit.jupiter.api.Test; public class DistributionZonesSchedulersTest { private static final IgniteLogger LOG = Loggers.forClass(DistributionZonesSchedulersTest.class); - private static final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor( - Math.min(Runtime.getRuntime().availableProcessors() * 3, 20), - new NamedThreadFactory("test-dst-zones-scheduler", LOG), - new ThreadPoolExecutor.DiscardPolicy() + private static ScheduledExecutorService executor = createZoneManagerExecutor( + new NamedThreadFactory("test-dst-zones-scheduler", LOG) ); @AfterAll @@ -83,27 +80,118 @@ public class DistributionZonesSchedulersTest { } @Test - void testScaleUpReScheduleNotStartedTask() { + void testScaleUpReScheduling() throws InterruptedException { ZoneState state = new DistributionZoneManager.ZoneState(executor); - testReScheduleNotStartedTask(state::rescheduleScaleUp); + testReScheduling(state::rescheduleScaleUp); } @Test - void testScaleDownReScheduleNotStartedTask() { + void testScaleDownReScheduling() throws InterruptedException { ZoneState state = new DistributionZoneManager.ZoneState(executor); - testReScheduleNotStartedTask(state::rescheduleScaleDown); + testReScheduling(state::rescheduleScaleDown); } - private static void testReScheduleNotStartedTask(BiConsumer<Long, Runnable> fn) { - Runnable runnable = mock(Runnable.class); + /** + * Tests that scaleUp/scaleDown tasks with a zero delay will not be canceled by other tasks. + * Tests that scaleUp/scaleDown tasks with a delay grater then zero will be canceled by other tasks. + */ + private static void testReScheduling(BiConsumer<Long, Runnable> fn) throws InterruptedException { + AtomicInteger counter = new AtomicInteger(); - fn.accept(1L, runnable); + CountDownLatch firstTaskLatch = new CountDownLatch(1); - fn.accept(0L, runnable); + CountDownLatch lastTaskLatch = new CountDownLatch(1); - verify(runnable, after(1200).times(1)).run(); + fn.accept(0L, () -> { + try { + assertTrue(firstTaskLatch.await(3, TimeUnit.SECONDS)); + + counter.incrementAndGet(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + fn.accept(1L, counter::incrementAndGet); + + fn.accept(0L, counter::incrementAndGet); + + fn.accept(0L, counter::incrementAndGet); + + fn.accept(1L, counter::incrementAndGet); + + fn.accept(1L, counter::incrementAndGet); + + fn.accept(0L, counter::incrementAndGet); + + fn.accept(0L, () -> { + counter.incrementAndGet(); + + lastTaskLatch.countDown(); + }); + + firstTaskLatch.countDown(); + + assertTrue(lastTaskLatch.await(3, TimeUnit.SECONDS)); + + assertEquals(5, counter.get()); + } + + @Test + void testScaleUpOrdering() throws InterruptedException { + ZoneState state = new DistributionZoneManager.ZoneState(executor); + + testOrdering(state::rescheduleScaleUp); + } + + @Test + void testScaleDownOrdering() throws InterruptedException { + ZoneState state = new DistributionZoneManager.ZoneState(executor); + + testOrdering(state::rescheduleScaleDown); + } + + private static void testOrdering(BiConsumer<Long, Runnable> fn) throws InterruptedException { + AtomicInteger counter = new AtomicInteger(); + + CountDownLatch latch = new CountDownLatch(1); + + AtomicBoolean sequentialOrder = new AtomicBoolean(true); + + fn.accept(0L, () -> { + try { + assertTrue(latch.await(3, TimeUnit.SECONDS)); + + counter.incrementAndGet(); + + if (counter.get() != 1) { + sequentialOrder.set(false); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + for (int i = 2; i < 11; i++) { + int j = i; + + fn.accept(0L, () -> { + counter.incrementAndGet(); + + if (counter.get() != j) { + sequentialOrder.set(false); + } + }); + } + + latch.countDown(); + + waitForCondition(() -> counter.get() == 10, 3000); + assertEquals(10, counter.get(), "Not all tasks were executed."); + + assertTrue(sequentialOrder.get(), "The order of tasks execution is not sequential."); } @Test @@ -131,11 +219,156 @@ public class DistributionZonesSchedulersTest { latch.await(1000, TimeUnit.MILLISECONDS); - fn.accept(0L, () -> { - flag.set(true); - }); + fn.accept(0L, () -> flag.set(true)); assertTrue(waitForCondition(() -> 0L == latch.getCount(), 1500)); assertTrue(waitForCondition(flag::get, 1500)); } + + @Test + void testCancelScaleUpTaskOnStopScaleUp() { + ZoneState state = new DistributionZoneManager.ZoneState(executor); + + testCancelTask(state::rescheduleScaleUp, state::stopScaleUp, () -> state.scaleUpTask().isCancelled()); + } + + @Test + void testCancelScaleDownTaskOnStopScaleDown() { + ZoneState state = new ZoneState(executor); + + testCancelTask(state::rescheduleScaleDown, state::stopScaleDown, () -> state.scaleDownTask().isCancelled()); + } + + @Test + void testCancelScaleUpTasksOnStopTimers() { + ZoneState state = new ZoneState(executor); + + testCancelTask(state::rescheduleScaleUp, state::stopTimers, () -> state.scaleUpTask().isCancelled()); + } + + @Test + void testCancelScaleDownTasksOnStopTimers() { + ZoneState state = new ZoneState(executor); + + testCancelTask(state::rescheduleScaleDown, state::stopTimers, () -> state.scaleDownTask().isCancelled()); + } + + /** + * {@link ZoneState#stopScaleUp()}, {@link ZoneState#stopScaleDown()} and {@link ZoneState#stopTimers()} cancel task + * if it is not started and has a delay greater than zero. + */ + private static void testCancelTask( + BiConsumer<Long, Runnable> fn, + Runnable stopTask, + Supplier<Boolean> isTaskCancelled + ) { + CountDownLatch latch = new CountDownLatch(1); + + fn.accept(0L, () -> { + try { + assertTrue(latch.await(3, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + fn.accept(1L, () -> {}); + + assertFalse(isTaskCancelled.get()); + + stopTask.run(); + + assertTrue(isTaskCancelled.get()); + + latch.countDown(); + } + + @Test + void testNotCancelScaleUpTaskOnStopScaleUp() { + ZoneState state = new DistributionZoneManager.ZoneState(executor); + + testNotCancelTask(state::rescheduleScaleUp, state::stopScaleUp, () -> state.scaleUpTask().isCancelled()); + } + + @Test + void testNotCancelScaleDownTaskOnStopScaleDown() { + ZoneState state = new ZoneState(executor); + + testNotCancelTask(state::rescheduleScaleDown, state::stopScaleDown, () -> state.scaleDownTask().isCancelled()); + + } + + /** + * {@link ZoneState#stopScaleUp()} and {@link ZoneState#stopScaleDown()} doesn't cancel task + * if it is not started and has a delay equal to zero. + */ + private static void testNotCancelTask( + BiConsumer<Long, Runnable> fn, + Runnable stopTask, + Supplier<Boolean> isTaskCancelled + ) { + CountDownLatch latch = new CountDownLatch(1); + + fn.accept(0L, () -> { + try { + assertTrue(latch.await(3, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + fn.accept(0L, () -> {}); + + assertFalse(isTaskCancelled.get()); + + stopTask.run(); + + assertFalse(isTaskCancelled.get()); + + latch.countDown(); + } + + /** + * {@link ZoneState#stopTimers()} cancel task if it is not started and has a delay equal to zero. + */ + @Test + public void testCancelTasksOnStopTimersAndImmediateTimerValues() { + ZoneState state = new DistributionZoneManager.ZoneState(executor); + + CountDownLatch scaleUpTaskLatch = new CountDownLatch(1); + + state.rescheduleScaleUp(0L, () -> { + try { + assertTrue(scaleUpTaskLatch.await(3, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + state.rescheduleScaleUp(0L, () -> {}); + + assertFalse(state.scaleUpTask().isCancelled()); + + CountDownLatch scaleDownTaskLatch = new CountDownLatch(1); + + state.rescheduleScaleDown(0L, () -> { + try { + assertTrue(scaleDownTaskLatch.await(3, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + state.rescheduleScaleDown(0L, () -> {}); + + assertFalse(state.scaleDownTask().isCancelled()); + + state.stopTimers(); + + assertTrue(state.scaleUpTask().isCancelled()); + assertTrue(state.scaleDownTask().isCancelled()); + + scaleUpTaskLatch.countDown(); + scaleDownTaskLatch.countDown(); + } }