This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5db7920c6c IGNITE-21658 Make LowWatermark a top level component (#3340)
5db7920c6c is described below

commit 5db7920c6cbfabc0f482fdd0942549b33b2747e2
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Mon Mar 4 14:59:14 2024 +0300

    IGNITE-21658 Make LowWatermark a top level component (#3340)
---
 .../runner/app/ItIgniteNodeRestartTest.java        | 11 ++-
 .../org/apache/ignite/internal/app/IgniteImpl.java | 16 +++--
 .../rebalance/ItRebalanceDistributedTest.java      | 16 +++--
 .../internal/table/distributed/LowWatermark.java   | 81 ++++++++++++++--------
 .../distributed/LowWatermarkChangedListener.java   | 37 ++++++++++
 .../internal/table/distributed/TableManager.java   | 19 ++---
 .../ignite/internal/table/distributed/gc/MvGc.java |  8 ++-
 .../table/distributed/LowWatermarkTest.java        | 25 ++++---
 .../table/distributed/TableManagerTest.java        |  5 +-
 .../internal/table/distributed/gc/MvGcTest.java    | 28 ++++----
 10 files changed, 168 insertions(+), 78 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 92fd0662f2..4dde7bce06 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -161,6 +161,7 @@ import 
org.apache.ignite.internal.storage.DataStorageModules;
 import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.distributed.LowWatermark;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -524,6 +525,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         var sqlRef = new AtomicReference<IgniteSqlImpl>();
 
+        LowWatermark lowWatermark = new LowWatermark(name, 
gcConfig.lowWatermark(), hybridClock, txManager, vault, failureProcessor);
+
         TableManager tableManager = new TableManager(
                 name,
                 registry,
@@ -547,16 +550,15 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 hybridClock,
                 new OutgoingSnapshotsManager(clusterSvc.messagingService()),
                 topologyAwareRaftGroupServiceFactory,
-                vault,
                 distributionZoneManager,
                 schemaSyncService,
                 catalogManager,
                 new HybridTimestampTracker(),
                 placementDriverManager.placementDriver(),
                 sqlRef::get,
-                failureProcessor,
                 resourcesRegistry,
-                rebalanceScheduler
+                rebalanceScheduler,
+                lowWatermark
         );
 
         var indexManager = new IndexManager(
@@ -616,6 +618,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 cmgManager,
                 replicaMgr,
                 txManager,
+                lowWatermark,
                 metaStorageMgr,
                 clusterCfgMgr,
                 dataStorageManager,
@@ -635,6 +638,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
             components.add(component);
         }
 
+        lowWatermark.scheduleUpdates();
+
         PartialNode partialNode = partialNode(
                 name,
                 nodeCfgMgr,
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f550ecdb5e..d1103775b4 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -177,6 +177,7 @@ import 
org.apache.ignite.internal.storage.engine.StorageEngine;
 import org.apache.ignite.internal.storage.engine.ThreadAssertingStorageEngine;
 import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
 import org.apache.ignite.internal.systemview.api.SystemViewManager;
+import org.apache.ignite.internal.table.distributed.LowWatermark;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -344,6 +345,8 @@ public class IgniteImpl implements Ignite {
 
     private final ClockWaiter clockWaiter;
 
+    private final LowWatermark lowWatermark;
+
     private final OutgoingSnapshotsManager outgoingSnapshotsManager;
 
     private final RestAddressReporter restAddressReporter;
@@ -683,6 +686,8 @@ public class IgniteImpl implements Ignite {
 
         StorageUpdateConfiguration storageUpdateConfiguration = 
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
 
+        lowWatermark = new LowWatermark(name, gcConfig.lowWatermark(), clock, 
txManager, vaultMgr, failureProcessor);
+
         distributedTblMgr = new TableManager(
                 name,
                 registry,
@@ -706,16 +711,15 @@ public class IgniteImpl implements Ignite {
                 clock,
                 outgoingSnapshotsManager,
                 topologyAwareRaftGroupServiceFactory,
-                vaultMgr,
                 distributionZoneManager,
                 schemaSyncService,
                 catalogManager,
                 observableTimestampTracker,
                 placementDriverMgr.placementDriver(),
                 this::sql,
-                failureProcessor,
                 resourcesRegistry,
-                rebalanceScheduler
+                rebalanceScheduler,
+                lowWatermark
         );
 
         indexManager = new IndexManager(
@@ -959,7 +963,8 @@ public class IgniteImpl implements Ignite {
                     restComponent,
                     raftMgr,
                     clusterStateStorage,
-                    cmgMgr
+                    cmgMgr,
+                    lowWatermark
             );
 
             clusterSvc.updateMetadata(new 
NodeMetadata(restComponent.hostName(), restComponent.httpPort(), 
restComponent.httpsPort()));
@@ -1035,6 +1040,9 @@ public class IgniteImpl implements Ignite {
                     .thenComposeAsync(ignored -> 
awaitSelfInLocalLogicalTopology(), startupExecutor)
                     .thenRunAsync(() -> {
                         try {
+                            // Enable watermark events.
+                            lowWatermark.scheduleUpdates();
+
                             // Enable REST component on start complete.
                             restComponent.enable();
                             // Transfer the node to the STARTED state.
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 5ddbc3db69..6be1f738c4 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -168,6 +168,7 @@ import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableRaftService;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.distributed.LowWatermark;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -954,6 +955,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         private final NetworkAddress networkAddress;
 
+        private final LowWatermark lowWatermark;
+
         /** The future have to be complete after the node start and all Meta 
storage watches are deployd. */
         private CompletableFuture<Void> deployWatchesFut;
 
@@ -1168,6 +1171,9 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
             StorageUpdateConfiguration storageUpdateConfiguration = 
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
 
+            HybridClockImpl clock = new HybridClockImpl();
+            lowWatermark = new LowWatermark(name, gcConfig.lowWatermark(), 
clock, txManager, vaultManager, failureProcessor);
+
             tableManager = new TableManager(
                     name,
                     registry,
@@ -1188,19 +1194,18 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     view -> new LocalLogStorageFactory(),
                     threadPoolsManager.tableIoExecutor(),
                     threadPoolsManager.partitionOperationsExecutor(),
-                    new HybridClockImpl(),
+                    clock,
                     new 
OutgoingSnapshotsManager(clusterService.messagingService()),
                     topologyAwareRaftGroupServiceFactory,
-                    vaultManager,
                     distributionZoneManager,
                     schemaSyncService,
                     catalogManager,
                     new HybridTimestampTracker(),
                     placementDriver,
                     () -> mock(IgniteSql.class),
-                    failureProcessor,
                     resourcesRegistry,
-                    rebalanceScheduler
+                    rebalanceScheduler,
+                    lowWatermark
             ) {
                 @Override
                 protected TxStateTableStorage createTxStateTableStorage(
@@ -1271,6 +1276,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
             deployWatchesFut = CompletableFuture.supplyAsync(() -> {
                 List<IgniteComponent> secondComponents = List.of(
+                        lowWatermark,
                         metaStorageManager,
                         clusterCfgMgr,
                         clockWaiter,
@@ -1298,6 +1304,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
                 assertThat(configurationNotificationFut, willSucceedIn(1, 
TimeUnit.MINUTES));
 
+                lowWatermark.scheduleUpdates();
+
                 return metaStorageManager.deployWatches();
             }).thenCompose(identity());
         }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
index c0cf426414..8e5cc3cf20 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
@@ -18,15 +18,19 @@
 package org.apache.ignite.internal.table.distributed;
 
 import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.failure.FailureContext;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -35,8 +39,8 @@ import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
 import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
-import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.util.ByteUtils;
@@ -55,7 +59,7 @@ import org.jetbrains.annotations.Nullable;
  *
  * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol";>IEP-91</a>
  */
-public class LowWatermark implements ManuallyCloseable {
+public class LowWatermark implements IgniteComponent {
     private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermark.class);
 
     static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
@@ -68,7 +72,7 @@ public class LowWatermark implements ManuallyCloseable {
 
     private final VaultManager vaultManager;
 
-    private final MvGc mvGc;
+    private final List<LowWatermarkChangedListener> updateListeners = new 
CopyOnWriteArrayList<>();
 
     private final ScheduledExecutorService scheduledThreadPool;
 
@@ -76,7 +80,7 @@ public class LowWatermark implements ManuallyCloseable {
 
     private final AtomicBoolean closeGuard = new AtomicBoolean();
 
-    private volatile HybridTimestamp lowWatermark;
+    private volatile @Nullable HybridTimestamp lowWatermark;
 
     private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
 
@@ -90,7 +94,6 @@ public class LowWatermark implements ManuallyCloseable {
      * @param clock A hybrid logical clock.
      * @param txManager Transaction manager.
      * @param vaultManager Vault manager.
-     * @param mvGc MVCC garbage collector.
      * @param failureProcessor Failure processor tha is used to handle 
critical errors.
      */
     public LowWatermark(
@@ -99,14 +102,12 @@ public class LowWatermark implements ManuallyCloseable {
             HybridClock clock,
             TxManager txManager,
             VaultManager vaultManager,
-            MvGc mvGc,
             FailureProcessor failureProcessor
     ) {
         this.lowWatermarkConfig = lowWatermarkConfig;
         this.clock = clock;
         this.txManager = txManager;
         this.vaultManager = vaultManager;
-        this.mvGc = mvGc;
         this.failureProcessor = failureProcessor;
 
         scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
@@ -117,11 +118,23 @@ public class LowWatermark implements ManuallyCloseable {
     /**
      * Starts the watermark manager.
      */
-    public void start() {
+    @Override
+    public CompletableFuture<Void> start() {
+        inBusyLock(busyLock, () -> {
+            lowWatermark = readLowWatermarkFromVault();
+        });
+
+        return nullCompletedFuture();
+    }
+
+    /**
+     * Schedule watermark updates.
+     */
+    public void scheduleUpdates() {
         inBusyLock(busyLock, () -> {
-            HybridTimestamp lowWatermark = readLowWatermarkFromVault();
+            HybridTimestamp lowWatermarkCandidate = lowWatermark;
 
-            if (lowWatermark == null) {
+            if (lowWatermarkCandidate == null) {
                 LOG.info("Previous value of the low watermark was not found, 
will schedule to update it");
 
                 scheduleUpdateLowWatermarkBusy();
@@ -129,19 +142,14 @@ public class LowWatermark implements ManuallyCloseable {
                 return;
             }
 
-            LOG.info(
-                    "Low watermark has been successfully retrieved from the 
vault and is scheduled to be updated: {}",
-                    lowWatermark
-            );
+            LOG.info("Low watermark has been scheduled to be updated: {}", 
lowWatermarkCandidate);
 
-            txManager.updateLowWatermark(lowWatermark)
-                    .thenRun(() -> inBusyLock(busyLock, () -> {
-                        this.lowWatermark = lowWatermark;
-
-                        runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
-                    }))
+            txManager.updateLowWatermark(lowWatermarkCandidate)
+                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> 
notifyListeners(lowWatermarkCandidate)), scheduledThreadPool)
                     .whenComplete((unused, throwable) -> {
-                        if (throwable != null && !(throwable instanceof 
NodeStoppingException)) {
+                        if (throwable == null) {
+                            inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
+                        } else if (!(throwable instanceof 
NodeStoppingException)) {
                             LOG.error("Error during the Watermark manager 
start", throwable);
 
                             failureProcessor.process(new 
FailureContext(CRITICAL_ERROR, throwable));
@@ -159,7 +167,7 @@ public class LowWatermark implements ManuallyCloseable {
     }
 
     @Override
-    public void close() {
+    public void stop() {
         if (!closeGuard.compareAndSet(false, true)) {
             return;
         }
@@ -190,12 +198,12 @@ public class LowWatermark implements ManuallyCloseable {
             // created, then we can safely promote the candidate as a new low 
watermark, store it in vault, and we can safely start cleaning
             // up the stale/junk data in the tables.
             txManager.updateLowWatermark(lowWatermarkCandidate)
-                    .thenRunAsync(() -> inBusyLock(busyLock, () -> {
+                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
                         vaultManager.put(LOW_WATERMARK_VAULT_KEY, 
ByteUtils.toBytes(lowWatermarkCandidate));
 
                         lowWatermark = lowWatermarkCandidate;
 
-                        
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermarkCandidate);
+                        return notifyListeners(lowWatermarkCandidate);
                     }), scheduledThreadPool)
                     .whenComplete((unused, throwable) -> {
                         if (throwable != null) {
@@ -206,15 +214,32 @@ public class LowWatermark implements ManuallyCloseable {
                             }
                         } else {
                             LOG.info("Successful low watermark update: {}", 
lowWatermarkCandidate);
+
+                            scheduleUpdateLowWatermarkBusy();
                         }
                     });
         });
     }
 
-    private void runGcAndScheduleUpdateLowWatermarkBusy(HybridTimestamp 
lowWatermark) {
-        mvGc.updateLowWatermark(lowWatermark);
+    public void addUpdateListener(LowWatermarkChangedListener listener) {
+        updateListeners.add(listener);
+    }
+
+    public void removeUpdateListener(LowWatermarkChangedListener listener) {
+        updateListeners.remove(listener);
+    }
+
+    private CompletableFuture<Void> notifyListeners(HybridTimestamp 
lowWatermark) {
+        if (updateListeners.isEmpty()) {
+            return nullCompletedFuture();
+        }
+
+        ArrayList<CompletableFuture<?>> res = new ArrayList<>();
+        for (LowWatermarkChangedListener updateListener : updateListeners) {
+            res.add(updateListener.onLwmChanged(lowWatermark));
+        }
 
-        scheduleUpdateLowWatermarkBusy();
+        return CompletableFuture.allOf(res.toArray(CompletableFuture[]::new));
     }
 
     private void scheduleUpdateLowWatermarkBusy() {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java
new file mode 100644
index 0000000000..2a738a7c46
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+
+/**
+ * LWM event listener interface.
+ *
+ * @see LowWatermark
+ */
+@FunctionalInterface
+public interface LowWatermarkChangedListener {
+    /**
+     * Low watermark changed callback.
+     *
+     * @param ts New low watermark.
+     * @return A future, which completes after the event has been processed.
+     */
+    CompletableFuture<Void> onLwmChanged(HybridTimestamp ts);
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index ed3bf6eec2..3df5f576f1 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -114,7 +114,6 @@ import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.rebalance.PartitionMover;
 import 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceRaftGroupEventsListener;
 import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
-import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
@@ -209,7 +208,6 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.utils.RebalanceUtilEx;
-import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.worker.ThreadAssertions;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.util.IgniteNameUtils;
@@ -412,11 +410,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      * @param partitionOperationsExecutor Striped executor on which partition 
operations (potentially requiring I/O with storages)
      *     will be executed.
      * @param raftGroupServiceFactory Factory that is used for creation of 
raft group services for replication groups.
-     * @param vaultManager Vault manager.
      * @param placementDriver Placement driver.
      * @param sql A supplier function that returns {@link IgniteSql}.
-     * @param failureProcessor Failure processor that is used to process 
critical errors.
      * @param rebalanceScheduler Executor for scheduling rebalance routine.
+     * @param lowWatermark Low watermark.
      */
     public TableManager(
             String nodeName,
@@ -441,16 +438,15 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             HybridClock clock,
             OutgoingSnapshotsManager outgoingSnapshotsManager,
             TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory,
-            VaultManager vaultManager,
             DistributionZoneManager distributionZoneManager,
             SchemaSyncService schemaSyncService,
             CatalogService catalogService,
             HybridTimestampTracker observableTimestampTracker,
             PlacementDriver placementDriver,
             Supplier<IgniteSql> sql,
-            FailureProcessor failureProcessor,
             RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry,
-            ScheduledExecutorService rebalanceScheduler
+            ScheduledExecutorService rebalanceScheduler,
+            LowWatermark lowWatermark
     ) {
         this.topologyService = topologyService;
         this.raftMgr = raftMgr;
@@ -474,6 +470,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         this.storageUpdateConfig = storageUpdateConfig;
         this.remotelyTriggeredResourceRegistry = 
remotelyTriggeredResourceRegistry;
         this.rebalanceScheduler = rebalanceScheduler;
+        this.lowWatermark = lowWatermark;
 
         this.executorInclinedSchemaSyncService = new 
ExecutorInclinedSchemaSyncService(schemaSyncService, 
partitionOperationsExecutor);
         this.executorInclinedPlacementDriver = new 
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -525,8 +522,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
         mvGc = new MvGc(nodeName, gcConfig);
 
-        lowWatermark = new LowWatermark(nodeName, gcConfig.lowWatermark(), 
clock, txManager, vaultManager, mvGc, failureProcessor);
-
         raftCommandsMarshaller = new 
ThreadLocalPartitionCommandsMarshaller(messageSerializationRegistry);
 
         partitionReplicatorNodeRecovery = new PartitionReplicatorNodeRecovery(
@@ -552,8 +547,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     public CompletableFuture<Void> start() {
         return inBusyLockAsync(busyLock, () -> {
             mvGc.start();
-
-            lowWatermark.start();
+            lowWatermark.addUpdateListener(mvGc);
 
             transactionStateResolver.start();
 
@@ -1078,6 +1072,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener);
         metaStorageMgr.unregisterWatch(assignmentsSwitchRebalanceListener);
 
+        lowWatermark.removeUpdateListener(mvGc);
+
         var tablesToStop = new HashMap<Integer, TableImpl>();
 
         tablesToStop.putAll(latestTablesById());
@@ -1095,7 +1091,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         }
 
         IgniteUtils.closeAllManually(
-                lowWatermark,
                 mvGc,
                 fullStateTransferIndexChooser,
                 sharedTxStateStorage,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
index 4bf46fd6ac..38e26e05d4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.configuration.GcConfiguration;
+import 
org.apache.ignite.internal.table.distributed.LowWatermarkChangedListener;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.TrackerClosedException;
@@ -50,7 +51,7 @@ import org.jetbrains.annotations.TestOnly;
  *
  * @see GcUpdateHandler#vacuumBatch(HybridTimestamp, int, boolean)
  */
-public class MvGc implements ManuallyCloseable {
+public class MvGc implements LowWatermarkChangedListener, ManuallyCloseable {
     private static final IgniteLogger LOG = Loggers.forClass(MvGc.class);
 
     /** Node name. */
@@ -153,7 +154,8 @@ public class MvGc implements ManuallyCloseable {
      * @param newLwm New low watermark.
      * @throws IgniteInternalException with {@link 
GarbageCollector#CLOSED_ERR} If the garbage collector is closed.
      */
-    public void updateLowWatermark(HybridTimestamp newLwm) {
+    @Override
+    public CompletableFuture<Void> onLwmChanged(HybridTimestamp newLwm) {
         inBusyLock(() -> {
             HybridTimestamp updatedLwm = 
lowWatermarkReference.updateAndGet(currentLwm -> {
                 if (currentLwm == null) {
@@ -171,6 +173,8 @@ public class MvGc implements ManuallyCloseable {
 
             executor.submit(() -> inBusyLock(this::initNewGcBusy));
         });
+
+        return nullCompletedFuture();
     }
 
     @Override
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
index 42997ac27c..c35e203fdb 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
-import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.util.ByteUtils;
@@ -72,26 +71,31 @@ public class LowWatermarkTest extends 
BaseIgniteAbstractTest {
 
     private final VaultManager vaultManager = mock(VaultManager.class);
 
-    private final MvGc mvGc = mock(MvGc.class);
+    private LowWatermarkChangedListener listener;
 
     private LowWatermark lowWatermark;
 
     @BeforeEach
     void setUp() {
-        lowWatermark = new LowWatermark("test", lowWatermarkConfig, clock, 
txManager, vaultManager, mvGc, mock(FailureProcessor.class));
+        listener = mock(LowWatermarkChangedListener.class);
+        
when(listener.onLwmChanged(any(HybridTimestamp.class))).thenReturn(nullCompletedFuture());
+
+        lowWatermark = new LowWatermark("test", lowWatermarkConfig, clock, 
txManager, vaultManager, mock(FailureProcessor.class));
+        lowWatermark.addUpdateListener(listener);
     }
 
     @AfterEach
     void tearDown() {
-        lowWatermark.close();
+        lowWatermark.stop();
     }
 
     @Test
     void testStartWithEmptyVault() {
         // Let's check the start with no low watermark in vault.
         lowWatermark.start();
+        lowWatermark.scheduleUpdates();
 
-        verify(mvGc, never()).updateLowWatermark(any(HybridTimestamp.class));
+        verify(listener, never()).onLwmChanged(any(HybridTimestamp.class));
         assertNull(lowWatermark.getLowWatermark());
     }
 
@@ -106,7 +110,11 @@ public class LowWatermarkTest extends 
BaseIgniteAbstractTest {
 
         this.lowWatermark.start();
 
-        verify(mvGc).updateLowWatermark(lowWatermark);
+        assertEquals(lowWatermark, this.lowWatermark.getLowWatermark());
+
+        this.lowWatermark.scheduleUpdates();
+
+        verify(listener, timeout(1_000)).onLwmChanged(lowWatermark);
         assertEquals(lowWatermark, this.lowWatermark.getLowWatermark());
     }
 
@@ -135,13 +143,13 @@ public class LowWatermarkTest extends 
BaseIgniteAbstractTest {
 
         lowWatermark.updateLowWatermark();
 
-        InOrder inOrder = inOrder(txManager, vaultManager, mvGc);
+        InOrder inOrder = inOrder(txManager, vaultManager, listener);
 
         inOrder.verify(txManager).updateLowWatermark(newLowWatermarkCandidate);
 
         inOrder.verify(vaultManager, 
timeout(1000)).put(LOW_WATERMARK_VAULT_KEY, 
ByteUtils.toBytes(newLowWatermarkCandidate));
 
-        inOrder.verify(mvGc).updateLowWatermark(newLowWatermarkCandidate);
+        inOrder.verify(listener, 
timeout(1_000)).onLwmChanged(newLowWatermarkCandidate);
 
         assertEquals(newLowWatermarkCandidate, lowWatermark.getLowWatermark());
     }
@@ -167,6 +175,7 @@ public class LowWatermarkTest extends 
BaseIgniteAbstractTest {
             });
 
             lowWatermark.start();
+            this.lowWatermark.scheduleUpdates();
 
             // Let's check that it hasn't been called more than once.
             assertFalse(startGetAllReadOnlyTransactions.await(1, 
TimeUnit.SECONDS));
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 264b0d69c9..f03fa9043c 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -769,16 +769,15 @@ public class TableManagerTest extends IgniteAbstractTest {
                 clock,
                 new 
OutgoingSnapshotsManager(clusterService.messagingService()),
                 mock(TopologyAwareRaftGroupServiceFactory.class),
-                vaultManager,
                 distributionZoneManager,
                 new AlwaysSyncedSchemaSyncService(),
                 catalogManager,
                 new HybridTimestampTracker(),
                 new TestPlacementDriver(node),
                 () -> mock(IgniteSql.class),
-                mock(FailureProcessor.class),
                 new RemotelyTriggeredResourceRegistry(),
-                mock(ScheduledExecutorService.class)
+                mock(ScheduledExecutorService.class),
+                new LowWatermark(NODE_NAME, gcConfig.lowWatermark(), clock, 
tm, vaultManager, mock(FailureProcessor.class))
         ) {
 
             @Override
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
index abe97336be..04f6035ff5 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
@@ -96,7 +96,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
     void testAddStorageWithLowWatermark() {
         HybridTimestamp lowWatermark = new HybridTimestamp(1, 1);
 
-        gc.updateLowWatermark(lowWatermark);
+        gc.onLwmChanged(lowWatermark);
 
         CompletableFuture<Void> invokeVacuumMethodFuture = new 
CompletableFuture<>();
 
@@ -119,7 +119,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
         gc.addStorage(createTablePartitionId(), gcUpdateHandler0);
         gc.addStorage(createTablePartitionId(), gcUpdateHandler1);
 
-        gc.updateLowWatermark(lowWatermark0);
+        gc.onLwmChanged(lowWatermark0);
 
         // We expect GcUpdateHandler#vacuum to be called with the set 
lowWatermark0.
         assertThat(invokeVacuumMethodFuture0, willCompleteSuccessfully());
@@ -134,7 +134,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
         completeFutureOnVacuum(gcUpdateHandler0, invokeVacuumMethodFuture2, 
lowWatermark1);
         completeFutureOnVacuum(gcUpdateHandler1, invokeVacuumMethodFuture3, 
lowWatermark1);
 
-        gc.updateLowWatermark(lowWatermark1);
+        gc.onLwmChanged(lowWatermark1);
 
         // We expect GcUpdateHandler#vacuum to be called with the set 
lowWatermark0.
         assertThat(invokeVacuumMethodFuture2, willCompleteSuccessfully());
@@ -154,7 +154,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
         gc.addStorage(createTablePartitionId(), gcUpdateHandler0);
         gc.addStorage(createTablePartitionId(), gcUpdateHandler1);
 
-        gc.updateLowWatermark(firstLowWatermark);
+        gc.onLwmChanged(firstLowWatermark);
 
         // We expect GcUpdateHandler#vacuum to be called with the set 
lowWatermark0.
         assertThat(invokeVacuumMethodFuture0, willCompleteSuccessfully());
@@ -169,7 +169,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
         completeFutureOnVacuum(gcUpdateHandler0, 
invokeVacuumMethodFutureForSame0, sameLowWatermark);
         completeFutureOnVacuum(gcUpdateHandler1, 
invokeVacuumMethodFutureForSame1, sameLowWatermark);
 
-        gc.updateLowWatermark(sameLowWatermark);
+        gc.onLwmChanged(sameLowWatermark);
 
         // We expect that GcUpdateHandler#vacuum will not be called.
         assertThat(invokeVacuumMethodFutureForSame0, willTimeoutFast());
@@ -184,7 +184,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
         completeFutureOnVacuum(gcUpdateHandler0, 
invokeVacuumMethodFutureForLower0, lowerLowWatermark);
         completeFutureOnVacuum(gcUpdateHandler1, 
invokeVacuumMethodFutureForLower1, lowerLowWatermark);
 
-        gc.updateLowWatermark(lowerLowWatermark);
+        gc.onLwmChanged(lowerLowWatermark);
 
         // We expect that GcUpdateHandler#vacuum will not be called.
         assertThat(invokeVacuumMethodFutureForSame0, willTimeoutFast());
@@ -201,7 +201,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
 
         gc.addStorage(createTablePartitionId(), gcUpdateHandler);
 
-        gc.updateLowWatermark(new HybridTimestamp(2, 2));
+        gc.onLwmChanged(new HybridTimestamp(2, 2));
 
         assertTrue(latch.await(200, TimeUnit.MILLISECONDS));
     }
@@ -219,7 +219,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
 
         gc.addStorage(tablePartitionId, 
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture, null));
 
-        gc.updateLowWatermark(new HybridTimestamp(1, 1));
+        gc.onLwmChanged(new HybridTimestamp(1, 1));
 
         assertThat(invokeVacuumMethodFuture, willCompleteSuccessfully());
         assertThat(gc.removeStorage(tablePartitionId), 
willCompleteSuccessfully());
@@ -237,7 +237,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
 
         gc.addStorage(tablePartitionId, 
createWithWaitFinishVacuum(startInvokeVacuumMethodFuture, 
finishInvokeVacuumMethodFuture));
 
-        gc.updateLowWatermark(new HybridTimestamp(1, 1));
+        gc.onLwmChanged(new HybridTimestamp(1, 1));
 
         assertThat(startInvokeVacuumMethodFuture, willCompleteSuccessfully());
 
@@ -259,7 +259,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
 
         gc.addStorage(tablePartitionId, 
createWithWaitFinishVacuum(startInvokeVacuumMethodFuture, 
finishInvokeVacuumMethodFuture));
 
-        gc.updateLowWatermark(new HybridTimestamp(1, 1));
+        gc.onLwmChanged(new HybridTimestamp(1, 1));
 
         assertThat(startInvokeVacuumMethodFuture, willCompleteSuccessfully());
 
@@ -282,7 +282,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
 
         gc.addStorage(tablePartitionId, gcUpdateHandler);
 
-        gc.updateLowWatermark(new HybridTimestamp(1, 1));
+        gc.onLwmChanged(new HybridTimestamp(1, 1));
 
         assertThat(invokeVacuumMethodFuture0, willCompleteSuccessfully());
         assertThat(gc.removeStorage(tablePartitionId), 
willCompleteSuccessfully());
@@ -301,7 +301,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
 
         assertThrowsClosed(() -> gc.addStorage(createTablePartitionId(), 
createGcUpdateHandler()));
         assertThrowsClosed(() -> gc.removeStorage(createTablePartitionId()));
-        assertThrowsClosed(() -> gc.updateLowWatermark(new HybridTimestamp(1, 
1)));
+        assertThrowsClosed(() -> gc.onLwmChanged(new HybridTimestamp(1, 1)));
 
         assertDoesNotThrow(gc::close);
     }
@@ -317,7 +317,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
 
         gc.start();
 
-        gc.updateLowWatermark(new HybridTimestamp(1, 1));
+        gc.onLwmChanged(new HybridTimestamp(1, 1));
 
         for (int i = 0; i < 100; i++) {
             CountDownLatch latch = new CountDownLatch(5);
@@ -357,7 +357,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
         gc.addStorage(createTablePartitionId(), gcUpdateHandler);
 
         // Let's update the low watermark and see that we didn't start the 
garbage collection because we didn't reach the safe time.
-        gc.updateLowWatermark(lvm);
+        gc.onLwmChanged(lvm);
 
         assertThat(invokeVacuumMethodFuture, willTimeoutFast());
         verify(safeTimeTracker).waitFor(lvm);


Reply via email to