This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-26043
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 1172aa78561ff4d16ee5b9c660702f76288a79a8
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Jul 29 08:30:21 2025 +0300

    IGNITE-26043 wip
---
 .../internal/lowwatermark/LowWatermarkImpl.java    | 57 ++++++++++++++++----
 .../ScheduledUpdateLowWatermarkTask.java           | 63 ++++++++++++++++++++++
 .../lowwatermark/LowWatermarkImplTest.java         | 17 +++++-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  1 +
 4 files changed, 125 insertions(+), 13 deletions(-)

diff --git 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
index 2a95a8b977d..7e014414f31 100644
--- 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
+++ 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.lowwatermark.ScheduledUpdateLowWatermarkTask.State;
 import 
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
 import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
 import 
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEventParameters;
@@ -124,6 +125,8 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
 
     private final Map<UUID, LowWatermarkLock> locks = new 
ConcurrentHashMap<>();
 
+    private final AtomicReference<ScheduledUpdateLowWatermarkTask> 
lastScheduledUpdateLowWatermarkTask = new AtomicReference<>();
+
     /**
      * Constructor.
      *
@@ -212,12 +215,44 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
     }
 
     private void scheduleUpdateLowWatermarkBusy() {
-        ScheduledFuture<?> previousScheduledFuture = 
this.lastScheduledTaskFuture.get();
+        while (true) {
+            ScheduledUpdateLowWatermarkTask lastTask = 
lastScheduledUpdateLowWatermarkTask.get();
+            ScheduledUpdateLowWatermarkTask newTask = new 
ScheduledUpdateLowWatermarkTask(this, State.NEW);
+
+            State lastTaskState = lastTask == null ? State.COMPLETED : 
lastTask.state();
+
+            switch (lastTaskState) {
+                case NEW:
+                    if (lastTask.tryCancel() && 
lastScheduledUpdateLowWatermarkTask.compareAndSet(lastTask, newTask)) {
+                        scheduleUpdateLowWatermarkTaskBusy(newTask);
+
+                        return;
+                    }
+
+                    break;
+                case IN_PROGRESS:
+                case CANCELLED:
+                    // The new task will be rescheduled successfully.
+                    return;
+                case COMPLETED:
+                    if 
(lastScheduledUpdateLowWatermarkTask.compareAndSet(lastTask, newTask)) {
+                        scheduleUpdateLowWatermarkTaskBusy(newTask);
+
+                        return;
+                    }
+
+                    break;
+                default:
+                    throw new AssertionError("Unknown state: " + 
lastTaskState);
+            }
+        }
+    }
 
-        assert previousScheduledFuture == null || 
previousScheduledFuture.isDone() : "previous scheduled task has not finished";
+    private void 
scheduleUpdateLowWatermarkTaskBusy(ScheduledUpdateLowWatermarkTask task) {
+        ScheduledFuture<?> previousScheduledFuture = 
this.lastScheduledTaskFuture.get();
 
         ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule(
-                () -> updateLowWatermark(createNewLowWatermarkCandidate()),
+                task,
                 lowWatermarkConfig.updateIntervalMillis().value(),
                 TimeUnit.MILLISECONDS
         );
@@ -270,7 +305,11 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
 
     @Override
     public void updateLowWatermark(HybridTimestamp newLowWatermark) {
-        inBusyLock(busyLock, () -> {
+        updateLowWatermarkAsync(newLowWatermark);
+    }
+
+    CompletableFuture<Void> updateLowWatermarkAsync(HybridTimestamp 
newLowWatermark) {
+        return inBusyLockAsync(busyLock, () -> {
             LowWatermarkCandidate newLowWatermarkCandidate = new 
LowWatermarkCandidate(newLowWatermark, new CompletableFuture<>());
             LowWatermarkCandidate oldLowWatermarkCandidate;
 
@@ -279,12 +318,12 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
 
                 // If another candidate contains a higher low watermark, then 
there is no need to update.
                 if 
(oldLowWatermarkCandidate.lowWatermark().compareTo(newLowWatermark) >= 0) {
-                    return;
+                    return nullCompletedFuture();
                 }
             } while 
(!lowWatermarkCandidate.compareAndSet(oldLowWatermarkCandidate, 
newLowWatermarkCandidate));
 
             // We will start the update as soon as the previous one finishes.
-            oldLowWatermarkCandidate.updateFuture()
+            return oldLowWatermarkCandidate.updateFuture()
                     .thenComposeAsync(unused -> 
updateAndNotify(newLowWatermark), scheduledThreadPool)
                     .whenComplete((unused, throwable) -> {
                         if (throwable != null) {
@@ -339,16 +378,12 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
                             .whenCompleteAsync((unused, throwable) -> {
                                 if (throwable != null) {
                                     if (!(hasCause(throwable, 
NodeStoppingException.class))) {
-                                        LOG.error("Failed to update low 
watermark, will schedule again: {}", throwable, newLowWatermark);
+                                        LOG.error("Failed to update low 
watermark: {}", throwable, newLowWatermark);
 
                                         failureManager.process(new 
FailureContext(CRITICAL_ERROR, throwable));
-
-                                        inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
                                     }
                                 } else {
                                     LOG.info("Successful low watermark update: 
{}", newLowWatermark);
-
-                                    inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
                                 }
                             }, scheduledThreadPool);
                 }
diff --git 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/ScheduledUpdateLowWatermarkTask.java
 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/ScheduledUpdateLowWatermarkTask.java
new file mode 100644
index 00000000000..3772e192052
--- /dev/null
+++ 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/ScheduledUpdateLowWatermarkTask.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.lowwatermark;
+
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfigurationSchema;
+
+/**
+ * Scheduled task to {@link LowWatermarkImpl#updateLowWatermark} at {@link 
LowWatermarkConfigurationSchema#updateIntervalMillis}.
+ */
+class ScheduledUpdateLowWatermarkTask implements Runnable {
+    private final LowWatermarkImpl lowWatermarkImpl;
+
+    private final AtomicReference<State> state;
+
+    ScheduledUpdateLowWatermarkTask(LowWatermarkImpl lowWatermarkImpl, State 
state) {
+        this.lowWatermarkImpl = lowWatermarkImpl;
+
+        this.state = new AtomicReference<>(state);
+    }
+
+    @Override
+    public void run() {
+        if (state.compareAndSet(State.NEW, State.IN_PROGRESS)) {
+            
lowWatermarkImpl.updateLowWatermarkAsync(lowWatermarkImpl.createNewLowWatermarkCandidate())
+                    .whenCompleteAsync((unused, throwable) -> {
+                        if (state.compareAndSet(State.IN_PROGRESS, 
State.COMPLETED) && !hasCause(throwable, NodeStoppingException.class)) {
+                            lowWatermarkImpl.scheduleUpdates();
+                        }
+                    });
+        }
+    }
+
+    boolean tryCancel() {
+        return state.compareAndSet(State.NEW, State.CANCELLED);
+    }
+
+    State state() {
+        return state.get();
+    }
+
+    enum State {
+        NEW, IN_PROGRESS, COMPLETED, CANCELLED
+    }
+}
diff --git 
a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
 
