This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-21585 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 6e2ea2372770e52e8c10801140f89809eb7fbd99 Author: amashenkov <[email protected]> AuthorDate: Tue Mar 5 15:10:38 2024 +0300 LowWatermark refactored, interface extracted, added dummy implementation. --- .../runner/app/ItIgniteNodeRestartTest.java | 4 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 12 +- .../rebalance/ItRebalanceDistributedTest.java | 6 +- .../internal/table/distributed/LowWatermark.java | 261 +-------------------- .../distributed/LowWatermarkChangedListener.java | 2 +- .../{LowWatermark.java => LowWatermarkImpl.java} | 12 +- .../table/distributed/LowWatermarkTest.java | 8 +- .../table/distributed/TableManagerTest.java | 9 +- .../ignite/internal/table/TestLowWatermark.java | 69 ++++++ 9 files changed, 107 insertions(+), 276 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index f759902e11..c22f720f00 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -161,7 +161,7 @@ import org.apache.ignite.internal.storage.DataStorageModules; import org.apache.ignite.internal.systemview.SystemViewManagerImpl; import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.table.TableViewInternal; -import org.apache.ignite.internal.table.distributed.LowWatermark; +import org.apache.ignite.internal.table.distributed.LowWatermarkImpl; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; @@ -525,7 +525,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { var sqlRef = new AtomicReference<IgniteSqlImpl>(); - LowWatermark lowWatermark = new LowWatermark(name, gcConfig.lowWatermark(), hybridClock, txManager, vault, failureProcessor); + LowWatermarkImpl lowWatermark = new LowWatermarkImpl(name, gcConfig.lowWatermark(), hybridClock, txManager, vault, failureProcessor); TableManager tableManager = new TableManager( name, diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 5d8cf99a8e..210c08fcdd 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -177,7 +177,7 @@ import org.apache.ignite.internal.storage.engine.StorageEngine; import org.apache.ignite.internal.storage.engine.ThreadAssertingStorageEngine; import org.apache.ignite.internal.systemview.SystemViewManagerImpl; import org.apache.ignite.internal.systemview.api.SystemViewManager; -import org.apache.ignite.internal.table.distributed.LowWatermark; +import org.apache.ignite.internal.table.distributed.LowWatermarkImpl; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; @@ -345,7 +345,7 @@ public class IgniteImpl implements Ignite { private final ClockWaiter clockWaiter; - private final LowWatermark lowWatermark; + private final LowWatermarkImpl lowWatermark; private final OutgoingSnapshotsManager outgoingSnapshotsManager; @@ -686,7 +686,7 @@ public class IgniteImpl implements Ignite { StorageUpdateConfiguration storageUpdateConfiguration = clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY); - lowWatermark = new LowWatermark(name, gcConfig.lowWatermark(), clock, txManager, vaultMgr, failureProcessor); + lowWatermark = new LowWatermarkImpl(name, gcConfig.lowWatermark(), clock, txManager, vaultMgr, failureProcessor); distributedTblMgr = new TableManager( name, @@ -1449,4 +1449,10 @@ public class IgniteImpl implements Ignite { public RemotelyTriggeredResourceRegistry resourcesRegistry() { return resourcesRegistry; } + + /** Returns low watermark */ + @TestOnly + public LowWatermarkImpl lowWatermark() { + return lowWatermark; + } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index a2a8b81c38..aed0d5ce4c 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -168,7 +168,7 @@ import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.TableRaftService; import org.apache.ignite.internal.table.TableTestUtils; import org.apache.ignite.internal.table.TableViewInternal; -import org.apache.ignite.internal.table.distributed.LowWatermark; +import org.apache.ignite.internal.table.distributed.LowWatermarkImpl; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; @@ -955,7 +955,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { private final NetworkAddress networkAddress; - private final LowWatermark lowWatermark; + private final LowWatermarkImpl lowWatermark; /** The future have to be complete after the node start and all Meta storage watches are deployd. */ private CompletableFuture<Void> deployWatchesFut; @@ -1172,7 +1172,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { StorageUpdateConfiguration storageUpdateConfiguration = clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY); HybridClockImpl clock = new HybridClockImpl(); - lowWatermark = new LowWatermark(name, gcConfig.lowWatermark(), clock, txManager, vaultManager, failureProcessor); + lowWatermark = new LowWatermarkImpl(name, gcConfig.lowWatermark(), clock, txManager, vaultManager, failureProcessor); tableManager = new TableManager( name, diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java index 8e5cc3cf20..6b77076d8f 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java @@ -17,264 +17,19 @@ package org.apache.ignite.internal.table.distributed; -import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR; -import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; -import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.ignite.internal.failure.FailureContext; -import org.apache.ignite.internal.failure.FailureProcessor; -import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.lang.ByteArray; -import org.apache.ignite.internal.lang.NodeStoppingException; -import org.apache.ignite.internal.logger.IgniteLogger; -import org.apache.ignite.internal.logger.Loggers; -import org.apache.ignite.internal.manager.IgniteComponent; -import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; -import org.apache.ignite.internal.thread.NamedThreadFactory; -import org.apache.ignite.internal.tx.TxManager; -import org.apache.ignite.internal.util.ByteUtils; -import org.apache.ignite.internal.util.IgniteSpinBusyLock; -import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.internal.vault.VaultEntry; -import org.apache.ignite.internal.vault.VaultManager; import org.jetbrains.annotations.Nullable; /** - * Class to manage the low watermark. - * - * <p>Low watermark is the node's local time, which ensures that read-only transactions have completed by this time, and new read-only - * transactions will only be created after this time, and we can safely delete obsolete/garbage data such as: obsolete versions of table - * rows, remote indexes, remote tables, etc. - * - * @see <a href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol">IEP-91</a> + * An interface for tracking Low watermark. */ -public class LowWatermark implements IgniteComponent { - private static final IgniteLogger LOG = Loggers.forClass(LowWatermark.class); - - static final ByteArray LOW_WATERMARK_VAULT_KEY = new ByteArray("low-watermark"); - - private final LowWatermarkConfiguration lowWatermarkConfig; - - private final HybridClock clock; - - private final TxManager txManager; - - private final VaultManager vaultManager; - - private final List<LowWatermarkChangedListener> updateListeners = new CopyOnWriteArrayList<>(); - - private final ScheduledExecutorService scheduledThreadPool; - - private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); - - private final AtomicBoolean closeGuard = new AtomicBoolean(); - - private volatile @Nullable HybridTimestamp lowWatermark; - - private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture = new AtomicReference<>(); - - private final FailureProcessor failureProcessor; - - /** - * Constructor. - * - * @param nodeName Node name. - * @param lowWatermarkConfig Low watermark configuration. - * @param clock A hybrid logical clock. - * @param txManager Transaction manager. - * @param vaultManager Vault manager. - * @param failureProcessor Failure processor tha is used to handle critical errors. - */ - public LowWatermark( - String nodeName, - LowWatermarkConfiguration lowWatermarkConfig, - HybridClock clock, - TxManager txManager, - VaultManager vaultManager, - FailureProcessor failureProcessor - ) { - this.lowWatermarkConfig = lowWatermarkConfig; - this.clock = clock; - this.txManager = txManager; - this.vaultManager = vaultManager; - this.failureProcessor = failureProcessor; - - scheduledThreadPool = Executors.newSingleThreadScheduledExecutor( - NamedThreadFactory.create(nodeName, "low-watermark-updater", LOG) - ); - } - - /** - * Starts the watermark manager. - */ - @Override - public CompletableFuture<Void> start() { - inBusyLock(busyLock, () -> { - lowWatermark = readLowWatermarkFromVault(); - }); - - return nullCompletedFuture(); - } - - /** - * Schedule watermark updates. - */ - public void scheduleUpdates() { - inBusyLock(busyLock, () -> { - HybridTimestamp lowWatermarkCandidate = lowWatermark; - - if (lowWatermarkCandidate == null) { - LOG.info("Previous value of the low watermark was not found, will schedule to update it"); - - scheduleUpdateLowWatermarkBusy(); - - return; - } - - LOG.info("Low watermark has been scheduled to be updated: {}", lowWatermarkCandidate); - - txManager.updateLowWatermark(lowWatermarkCandidate) - .thenComposeAsync(unused -> inBusyLock(busyLock, () -> notifyListeners(lowWatermarkCandidate)), scheduledThreadPool) - .whenComplete((unused, throwable) -> { - if (throwable == null) { - inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy); - } else if (!(throwable instanceof NodeStoppingException)) { - LOG.error("Error during the Watermark manager start", throwable); - - failureProcessor.process(new FailureContext(CRITICAL_ERROR, throwable)); - - inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy); - } - }); - }); - } - - private @Nullable HybridTimestamp readLowWatermarkFromVault() { - VaultEntry vaultEntry = vaultManager.get(LOW_WATERMARK_VAULT_KEY); - - return vaultEntry == null ? null : ByteUtils.fromBytes(vaultEntry.value()); - } - - @Override - public void stop() { - if (!closeGuard.compareAndSet(false, true)) { - return; - } - - busyLock.block(); - - ScheduledFuture<?> lastScheduledTaskFuture = this.lastScheduledTaskFuture.get(); - - if (lastScheduledTaskFuture != null) { - lastScheduledTaskFuture.cancel(true); - } - - IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, TimeUnit.SECONDS); - } - - /** - * Returns the current low watermark, {@code null} means no low watermark has been assigned yet. - */ - public @Nullable HybridTimestamp getLowWatermark() { - return lowWatermark; - } - - void updateLowWatermark() { - inBusyLock(busyLock, () -> { - HybridTimestamp lowWatermarkCandidate = createNewLowWatermarkCandidate(); - - // Wait until all the read-only transactions are finished before the new candidate, since no new RO transactions could be - // created, then we can safely promote the candidate as a new low watermark, store it in vault, and we can safely start cleaning - // up the stale/junk data in the tables. - txManager.updateLowWatermark(lowWatermarkCandidate) - .thenComposeAsync(unused -> inBusyLock(busyLock, () -> { - vaultManager.put(LOW_WATERMARK_VAULT_KEY, ByteUtils.toBytes(lowWatermarkCandidate)); - - lowWatermark = lowWatermarkCandidate; - - return notifyListeners(lowWatermarkCandidate); - }), scheduledThreadPool) - .whenComplete((unused, throwable) -> { - if (throwable != null) { - if (!(throwable instanceof NodeStoppingException)) { - LOG.error("Failed to update low watermark, will schedule again: {}", throwable, lowWatermarkCandidate); - - inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy); - } - } else { - LOG.info("Successful low watermark update: {}", lowWatermarkCandidate); - - scheduleUpdateLowWatermarkBusy(); - } - }); - }); - } - - public void addUpdateListener(LowWatermarkChangedListener listener) { - updateListeners.add(listener); - } - - public void removeUpdateListener(LowWatermarkChangedListener listener) { - updateListeners.remove(listener); - } - - private CompletableFuture<Void> notifyListeners(HybridTimestamp lowWatermark) { - if (updateListeners.isEmpty()) { - return nullCompletedFuture(); - } - - ArrayList<CompletableFuture<?>> res = new ArrayList<>(); - for (LowWatermarkChangedListener updateListener : updateListeners) { - res.add(updateListener.onLwmChanged(lowWatermark)); - } - - return CompletableFuture.allOf(res.toArray(CompletableFuture[]::new)); - } - - private void scheduleUpdateLowWatermarkBusy() { - ScheduledFuture<?> previousScheduledFuture = this.lastScheduledTaskFuture.get(); - - assert previousScheduledFuture == null || previousScheduledFuture.isDone() : "previous scheduled task has not finished"; - - ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule( - this::updateLowWatermark, - lowWatermarkConfig.updateFrequency().value(), - TimeUnit.MILLISECONDS - ); - - boolean casResult = lastScheduledTaskFuture.compareAndSet(previousScheduledFuture, newScheduledFuture); - - assert casResult : "only one scheduled task is expected"; - } - - HybridTimestamp createNewLowWatermarkCandidate() { - HybridTimestamp now = clock.now(); - - HybridTimestamp lowWatermarkCandidate = now.addPhysicalTime( - -lowWatermarkConfig.dataAvailabilityTime().value() - getMaxClockSkew() - ); - - HybridTimestamp lowWatermark = this.lowWatermark; - - assert lowWatermark == null || lowWatermarkCandidate.compareTo(lowWatermark) > 0 : - "lowWatermark=" + lowWatermark + ", lowWatermarkCandidate=" + lowWatermarkCandidate; +public interface LowWatermark { + /** Returns the current low watermark, {@code null} means no low watermark has been assigned yet. */ + @Nullable HybridTimestamp getLowWatermark(); - return lowWatermarkCandidate; - } + /** Subscribes on watermark changes. */ + void addUpdateListener(LowWatermarkChangedListener listener); - private long getMaxClockSkew() { - // TODO: IGNITE-19287 Add Implementation - return HybridTimestamp.CLOCK_SKEW; - } + /** Unsubscribes on watermark changes. */ + void removeUpdateListener(LowWatermarkChangedListener listener); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java index 2a738a7c46..72e395bbaf 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java @@ -21,7 +21,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.hlc.HybridTimestamp; /** - * LWM event listener interface. + * Low watermark event listener interface. * * @see LowWatermark */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java similarity index 98% copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java index 8e5cc3cf20..177ee9d652 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java @@ -59,8 +59,8 @@ import org.jetbrains.annotations.Nullable; * * @see <a href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol">IEP-91</a> */ -public class LowWatermark implements IgniteComponent { - private static final IgniteLogger LOG = Loggers.forClass(LowWatermark.class); +public class LowWatermarkImpl implements IgniteComponent, LowWatermark { + private static final IgniteLogger LOG = Loggers.forClass(LowWatermarkImpl.class); static final ByteArray LOW_WATERMARK_VAULT_KEY = new ByteArray("low-watermark"); @@ -96,7 +96,7 @@ public class LowWatermark implements IgniteComponent { * @param vaultManager Vault manager. * @param failureProcessor Failure processor tha is used to handle critical errors. */ - public LowWatermark( + public LowWatermarkImpl( String nodeName, LowWatermarkConfiguration lowWatermarkConfig, HybridClock clock, @@ -183,9 +183,7 @@ public class LowWatermark implements IgniteComponent { IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, TimeUnit.SECONDS); } - /** - * Returns the current low watermark, {@code null} means no low watermark has been assigned yet. - */ + @Override public @Nullable HybridTimestamp getLowWatermark() { return lowWatermark; } @@ -221,10 +219,12 @@ public class LowWatermark implements IgniteComponent { }); } + @Override public void addUpdateListener(LowWatermarkChangedListener listener) { updateListeners.add(listener); } + @Override public void removeUpdateListener(LowWatermarkChangedListener listener) { updateListeners.remove(listener); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java index c35e203fdb..a21ab858b7 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.table.distributed; -import static org.apache.ignite.internal.table.distributed.LowWatermark.LOW_WATERMARK_VAULT_KEY; +import static org.apache.ignite.internal.table.distributed.LowWatermarkImpl.LOW_WATERMARK_VAULT_KEY; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.hamcrest.MatcherAssert.assertThat; @@ -58,7 +58,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InOrder; /** - * For {@link LowWatermark} testing. + * For {@link LowWatermarkImpl} testing. */ @ExtendWith(ConfigurationExtension.class) public class LowWatermarkTest extends BaseIgniteAbstractTest { @@ -73,14 +73,14 @@ public class LowWatermarkTest extends BaseIgniteAbstractTest { private LowWatermarkChangedListener listener; - private LowWatermark lowWatermark; + private LowWatermarkImpl lowWatermark; @BeforeEach void setUp() { listener = mock(LowWatermarkChangedListener.class); when(listener.onLwmChanged(any(HybridTimestamp.class))).thenReturn(nullCompletedFuture()); - lowWatermark = new LowWatermark("test", lowWatermarkConfig, clock, txManager, vaultManager, mock(FailureProcessor.class)); + lowWatermark = new LowWatermarkImpl("test", lowWatermarkConfig, clock, txManager, vaultManager, mock(FailureProcessor.class)); lowWatermark.addUpdateListener(listener); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index 7b041ea1a6..90eb159423 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -118,6 +118,7 @@ import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorage import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration; import org.apache.ignite.internal.table.TableTestUtils; import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.table.TestLowWatermark; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; import org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService; import org.apache.ignite.internal.testframework.IgniteAbstractTest; @@ -129,7 +130,6 @@ import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; import org.apache.ignite.internal.util.CursorUtils; import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.TopologyService; @@ -243,8 +243,11 @@ public class TableManagerTest extends IgniteAbstractTest { private ExecutorService partitionOperationsExecutor; + private TestLowWatermark lowWatermark; + @BeforeEach void before() throws NodeStoppingException { + lowWatermark = new TestLowWatermark(); catalogMetastore = StandaloneMetaStorageManager.create(new SimpleInMemoryKeyValueStorage(NODE_NAME)); catalogManager = CatalogTestUtils.createTestCatalogManager(NODE_NAME, clock, catalogMetastore); @@ -744,8 +747,6 @@ public class TableManagerTest extends IgniteAbstractTest { */ private TableManager createTableManager(CompletableFuture<TableManager> tblManagerFut, Consumer<MvTableStorage> tableStorageDecorator, Consumer<TxStateTableStorage> txStateTableStorageDecorator) { - VaultManager vaultManager = mock(VaultManager.class); - TableManager tableManager = new TableManager( NODE_NAME, revisionUpdater, @@ -777,7 +778,7 @@ public class TableManagerTest extends IgniteAbstractTest { () -> mock(IgniteSql.class), new RemotelyTriggeredResourceRegistry(), mock(ScheduledExecutorService.class), - new LowWatermark(NODE_NAME, gcConfig.lowWatermark(), clock, tm, vaultManager, mock(FailureProcessor.class)) + lowWatermark ) { @Override diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java new file mode 100644 index 0000000000..320c776ffe --- /dev/null +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.table.distributed.LowWatermark; +import org.apache.ignite.internal.table.distributed.LowWatermarkChangedListener; +import org.jetbrains.annotations.Nullable; + +/** + * Low watermark dummy implementation, which requires explicit {@link #updateAndNotify(HybridTimestamp)} method call to notify listeners. + * This implementation has no persistent state and notifies listeners instantly in same thread. + */ +public class TestLowWatermark implements LowWatermark { + private final List<LowWatermarkChangedListener> listeners = new CopyOnWriteArrayList<>(); + private volatile HybridTimestamp ts; + + @Override + public @Nullable HybridTimestamp getLowWatermark() { + return ts; + } + + @Override + public void addUpdateListener(LowWatermarkChangedListener listener) { + this.listeners.add(listener); + } + + @Override + public void removeUpdateListener(LowWatermarkChangedListener listener) { + this.listeners.remove(listener); + } + + /** + * Update low watermark and notify listeners. + * + * @param newTs New timestamp. + * @return Listener notification future. + */ + public CompletableFuture<Void> updateAndNotify(HybridTimestamp newTs) { + assert ts == null || ts.longValue() < newTs.longValue(); + + this.ts = newTs; + + return CompletableFuture.allOf(listeners.stream().map(l -> l.onLwmChanged(newTs)).toArray(CompletableFuture[]::new)); + } + + /** Set low watermark without listeners notification. */ + public void update(HybridTimestamp newTs) { + this.ts = newTs; + } +}
