This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e6a1fffdfaf IGNITE-28202 Advance last applied index for each 
Metastorage command (#7763)
e6a1fffdfaf is described below

commit e6a1fffdfaf77983de18bd03333025f1e0e9ca77
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Mar 12 17:35:44 2026 +0400

    IGNITE-28202 Advance last applied index for each Metastorage command (#7763)
---
 .../impl/ItIdempotentCommandCacheTest.java         | 175 ++++++++----
 .../metastorage/impl/ItMetaStorageServiceTest.java |  28 +-
 .../server/raft/MetaStorageWriteHandler.java       |  22 ++
 .../impl/IdempotentCommandCacheTest.java           |  36 ++-
 .../server/raft/MetaStorageListenerTest.java       | 309 +++++++++++++++++++++
 5 files changed, 507 insertions(+), 63 deletions(-)

diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index 345a61ae89c..ce1e185ccda 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
+import static java.util.concurrent.TimeUnit.DAYS;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
@@ -25,13 +26,16 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
 import static 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager.configureCmgManagerToStartMetastorage;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptySetCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
 import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
 import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -39,6 +43,10 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -52,6 +60,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiPredicate;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.close.ManuallyCloseable;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -115,6 +124,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 /**
@@ -122,7 +132,7 @@ import org.junit.jupiter.params.provider.MethodSource;
  */
 @ExtendWith(ConfigurationExtension.class)
 @ExtendWith(ExecutorServiceExtension.class)
-public class ItIdempotentCommandCacheTest extends IgniteAbstractTest {
+class ItIdempotentCommandCacheTest extends IgniteAbstractTest {
     private static final MetaStorageCommandsFactory CMD_FACTORY = new 
MetaStorageCommandsFactory();
 
     private static final int NODES_COUNT = 2;
@@ -152,7 +162,9 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
 
     private List<Node> nodes;
 
-    private static class Node implements AutoCloseable {
+    private TestInfo testInfo;
+
+    private static class Node implements ManuallyCloseable {
         private static final IgniteLogger log = Loggers.forClass(Node.class);
 
         ClusterService clusterService;
@@ -270,11 +282,9 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
         }
 
         void start(CompletableFuture<Set<String>> metaStorageNodesFut) {
-            if (metaStorageNodesFut != null) {
-                when(cmgManager.metaStorageInfo()).thenReturn(
-                        metaStorageNodesFut.thenApply(nodes -> new 
CmgMessagesFactory().metaStorageInfo().metaStorageNodes(nodes).build())
-                );
-            }
+            when(cmgManager.metaStorageInfo()).thenReturn(
+                    metaStorageNodesFut.thenApply(nodes -> new 
CmgMessagesFactory().metaStorageInfo().metaStorageNodes(nodes).build())
+            );
 
             assertThat(
                     startAsync(new ComponentContext(),
@@ -329,23 +339,64 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
 
     @BeforeEach
     void setUp(TestInfo testInfo) {
-        startCluster(testInfo);
+        this.testInfo = testInfo;
+
+        if 
(testInfo.getTestMethod().orElseThrow().isAnnotationPresent(DisableIdleSafeTimePropagation.class))
 {
+            // The test asked to disable idle safe time propagation, so set it 
to a super-long period.
+            assertThat(
+                    
systemDistributedConfiguration.idleSafeTimeSyncIntervalMillis().update(DAYS.toMillis(1)),
+                    willCompleteSuccessfully()
+            );
+        }
+
+        startCluster();
     }
 
     @AfterEach
     void tearDown() throws Exception {
-        closeAll(nodes.stream());
+        closeAllManually(nodes.stream());
     }
 
-    @Test
-    public void testIdempotentInvoke() throws InterruptedException {
+    @ParameterizedTest
+    @EnumSource(Invoker.class)
+    void testIdempotentInvoke(Invoker invoker) {
+        Node leader = leader(raftClient());
+
+        boolean result = doRetriedInvoke(invoker, leader);
+
+        assertTrue(result);
+        assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(), TEST_VALUE));
+    }
+
+    private boolean doRetriedInvoke(Invoker invoker, Node leader) {
         AtomicInteger writeActionReqCount = new AtomicInteger();
         CompletableFuture<Void> retryBlockingFuture = new 
CompletableFuture<>();
 
         log.info("Test: blocking messages.");
 
-        Node leader = leader(raftClient());
+        dropResponseToFirstInvoke(leader, writeActionReqCount, 
retryBlockingFuture);
+
+        MetaStorageManager metaStorageManager = leader.metaStorageManager;
+
+        CompletableFuture<Boolean> fut = invoker.invokeOn(metaStorageManager);
+
+        await().until(() -> leader.checkValueInStorage(TEST_KEY.bytes(), 
TEST_VALUE));
+
+        log.info("Test: value appeared in storage.");
+
+        assertTrue(retryBlockingFuture.complete(null));
 
+        await().until(() -> writeActionReqCount.get() == 2);
+
+        leader.stopDroppingMessages();
+
+        assertThat(fut, willCompleteSuccessfully());
+        log.info("Test: invoke complete.");
+
+        return fut.join();
+    }
+
+    private void dropResponseToFirstInvoke(Node leader, AtomicInteger 
writeActionReqCount, CompletableFuture<Void> retryBlockingFuture) {
         leader.dropMessages((n, msg) -> {
             // Dropping the first response, this will cause timeout on first 
response, and then retry.
             if (msg instanceof ActionResponse && ((ActionResponse) 
msg).result() != null && writeActionReqCount.get() == 1) {
@@ -358,11 +409,11 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
                 WriteActionRequest request = (WriteActionRequest) msg;
 
                 if (!(request.deserializedCommand() instanceof 
SyncTimeCommand)) {
-                    writeActionReqCount.incrementAndGet();
-                    log.info("Test: WriteActionRequest intercepted, count=" + 
writeActionReqCount.get());
+                    int incrementedCount = 
writeActionReqCount.incrementAndGet();
+                    log.info("Test: WriteActionRequest intercepted, count=" + 
incrementedCount);
 
                     // Second request: retry.
-                    if (writeActionReqCount.get() == 2) {
+                    if (incrementedCount == 2) {
                         log.info("Test: retry blocked.");
 
                         retryBlockingFuture.orTimeout(10, 
TimeUnit.SECONDS).join();
@@ -374,34 +425,30 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
 
             return false;
         });
+    }
 
-        MetaStorageManager metaStorageManager = leader.metaStorageManager;
-
-        CompletableFuture<Boolean> fut = metaStorageManager.invoke(
-                notExists(TEST_KEY),
-                put(TEST_KEY, TEST_VALUE),
-                put(TEST_KEY, ANOTHER_VALUE)
-        );
-
-        assertTrue(waitForCondition(() -> 
leader.checkValueInStorage(TEST_KEY.bytes(), TEST_VALUE), 10_000));
-
-        log.info("Test: value appeared in storage.");
-
-        assertTrue(retryBlockingFuture.complete(null));
-
-        assertTrue(waitForCondition(() -> writeActionReqCount.get() == 2, 
10_000));
+    /**
+     * This makes sure that a gap between KV storage last applied index and 
last applied index from the point of view of JRaft does
+     * not happen (due to idempotent command retries not advancing the last 
applied index in the KV storage).
+     * If it does, and a Raft snapshot is taken in exactly that moment, the 
JRaft node will fail on recovery.
+     */
+    @ParameterizedTest
+    @EnumSource(Invoker.class)
+    // Disable safe time propagation to prevent safe time commands in the log, 
so that they don't close the index
+    // gap we are trying to reproduce.
+    @DisableIdleSafeTimePropagation
+    void retriedInvokeDoesNotBreakIndexConsistency(Invoker invoker) throws 
Exception {
+        Node leader = leader(raftClient());
 
-        leader.stopDroppingMessages();
+        doRetriedInvoke(invoker, leader);
 
-        assertThat(fut, willCompleteSuccessfully());
-        log.info("Test: invoke complete.");
+        nodes.forEach(n -> assertThat(raftClient().snapshot(new 
Peer(n.clusterService.nodeName()), true), willCompleteSuccessfully()));
 
-        assertTrue(fut.join());
-        assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(), TEST_VALUE));
+        restartCluster();
     }
 
     @Test
-    public void testIdempotentInvokeAfterLeaderChange() {
+    void testIdempotentInvokeAfterLeaderChange() {
         InvokeCommand invokeCommand = (InvokeCommand) 
buildKeyNotExistsInvokeCommand(TEST_KEY, TEST_VALUE, ANOTHER_VALUE);
 
         RaftGroupService raftClient = raftClient();
@@ -436,7 +483,7 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
 
     @ParameterizedTest
     @MethodSource("idempotentCommandProvider")
-    public void testIdempotentCacheRestoreFromSnapshot(IdempotentCommand 
idempotentCommand, TestInfo testInfo) throws Exception {
+    void testIdempotentCacheRestoreFromSnapshot(IdempotentCommand 
idempotentCommand) throws Exception {
         RaftGroupService raftClient = raftClient();
         Node leader = leader(raftClient);
 
@@ -450,19 +497,13 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
         } else {
             assertEquals(YIELD_RESULT, ((StatementResult) 
commandProcessingResult).getAsInt());
             assertTrue(leader.checkValueInStorage(TEST_KEY_2.bytes(), 
TEST_VALUE_2));
-
         }
 
         // Do the snapshot.
         nodes.forEach(n -> raftClient().snapshot(new 
Peer(n.clusterService.nodeName()), false));
 
         // Restart nodes in order to trigger idempotent volatile cache 
initialization from snapshot.
-        for (Node node : nodes) {
-            node.stop();
-        }
-
-        // Restart cluster.
-        startCluster(testInfo);
+        restartCluster();
 
         ClockService node0ClockService = nodes.get(0).clockService;
         long timestampAfterRestartPhysicalLong = 
node0ClockService.now().getPhysical();
@@ -510,12 +551,22 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
         }
     }
 
+    private void restartCluster() throws Exception {
+        for (Node node : nodes) {
+            node.stop();
+        }
+
+        startCluster();
+    }
+
     private Node leader(RaftGroupService raftClient) {
         CompletableFuture<LeaderWithTerm> refreshLeaderFut = 
raftClient.refreshAndGetLeaderWithTerm();
 
         assertThat(refreshLeaderFut, willCompleteSuccessfully());
 
-        String currentLeader = refreshLeaderFut.join().leader().consistentId();
+        Peer leader = refreshLeaderFut.join().leader();
+        assertThat(leader, is(notNullValue()));
+        String currentLeader = leader.consistentId();
 
         return nodes.stream().filter(n -> 
n.clusterService.nodeName().equals(currentLeader)).findAny().orElseThrow();
     }
@@ -582,7 +633,7 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
                 .build();
     }
 
-    private void startCluster(TestInfo testInfo) {
+    private void startCluster() {
         nodes = new ArrayList<>();
 
         for (int i = 0; i < NODES_COUNT; i++) {
@@ -607,4 +658,34 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
 
         nodes.forEach(Node::deployWatches);
     }
+
+    @Target(ElementType.METHOD)
+    @Retention(RetentionPolicy.RUNTIME)
+    private @interface DisableIdleSafeTimePropagation {
+    }
+
+    private enum Invoker {
+        INVOKE {
+            @Override
+            CompletableFuture<Boolean> invokeOn(MetaStorageManager 
metaStorageManager) {
+                return metaStorageManager.invoke(
+                        notExists(TEST_KEY),
+                        put(TEST_KEY, TEST_VALUE),
+                        put(TEST_KEY, ANOTHER_VALUE)
+                );
+            }
+        },
+        MULTI_INVOKE {
+            @Override
+            CompletableFuture<Boolean> invokeOn(MetaStorageManager 
metaStorageManager) {
+                return metaStorageManager.invoke(iif(
+                        notExists(TEST_KEY),
+                        ops(put(TEST_KEY, TEST_VALUE)).yield(true),
+                        ops(put(TEST_KEY, ANOTHER_VALUE)).yield(false)
+                )).thenApply(StatementResult::getAsBoolean);
+            }
+        };
+
+        abstract CompletableFuture<Boolean> invokeOn(MetaStorageManager 
metaStorageManager);
+    }
 }
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index e3c0fa30afd..e8528128e56 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -47,7 +47,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -94,6 +97,7 @@ import org.apache.ignite.internal.metastorage.server.If;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.OrCondition;
 import org.apache.ignite.internal.metastorage.server.RevisionCondition;
+import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.ValueCondition;
 import org.apache.ignite.internal.metastorage.server.ValueCondition.Type;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
@@ -129,6 +133,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /**
  * Meta storage client tests.
@@ -226,7 +232,7 @@ public class ItMetaStorageServiceTest extends 
BaseIgniteAbstractTest {
             );
             this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), 
new IgniteSpinBusyLock(), clock);
 
-            this.mockStorage = mock(KeyValueStorage.class);
+            this.mockStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
         }
 
         void start(PeersAndLearners configuration) {
@@ -395,7 +401,7 @@ public class ItMetaStorageServiceTest extends 
BaseIgniteAbstractTest {
     public void testGetAll() {
         Node node = prepareNodes(1).get(0);
 
-        
when(node.mockStorage.getAll(anyList())).thenReturn(EXPECTED_SRV_RESULT_COLL);
+        
doReturn(EXPECTED_SRV_RESULT_COLL).when(node.mockStorage).getAll(anyList());
 
         startNodes();
 
@@ -410,7 +416,7 @@ public class ItMetaStorageServiceTest extends 
BaseIgniteAbstractTest {
     public void testGetAllWithUpperBoundRevision() {
         Node node = prepareNodes(1).get(0);
 
-        when(node.mockStorage.getAll(anyList(), 
eq(10L))).thenReturn(EXPECTED_SRV_RESULT_COLL);
+        
doReturn(EXPECTED_SRV_RESULT_COLL).when(node.mockStorage).getAll(anyList(), 
eq(10L));
 
         startNodes();
 
@@ -685,7 +691,19 @@ public class ItMetaStorageServiceTest extends 
BaseIgniteAbstractTest {
 
         var ifCaptor = ArgumentCaptor.forClass(If.class);
 
-        when(node.mockStorage.invoke(any(), any(), 
any())).thenReturn(ops().yield(true).result(), null, null);
+        doAnswer(new Answer() {
+            private boolean first = true;
+
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable 
{
+                // To update last applied index in the storage.
+                invocation.callRealMethod();
+
+                Object result = first ? ops().yield(true).result() : null;
+                first = false;
+                return result;
+            }
+        }).when(node.mockStorage).invoke(any(), any(), any());
 
         startNodes();
 
@@ -736,8 +754,6 @@ public class ItMetaStorageServiceTest extends 
BaseIgniteAbstractTest {
 
         byte[] expVal = {2};
 
-        when(node.mockStorage.invoke(any(), any(), any(), any(), 
any())).thenReturn(true);
-
         startNodes();
 
         Condition condition = Conditions.notExists(expKey);
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index 735f8520bb4..74a9dbb1a74 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -73,6 +73,7 @@ import 
org.apache.ignite.internal.metastorage.server.TombstoneCondition;
 import org.apache.ignite.internal.metastorage.server.ValueCondition;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.IndexWithTerm;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.util.Cursor;
@@ -114,6 +115,18 @@ public class MetaStorageWriteHandler {
      * Processes a given {@link WriteCommand}.
      */
     void handleWriteCommand(CommandClosure<WriteCommand> clo) {
+        handleWriteCommandInternal(clo);
+
+        assert lastAppliedIndex() == clo.index() : String.format(
+                "Last applied index after command application is not equal to 
the command index "
+                        + "[lastAppliedIndex=%d, commandIndex=%d, command=%s]",
+                lastAppliedIndex(),
+                clo.index(),
+                clo.command().toStringForLightLogging()
+        );
+    }
+
+    private void handleWriteCommandInternal(CommandClosure<WriteCommand> clo) {
         WriteCommand command = clo.command();
 
         CommandClosure<WriteCommand> resultClosure;
@@ -144,6 +157,8 @@ public class MetaStorageWriteHandler {
                     clo.result(commandResult);
                 }
 
+                storage.setIndexAndTerm(clo.index(), clo.term());
+
                 return;
             } else {
                 resultClosure = new ResultCachingClosure(clo);
@@ -380,6 +395,11 @@ public class MetaStorageWriteHandler {
         }
     }
 
+    private long lastAppliedIndex() {
+        IndexWithTerm indexWithTerm = storage.getIndexWithTerm();
+        return indexWithTerm == null ? -1 : indexWithTerm.index();
+    }
+
     Command beforeApply(Command command) {
         if (command instanceof MetaStorageWriteCommand) {
             // Initiator sends us a timestamp to adjust to.
@@ -446,6 +466,8 @@ public class MetaStorageWriteHandler {
         List<CommandId> evictedCommandIds = 
evictCommandsFromCache(evictionTimestamp);
 
         if (evictedCommandIds.isEmpty()) {
+            storage.setIndexAndTerm(context.index, context.term);
+
             return;
         }
 
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
index 9b9d7eb1e46..644bff4dfe4 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
@@ -81,6 +81,8 @@ public class IdempotentCommandCacheTest extends 
BaseIgniteAbstractTest {
     public void setUp() {
         storage = new SimpleInMemoryKeyValueStorage(NODE_NAME);
         metaStorageListener = new MetaStorageListener(storage, clock, new 
ClusterTimeImpl(NODE_NAME, new IgniteSpinBusyLock(), clock));
+
+        storage.setIndexAndTerm(1, 1);
     }
 
     @Test
@@ -98,14 +100,14 @@ public class IdempotentCommandCacheTest extends 
BaseIgniteAbstractTest {
                 .initiatorTime(clock.now())
                 .build();
 
-        metaStorageListener.onWrite(commandIterator(command));
+        metaStorageListener.onWrite(commandIterator(command, 2, 1));
 
         assertNotNull(lastCommandResult);
         assertTrue((Boolean) lastCommandResult);
         checkValueInStorage(testKey.bytes(), testValue.bytes());
 
         // Another call of same command.
-        metaStorageListener.onWrite(commandIterator(command));
+        metaStorageListener.onWrite(commandIterator(command, 3, 1));
         assertNotNull(lastCommandResult);
         assertTrue((Boolean) lastCommandResult);
         checkValueInStorage(testKey.bytes(), testValue.bytes());
@@ -130,7 +132,7 @@ public class IdempotentCommandCacheTest extends 
BaseIgniteAbstractTest {
                 .initiatorTime(clock.now())
                 .build();
 
-        metaStorageListener.onWrite(commandIterator(command));
+        metaStorageListener.onWrite(commandIterator(command, 2, 1));
 
         StatementResult result = (StatementResult) lastCommandResult;
         assertNotNull(result);
@@ -138,7 +140,7 @@ public class IdempotentCommandCacheTest extends 
BaseIgniteAbstractTest {
         checkValueInStorage(testKey.bytes(), testValue.bytes());
 
         // Another call of same command.
-        metaStorageListener.onWrite(commandIterator(command));
+        metaStorageListener.onWrite(commandIterator(command, 2, 1));
         result = (StatementResult) lastCommandResult;
         assertNotNull(result);
         assertTrue(result.getAsBoolean());
@@ -158,13 +160,13 @@ public class IdempotentCommandCacheTest extends 
BaseIgniteAbstractTest {
                 .initiatorTime(clock.now())
                 .build();
 
-        metaStorageListener.onWrite(commandIterator(command0));
+        metaStorageListener.onWrite(commandIterator(command0, 2, 1));
 
         assertNull(lastCommandResult);
         checkValueInStorage(testKey.bytes(), testValue0.bytes());
 
         // Another call of same command.
-        metaStorageListener.onWrite(commandIterator(command0));
+        metaStorageListener.onWrite(commandIterator(command0, 3, 1));
         assertNull(lastCommandResult);
         checkValueInStorage(testKey.bytes(), testValue0.bytes());
 
@@ -175,7 +177,7 @@ public class IdempotentCommandCacheTest extends 
BaseIgniteAbstractTest {
                 .initiatorTime(clock.now())
                 .build();
 
-        metaStorageListener.onWrite(commandIterator(command1));
+        metaStorageListener.onWrite(commandIterator(command1, 4, 1));
 
         assertNull(lastCommandResult);
         checkValueInStorage(testKey.bytes(), testValue1.bytes());
@@ -188,17 +190,21 @@ public class IdempotentCommandCacheTest extends 
BaseIgniteAbstractTest {
         assertArrayEquals(testValueExpected, e.value());
     }
 
-    private Iterator<CommandClosure<WriteCommand>> 
commandIterator(WriteCommand command) {
-        List<CommandClosure<WriteCommand>> closureList = List.of(new 
TestCommandClosure(command));
+    private Iterator<CommandClosure<WriteCommand>> 
commandIterator(WriteCommand command, long index, long term) {
+        List<CommandClosure<WriteCommand>> closureList = List.of(new 
TestCommandClosure(command, index, term));
 
         return closureList.iterator();
     }
 
     private class TestCommandClosure implements CommandClosure<WriteCommand> {
         private final WriteCommand command;
+        private final long index;
+        private final long term;
 
-        private TestCommandClosure(WriteCommand command) {
+        private TestCommandClosure(WriteCommand command, long index, long 
term) {
             this.command = command;
+            this.index = index;
+            this.term = term;
         }
 
         @Override
@@ -206,6 +212,16 @@ public class IdempotentCommandCacheTest extends 
BaseIgniteAbstractTest {
             return command;
         }
 
+        @Override
+        public long index() {
+            return index;
+        }
+
+        @Override
+        public long term() {
+            return term;
+        }
+
         @Override
         public void result(@Nullable Serializable res) {
             lastCommandResult = res;
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListenerTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListenerTest.java
new file mode 100644
index 00000000000..7ce6b191e2b
--- /dev/null
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListenerTest.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.command.InvokeCommand;
+import 
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MetaStorageWriteCommand;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.command.PutCommand;
+import org.apache.ignite.internal.metastorage.dsl.MetaStorageMessagesFactory;
+import org.apache.ignite.internal.metastorage.dsl.Statements;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.IndexWithTerm;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+class MetaStorageListenerTest {
+    private static final MetaStorageCommandsFactory commandsFactory = new 
MetaStorageCommandsFactory();
+    private static final MetaStorageMessagesFactory messagesFactory = new 
MetaStorageMessagesFactory();
+
+    private static final long COMMAND_TERM = 3;
+
+    private static final long PRE_TARGET_COMMAND_INDEX = 1;
+    private static final long TARGET_COMMAND_INDEX = 2;
+
+    private static final HybridClock clock = new HybridClockImpl();
+
+    private KeyValueStorage storage;
+    private ClusterTimeImpl clusterTime;
+
+    private MetaStorageListener listener;
+
+    @BeforeEach
+    void setUp() {
+        storage = new SimpleInMemoryKeyValueStorage("test");
+        clusterTime = new ClusterTimeImpl("test", new IgniteSpinBusyLock(), 
clock);
+
+        listener = new MetaStorageListener(storage, clock, clusterTime);
+
+        storage.start();
+    }
+
+    @AfterEach
+    void cleanup() throws Exception {
+        if (listener != null) {
+            listener.onShutdown();
+        }
+
+        closeAllManually(clusterTime, storage);
+    }
+
+    @ParameterizedTest
+    @MethodSource("commandsVariationsForIndexAdvanceTesting")
+    void commandsAdvanceLastAppliedIndex(MetaStorageWriteCommand command) {
+        testCommandAdvancedLastAppliedIndex(command);
+    }
+
+    private static List<MetaStorageWriteCommand> 
commandsVariationsForIndexAdvanceTesting() {
+        HybridTimestamp initiatorTime = clock.now();
+        HybridTimestamp safeTime = clock.now();
+
+        return List.of(
+                somePutCommand(),
+                commandsFactory.putAllCommand()
+                        .keys(List.of(ByteBuffer.allocate(3)))
+                        .values(List.of(ByteBuffer.allocate(3)))
+                        .initiatorTime(initiatorTime)
+                        .safeTime(safeTime)
+                        .build(),
+                commandsFactory.removeCommand()
+                        .key(ByteBuffer.allocate(3))
+                        .initiatorTime(initiatorTime)
+                        .safeTime(safeTime)
+                        .build(),
+                commandsFactory.removeAllCommand()
+                        .keys(List.of(ByteBuffer.allocate(3)))
+                        .initiatorTime(initiatorTime)
+                        .safeTime(safeTime)
+                        .build(),
+                commandsFactory.removeByPrefixCommand()
+                        .prefix(ByteBuffer.allocate(3))
+                        .initiatorTime(initiatorTime)
+                        .safeTime(safeTime)
+                        .build(),
+                someInvokeCommand(),
+                someMultiInvokeCommand(),
+
+                // Normal command that gets applied.
+                commandsFactory.syncTimeCommand()
+                        .initiatorTime(initiatorTime)
+                        .safeTime(safeTime)
+                        .initiatorTerm(COMMAND_TERM)
+                        .build(),
+
+                // Command that gets discarded.
+                commandsFactory.syncTimeCommand()
+                        .initiatorTime(initiatorTime)
+                        .safeTime(safeTime)
+                        .initiatorTerm(COMMAND_TERM - 1)
+                        .build()
+        );
+    }
+
+    private void testCommandAdvancedLastAppliedIndex(MetaStorageWriteCommand 
command) {
+        applySingleCommand(command, TARGET_COMMAND_INDEX, COMMAND_TERM);
+
+        assertTargetIndexAndTermInStorage();
+    }
+
+    private void assertTargetIndexAndTermInStorage() {
+        IndexWithTerm indexWithTerm = storage.getIndexWithTerm();
+
+        assertThat(indexWithTerm, is(notNullValue()));
+        assertThat(indexWithTerm.index(), is(TARGET_COMMAND_INDEX));
+        assertThat(indexWithTerm.term(), is(COMMAND_TERM));
+    }
+
+    private void applySingleCommand(MetaStorageWriteCommand command, long 
index, long term) {
+        Command updatedCommand = listener.onBeforeApply(command);
+        listener.onWrite(List.of(commandClosure((MetaStorageWriteCommand) 
updatedCommand, index, term)).iterator());
+    }
+
+    private static CommandClosure<WriteCommand> 
commandClosure(MetaStorageWriteCommand command, long index, long term) {
+        return new SimpleCommandClosure(command, index, term);
+    }
+
+    private static PutCommand somePutCommand() {
+        return commandsFactory.putCommand()
+                .key(ByteBuffer.allocate(3))
+                .value(ByteBuffer.allocate(3))
+                .initiatorTime(clock.now())
+                .safeTime(clock.now())
+                .build();
+    }
+
+    private static InvokeCommand someInvokeCommand() {
+        return commandsFactory.invokeCommand()
+                
.id(messagesFactory.commandId().nodeId(UUID.randomUUID()).build())
+                .condition(notExists(new ByteArray(new byte[3])))
+                .success(List.of(noop()))
+                .failure(List.of(noop()))
+                .initiatorTime(clock.now())
+                .safeTime(clock.now())
+                .build();
+    }
+
+    private static MultiInvokeCommand someMultiInvokeCommand() {
+        return commandsFactory.multiInvokeCommand()
+                
.id(messagesFactory.commandId().nodeId(UUID.randomUUID()).build())
+                .iif(Statements.iif(
+                        notExists(new ByteArray(new byte[3])),
+                        ops(noop()).yield(true),
+                        ops(noop()).yield(false)
+                ))
+                .initiatorTime(clock.now())
+                .safeTime(clock.now())
+                .build();
+    }
+
+    @Test
+    void compactionCommandAdvancesLastAppliedIndex() {
+        // We need to have at least one revision, otherwise the following 
compaction command will fail.
+        createSomeRevision();
+
+        MetaStorageWriteCommand command = commandsFactory.compactionCommand()
+                .compactionRevision(0)
+                .initiatorTime(clock.now())
+                .safeTime(clock.now())
+                .build();
+
+        testCommandAdvancedLastAppliedIndex(command);
+    }
+
+    private void createSomeRevision() {
+        applySingleCommand(somePutCommand(), PRE_TARGET_COMMAND_INDEX, 
COMMAND_TERM);
+    }
+
+    @Test
+    void noOpEvictionCommandAdvancesLastAppliedIndex() {
+        // This command will not evict anything.
+        MetaStorageWriteCommand command = 
commandsFactory.evictIdempotentCommandsCacheCommand()
+                .evictionTimestamp(HybridTimestamp.MIN_VALUE)
+                .initiatorTime(clock.now())
+                .safeTime(clock.now())
+                .build();
+
+        testCommandAdvancedLastAppliedIndex(command);
+    }
+
+    @Test
+    void effectiveEvictionCommandAdvancesLastAppliedIndex() {
+        applySingleCommand(someInvokeCommand(), PRE_TARGET_COMMAND_INDEX, 
COMMAND_TERM);
+
+        // This command will evict something.
+        MetaStorageWriteCommand command = 
commandsFactory.evictIdempotentCommandsCacheCommand()
+                .evictionTimestamp(HybridTimestamp.MAX_VALUE)
+                .initiatorTime(clock.now())
+                .safeTime(clock.now())
+                .build();
+
+        testCommandAdvancedLastAppliedIndex(command);
+    }
+
+    @Test
+    void noOpInvokeCommandAdvancesLastAppliedIndex() {
+        MetaStorageWriteCommand command = someInvokeCommand();
+        applySingleCommand(command, PRE_TARGET_COMMAND_INDEX, COMMAND_TERM);
+
+        testCommandAdvancedLastAppliedIndex(command);
+    }
+
+    @Test
+    void noOpMultiInvokeCommandAdvancesLastAppliedIndex() {
+        MetaStorageWriteCommand command = someMultiInvokeCommand();
+        applySingleCommand(command, PRE_TARGET_COMMAND_INDEX, COMMAND_TERM);
+
+        testCommandAdvancedLastAppliedIndex(command);
+    }
+
+    @Test
+    void applicationOfConfigurationAdvancesLastAppliedIndex() {
+        listener.onConfigurationCommitted(
+                new RaftGroupConfiguration(TARGET_COMMAND_INDEX, COMMAND_TERM, 
1, 0, List.of("peer"), List.of(), null, null),
+                TARGET_COMMAND_INDEX,
+                COMMAND_TERM
+        );
+
+        assertTargetIndexAndTermInStorage();
+    }
+
+    private static class SimpleCommandClosure implements 
CommandClosure<WriteCommand> {
+        private final MetaStorageWriteCommand command;
+        private final long index;
+        private final long term;
+
+        private SimpleCommandClosure(MetaStorageWriteCommand command, long 
index, long term) {
+            this.command = command;
+            this.index = index;
+            this.term = term;
+        }
+
+        @Override
+        public WriteCommand command() {
+            return command;
+        }
+
+        @Override
+        public long index() {
+            return index;
+        }
+
+        @Override
+        public long term() {
+            return term;
+        }
+
+        @Override
+        public @Nullable HybridTimestamp safeTimestamp() {
+            return command.safeTime();
+        }
+
+        @Override
+        public void result(@Nullable Serializable res) {
+            // No-op.
+        }
+    }
+}


Reply via email to