This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-21585
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 6e2ea2372770e52e8c10801140f89809eb7fbd99
Author: amashenkov <[email protected]>
AuthorDate: Tue Mar 5 15:10:38 2024 +0300

    LowWatermark refactored, interface extracted, added dummy implementation.
---
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  12 +-
 .../rebalance/ItRebalanceDistributedTest.java      |   6 +-
 .../internal/table/distributed/LowWatermark.java   | 261 +--------------------
 .../distributed/LowWatermarkChangedListener.java   |   2 +-
 .../{LowWatermark.java => LowWatermarkImpl.java}   |  12 +-
 .../table/distributed/LowWatermarkTest.java        |   8 +-
 .../table/distributed/TableManagerTest.java        |   9 +-
 .../ignite/internal/table/TestLowWatermark.java    |  69 ++++++
 9 files changed, 107 insertions(+), 276 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index f759902e11..c22f720f00 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -161,7 +161,7 @@ import 
org.apache.ignite.internal.storage.DataStorageModules;
 import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TableViewInternal;
-import org.apache.ignite.internal.table.distributed.LowWatermark;
+import org.apache.ignite.internal.table.distributed.LowWatermarkImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -525,7 +525,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         var sqlRef = new AtomicReference<IgniteSqlImpl>();
 
-        LowWatermark lowWatermark = new LowWatermark(name, 
gcConfig.lowWatermark(), hybridClock, txManager, vault, failureProcessor);
+        LowWatermarkImpl lowWatermark = new LowWatermarkImpl(name, 
gcConfig.lowWatermark(), hybridClock, txManager, vault, failureProcessor);
 
         TableManager tableManager = new TableManager(
                 name,
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 5d8cf99a8e..210c08fcdd 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -177,7 +177,7 @@ import 
org.apache.ignite.internal.storage.engine.StorageEngine;
 import org.apache.ignite.internal.storage.engine.ThreadAssertingStorageEngine;
 import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
 import org.apache.ignite.internal.systemview.api.SystemViewManager;
-import org.apache.ignite.internal.table.distributed.LowWatermark;
+import org.apache.ignite.internal.table.distributed.LowWatermarkImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -345,7 +345,7 @@ public class IgniteImpl implements Ignite {
 
     private final ClockWaiter clockWaiter;
 
-    private final LowWatermark lowWatermark;
+    private final LowWatermarkImpl lowWatermark;
 
     private final OutgoingSnapshotsManager outgoingSnapshotsManager;
 
@@ -686,7 +686,7 @@ public class IgniteImpl implements Ignite {
 
         StorageUpdateConfiguration storageUpdateConfiguration = 
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
 
-        lowWatermark = new LowWatermark(name, gcConfig.lowWatermark(), clock, 
txManager, vaultMgr, failureProcessor);
+        lowWatermark = new LowWatermarkImpl(name, gcConfig.lowWatermark(), 
clock, txManager, vaultMgr, failureProcessor);
 
         distributedTblMgr = new TableManager(
                 name,
@@ -1449,4 +1449,10 @@ public class IgniteImpl implements Ignite {
     public RemotelyTriggeredResourceRegistry resourcesRegistry() {
         return resourcesRegistry;
     }
+
+    /** Returns low watermark */
+    @TestOnly
+    public LowWatermarkImpl lowWatermark() {
+        return lowWatermark;
+    }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index a2a8b81c38..aed0d5ce4c 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -168,7 +168,7 @@ import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableRaftService;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.TableViewInternal;
-import org.apache.ignite.internal.table.distributed.LowWatermark;
+import org.apache.ignite.internal.table.distributed.LowWatermarkImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -955,7 +955,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         private final NetworkAddress networkAddress;
 
-        private final LowWatermark lowWatermark;
+        private final LowWatermarkImpl lowWatermark;
 
         /** The future have to be complete after the node start and all Meta 
storage watches are deployd. */
         private CompletableFuture<Void> deployWatchesFut;
@@ -1172,7 +1172,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
             StorageUpdateConfiguration storageUpdateConfiguration = 
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
 
             HybridClockImpl clock = new HybridClockImpl();
-            lowWatermark = new LowWatermark(name, gcConfig.lowWatermark(), 
clock, txManager, vaultManager, failureProcessor);
+            lowWatermark = new LowWatermarkImpl(name, gcConfig.lowWatermark(), 
clock, txManager, vaultManager, failureProcessor);
 
             tableManager = new TableManager(
                     name,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
index 8e5cc3cf20..6b77076d8f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
@@ -17,264 +17,19 @@
 
 package org.apache.ignite.internal.table.distributed;
 
-import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.internal.failure.FailureContext;
-import org.apache.ignite.internal.failure.FailureProcessor;
-import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.lang.NodeStoppingException;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.manager.IgniteComponent;
-import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.util.ByteUtils;
-import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.vault.VaultEntry;
-import org.apache.ignite.internal.vault.VaultManager;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Class to manage the low watermark.
- *
- * <p>Low watermark is the node's local time, which ensures that read-only 
transactions have completed by this time, and new read-only
- * transactions will only be created after this time, and we can safely delete 
obsolete/garbage data such as: obsolete versions of table
- * rows, remote indexes, remote tables, etc.
- *
- * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol";>IEP-91</a>
+ * An interface for tracking Low watermark.
  */
-public class LowWatermark implements IgniteComponent {
-    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermark.class);
-
-    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
-
-    private final LowWatermarkConfiguration lowWatermarkConfig;
-
-    private final HybridClock clock;
-
-    private final TxManager txManager;
-
-    private final VaultManager vaultManager;
-
-    private final List<LowWatermarkChangedListener> updateListeners = new 
CopyOnWriteArrayList<>();
-
-    private final ScheduledExecutorService scheduledThreadPool;
-
-    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
-
-    private final AtomicBoolean closeGuard = new AtomicBoolean();
-
-    private volatile @Nullable HybridTimestamp lowWatermark;
-
-    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
-
-    private final FailureProcessor failureProcessor;
-
-    /**
-     * Constructor.
-     *
-     * @param nodeName Node name.
-     * @param lowWatermarkConfig Low watermark configuration.
-     * @param clock A hybrid logical clock.
-     * @param txManager Transaction manager.
-     * @param vaultManager Vault manager.
-     * @param failureProcessor Failure processor tha is used to handle 
critical errors.
-     */
-    public LowWatermark(
-            String nodeName,
-            LowWatermarkConfiguration lowWatermarkConfig,
-            HybridClock clock,
-            TxManager txManager,
-            VaultManager vaultManager,
-            FailureProcessor failureProcessor
-    ) {
-        this.lowWatermarkConfig = lowWatermarkConfig;
-        this.clock = clock;
-        this.txManager = txManager;
-        this.vaultManager = vaultManager;
-        this.failureProcessor = failureProcessor;
-
-        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
-                NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
-        );
-    }
-
-    /**
-     * Starts the watermark manager.
-     */
-    @Override
-    public CompletableFuture<Void> start() {
-        inBusyLock(busyLock, () -> {
-            lowWatermark = readLowWatermarkFromVault();
-        });
-
-        return nullCompletedFuture();
-    }
-
-    /**
-     * Schedule watermark updates.
-     */
-    public void scheduleUpdates() {
-        inBusyLock(busyLock, () -> {
-            HybridTimestamp lowWatermarkCandidate = lowWatermark;
-
-            if (lowWatermarkCandidate == null) {
-                LOG.info("Previous value of the low watermark was not found, 
will schedule to update it");
-
-                scheduleUpdateLowWatermarkBusy();
-
-                return;
-            }
-
-            LOG.info("Low watermark has been scheduled to be updated: {}", 
lowWatermarkCandidate);
-
-            txManager.updateLowWatermark(lowWatermarkCandidate)
-                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> 
notifyListeners(lowWatermarkCandidate)), scheduledThreadPool)
-                    .whenComplete((unused, throwable) -> {
-                        if (throwable == null) {
-                            inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
-                        } else if (!(throwable instanceof 
NodeStoppingException)) {
-                            LOG.error("Error during the Watermark manager 
start", throwable);
-
-                            failureProcessor.process(new 
FailureContext(CRITICAL_ERROR, throwable));
-
-                            inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
-                        }
-                    });
-        });
-    }
-
-    private @Nullable HybridTimestamp readLowWatermarkFromVault() {
-        VaultEntry vaultEntry = vaultManager.get(LOW_WATERMARK_VAULT_KEY);
-
-        return vaultEntry == null ? null : 
ByteUtils.fromBytes(vaultEntry.value());
-    }
-
-    @Override
-    public void stop() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
-        }
-
-        busyLock.block();
-
-        ScheduledFuture<?> lastScheduledTaskFuture = 
this.lastScheduledTaskFuture.get();
-
-        if (lastScheduledTaskFuture != null) {
-            lastScheduledTaskFuture.cancel(true);
-        }
-
-        IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, 
TimeUnit.SECONDS);
-    }
-
-    /**
-     * Returns the current low watermark, {@code null} means no low watermark 
has been assigned yet.
-     */
-    public @Nullable HybridTimestamp getLowWatermark() {
-        return lowWatermark;
-    }
-
-    void updateLowWatermark() {
-        inBusyLock(busyLock, () -> {
-            HybridTimestamp lowWatermarkCandidate = 
createNewLowWatermarkCandidate();
-
-            // Wait until all the read-only transactions are finished before 
the new candidate, since no new RO transactions could be
-            // created, then we can safely promote the candidate as a new low 
watermark, store it in vault, and we can safely start cleaning
-            // up the stale/junk data in the tables.
-            txManager.updateLowWatermark(lowWatermarkCandidate)
-                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
-                        vaultManager.put(LOW_WATERMARK_VAULT_KEY, 
ByteUtils.toBytes(lowWatermarkCandidate));
-
-                        lowWatermark = lowWatermarkCandidate;
-
-                        return notifyListeners(lowWatermarkCandidate);
-                    }), scheduledThreadPool)
-                    .whenComplete((unused, throwable) -> {
-                        if (throwable != null) {
-                            if (!(throwable instanceof NodeStoppingException)) 
{
-                                LOG.error("Failed to update low watermark, 
will schedule again: {}", throwable, lowWatermarkCandidate);
-
-                                inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
-                            }
-                        } else {
-                            LOG.info("Successful low watermark update: {}", 
lowWatermarkCandidate);
-
-                            scheduleUpdateLowWatermarkBusy();
-                        }
-                    });
-        });
-    }
-
-    public void addUpdateListener(LowWatermarkChangedListener listener) {
-        updateListeners.add(listener);
-    }
-
-    public void removeUpdateListener(LowWatermarkChangedListener listener) {
-        updateListeners.remove(listener);
-    }
-
-    private CompletableFuture<Void> notifyListeners(HybridTimestamp 
lowWatermark) {
-        if (updateListeners.isEmpty()) {
-            return nullCompletedFuture();
-        }
-
-        ArrayList<CompletableFuture<?>> res = new ArrayList<>();
-        for (LowWatermarkChangedListener updateListener : updateListeners) {
-            res.add(updateListener.onLwmChanged(lowWatermark));
-        }
-
-        return CompletableFuture.allOf(res.toArray(CompletableFuture[]::new));
-    }
-
-    private void scheduleUpdateLowWatermarkBusy() {
-        ScheduledFuture<?> previousScheduledFuture = 
this.lastScheduledTaskFuture.get();
-
-        assert previousScheduledFuture == null || 
previousScheduledFuture.isDone() : "previous scheduled task has not finished";
-
-        ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule(
-                this::updateLowWatermark,
-                lowWatermarkConfig.updateFrequency().value(),
-                TimeUnit.MILLISECONDS
-        );
-
-        boolean casResult = 
lastScheduledTaskFuture.compareAndSet(previousScheduledFuture, 
newScheduledFuture);
-
-        assert casResult : "only one scheduled task is expected";
-    }
-
-    HybridTimestamp createNewLowWatermarkCandidate() {
-        HybridTimestamp now = clock.now();
-
-        HybridTimestamp lowWatermarkCandidate = now.addPhysicalTime(
-                -lowWatermarkConfig.dataAvailabilityTime().value() - 
getMaxClockSkew()
-        );
-
-        HybridTimestamp lowWatermark = this.lowWatermark;
-
-        assert lowWatermark == null || 
lowWatermarkCandidate.compareTo(lowWatermark) > 0 :
-                "lowWatermark=" + lowWatermark + ", lowWatermarkCandidate=" + 
lowWatermarkCandidate;
+public interface LowWatermark {
+    /** Returns the current low watermark, {@code null} means no low watermark 
has been assigned yet. */
+    @Nullable HybridTimestamp getLowWatermark();
 
-        return lowWatermarkCandidate;
-    }
+    /** Subscribes on watermark changes. */
+    void addUpdateListener(LowWatermarkChangedListener listener);
 
-    private long getMaxClockSkew() {
-        // TODO: IGNITE-19287 Add Implementation
-        return HybridTimestamp.CLOCK_SKEW;
-    }
+    /** Unsubscribes on watermark changes. */
+    void removeUpdateListener(LowWatermarkChangedListener listener);
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java
index 2a738a7c46..72e395bbaf 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java
@@ -21,7 +21,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 
 /**
- * LWM event listener interface.
+ * Low watermark event listener interface.
  *
  * @see LowWatermark
  */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java
similarity index 98%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java
index 8e5cc3cf20..177ee9d652 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java
@@ -59,8 +59,8 @@ import org.jetbrains.annotations.Nullable;
  *
  * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol";>IEP-91</a>
  */
-public class LowWatermark implements IgniteComponent {
-    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermark.class);
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermarkImpl.class);
 
     static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
 
