This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26043 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 1172aa78561ff4d16ee5b9c660702f76288a79a8 Author: Kirill Tkalenko <[email protected]> AuthorDate: Tue Jul 29 08:30:21 2025 +0300 IGNITE-26043 wip --- .../internal/lowwatermark/LowWatermarkImpl.java | 57 ++++++++++++++++---- .../ScheduledUpdateLowWatermarkTask.java | 63 ++++++++++++++++++++++ .../lowwatermark/LowWatermarkImplTest.java | 17 +++++- .../org/apache/ignite/internal/app/IgniteImpl.java | 1 + 4 files changed, 125 insertions(+), 13 deletions(-) diff --git a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java index 2a95a8b977d..7e014414f31 100644 --- a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java +++ b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.lowwatermark.ScheduledUpdateLowWatermarkTask.State; import org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters; import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent; import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEventParameters; @@ -124,6 +125,8 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L private final Map<UUID, LowWatermarkLock> locks = new ConcurrentHashMap<>(); + private final AtomicReference<ScheduledUpdateLowWatermarkTask> lastScheduledUpdateLowWatermarkTask = new AtomicReference<>(); + /** * Constructor. * @@ -212,12 +215,44 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L } private void scheduleUpdateLowWatermarkBusy() { - ScheduledFuture<?> previousScheduledFuture = this.lastScheduledTaskFuture.get(); + while (true) { + ScheduledUpdateLowWatermarkTask lastTask = lastScheduledUpdateLowWatermarkTask.get(); + ScheduledUpdateLowWatermarkTask newTask = new ScheduledUpdateLowWatermarkTask(this, State.NEW); + + State lastTaskState = lastTask == null ? State.COMPLETED : lastTask.state(); + + switch (lastTaskState) { + case NEW: + if (lastTask.tryCancel() && lastScheduledUpdateLowWatermarkTask.compareAndSet(lastTask, newTask)) { + scheduleUpdateLowWatermarkTaskBusy(newTask); + + return; + } + + break; + case IN_PROGRESS: + case CANCELLED: + // The new task will be rescheduled successfully. + return; + case COMPLETED: + if (lastScheduledUpdateLowWatermarkTask.compareAndSet(lastTask, newTask)) { + scheduleUpdateLowWatermarkTaskBusy(newTask); + + return; + } + + break; + default: + throw new AssertionError("Unknown state: " + lastTaskState); + } + } + } - assert previousScheduledFuture == null || previousScheduledFuture.isDone() : "previous scheduled task has not finished"; + private void scheduleUpdateLowWatermarkTaskBusy(ScheduledUpdateLowWatermarkTask task) { + ScheduledFuture<?> previousScheduledFuture = this.lastScheduledTaskFuture.get(); ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule( - () -> updateLowWatermark(createNewLowWatermarkCandidate()), + task, lowWatermarkConfig.updateIntervalMillis().value(), TimeUnit.MILLISECONDS ); @@ -270,7 +305,11 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L @Override public void updateLowWatermark(HybridTimestamp newLowWatermark) { - inBusyLock(busyLock, () -> { + updateLowWatermarkAsync(newLowWatermark); + } + + CompletableFuture<Void> updateLowWatermarkAsync(HybridTimestamp newLowWatermark) { + return inBusyLockAsync(busyLock, () -> { LowWatermarkCandidate newLowWatermarkCandidate = new LowWatermarkCandidate(newLowWatermark, new CompletableFuture<>()); LowWatermarkCandidate oldLowWatermarkCandidate; @@ -279,12 +318,12 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L // If another candidate contains a higher low watermark, then there is no need to update. if (oldLowWatermarkCandidate.lowWatermark().compareTo(newLowWatermark) >= 0) { - return; + return nullCompletedFuture(); } } while (!lowWatermarkCandidate.compareAndSet(oldLowWatermarkCandidate, newLowWatermarkCandidate)); // We will start the update as soon as the previous one finishes. - oldLowWatermarkCandidate.updateFuture() + return oldLowWatermarkCandidate.updateFuture() .thenComposeAsync(unused -> updateAndNotify(newLowWatermark), scheduledThreadPool) .whenComplete((unused, throwable) -> { if (throwable != null) { @@ -339,16 +378,12 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L .whenCompleteAsync((unused, throwable) -> { if (throwable != null) { if (!(hasCause(throwable, NodeStoppingException.class))) { - LOG.error("Failed to update low watermark, will schedule again: {}", throwable, newLowWatermark); + LOG.error("Failed to update low watermark: {}", throwable, newLowWatermark); failureManager.process(new FailureContext(CRITICAL_ERROR, throwable)); - - inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy); } } else { LOG.info("Successful low watermark update: {}", newLowWatermark); - - inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy); } }, scheduledThreadPool); } diff --git a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/ScheduledUpdateLowWatermarkTask.java b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/ScheduledUpdateLowWatermarkTask.java new file mode 100644 index 00000000000..3772e192052 --- /dev/null +++ b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/ScheduledUpdateLowWatermarkTask.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.lowwatermark; + +import static org.apache.ignite.internal.util.ExceptionUtils.hasCause; + +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.schema.configuration.LowWatermarkConfigurationSchema; + +/** + * Scheduled task to {@link LowWatermarkImpl#updateLowWatermark} at {@link LowWatermarkConfigurationSchema#updateIntervalMillis}. + */ +class ScheduledUpdateLowWatermarkTask implements Runnable { + private final LowWatermarkImpl lowWatermarkImpl; + + private final AtomicReference<State> state; + + ScheduledUpdateLowWatermarkTask(LowWatermarkImpl lowWatermarkImpl, State state) { + this.lowWatermarkImpl = lowWatermarkImpl; + + this.state = new AtomicReference<>(state); + } + + @Override + public void run() { + if (state.compareAndSet(State.NEW, State.IN_PROGRESS)) { + lowWatermarkImpl.updateLowWatermarkAsync(lowWatermarkImpl.createNewLowWatermarkCandidate()) + .whenCompleteAsync((unused, throwable) -> { + if (state.compareAndSet(State.IN_PROGRESS, State.COMPLETED) && !hasCause(throwable, NodeStoppingException.class)) { + lowWatermarkImpl.scheduleUpdates(); + } + }); + } + } + + boolean tryCancel() { + return state.compareAndSet(State.NEW, State.CANCELLED); + } + + State state() { + return state.get(); + } + + enum State { + NEW, IN_PROGRESS, COMPLETED, CANCELLED + } +} diff --git a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java index 3311f5c012d..b0df267ea45 100644 --- a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java +++ b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java @@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.lowwatermark.LowWatermarkImpl.LOW_WATERMARK_VAULT_KEY; import static org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; @@ -34,7 +35,6 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -210,12 +210,16 @@ public class LowWatermarkImplTest extends BaseIgniteAbstractTest { var onLwmChangedFinishFuture = new CompletableFuture<>(); + var firstOnLwmChangedFuture = new CompletableFuture<>(); + try { assertThat(lowWatermarkConfig.updateIntervalMillis().update(100L), willSucceedFast()); when(lwmChangedListener.notify(any())).then(invocation -> { onLwmChangedLatch.countDown(); + firstOnLwmChangedFuture.complete(null); + return onLwmChangedFinishFuture; }); @@ -223,7 +227,7 @@ public class LowWatermarkImplTest extends BaseIgniteAbstractTest { lowWatermark.scheduleUpdates(); // Let's check that it hasn't been called more than once. - assertFalse(onLwmChangedLatch.await(1, TimeUnit.SECONDS)); + assertThat(firstOnLwmChangedFuture, willCompleteSuccessfully()); // Let's check that it was called only once. assertEquals(2, onLwmChangedLatch.getCount()); @@ -336,6 +340,15 @@ public class LowWatermarkImplTest extends BaseIgniteAbstractTest { verify(failureManager, never()).process(any()); } + @Test + void testParallelScheduleUpdates() { + // TODO: IGNITE-26043 продолжить + runRace( + () -> lowWatermark.scheduleUpdates(), + () -> lowWatermark.scheduleUpdates() + ); + } + private CompletableFuture<HybridTimestamp> listenUpdateLowWatermark() { var future = new CompletableFuture<HybridTimestamp>(); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index f383fb7c249..3625e8daa5b 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -1601,6 +1601,7 @@ public class IgniteImpl implements Ignite { .thenCompose(ignored -> systemViewManager.completeRegistration()) .thenRunAsync(() -> { try { + // TODO: IGNITE-26043 вот тут надо будет подумать вниматьлно! // Enable watermark events. lowWatermark.scheduleUpdates();