b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
index 3311f5c012d..b0df267ea45 100644
--- 
a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
+++ 
b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
@@ -21,6 +21,7 @@ import static 
java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.lowwatermark.LowWatermarkImpl.LOW_WATERMARK_VAULT_KEY;
 import static 
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -34,7 +35,6 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -210,12 +210,16 @@ public class LowWatermarkImplTest extends 
BaseIgniteAbstractTest {
 
         var onLwmChangedFinishFuture = new CompletableFuture<>();
 
+        var firstOnLwmChangedFuture = new CompletableFuture<>();
+
         try {
             assertThat(lowWatermarkConfig.updateIntervalMillis().update(100L), 
willSucceedFast());
 
             when(lwmChangedListener.notify(any())).then(invocation -> {
                 onLwmChangedLatch.countDown();
 
+                firstOnLwmChangedFuture.complete(null);
+
                 return onLwmChangedFinishFuture;
             });
 
@@ -223,7 +227,7 @@ public class LowWatermarkImplTest extends 
BaseIgniteAbstractTest {
             lowWatermark.scheduleUpdates();
 
             // Let's check that it hasn't been called more than once.
-            assertFalse(onLwmChangedLatch.await(1, TimeUnit.SECONDS));
+            assertThat(firstOnLwmChangedFuture, willCompleteSuccessfully());
 
             // Let's check that it was called only once.
             assertEquals(2, onLwmChangedLatch.getCount());
@@ -336,6 +340,15 @@ public class LowWatermarkImplTest extends 
BaseIgniteAbstractTest {
         verify(failureManager, never()).process(any());
     }
 
+    @Test
+    void testParallelScheduleUpdates() {
+        // TODO: IGNITE-26043 продолжить
+        runRace(
+                () -> lowWatermark.scheduleUpdates(),
+                () -> lowWatermark.scheduleUpdates()
+        );
+    }
+
     private CompletableFuture<HybridTimestamp> listenUpdateLowWatermark() {
         var future = new CompletableFuture<HybridTimestamp>();
 
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f383fb7c249..3625e8daa5b 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1601,6 +1601,7 @@ public class IgniteImpl implements Ignite {
                 .thenCompose(ignored -> 
systemViewManager.completeRegistration())
                 .thenRunAsync(() -> {
                     try {
+                        // TODO: IGNITE-26043 вот тут надо будет подумать 
вниматьлно!
                         // Enable watermark events.
                         lowWatermark.scheduleUpdates();
 

Reply via email to