@@ -96,7 +96,7 @@ public class LowWatermark implements IgniteComponent {
      * @param vaultManager Vault manager.
      * @param failureProcessor Failure processor tha is used to handle 
critical errors.
      */
-    public LowWatermark(
+    public LowWatermarkImpl(
             String nodeName,
             LowWatermarkConfiguration lowWatermarkConfig,
             HybridClock clock,
@@ -183,9 +183,7 @@ public class LowWatermark implements IgniteComponent {
         IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, 
TimeUnit.SECONDS);
     }
 
-    /**
-     * Returns the current low watermark, {@code null} means no low watermark 
has been assigned yet.
-     */
+    @Override
     public @Nullable HybridTimestamp getLowWatermark() {
         return lowWatermark;
     }
@@ -221,10 +219,12 @@ public class LowWatermark implements IgniteComponent {
         });
     }
 
+    @Override
     public void addUpdateListener(LowWatermarkChangedListener listener) {
         updateListeners.add(listener);
     }
 
+    @Override
     public void removeUpdateListener(LowWatermarkChangedListener listener) {
         updateListeners.remove(listener);
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
index c35e203fdb..a21ab858b7 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed;
 
-import static 
org.apache.ignite.internal.table.distributed.LowWatermark.LOW_WATERMARK_VAULT_KEY;
+import static 
org.apache.ignite.internal.table.distributed.LowWatermarkImpl.LOW_WATERMARK_VAULT_KEY;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -58,7 +58,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.InOrder;
 
 /**
- * For {@link LowWatermark} testing.
+ * For {@link LowWatermarkImpl} testing.
  */
 @ExtendWith(ConfigurationExtension.class)
 public class LowWatermarkTest extends BaseIgniteAbstractTest {
@@ -73,14 +73,14 @@ public class LowWatermarkTest extends 
BaseIgniteAbstractTest {
 
     private LowWatermarkChangedListener listener;
 
-    private LowWatermark lowWatermark;
+    private LowWatermarkImpl lowWatermark;
 
     @BeforeEach
     void setUp() {
         listener = mock(LowWatermarkChangedListener.class);
         
when(listener.onLwmChanged(any(HybridTimestamp.class))).thenReturn(nullCompletedFuture());
 
-        lowWatermark = new LowWatermark("test", lowWatermarkConfig, clock, 
txManager, vaultManager, mock(FailureProcessor.class));
+        lowWatermark = new LowWatermarkImpl("test", lowWatermarkConfig, clock, 
txManager, vaultManager, mock(FailureProcessor.class));
         lowWatermark.addUpdateListener(listener);
     }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 7b041ea1a6..90eb159423 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -118,6 +118,7 @@ import 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorage
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.TestLowWatermark;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import 
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -129,7 +130,6 @@ import 
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.util.CursorUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
@@ -243,8 +243,11 @@ public class TableManagerTest extends IgniteAbstractTest {
 
     private ExecutorService partitionOperationsExecutor;
 
+    private TestLowWatermark lowWatermark;
+
     @BeforeEach
     void before() throws NodeStoppingException {
+        lowWatermark = new TestLowWatermark();
         catalogMetastore = StandaloneMetaStorageManager.create(new 
SimpleInMemoryKeyValueStorage(NODE_NAME));
         catalogManager = CatalogTestUtils.createTestCatalogManager(NODE_NAME, 
clock, catalogMetastore);
 
@@ -744,8 +747,6 @@ public class TableManagerTest extends IgniteAbstractTest {
      */
     private TableManager createTableManager(CompletableFuture<TableManager> 
tblManagerFut, Consumer<MvTableStorage> tableStorageDecorator,
             Consumer<TxStateTableStorage> txStateTableStorageDecorator) {
-        VaultManager vaultManager = mock(VaultManager.class);
-
         TableManager tableManager = new TableManager(
                 NODE_NAME,
                 revisionUpdater,
@@ -777,7 +778,7 @@ public class TableManagerTest extends IgniteAbstractTest {
                 () -> mock(IgniteSql.class),
                 new RemotelyTriggeredResourceRegistry(),
                 mock(ScheduledExecutorService.class),
-                new LowWatermark(NODE_NAME, gcConfig.lowWatermark(), clock, 
tm, vaultManager, mock(FailureProcessor.class))
+                lowWatermark
         ) {
 
             @Override
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java
new file mode 100644
index 0000000000..320c776ffe
--- /dev/null
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.distributed.LowWatermark;
+import 
org.apache.ignite.internal.table.distributed.LowWatermarkChangedListener;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Low watermark dummy implementation, which requires explicit {@link 
#updateAndNotify(HybridTimestamp)} method call to notify listeners.
+ * This implementation has no persistent state and notifies listeners 
instantly in same thread.
+ */
+public class TestLowWatermark implements LowWatermark {
+    private final List<LowWatermarkChangedListener> listeners = new 
CopyOnWriteArrayList<>();
+    private volatile HybridTimestamp ts;
+
+    @Override
+    public @Nullable HybridTimestamp getLowWatermark() {
+        return ts;
+    }
+
+    @Override
+    public void addUpdateListener(LowWatermarkChangedListener listener) {
+        this.listeners.add(listener);
+    }
+
+    @Override
+    public void removeUpdateListener(LowWatermarkChangedListener listener) {
+        this.listeners.remove(listener);
+    }
+
+    /**
+     * Update low watermark and notify listeners.
+     *
+     * @param newTs New timestamp.
+     * @return Listener notification future.
+     */
+    public CompletableFuture<Void> updateAndNotify(HybridTimestamp newTs) {
+        assert ts == null || ts.longValue() < newTs.longValue();
+
+        this.ts = newTs;
+
+        return CompletableFuture.allOf(listeners.stream().map(l -> 
l.onLwmChanged(newTs)).toArray(CompletableFuture[]::new));
+    }
+
+    /** Set low watermark without listeners notification. */
+    public void update(HybridTimestamp newTs) {
+        this.ts = newTs;
+    }
+}

Reply via email to