This is an automated email from the ASF dual-hosted git repository. apolovtsev pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new e26fc8016d IGNITE-20290 Command reordering wrt safe time in MetaStorage (#2506) e26fc8016d is described below commit e26fc8016ddd4ebb99596909157f2f0cdb925ebf Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Tue Aug 29 15:59:32 2023 +0400 IGNITE-20290 Command reordering wrt safe time in MetaStorage (#2506) * No mechanism exists to forbid reordering of idle safe time propagation commands wrt other MetaStorage write commands, it's added in this commit * Idle safe time commands are triggered too early, this is fixed --- .../impl/ItMetaStorageManagerImplTest.java | 52 +++++++++++++++++++++- .../ItMetaStorageMultipleNodesAbstractTest.java | 2 +- ...MetaStorageSafeTimePropagationAbstractTest.java | 13 ++++-- .../metastorage/command/SyncTimeCommand.java | 14 +----- .../metastorage/impl/MetaStorageManagerImpl.java | 35 ++++++++++++--- .../metastorage/impl/MetaStorageServiceImpl.java | 2 +- .../metastorage/server/KeyValueStorage.java | 7 +++ .../server/OnRevisionAppliedCallback.java | 15 +++++-- .../metastorage/server/WatchProcessor.java | 21 ++++++++- .../server/persistence/RocksDbKeyValueStorage.java | 13 ++++++ .../server/raft/MetaStorageWriteHandler.java | 30 ++++++++----- .../server/BasicOperationsKeyValueStorageTest.java | 28 ++++++++++-- .../metastorage/server/WatchProcessorTest.java | 10 ++--- .../server/SimpleInMemoryKeyValueStorage.java | 11 +++++ 14 files changed, 202 insertions(+), 51 deletions(-) diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java index d2dbc13721..091ed4c1fe 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java @@ -26,6 +26,7 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn; import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -40,6 +41,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; @@ -47,6 +50,7 @@ import org.apache.ignite.internal.configuration.testframework.ConfigurationExten import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.RevisionUpdateListener; @@ -57,6 +61,7 @@ import org.apache.ignite.internal.metastorage.dsl.Conditions; import org.apache.ignite.internal.metastorage.dsl.Operations; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; @@ -95,7 +100,7 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { void setUp( TestInfo testInfo, @InjectConfiguration RaftConfiguration raftConfiguration, - @InjectConfiguration MetaStorageConfiguration metaStorageConfiguration + @InjectConfiguration("mock.idleSyncTimeInterval = 100") MetaStorageConfiguration metaStorageConfiguration ) { var addr = new NetworkAddress("localhost", 10_000); @@ -262,4 +267,49 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { assertThat(revisionCapture.getAllValues(), is(List.of(revision + 1))); } + + /** + * Tests that idle safe time propagation does not advance safe time while watches of a normal command are being executed. + */ + @Test + void testIdleSafeTimePropagationAndNormalSafeTimePropagationInteraction(TestInfo testInfo) throws Exception { + var key = new ByteArray("foo"); + byte[] value = "bar".getBytes(UTF_8); + + AtomicBoolean watchCompleted = new AtomicBoolean(false); + CompletableFuture<HybridTimestamp> watchEventTsFuture = new CompletableFuture<>(); + + metaStorageManager.registerExactWatch(key, new WatchListener() { + @Override + public CompletableFuture<Void> onUpdate(WatchEvent event) { + watchEventTsFuture.complete(event.timestamp()); + + // The future will set the flag and complete after 300ms to allow idle safe time mechanism (which ticks each 100ms) + // to advance SafeTime (if there is still a bug for which this test is written). + return waitFor(300, TimeUnit.MILLISECONDS) + .whenComplete((res, ex) -> watchCompleted.set(true)); + } + + @Override + public void onError(Throwable e) { + } + }); + + metaStorageManager.put(key, value); + + ClusterTime clusterTime = metaStorageManager.clusterTime(); + + assertThat(watchEventTsFuture, willSucceedIn(5, TimeUnit.SECONDS)); + + HybridTimestamp watchEventTs = watchEventTsFuture.join(); + assertThat(clusterTime.waitFor(watchEventTs), willCompleteSuccessfully()); + + assertThat("Safe time is advanced too early", watchCompleted.get(), is(true)); + } + + private static CompletableFuture<Void> waitFor(int timeout, TimeUnit unit) { + return new CompletableFuture<Void>() + .orTimeout(timeout, unit) + .exceptionally(ex -> null); + } } diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java index 1e039b2bf8..082c3b27f1 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java @@ -234,7 +234,7 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr private final List<Node> nodes = new ArrayList<>(); - private Node startNode(TestInfo testInfo) throws NodeStoppingException { + private Node startNode(TestInfo testInfo) { var nodeFinder = new StaticNodeFinder(List.of(new NetworkAddress("localhost", 10_000))); ClusterService clusterService = ClusterServiceTestUtils.clusterService(testInfo, 10_000 + nodes.size(), nodeFinder); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java index 47b8ad923a..1baafe9b59 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.metastorage.WatchEvent; import org.apache.ignite.internal.metastorage.WatchListener; import org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorageTest; +import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback; import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.junit.jupiter.api.AfterEach; @@ -45,10 +46,16 @@ public abstract class ItMetaStorageSafeTimePropagationAbstractTest extends Abstr @BeforeEach public void startWatches() { - storage.startWatches(1, (e, t) -> { - time.updateSafeTime(t); + storage.startWatches(1, new OnRevisionAppliedCallback() { + @Override + public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) { + time.updateSafeTime(newSafeTime); + } - return CompletableFuture.completedFuture(null); + @Override + public CompletableFuture<Void> onRevisionApplied(WatchEvent e) { + return CompletableFuture.completedFuture(null); + } }); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java index 4032f0f8f2..fadd2deee6 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java @@ -17,25 +17,13 @@ package org.apache.ignite.internal.metastorage.command; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; - -import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.network.annotations.Transferable; /** * Command that initiates idle safe time synchronization. */ @Transferable(MetastorageCommandsMessageGroup.SYNC_TIME) -public interface SyncTimeCommand extends WriteCommand { - /** New safe time. */ - long safeTimeLong(); - +public interface SyncTimeCommand extends MetaStorageWriteCommand { /** Term of the initiator. */ long initiatorTerm(); - - /** New safe time. */ - default HybridTimestamp safeTime() { - return hybridTimestamp(safeTimeLong()); - } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index 8d810e6044..b3c41997cf 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.metastorage.dsl.Operation; import org.apache.ignite.internal.metastorage.dsl.StatementResult; import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; +import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback; import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId; import org.apache.ignite.internal.metastorage.server.time.ClusterTime; @@ -449,7 +450,17 @@ public class MetaStorageManagerImpl implements MetaStorageManager { return recoveryFinishedFuture .thenAccept(revision -> inBusyLock(busyLock, () -> { // Meta Storage contract states that all updated entries under a particular revision must be stored in the Vault. - storage.startWatches(revision + 1, this::onRevisionApplied); + storage.startWatches(revision + 1, new OnRevisionAppliedCallback() { + @Override + public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) { + MetaStorageManagerImpl.this.onSafeTimeAdvanced(newSafeTime); + } + + @Override + public CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent) { + return MetaStorageManagerImpl.this.onRevisionApplied(watchEvent); + } + }); })) .whenComplete((v, e) -> { if (e == null) { @@ -817,20 +828,32 @@ public class MetaStorageManagerImpl implements MetaStorageManager { } } + private void onSafeTimeAdvanced(HybridTimestamp time) { + assert time != null; + + if (!busyLock.enterBusy()) { + LOG.info("Skipping advancing Safe Time because the node is stopping"); + + return; + } + + try { + clusterTime.updateSafeTime(time); + } finally { + busyLock.leaveBusy(); + } + } + /** * Saves processed Meta Storage revision and corresponding entries to the Vault. */ - private CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent, HybridTimestamp time) { - assert time != null; - + private CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent) { if (!busyLock.enterBusy()) { LOG.info("Skipping applying MetaStorage revision because the node is stopping"); return completedFuture(null); } - clusterTime.updateSafeTime(time); - try { CompletableFuture<Void> saveToVaultFuture = vaultMgr.put(APPLIED_REV_KEY, longToBytes(watchEvent.revision())); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java index 4f32decdff..5637cb274a 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java @@ -279,7 +279,7 @@ public class MetaStorageServiceImpl implements MetaStorageService { */ public CompletableFuture<Void> syncTime(HybridTimestamp safeTime, long term) { SyncTimeCommand syncTimeCommand = context.commandsFactory().syncTimeCommand() - .safeTimeLong(safeTime.longValue()) + .initiatorTimeLong(safeTime.longValue()) .initiatorTerm(term) .build(); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java index 5b29bcab25..de625f477e 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java @@ -312,4 +312,11 @@ public interface KeyValueStorage extends ManuallyCloseable { /** Explicitly notifies revision update listeners. */ CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long newRevision); + + /** + * Advances MetaStorage Safe Time to a new value without creating a new revision. + * + * @param newSafeTime New Safe Time value. + */ + void advanceSafeTime(HybridTimestamp newSafeTime); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java index 93a2fc1c37..b514048950 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java @@ -22,16 +22,23 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.metastorage.WatchEvent; /** - * Interface for declaring callbacks that get called after all Meta Storage watches have been notified of a particular revision. + * Interface for declaring callbacks that get called after all Meta Storage watches have been notified of a particular revision + * and/or when SafeTime gets advanced. */ -@FunctionalInterface public interface OnRevisionAppliedCallback { + /** + * Invoked whenever MetaStorage Safe Time gets advanced (either because a write command is applied, + * together with all watches that process it, or because idle safe time mechanism advanced Safe Time). + * + * @param newSafeTime New safe time value. + */ + void onSafeTimeAdvanced(HybridTimestamp newSafeTime); + /** * Notifies of completion of processing of Meta Storage watches for a particular revision. * * @param watchEvent Event with modified Meta Storage entries processed by at least one Watch. - * @param newSafeTime Safe time of the applied revision. * @return Future that represents the state of the execution of the callback. */ - CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent, HybridTimestamp newSafeTime); + CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java index f334dff57b..5a58278f48 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java @@ -122,7 +122,7 @@ public class WatchProcessor implements ManuallyCloseable { } /** - * Notifies registered watch about an update event. + * Notifies registered watches about an update event. */ public void notifyWatches(List<Entry> updatedEntries, HybridTimestamp time) { assert time != null; @@ -244,7 +244,9 @@ public class WatchProcessor implements ManuallyCloseable { var event = new WatchEvent(acceptedEntries, revision, time); - return revisionCallback.onRevisionApplied(event, time) + revisionCallback.onSafeTimeAdvanced(time); + + return revisionCallback.onRevisionApplied(event) .whenComplete((ignored, e) -> { if (e != null) { LOG.error("Error occurred when notifying watches", e); @@ -257,6 +259,21 @@ public class WatchProcessor implements ManuallyCloseable { } } + /** + * Advances safe time without notifying watches (as there is no new revision). + */ + public void advanceSafeTime(HybridTimestamp time) { + assert time != null; + + notificationFuture = notificationFuture + .thenRunAsync(() -> revisionCallback.onSafeTimeAdvanced(time), watchExecutor) + .whenComplete((ignored, e) -> { + if (e != null) { + LOG.error("Error occurred when notifying safe time advanced callback", e); + } + }); + } + @Override public void close() { notificationFuture.cancel(true); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java index abb2d85848..d691d73c14 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java @@ -1664,4 +1664,17 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long newRevision) { return watchProcessor.notifyUpdateRevisionListeners(newRevision); } + + @Override + public void advanceSafeTime(HybridTimestamp newSafeTime) { + rwLock.writeLock().lock(); + + try { + if (recoveryStatus.get() == RecoveryStatus.DONE) { + watchProcessor.advanceSafeTime(newSafeTime); + } + } finally { + rwLock.writeLock().unlock(); + } + } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java index 8f836a1963..a8f98b58ba 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java @@ -83,18 +83,18 @@ public class MetaStorageWriteHandler { if (command instanceof MetaStorageWriteCommand) { var cmdWithTime = (MetaStorageWriteCommand) command; - HybridTimestamp safeTime = cmdWithTime.safeTime(); + if (command instanceof SyncTimeCommand) { + var syncTimeCommand = (SyncTimeCommand) command; - handleWriteWithTime(clo, cmdWithTime, safeTime); - } else if (command instanceof SyncTimeCommand) { - var syncTimeCommand = (SyncTimeCommand) command; + // Ignore the command if it has been sent by a stale leader. + if (clo.term() != syncTimeCommand.initiatorTerm()) { + clo.result(null); - // Ignore the command if it has been sent by a stale leader. - if (clo.term() == syncTimeCommand.initiatorTerm()) { - clusterTime.updateSafeTime(syncTimeCommand.safeTime()); + return; + } } - clo.result(null); + handleWriteWithTime(clo, cmdWithTime); } else { assert false : "Command was not found [cmd=" + command + ']'; } @@ -118,9 +118,10 @@ public class MetaStorageWriteHandler { * * @param clo Command closure. * @param command Command. - * @param opTime Command's time. */ - private void handleWriteWithTime(CommandClosure<WriteCommand> clo, MetaStorageWriteCommand command, HybridTimestamp opTime) { + private void handleWriteWithTime(CommandClosure<WriteCommand> clo, MetaStorageWriteCommand command) { + HybridTimestamp opTime = command.safeTime(); + if (command instanceof PutCommand) { PutCommand putCmd = (PutCommand) command; @@ -177,6 +178,10 @@ public class MetaStorageWriteHandler { MultiInvokeCommand cmd = (MultiInvokeCommand) command; clo.result(storage.invoke(toIf(cmd.iif()), opTime)); + } else if (command instanceof SyncTimeCommand) { + storage.advanceSafeTime(command.safeTime()); + + clo.result(null); } } @@ -286,7 +291,10 @@ public class MetaStorageWriteHandler { } } - void beforeApply(Command command) { + // TODO: IGNITE-20290 - This is insufficient, we must do this in single thread before saving the command to the RAFT log. + // Synchronized to make sure no reodering happens as RaftGroupListener#beforeApply() might be invoked in different threads + // for different commands. + synchronized void beforeApply(Command command) { if (command instanceof MetaStorageWriteCommand) { // Initiator sends us a timestamp to adjust to. // Alter command by setting safe time based on the adjusted clock. diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java index 198ade0395..5486b2e81a 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java @@ -1969,7 +1969,17 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu long appliedRevision = storage.revision(); - storage.startWatches(1, (event, ts) -> completedFuture(null)); + storage.startWatches(1, new OnRevisionAppliedCallback() { + @Override + public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) { + // No-op. + } + + @Override + public CompletableFuture<Void> onRevisionApplied(WatchEvent event) { + return completedFuture(null); + } + }); CompletableFuture<byte[]> fut = new CompletableFuture<>(); @@ -2308,7 +2318,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu OnRevisionAppliedCallback mockCallback = mock(OnRevisionAppliedCallback.class); - when(mockCallback.onRevisionApplied(any(), any())).thenReturn(completedFuture(null)); + when(mockCallback.onRevisionApplied(any())).thenReturn(completedFuture(null)); storage.startWatches(1, mockCallback); @@ -2320,7 +2330,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu verify(mockListener3, timeout(10_000)).onUpdate(any()); - verify(mockCallback, never()).onRevisionApplied(any(), any()); + verify(mockCallback, never()).onRevisionApplied(any()); } @Test @@ -2505,7 +2515,17 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu } }); - storage.startWatches(1, (event, ts) -> completedFuture(null)); + storage.startWatches(1, new OnRevisionAppliedCallback() { + @Override + public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) { + // No-op. + } + + @Override + public CompletableFuture<Void> onRevisionApplied(WatchEvent event) { + return completedFuture(null); + } + }); return resultFuture; } diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java index fcbf626497..745e0f2143 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java @@ -56,7 +56,7 @@ public class WatchProcessorTest extends BaseIgniteAbstractTest { @BeforeEach void setUp() { - when(revisionCallback.onRevisionApplied(any(), any())).thenReturn(completedFuture(null)); + when(revisionCallback.onRevisionApplied(any())).thenReturn(completedFuture(null)); watchProcessor.setRevisionCallback(revisionCallback); } @@ -90,7 +90,7 @@ public class WatchProcessorTest extends BaseIgniteAbstractTest { var watchEventCaptor = ArgumentCaptor.forClass(WatchEvent.class); - verify(revisionCallback, timeout(1_000)).onRevisionApplied(watchEventCaptor.capture(), any()); + verify(revisionCallback, timeout(1_000)).onRevisionApplied(watchEventCaptor.capture()); WatchEvent event = watchEventCaptor.getValue(); @@ -120,7 +120,7 @@ public class WatchProcessorTest extends BaseIgniteAbstractTest { verify(listener1, timeout(1_000)).onUpdate(event); - verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, ts); + verify(revisionCallback, timeout(1_000)).onRevisionApplied(event); ts = new HybridTimestamp(2, 3); @@ -130,7 +130,7 @@ public class WatchProcessorTest extends BaseIgniteAbstractTest { verify(listener2, timeout(1_000)).onUpdate(event); - verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, ts); + verify(revisionCallback, timeout(1_000)).onRevisionApplied(event); } /** @@ -156,7 +156,7 @@ public class WatchProcessorTest extends BaseIgniteAbstractTest { verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new EntryEvent(oldEntry(entry2), entry2))); verify(listener2, timeout(1_000)).onError(any(IllegalStateException.class)); - verify(revisionCallback, never()).onRevisionApplied(any(), any()); + verify(revisionCallback, never()).onRevisionApplied(any()); } /** diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java index 6760682702..36980dc87e 100644 --- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java +++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java @@ -888,4 +888,15 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long newRevision) { return watchProcessor.notifyUpdateRevisionListeners(newRevision); } + + @Override + public void advanceSafeTime(HybridTimestamp newSafeTime) { + synchronized (mux) { + if (!areWatchesEnabled) { + return; + } + + watchProcessor.advanceSafeTime(newSafeTime); + } + } }