This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-26043
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 0639e9a4dea07e814eb2d0c9df92019f137b4822
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Jul 29 09:45:44 2025 +0300

    IGNITE-26043 wip
---
 .../notifications/ConfigurationListener.java       | 17 ++++++++++++
 .../internal/lowwatermark/LowWatermarkImpl.java    | 31 +++++++++++++++-------
 .../lowwatermark/LowWatermarkImplTest.java         | 27 +++++++++++++++++--
 .../org/apache/ignite/internal/app/IgniteImpl.java |  1 -
 4 files changed, 64 insertions(+), 12 deletions(-)

diff --git 
a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
 
b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
index c955b81da30..5b85423e001 100644
--- 
a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
+++ 
b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
@@ -17,7 +17,11 @@
 
 package org.apache.ignite.configuration.notifications;
 
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 /**
  * Configuration property change listener.
@@ -33,4 +37,17 @@ public interface ConfigurationListener<VIEWT> {
      * @return Future that signifies the end of the listener execution.
      */
     CompletableFuture<?> onUpdate(ConfigurationNotificationEvent<VIEWT> ctx);
+
+    /** Creates an adapter for a given callback. */
+    static <T> ConfigurationListener<T> 
fromConsumer(Consumer<ConfigurationNotificationEvent<T>> callback) {
+        return ctx -> {
+            try {
+                callback.accept(ctx);
+            } catch (Throwable e) {
+                return failedFuture(e);
+            }
+
+            return nullCompletedFuture();
+        };
+    }
 }
diff --git 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
index 7e014414f31..084db3125ad 100644
--- 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
+++ 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.lowwatermark;
 
+import static 
org.apache.ignite.configuration.notifications.ConfigurationListener.fromConsumer;
 import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
@@ -36,7 +37,9 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import org.apache.ignite.internal.event.AbstractEventProducer;
@@ -127,6 +130,8 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
 
     private final AtomicReference<ScheduledUpdateLowWatermarkTask> 
lastScheduledUpdateLowWatermarkTask = new AtomicReference<>();
 
+    private final Lock scheduleUpdateLowWatermarkTaskLock = new 
ReentrantLock();
+
     /**
      * Constructor.
      *
@@ -162,6 +167,8 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
 
             messagingService.addMessageHandler(LowWatermarkMessageGroup.class, 
this::onReceiveNetworkMessage);
 
+            lowWatermarkConfig.updateIntervalMillis().listen(fromConsumer(ctx 
-> scheduleUpdates()));
+
             return nullCompletedFuture();
         });
     }
@@ -215,7 +222,9 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
     }
 
     private void scheduleUpdateLowWatermarkBusy() {
-        while (true) {
+        scheduleUpdateLowWatermarkTaskLock.lock();
+
+        try {
             ScheduledUpdateLowWatermarkTask lastTask = 
lastScheduledUpdateLowWatermarkTask.get();
             ScheduledUpdateLowWatermarkTask newTask = new 
ScheduledUpdateLowWatermarkTask(this, State.NEW);
 
@@ -223,28 +232,32 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
 
             switch (lastTaskState) {
                 case NEW:
-                    if (lastTask.tryCancel() && 
lastScheduledUpdateLowWatermarkTask.compareAndSet(lastTask, newTask)) {
-                        scheduleUpdateLowWatermarkTaskBusy(newTask);
+                    if (lastTask.tryCancel()) {
+                        boolean casResult = 
lastScheduledUpdateLowWatermarkTask.compareAndSet(lastTask, newTask);
+
+                        assert casResult : "It is forbidden to set a task in 
parallel";
 
-                        return;
+                        scheduleUpdateLowWatermarkTaskBusy(newTask);
                     }
 
                     break;
                 case IN_PROGRESS:
                 case CANCELLED:
                     // The new task will be rescheduled successfully.
-                    return;
+                    break;
                 case COMPLETED:
-                    if 
(lastScheduledUpdateLowWatermarkTask.compareAndSet(lastTask, newTask)) {
-                        scheduleUpdateLowWatermarkTaskBusy(newTask);
+                    boolean casResult = 
lastScheduledUpdateLowWatermarkTask.compareAndSet(lastTask, newTask);
 
-                        return;
-                    }
+                    assert casResult : "It is forbidden to set a task in 
parallel";
+
+                    scheduleUpdateLowWatermarkTaskBusy(newTask);
 
                     break;
                 default:
                     throw new AssertionError("Unknown state: " + 
lastTaskState);
             }
+        } finally {
+            scheduleUpdateLowWatermarkTaskLock.unlock();
         }
     }
 
diff --git 
a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
 
b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
index b0df267ea45..923a8f99930 100644
--- 
a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
+++ 
b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
@@ -41,6 +41,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atMost;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -341,12 +343,33 @@ public class LowWatermarkImplTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    void testParallelScheduleUpdates() {
-        // TODO: IGNITE-26043 продолжить
+    void testParallelScheduleUpdates() throws Exception {
+        assertThat(lowWatermarkConfig.updateIntervalMillis().update(300L), 
willCompleteSuccessfully());
+
+        assertThat(lowWatermark.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
         runRace(
+                () -> lowWatermark.scheduleUpdates(),
+                () -> lowWatermark.scheduleUpdates(),
                 () -> lowWatermark.scheduleUpdates(),
                 () -> lowWatermark.scheduleUpdates()
         );
+
+        Thread.sleep(1_000);
+
+        verify(lwmChangedListener, atLeast(2)).notify(any());
+        verify(lwmChangedListener, atMost(4)).notify(any());
+    }
+
+    @Test
+    void testScheduleUpdatesAfterUpdateIntervalInConfig() {
+        assertThat(lowWatermarkConfig.updateIntervalMillis().update(50_000L), 
willCompleteSuccessfully());
+
+        assertThat(lowWatermark.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+        assertThat(lowWatermarkConfig.updateIntervalMillis().update(100L), 
willCompleteSuccessfully());
+
+        verify(lwmChangedListener, timeout(1_000).atLeast(1)).notify(any());
     }
 
     private CompletableFuture<HybridTimestamp> listenUpdateLowWatermark() {
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 3625e8daa5b..f383fb7c249 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1601,7 +1601,6 @@ public class IgniteImpl implements Ignite {
                 .thenCompose(ignored -> 
systemViewManager.completeRegistration())
                 .thenRunAsync(() -> {
                     try {
-                        // TODO: IGNITE-26043 вот тут надо будет подумать 
вниматьлно!
                         // Enable watermark events.
                         lowWatermark.scheduleUpdates();
 

Reply via email to