This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26043 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 0639e9a4dea07e814eb2d0c9df92019f137b4822 Author: Kirill Tkalenko <[email protected]> AuthorDate: Tue Jul 29 09:45:44 2025 +0300 IGNITE-26043 wip --- .../notifications/ConfigurationListener.java | 17 ++++++++++++ .../internal/lowwatermark/LowWatermarkImpl.java | 31 +++++++++++++++------- .../lowwatermark/LowWatermarkImplTest.java | 27 +++++++++++++++++-- .../org/apache/ignite/internal/app/IgniteImpl.java | 1 - 4 files changed, 64 insertions(+), 12 deletions(-) diff --git a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java index c955b81da30..5b85423e001 100644 --- a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java +++ b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java @@ -17,7 +17,11 @@ package org.apache.ignite.configuration.notifications; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; /** * Configuration property change listener. @@ -33,4 +37,17 @@ public interface ConfigurationListener<VIEWT> { * @return Future that signifies the end of the listener execution. */ CompletableFuture<?> onUpdate(ConfigurationNotificationEvent<VIEWT> ctx); + + /** Creates an adapter for a given callback. */ + static <T> ConfigurationListener<T> fromConsumer(Consumer<ConfigurationNotificationEvent<T>> callback) { + return ctx -> { + try { + callback.accept(ctx); + } catch (Throwable e) { + return failedFuture(e); + } + + return nullCompletedFuture(); + }; + } } diff --git a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java index 7e014414f31..084db3125ad 100644 --- a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java +++ b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.lowwatermark; +import static org.apache.ignite.configuration.notifications.ConfigurationListener.fromConsumer; import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; @@ -36,7 +37,9 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import org.apache.ignite.internal.event.AbstractEventProducer; @@ -127,6 +130,8 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L private final AtomicReference<ScheduledUpdateLowWatermarkTask> lastScheduledUpdateLowWatermarkTask = new AtomicReference<>(); + private final Lock scheduleUpdateLowWatermarkTaskLock = new ReentrantLock(); + /** * Constructor. * @@ -162,6 +167,8 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L messagingService.addMessageHandler(LowWatermarkMessageGroup.class, this::onReceiveNetworkMessage); + lowWatermarkConfig.updateIntervalMillis().listen(fromConsumer(ctx -> scheduleUpdates())); + return nullCompletedFuture(); }); } @@ -215,7 +222,9 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L } private void scheduleUpdateLowWatermarkBusy() { - while (true) { + scheduleUpdateLowWatermarkTaskLock.lock(); + + try { ScheduledUpdateLowWatermarkTask lastTask = lastScheduledUpdateLowWatermarkTask.get(); ScheduledUpdateLowWatermarkTask newTask = new ScheduledUpdateLowWatermarkTask(this, State.NEW); @@ -223,28 +232,32 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L switch (lastTaskState) { case NEW: - if (lastTask.tryCancel() && lastScheduledUpdateLowWatermarkTask.compareAndSet(lastTask, newTask)) { - scheduleUpdateLowWatermarkTaskBusy(newTask); + if (lastTask.tryCancel()) { + boolean casResult = lastScheduledUpdateLowWatermarkTask.compareAndSet(lastTask, newTask); + + assert casResult : "It is forbidden to set a task in parallel"; - return; + scheduleUpdateLowWatermarkTaskBusy(newTask); } break; case IN_PROGRESS: case CANCELLED: // The new task will be rescheduled successfully. - return; + break; case COMPLETED: - if (lastScheduledUpdateLowWatermarkTask.compareAndSet(lastTask, newTask)) { - scheduleUpdateLowWatermarkTaskBusy(newTask); + boolean casResult = lastScheduledUpdateLowWatermarkTask.compareAndSet(lastTask, newTask); - return; - } + assert casResult : "It is forbidden to set a task in parallel"; + + scheduleUpdateLowWatermarkTaskBusy(newTask); break; default: throw new AssertionError("Unknown state: " + lastTaskState); } + } finally { + scheduleUpdateLowWatermarkTaskLock.unlock(); } } diff --git a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java index b0df267ea45..923a8f99930 100644 --- a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java +++ b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java @@ -41,6 +41,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -341,12 +343,33 @@ public class LowWatermarkImplTest extends BaseIgniteAbstractTest { } @Test - void testParallelScheduleUpdates() { - // TODO: IGNITE-26043 продолжить + void testParallelScheduleUpdates() throws Exception { + assertThat(lowWatermarkConfig.updateIntervalMillis().update(300L), willCompleteSuccessfully()); + + assertThat(lowWatermark.startAsync(new ComponentContext()), willCompleteSuccessfully()); + runRace( + () -> lowWatermark.scheduleUpdates(), + () -> lowWatermark.scheduleUpdates(), () -> lowWatermark.scheduleUpdates(), () -> lowWatermark.scheduleUpdates() ); + + Thread.sleep(1_000); + + verify(lwmChangedListener, atLeast(2)).notify(any()); + verify(lwmChangedListener, atMost(4)).notify(any()); + } + + @Test + void testScheduleUpdatesAfterUpdateIntervalInConfig() { + assertThat(lowWatermarkConfig.updateIntervalMillis().update(50_000L), willCompleteSuccessfully()); + + assertThat(lowWatermark.startAsync(new ComponentContext()), willCompleteSuccessfully()); + + assertThat(lowWatermarkConfig.updateIntervalMillis().update(100L), willCompleteSuccessfully()); + + verify(lwmChangedListener, timeout(1_000).atLeast(1)).notify(any()); } private CompletableFuture<HybridTimestamp> listenUpdateLowWatermark() { diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 3625e8daa5b..f383fb7c249 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -1601,7 +1601,6 @@ public class IgniteImpl implements Ignite { .thenCompose(ignored -> systemViewManager.completeRegistration()) .thenRunAsync(() -> { try { - // TODO: IGNITE-26043 вот тут надо будет подумать вниматьлно! // Enable watermark events. lowWatermark.scheduleUpdates();
