This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e6a1fffdfaf IGNITE-28202 Advance last applied index for each
Metastorage command (#7763)
e6a1fffdfaf is described below
commit e6a1fffdfaf77983de18bd03333025f1e0e9ca77
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Mar 12 17:35:44 2026 +0400
IGNITE-28202 Advance last applied index for each Metastorage command (#7763)
---
.../impl/ItIdempotentCommandCacheTest.java | 175 ++++++++----
.../metastorage/impl/ItMetaStorageServiceTest.java | 28 +-
.../server/raft/MetaStorageWriteHandler.java | 22 ++
.../impl/IdempotentCommandCacheTest.java | 36 ++-
.../server/raft/MetaStorageListenerTest.java | 309 +++++++++++++++++++++
5 files changed, 507 insertions(+), 63 deletions(-)
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index 345a61ae89c..ce1e185ccda 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.metastorage.impl;
+import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
@@ -25,13 +26,16 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
import static
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager.configureCmgManagerToStartMetastorage;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.util.CompletableFutures.emptySetCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -39,6 +43,10 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -52,6 +60,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.stream.Stream;
+import org.apache.ignite.internal.close.ManuallyCloseable;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -115,6 +124,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
/**
@@ -122,7 +132,7 @@ import org.junit.jupiter.params.provider.MethodSource;
*/
@ExtendWith(ConfigurationExtension.class)
@ExtendWith(ExecutorServiceExtension.class)
-public class ItIdempotentCommandCacheTest extends IgniteAbstractTest {
+class ItIdempotentCommandCacheTest extends IgniteAbstractTest {
private static final MetaStorageCommandsFactory CMD_FACTORY = new
MetaStorageCommandsFactory();
private static final int NODES_COUNT = 2;
@@ -152,7 +162,9 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
private List<Node> nodes;
- private static class Node implements AutoCloseable {
+ private TestInfo testInfo;
+
+ private static class Node implements ManuallyCloseable {
private static final IgniteLogger log = Loggers.forClass(Node.class);
ClusterService clusterService;
@@ -270,11 +282,9 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
}
void start(CompletableFuture<Set<String>> metaStorageNodesFut) {
- if (metaStorageNodesFut != null) {
- when(cmgManager.metaStorageInfo()).thenReturn(
- metaStorageNodesFut.thenApply(nodes -> new
CmgMessagesFactory().metaStorageInfo().metaStorageNodes(nodes).build())
- );
- }
+ when(cmgManager.metaStorageInfo()).thenReturn(
+ metaStorageNodesFut.thenApply(nodes -> new
CmgMessagesFactory().metaStorageInfo().metaStorageNodes(nodes).build())
+ );
assertThat(
startAsync(new ComponentContext(),
@@ -329,23 +339,64 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
@BeforeEach
void setUp(TestInfo testInfo) {
- startCluster(testInfo);
+ this.testInfo = testInfo;
+
+ if
(testInfo.getTestMethod().orElseThrow().isAnnotationPresent(DisableIdleSafeTimePropagation.class))
{
+ // The test asked to disable idle safe time propagation, so set it
to a super-long period.
+ assertThat(
+
systemDistributedConfiguration.idleSafeTimeSyncIntervalMillis().update(DAYS.toMillis(1)),
+ willCompleteSuccessfully()
+ );
+ }
+
+ startCluster();
}
@AfterEach
void tearDown() throws Exception {
- closeAll(nodes.stream());
+ closeAllManually(nodes.stream());
}
- @Test
- public void testIdempotentInvoke() throws InterruptedException {
+ @ParameterizedTest
+ @EnumSource(Invoker.class)
+ void testIdempotentInvoke(Invoker invoker) {
+ Node leader = leader(raftClient());
+
+ boolean result = doRetriedInvoke(invoker, leader);
+
+ assertTrue(result);
+ assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(), TEST_VALUE));
+ }
+
+ private boolean doRetriedInvoke(Invoker invoker, Node leader) {
AtomicInteger writeActionReqCount = new AtomicInteger();
CompletableFuture<Void> retryBlockingFuture = new
CompletableFuture<>();
log.info("Test: blocking messages.");
- Node leader = leader(raftClient());
+ dropResponseToFirstInvoke(leader, writeActionReqCount,
retryBlockingFuture);
+
+ MetaStorageManager metaStorageManager = leader.metaStorageManager;
+
+ CompletableFuture<Boolean> fut = invoker.invokeOn(metaStorageManager);
+
+ await().until(() -> leader.checkValueInStorage(TEST_KEY.bytes(),
TEST_VALUE));
+
+ log.info("Test: value appeared in storage.");
+
+ assertTrue(retryBlockingFuture.complete(null));
+ await().until(() -> writeActionReqCount.get() == 2);
+
+ leader.stopDroppingMessages();
+
+ assertThat(fut, willCompleteSuccessfully());
+ log.info("Test: invoke complete.");
+
+ return fut.join();
+ }
+
+ private void dropResponseToFirstInvoke(Node leader, AtomicInteger
writeActionReqCount, CompletableFuture<Void> retryBlockingFuture) {
leader.dropMessages((n, msg) -> {
// Dropping the first response, this will cause timeout on first
response, and then retry.
if (msg instanceof ActionResponse && ((ActionResponse)
msg).result() != null && writeActionReqCount.get() == 1) {
@@ -358,11 +409,11 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
WriteActionRequest request = (WriteActionRequest) msg;
if (!(request.deserializedCommand() instanceof
SyncTimeCommand)) {
- writeActionReqCount.incrementAndGet();
- log.info("Test: WriteActionRequest intercepted, count=" +
writeActionReqCount.get());
+ int incrementedCount =
writeActionReqCount.incrementAndGet();
+ log.info("Test: WriteActionRequest intercepted, count=" +
incrementedCount);
// Second request: retry.
- if (writeActionReqCount.get() == 2) {
+ if (incrementedCount == 2) {
log.info("Test: retry blocked.");
retryBlockingFuture.orTimeout(10,
TimeUnit.SECONDS).join();
@@ -374,34 +425,30 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
return false;
});
+ }
- MetaStorageManager metaStorageManager = leader.metaStorageManager;
-
- CompletableFuture<Boolean> fut = metaStorageManager.invoke(
- notExists(TEST_KEY),
- put(TEST_KEY, TEST_VALUE),
- put(TEST_KEY, ANOTHER_VALUE)
- );
-
- assertTrue(waitForCondition(() ->
leader.checkValueInStorage(TEST_KEY.bytes(), TEST_VALUE), 10_000));
-
- log.info("Test: value appeared in storage.");
-
- assertTrue(retryBlockingFuture.complete(null));
-
- assertTrue(waitForCondition(() -> writeActionReqCount.get() == 2,
10_000));
+ /**
+ * This makes sure that a gap between KV storage last applied index and
last applied index from the point of view of JRaft does
+ * not happen (due to idempotent command retries not advancing the last
applied index in the KV storage).
+ * If it does, and a Raft snapshot is taken in exactly that moment, the
JRaft node will fail on recovery.
+ */
+ @ParameterizedTest
+ @EnumSource(Invoker.class)
+ // Disable safe time propagation to prevent safe time commands in the log,
so that they don't close the index
+ // gap we are trying to reproduce.
+ @DisableIdleSafeTimePropagation
+ void retriedInvokeDoesNotBreakIndexConsistency(Invoker invoker) throws
Exception {
+ Node leader = leader(raftClient());
- leader.stopDroppingMessages();
+ doRetriedInvoke(invoker, leader);
- assertThat(fut, willCompleteSuccessfully());
- log.info("Test: invoke complete.");
+ nodes.forEach(n -> assertThat(raftClient().snapshot(new
Peer(n.clusterService.nodeName()), true), willCompleteSuccessfully()));
- assertTrue(fut.join());
- assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(), TEST_VALUE));
+ restartCluster();
}
@Test
- public void testIdempotentInvokeAfterLeaderChange() {
+ void testIdempotentInvokeAfterLeaderChange() {
InvokeCommand invokeCommand = (InvokeCommand)
buildKeyNotExistsInvokeCommand(TEST_KEY, TEST_VALUE, ANOTHER_VALUE);
RaftGroupService raftClient = raftClient();
@@ -436,7 +483,7 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
@ParameterizedTest
@MethodSource("idempotentCommandProvider")
- public void testIdempotentCacheRestoreFromSnapshot(IdempotentCommand
idempotentCommand, TestInfo testInfo) throws Exception {
+ void testIdempotentCacheRestoreFromSnapshot(IdempotentCommand
idempotentCommand) throws Exception {
RaftGroupService raftClient = raftClient();
Node leader = leader(raftClient);
@@ -450,19 +497,13 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
} else {
assertEquals(YIELD_RESULT, ((StatementResult)
commandProcessingResult).getAsInt());
assertTrue(leader.checkValueInStorage(TEST_KEY_2.bytes(),
TEST_VALUE_2));
-
}
// Do the snapshot.
nodes.forEach(n -> raftClient().snapshot(new
Peer(n.clusterService.nodeName()), false));
// Restart nodes in order to trigger idempotent volatile cache
initialization from snapshot.
- for (Node node : nodes) {
- node.stop();
- }
-
- // Restart cluster.
- startCluster(testInfo);
+ restartCluster();
ClockService node0ClockService = nodes.get(0).clockService;
long timestampAfterRestartPhysicalLong =
node0ClockService.now().getPhysical();
@@ -510,12 +551,22 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
}
}
+ private void restartCluster() throws Exception {
+ for (Node node : nodes) {
+ node.stop();
+ }
+
+ startCluster();
+ }
+
private Node leader(RaftGroupService raftClient) {
CompletableFuture<LeaderWithTerm> refreshLeaderFut =
raftClient.refreshAndGetLeaderWithTerm();
assertThat(refreshLeaderFut, willCompleteSuccessfully());
- String currentLeader = refreshLeaderFut.join().leader().consistentId();
+ Peer leader = refreshLeaderFut.join().leader();
+ assertThat(leader, is(notNullValue()));
+ String currentLeader = leader.consistentId();
return nodes.stream().filter(n ->
n.clusterService.nodeName().equals(currentLeader)).findAny().orElseThrow();
}
@@ -582,7 +633,7 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
.build();
}
- private void startCluster(TestInfo testInfo) {
+ private void startCluster() {
nodes = new ArrayList<>();
for (int i = 0; i < NODES_COUNT; i++) {
@@ -607,4 +658,34 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
nodes.forEach(Node::deployWatches);
}
+
+ @Target(ElementType.METHOD)
+ @Retention(RetentionPolicy.RUNTIME)
+ private @interface DisableIdleSafeTimePropagation {
+ }
+
+ private enum Invoker {
+ INVOKE {
+ @Override
+ CompletableFuture<Boolean> invokeOn(MetaStorageManager
metaStorageManager) {
+ return metaStorageManager.invoke(
+ notExists(TEST_KEY),
+ put(TEST_KEY, TEST_VALUE),
+ put(TEST_KEY, ANOTHER_VALUE)
+ );
+ }
+ },
+ MULTI_INVOKE {
+ @Override
+ CompletableFuture<Boolean> invokeOn(MetaStorageManager
metaStorageManager) {
+ return metaStorageManager.invoke(iif(
+ notExists(TEST_KEY),
+ ops(put(TEST_KEY, TEST_VALUE)).yield(true),
+ ops(put(TEST_KEY, ANOTHER_VALUE)).yield(false)
+ )).thenApply(StatementResult::getAsBoolean);
+ }
+ };
+
+ abstract CompletableFuture<Boolean> invokeOn(MetaStorageManager
metaStorageManager);
+ }
}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index e3c0fa30afd..e8528128e56 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -47,7 +47,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -94,6 +97,7 @@ import org.apache.ignite.internal.metastorage.server.If;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.OrCondition;
import org.apache.ignite.internal.metastorage.server.RevisionCondition;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.ValueCondition;
import org.apache.ignite.internal.metastorage.server.ValueCondition.Type;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
@@ -129,6 +133,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/**
* Meta storage client tests.
@@ -226,7 +232,7 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
);
this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(),
new IgniteSpinBusyLock(), clock);
- this.mockStorage = mock(KeyValueStorage.class);
+ this.mockStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
}
void start(PeersAndLearners configuration) {
@@ -395,7 +401,7 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
public void testGetAll() {
Node node = prepareNodes(1).get(0);
-
when(node.mockStorage.getAll(anyList())).thenReturn(EXPECTED_SRV_RESULT_COLL);
+
doReturn(EXPECTED_SRV_RESULT_COLL).when(node.mockStorage).getAll(anyList());
startNodes();
@@ -410,7 +416,7 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
public void testGetAllWithUpperBoundRevision() {
Node node = prepareNodes(1).get(0);
- when(node.mockStorage.getAll(anyList(),
eq(10L))).thenReturn(EXPECTED_SRV_RESULT_COLL);
+
doReturn(EXPECTED_SRV_RESULT_COLL).when(node.mockStorage).getAll(anyList(),
eq(10L));
startNodes();
@@ -685,7 +691,19 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
var ifCaptor = ArgumentCaptor.forClass(If.class);
- when(node.mockStorage.invoke(any(), any(),
any())).thenReturn(ops().yield(true).result(), null, null);
+ doAnswer(new Answer() {
+ private boolean first = true;
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
{
+ // To update last applied index in the storage.
+ invocation.callRealMethod();
+
+ Object result = first ? ops().yield(true).result() : null;
+ first = false;
+ return result;
+ }
+ }).when(node.mockStorage).invoke(any(), any(), any());
startNodes();
@@ -736,8 +754,6 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
byte[] expVal = {2};
- when(node.mockStorage.invoke(any(), any(), any(), any(),
any())).thenReturn(true);
-
startNodes();
Condition condition = Conditions.notExists(expKey);
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index 735f8520bb4..74a9dbb1a74 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -73,6 +73,7 @@ import
org.apache.ignite.internal.metastorage.server.TombstoneCondition;
import org.apache.ignite.internal.metastorage.server.ValueCondition;
import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.IndexWithTerm;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.util.Cursor;
@@ -114,6 +115,18 @@ public class MetaStorageWriteHandler {
* Processes a given {@link WriteCommand}.
*/
void handleWriteCommand(CommandClosure<WriteCommand> clo) {
+ handleWriteCommandInternal(clo);
+
+ assert lastAppliedIndex() == clo.index() : String.format(
+ "Last applied index after command application is not equal to
the command index "
+ + "[lastAppliedIndex=%d, commandIndex=%d, command=%s]",
+ lastAppliedIndex(),
+ clo.index(),
+ clo.command().toStringForLightLogging()
+ );
+ }
+
+ private void handleWriteCommandInternal(CommandClosure<WriteCommand> clo) {
WriteCommand command = clo.command();
CommandClosure<WriteCommand> resultClosure;
@@ -144,6 +157,8 @@ public class MetaStorageWriteHandler {
clo.result(commandResult);
}
+ storage.setIndexAndTerm(clo.index(), clo.term());
+
return;
} else {
resultClosure = new ResultCachingClosure(clo);
@@ -380,6 +395,11 @@ public class MetaStorageWriteHandler {
}
}
+ private long lastAppliedIndex() {
+ IndexWithTerm indexWithTerm = storage.getIndexWithTerm();
+ return indexWithTerm == null ? -1 : indexWithTerm.index();
+ }
+
Command beforeApply(Command command) {
if (command instanceof MetaStorageWriteCommand) {
// Initiator sends us a timestamp to adjust to.
@@ -446,6 +466,8 @@ public class MetaStorageWriteHandler {
List<CommandId> evictedCommandIds =
evictCommandsFromCache(evictionTimestamp);
if (evictedCommandIds.isEmpty()) {
+ storage.setIndexAndTerm(context.index, context.term);
+
return;
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
index 9b9d7eb1e46..644bff4dfe4 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
@@ -81,6 +81,8 @@ public class IdempotentCommandCacheTest extends
BaseIgniteAbstractTest {
public void setUp() {
storage = new SimpleInMemoryKeyValueStorage(NODE_NAME);
metaStorageListener = new MetaStorageListener(storage, clock, new
ClusterTimeImpl(NODE_NAME, new IgniteSpinBusyLock(), clock));
+
+ storage.setIndexAndTerm(1, 1);
}
@Test
@@ -98,14 +100,14 @@ public class IdempotentCommandCacheTest extends
BaseIgniteAbstractTest {
.initiatorTime(clock.now())
.build();
- metaStorageListener.onWrite(commandIterator(command));
+ metaStorageListener.onWrite(commandIterator(command, 2, 1));
assertNotNull(lastCommandResult);
assertTrue((Boolean) lastCommandResult);
checkValueInStorage(testKey.bytes(), testValue.bytes());
// Another call of same command.
- metaStorageListener.onWrite(commandIterator(command));
+ metaStorageListener.onWrite(commandIterator(command, 3, 1));
assertNotNull(lastCommandResult);
assertTrue((Boolean) lastCommandResult);
checkValueInStorage(testKey.bytes(), testValue.bytes());
@@ -130,7 +132,7 @@ public class IdempotentCommandCacheTest extends
BaseIgniteAbstractTest {
.initiatorTime(clock.now())
.build();
- metaStorageListener.onWrite(commandIterator(command));
+ metaStorageListener.onWrite(commandIterator(command, 2, 1));
StatementResult result = (StatementResult) lastCommandResult;
assertNotNull(result);
@@ -138,7 +140,7 @@ public class IdempotentCommandCacheTest extends
BaseIgniteAbstractTest {
checkValueInStorage(testKey.bytes(), testValue.bytes());
// Another call of same command.
- metaStorageListener.onWrite(commandIterator(command));
+ metaStorageListener.onWrite(commandIterator(command, 2, 1));
result = (StatementResult) lastCommandResult;
assertNotNull(result);
assertTrue(result.getAsBoolean());
@@ -158,13 +160,13 @@ public class IdempotentCommandCacheTest extends
BaseIgniteAbstractTest {
.initiatorTime(clock.now())
.build();
- metaStorageListener.onWrite(commandIterator(command0));
+ metaStorageListener.onWrite(commandIterator(command0, 2, 1));
assertNull(lastCommandResult);
checkValueInStorage(testKey.bytes(), testValue0.bytes());
// Another call of same command.
- metaStorageListener.onWrite(commandIterator(command0));
+ metaStorageListener.onWrite(commandIterator(command0, 3, 1));
assertNull(lastCommandResult);
checkValueInStorage(testKey.bytes(), testValue0.bytes());
@@ -175,7 +177,7 @@ public class IdempotentCommandCacheTest extends
BaseIgniteAbstractTest {
.initiatorTime(clock.now())
.build();
- metaStorageListener.onWrite(commandIterator(command1));
+ metaStorageListener.onWrite(commandIterator(command1, 4, 1));
assertNull(lastCommandResult);
checkValueInStorage(testKey.bytes(), testValue1.bytes());
@@ -188,17 +190,21 @@ public class IdempotentCommandCacheTest extends
BaseIgniteAbstractTest {
assertArrayEquals(testValueExpected, e.value());
}
- private Iterator<CommandClosure<WriteCommand>>
commandIterator(WriteCommand command) {
- List<CommandClosure<WriteCommand>> closureList = List.of(new
TestCommandClosure(command));
+ private Iterator<CommandClosure<WriteCommand>>
commandIterator(WriteCommand command, long index, long term) {
+ List<CommandClosure<WriteCommand>> closureList = List.of(new
TestCommandClosure(command, index, term));
return closureList.iterator();
}
private class TestCommandClosure implements CommandClosure<WriteCommand> {
private final WriteCommand command;
+ private final long index;
+ private final long term;
- private TestCommandClosure(WriteCommand command) {
+ private TestCommandClosure(WriteCommand command, long index, long
term) {
this.command = command;
+ this.index = index;
+ this.term = term;
}
@Override
@@ -206,6 +212,16 @@ public class IdempotentCommandCacheTest extends
BaseIgniteAbstractTest {
return command;
}
+ @Override
+ public long index() {
+ return index;
+ }
+
+ @Override
+ public long term() {
+ return term;
+ }
+
@Override
public void result(@Nullable Serializable res) {
lastCommandResult = res;
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListenerTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListenerTest.java
new file mode 100644
index 00000000000..7ce6b191e2b
--- /dev/null
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListenerTest.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.command.InvokeCommand;
+import
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MetaStorageWriteCommand;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.command.PutCommand;
+import org.apache.ignite.internal.metastorage.dsl.MetaStorageMessagesFactory;
+import org.apache.ignite.internal.metastorage.dsl.Statements;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.IndexWithTerm;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+class MetaStorageListenerTest {
+ private static final MetaStorageCommandsFactory commandsFactory = new
MetaStorageCommandsFactory();
+ private static final MetaStorageMessagesFactory messagesFactory = new
MetaStorageMessagesFactory();
+
+ private static final long COMMAND_TERM = 3;
+
+ private static final long PRE_TARGET_COMMAND_INDEX = 1;
+ private static final long TARGET_COMMAND_INDEX = 2;
+
+ private static final HybridClock clock = new HybridClockImpl();
+
+ private KeyValueStorage storage;
+ private ClusterTimeImpl clusterTime;
+
+ private MetaStorageListener listener;
+
+ @BeforeEach
+ void setUp() {
+ storage = new SimpleInMemoryKeyValueStorage("test");
+ clusterTime = new ClusterTimeImpl("test", new IgniteSpinBusyLock(),
clock);
+
+ listener = new MetaStorageListener(storage, clock, clusterTime);
+
+ storage.start();
+ }
+
+ @AfterEach
+ void cleanup() throws Exception {
+ if (listener != null) {
+ listener.onShutdown();
+ }
+
+ closeAllManually(clusterTime, storage);
+ }
+
+ @ParameterizedTest
+ @MethodSource("commandsVariationsForIndexAdvanceTesting")
+ void commandsAdvanceLastAppliedIndex(MetaStorageWriteCommand command) {
+ testCommandAdvancedLastAppliedIndex(command);
+ }
+
+ private static List<MetaStorageWriteCommand>
commandsVariationsForIndexAdvanceTesting() {
+ HybridTimestamp initiatorTime = clock.now();
+ HybridTimestamp safeTime = clock.now();
+
+ return List.of(
+ somePutCommand(),
+ commandsFactory.putAllCommand()
+ .keys(List.of(ByteBuffer.allocate(3)))
+ .values(List.of(ByteBuffer.allocate(3)))
+ .initiatorTime(initiatorTime)
+ .safeTime(safeTime)
+ .build(),
+ commandsFactory.removeCommand()
+ .key(ByteBuffer.allocate(3))
+ .initiatorTime(initiatorTime)
+ .safeTime(safeTime)
+ .build(),
+ commandsFactory.removeAllCommand()
+ .keys(List.of(ByteBuffer.allocate(3)))
+ .initiatorTime(initiatorTime)
+ .safeTime(safeTime)
+ .build(),
+ commandsFactory.removeByPrefixCommand()
+ .prefix(ByteBuffer.allocate(3))
+ .initiatorTime(initiatorTime)
+ .safeTime(safeTime)
+ .build(),
+ someInvokeCommand(),
+ someMultiInvokeCommand(),
+
+ // Normal command that gets applied.
+ commandsFactory.syncTimeCommand()
+ .initiatorTime(initiatorTime)
+ .safeTime(safeTime)
+ .initiatorTerm(COMMAND_TERM)
+ .build(),
+
+ // Command that gets discarded.
+ commandsFactory.syncTimeCommand()
+ .initiatorTime(initiatorTime)
+ .safeTime(safeTime)
+ .initiatorTerm(COMMAND_TERM - 1)
+ .build()
+ );
+ }
+
+ private void testCommandAdvancedLastAppliedIndex(MetaStorageWriteCommand
command) {
+ applySingleCommand(command, TARGET_COMMAND_INDEX, COMMAND_TERM);
+
+ assertTargetIndexAndTermInStorage();
+ }
+
+ private void assertTargetIndexAndTermInStorage() {
+ IndexWithTerm indexWithTerm = storage.getIndexWithTerm();
+
+ assertThat(indexWithTerm, is(notNullValue()));
+ assertThat(indexWithTerm.index(), is(TARGET_COMMAND_INDEX));
+ assertThat(indexWithTerm.term(), is(COMMAND_TERM));
+ }
+
+ private void applySingleCommand(MetaStorageWriteCommand command, long
index, long term) {
+ Command updatedCommand = listener.onBeforeApply(command);
+ listener.onWrite(List.of(commandClosure((MetaStorageWriteCommand)
updatedCommand, index, term)).iterator());
+ }
+
+ private static CommandClosure<WriteCommand>
commandClosure(MetaStorageWriteCommand command, long index, long term) {
+ return new SimpleCommandClosure(command, index, term);
+ }
+
+ private static PutCommand somePutCommand() {
+ return commandsFactory.putCommand()
+ .key(ByteBuffer.allocate(3))
+ .value(ByteBuffer.allocate(3))
+ .initiatorTime(clock.now())
+ .safeTime(clock.now())
+ .build();
+ }
+
+ private static InvokeCommand someInvokeCommand() {
+ return commandsFactory.invokeCommand()
+
.id(messagesFactory.commandId().nodeId(UUID.randomUUID()).build())
+ .condition(notExists(new ByteArray(new byte[3])))
+ .success(List.of(noop()))
+ .failure(List.of(noop()))
+ .initiatorTime(clock.now())
+ .safeTime(clock.now())
+ .build();
+ }
+
+ private static MultiInvokeCommand someMultiInvokeCommand() {
+ return commandsFactory.multiInvokeCommand()
+
.id(messagesFactory.commandId().nodeId(UUID.randomUUID()).build())
+ .iif(Statements.iif(
+ notExists(new ByteArray(new byte[3])),
+ ops(noop()).yield(true),
+ ops(noop()).yield(false)
+ ))
+ .initiatorTime(clock.now())
+ .safeTime(clock.now())
+ .build();
+ }
+
+ @Test
+ void compactionCommandAdvancesLastAppliedIndex() {
+ // We need to have at least one revision, otherwise the following
compaction command will fail.
+ createSomeRevision();
+
+ MetaStorageWriteCommand command = commandsFactory.compactionCommand()
+ .compactionRevision(0)
+ .initiatorTime(clock.now())
+ .safeTime(clock.now())
+ .build();
+
+ testCommandAdvancedLastAppliedIndex(command);
+ }
+
+ private void createSomeRevision() {
+ applySingleCommand(somePutCommand(), PRE_TARGET_COMMAND_INDEX,
COMMAND_TERM);
+ }
+
+ @Test
+ void noOpEvictionCommandAdvancesLastAppliedIndex() {
+ // This command will not evict anything.
+ MetaStorageWriteCommand command =
commandsFactory.evictIdempotentCommandsCacheCommand()
+ .evictionTimestamp(HybridTimestamp.MIN_VALUE)
+ .initiatorTime(clock.now())
+ .safeTime(clock.now())
+ .build();
+
+ testCommandAdvancedLastAppliedIndex(command);
+ }
+
+ @Test
+ void effectiveEvictionCommandAdvancesLastAppliedIndex() {
+ applySingleCommand(someInvokeCommand(), PRE_TARGET_COMMAND_INDEX,
COMMAND_TERM);
+
+ // This command will evict something.
+ MetaStorageWriteCommand command =
commandsFactory.evictIdempotentCommandsCacheCommand()
+ .evictionTimestamp(HybridTimestamp.MAX_VALUE)
+ .initiatorTime(clock.now())
+ .safeTime(clock.now())
+ .build();
+
+ testCommandAdvancedLastAppliedIndex(command);
+ }
+
+ @Test
+ void noOpInvokeCommandAdvancesLastAppliedIndex() {
+ MetaStorageWriteCommand command = someInvokeCommand();
+ applySingleCommand(command, PRE_TARGET_COMMAND_INDEX, COMMAND_TERM);
+
+ testCommandAdvancedLastAppliedIndex(command);
+ }
+
+ @Test
+ void noOpMultiInvokeCommandAdvancesLastAppliedIndex() {
+ MetaStorageWriteCommand command = someMultiInvokeCommand();
+ applySingleCommand(command, PRE_TARGET_COMMAND_INDEX, COMMAND_TERM);
+
+ testCommandAdvancedLastAppliedIndex(command);
+ }
+
+ @Test
+ void applicationOfConfigurationAdvancesLastAppliedIndex() {
+ listener.onConfigurationCommitted(
+ new RaftGroupConfiguration(TARGET_COMMAND_INDEX, COMMAND_TERM,
1, 0, List.of("peer"), List.of(), null, null),
+ TARGET_COMMAND_INDEX,
+ COMMAND_TERM
+ );
+
+ assertTargetIndexAndTermInStorage();
+ }
+
+ private static class SimpleCommandClosure implements
CommandClosure<WriteCommand> {
+ private final MetaStorageWriteCommand command;
+ private final long index;
+ private final long term;
+
+ private SimpleCommandClosure(MetaStorageWriteCommand command, long
index, long term) {
+ this.command = command;
+ this.index = index;
+ this.term = term;
+ }
+
+ @Override
+ public WriteCommand command() {
+ return command;
+ }
+
+ @Override
+ public long index() {
+ return index;
+ }
+
+ @Override
+ public long term() {
+ return term;
+ }
+
+ @Override
+ public @Nullable HybridTimestamp safeTimestamp() {
+ return command.safeTime();
+ }
+
+ @Override
+ public void result(@Nullable Serializable res) {
+ // No-op.
+ }
+ }
+}