This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 554f4ac6bb2 IGNITE-28216 Advance last applied index for each partition 
command (#7770)
554f4ac6bb2 is described below

commit 554f4ac6bb28da2b7493b7002673681c1d4eda77
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Mar 16 16:37:31 2026 +0400

    IGNITE-28216 Advance last applied index for each partition command (#7770)
---
 .../replicator/raft/ZonePartitionRaftListener.java |  55 ++++-
 .../handlers/WriteIntentSwitchCommandHandler.java  |  18 +-
 .../raft/ZonePartitionRaftListenerTest.java        | 237 +++++++++++++++++++--
 .../ItAbstractInternalTableScanTest.java           |   7 +-
 .../ItInternalTableReadOnlyOperationsTest.java     |   6 +-
 .../distributed/raft/TablePartitionProcessor.java  |  20 +-
 .../MinimumActiveTxTimeCommandHandler.java         |   8 +
 .../raft/PartitionCommandListenerTest.java         |  77 +++++--
 8 files changed, 386 insertions(+), 42 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index cb8c2bf087b..9eabd1db070 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -138,7 +138,7 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
                 .addHandler(
                         PartitionReplicationMessageGroup.GROUP_TYPE,
                         Commands.WRITE_INTENT_SWITCH_V2,
-                        new 
WriteIntentSwitchCommandHandler(tableProcessors::get, txManager))
+                        new 
WriteIntentSwitchCommandHandler(tableProcessors::get, txManager, 
txStatePartitionStorage))
                 .addHandler(
                         TxMessageGroup.GROUP_TYPE,
                         VACUUM_TX_STATE_COMMAND,
@@ -206,7 +206,19 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
                             safeTimestamp
                     );
                 } else if (command instanceof 
UpdateMinimumActiveTxBeginTimeCommand) {
-                    result = processCrossTableProcessorsCommand(command, 
commandIndex, commandTerm, safeTimestamp);
+                    CrossTableCommandResult crossTableResult = 
processCrossTableProcessorsCommand(
+                            command,
+                            commandIndex,
+                            commandTerm,
+                            safeTimestamp
+                    );
+                    result = crossTableResult.result;
+
+                    if (!crossTableResult.hadAnyTableProcessor) {
+                        // We MUST bump information about last updated 
index+term at least in one storage.
+                        // See a comment in #onWrite() for explanation.
+                        
updateTxStateStorageLastAppliedIfNotStale(commandIndex, commandTerm);
+                    }
                 } else if (command instanceof SafeTimeSyncCommand) {
                     result = handleSafeTimeSyncCommand(commandIndex, 
commandTerm);
                 } else if (command instanceof PrimaryReplicaChangeCommand) {
@@ -216,7 +228,13 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
                             partitionKey.toReplicationGroupId(), commandIndex, 
commandTerm,
                             cmd.leaseStartTime(), cmd.primaryReplicaNodeId(), 
cmd.primaryReplicaNodeName());
 
-                    result = processCrossTableProcessorsCommand(command, 
commandIndex, commandTerm, safeTimestamp);
+                    CrossTableCommandResult crossTableResult = 
processCrossTableProcessorsCommand(
+                            command,
+                            commandIndex,
+                            commandTerm,
+                            safeTimestamp
+                    );
+                    result = crossTableResult.result;
 
                     if (updateLeaseInfoInTxStorage(cmd, commandIndex, 
commandTerm)) {
                         LOG.debug("Updated lease info in tx storage 
[groupId={}, commandIndex={}, leaseStartTime={}]",
@@ -231,6 +249,8 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
                     if (commandHandler == null) {
                         LOG.info("Message type {} is not supported by the zone 
partition RAFT listener yet", command.getClass());
 
+                        
updateTxStateStorageLastAppliedIfNotStale(commandIndex, commandTerm);
+
                         result = EMPTY_APPLIED_RESULT;
                     } else {
                         result = commandHandler.handle(command, commandIndex, 
commandTerm, safeTimestamp);
@@ -275,14 +295,14 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
      * @param safeTimestamp Safe timestamp.
      * @return Tuple with the result of the command processing and a flag 
indicating whether the command was applied.
      */
-    private CommandResult processCrossTableProcessorsCommand(
+    private CrossTableCommandResult processCrossTableProcessorsCommand(
             WriteCommand command,
             long commandIndex,
             long commandTerm,
             @Nullable HybridTimestamp safeTimestamp
     ) {
         if (tableProcessors.isEmpty()) {
-            return new CommandResult(null, commandIndex > lastAppliedIndex);
+            return new CrossTableCommandResult(false, new CommandResult(null, 
commandIndex > lastAppliedIndex));
         }
 
         boolean wasApplied = false;
@@ -293,7 +313,7 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
             wasApplied = wasApplied || r.wasApplied();
         }
 
-        return new CommandResult(null, wasApplied);
+        return new CrossTableCommandResult(true, new CommandResult(null, 
wasApplied));
     }
 
     /**
@@ -322,12 +342,22 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
             LOG.warn("Table processor for table ID {} not found. Command will 
be ignored: {}",
                     tableId, command.toStringForLightLogging());
 
+            // We MUST bump information about last updated index+term.
+            // See a comment in #onWrite() for explanation.
+            updateTxStateStorageLastAppliedIfNotStale(commandIndex, 
commandTerm);
+
             return EMPTY_APPLIED_RESULT;
         }
 
         return tableProcessor.processCommand(command, commandIndex, 
commandTerm, safeTimestamp);
     }
 
+    private void updateTxStateStorageLastAppliedIfNotStale(long commandIndex, 
long commandTerm) {
+        if (commandIndex > txStateStorage.lastAppliedIndex()) {
+            txStateStorage.lastApplied(commandIndex, commandTerm);
+        }
+    }
+
     private boolean updateLeaseInfoInTxStorage(PrimaryReplicaChangeCommand 
command, long commandIndex, long commandTerm) {
         if (commandIndex <= txStateStorage.lastAppliedIndex()) {
             return false;
@@ -518,7 +548,6 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
     /**
      * Handler for the {@link SafeTimeSyncCommand}.
      *
-     * @param cmd Command.
      * @param commandIndex RAFT index of the command.
      * @param commandTerm RAFT term of the command.
      */
@@ -530,7 +559,7 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
 
         // We MUST bump information about last updated index+term.
         // See a comment in #onWrite() for explanation.
-        txStateStorage.lastApplied(commandIndex, commandTerm);
+        updateTxStateStorageLastAppliedIfNotStale(commandIndex, commandTerm);
 
         return EMPTY_APPLIED_RESULT;
     }
@@ -539,4 +568,14 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
     public HybridTimestamp currentSafeTime() {
         return safeTimeTracker.current();
     }
+
+    private static class CrossTableCommandResult {
+        private final boolean hadAnyTableProcessor;
+        private final CommandResult result;
+
+        private CrossTableCommandResult(boolean hadAnyTableProcessor, 
CommandResult result) {
+            this.hadAnyTableProcessor = hadAnyTableProcessor;
+            this.result = result;
+        }
+    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/WriteIntentSwitchCommandHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/WriteIntentSwitchCommandHandler.java
index b9407770de0..0d40a9bc560 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/WriteIntentSwitchCommandHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/WriteIntentSwitchCommandHandler.java
@@ -27,6 +27,7 @@ import 
org.apache.ignite.internal.partition.replicator.raft.CommandResult;
 import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
 import org.apache.ignite.internal.partition.replicator.raft.RaftTxFinishMarker;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -39,9 +40,16 @@ public class WriteIntentSwitchCommandHandler extends 
AbstractCommandHandler<Writ
 
     private final RaftTxFinishMarker txFinishMarker;
 
+    private final TxStatePartitionStorage txStatePartitionStorage;
+
     /** Constructor. */
-    public WriteIntentSwitchCommandHandler(IntFunction<RaftTableProcessor> 
tableProcessorByTableId, TxManager txManager) {
+    public WriteIntentSwitchCommandHandler(
+            IntFunction<RaftTableProcessor> tableProcessorByTableId,
+            TxManager txManager,
+            TxStatePartitionStorage txStatePartitionStorage
+    ) {
         this.tableProcessorByTableId = tableProcessorByTableId;
+        this.txStatePartitionStorage = txStatePartitionStorage;
 
         txFinishMarker = new RaftTxFinishMarker(txManager);
     }
@@ -58,6 +66,7 @@ public class WriteIntentSwitchCommandHandler extends 
AbstractCommandHandler<Writ
         txFinishMarker.markFinished(switchCommand.txId(), 
switchCommand.commit(), switchCommand.commitTimestamp(), null);
 
         boolean applied = false;
+        boolean handledByAnyTable = false;
         for (int tableId : ((WriteIntentSwitchCommandV2) 
switchCommand).tableIds()) {
             RaftTableProcessor tableProcessor = raftTableProcessor(tableId);
 
@@ -76,6 +85,13 @@ public class WriteIntentSwitchCommandHandler extends 
AbstractCommandHandler<Writ
                     .processCommand(switchCommand, commandIndex, commandTerm, 
safeTimestamp);
 
             applied = applied || singleResult.wasApplied();
+            handledByAnyTable = true;
+        }
+
+        // We MUST bump information about last updated index+term at least in 
one storage.
+        // See a comment in ZonePartitionRaftListener#onWrite() for 
explanation.
+        if (!handledByAnyTable && commandIndex > 
txStatePartitionStorage.lastAppliedIndex()) {
+            txStatePartitionStorage.lastApplied(commandIndex, commandTerm);
         }
 
         return new CommandResult(null, applied);
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
index e6eab46d8b4..650036f82f8 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
@@ -31,7 +31,11 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Answers.RETURNS_DEEP_STUBS;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.lenient;
@@ -51,7 +55,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -63,6 +69,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.command.BuildInde
 import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandV2;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommandV2;
+import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
@@ -99,6 +106,7 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.UpdateCommandResult;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
 import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStatePartitionStorage;
 import org.apache.ignite.internal.tx.test.TestTransactionIds;
@@ -130,11 +138,15 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
 
     private static final int TABLE_ID = 1;
 
+    private static final int NON_EXISTENT_TABLE_ID = 999;
+
     private static final PartitionKey ZONE_PARTITION_KEY = new 
PartitionKey(ZONE_ID, PARTITION_ID);
 
     private static final PartitionReplicationMessagesFactory 
PARTITION_REPLICATION_MESSAGES_FACTORY =
             new PartitionReplicationMessagesFactory();
 
+    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new 
TxMessagesFactory();
+
     private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
 
     @Captor
@@ -154,8 +166,8 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
     @Spy
     private TxStatePartitionStorage txStatePartitionStorage = new 
TestTxStatePartitionStorage();
 
-    @Mock
-    private MvPartitionStorage mvPartitionStorage;
+    @Spy
+    private MvPartitionStorage mvPartitionStorage = new 
TestMvPartitionStorage(PARTITION_ID);
 
     @InjectExecutorService
     private ExecutorService executor;
@@ -170,6 +182,9 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
     @Mock
     private PartitionSnapshots partitionSnapshots;
 
+    @Mock
+    private CatalogService catalogService;
+
     @BeforeEach
     void setUp() {
         listener = createListener();
@@ -277,11 +292,11 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
         
when(command.primaryReplicaNodeId()).thenReturn(leaseInfo.primaryReplicaNodeId());
         
when(command.primaryReplicaNodeName()).thenReturn(leaseInfo.primaryReplicaNodeName());
 
-        when(mvPartitionStorage.runConsistently(any())).thenAnswer(invocation 
-> {
+        doAnswer(invocation -> {
             WriteClosure<?> closure = invocation.getArgument(0);
 
             return closure.execute(locker);
-        });
+        }).when(mvPartitionStorage).runConsistently(any());
 
         listener.onWrite(List.of(writeCommandClosure).iterator());
 
@@ -461,22 +476,22 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
                 
.leaseStartTime(HybridTimestamp.MIN_VALUE.addPhysicalTime(1).longValue())
                 .build();
 
-        
listener.onWrite(List.of(writeCommandClosure(raftIndex.incrementAndGet(), 1, 
command, null, null)).iterator());
+        
listener.onWrite(List.of(writeCommandClosureWithoutSafeTime(raftIndex.incrementAndGet(),
 1, command)).iterator());
 
         mvPartitionStorage.lastApplied(10L, 1L);
 
         UpdateCommandV2 updateCommand = mock(UpdateCommandV2.class);
         when(updateCommand.tableId()).thenReturn(TABLE_ID);
 
-        WriteIntentSwitchCommand writeIntentSwitchCommand = 
mock(WriteIntentSwitchCommand.class);
+        WriteIntentSwitchCommand writeIntentSwitchCommand = 
writeIntentSwitchCommand();
 
-        SafeTimeSyncCommand safeTimeSyncCommand = 
mock(SafeTimeSyncCommand.class);
+        SafeTimeSyncCommand safeTimeSyncCommand = safeTimeSyncCommand();
 
         FinishTxCommandV2 finishTxCommand = mock(FinishTxCommandV2.class);
         
when(finishTxCommand.groupType()).thenReturn(PartitionReplicationMessageGroup.GROUP_TYPE);
         when(finishTxCommand.messageType()).thenReturn(Commands.FINISH_TX_V2);
 
-        PrimaryReplicaChangeCommand primaryReplicaChangeCommand = 
mock(PrimaryReplicaChangeCommand.class);
+        PrimaryReplicaChangeCommand primaryReplicaChangeCommand = 
primaryReplicaChangeCommand();
 
         // Checks for MvPartitionStorage.
         listener.onWrite(List.of(
@@ -510,7 +525,7 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
 
         verify(txStatePartitionStorage, never())
                 .compareAndSet(any(UUID.class), any(TxState.class), 
any(TxMeta.class), anyLong(), anyLong());
-        // First time for the command, second time for updating safe time.
+        // First time for safe time command above, second time for explicit 
call of lastApplied() in this test.
         verify(txStatePartitionStorage, times(2)).lastApplied(anyLong(), 
anyLong());
 
         assertThat(commandClosureResultCaptor.getAllValues(), 
containsInAnyOrder(new Throwable[]{null, null}));
@@ -536,6 +551,175 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
         assertThat(txStatePartitionStorage.lastAppliedTerm(), is(2L));
     }
 
+    @Test
+    void 
onlyUpdatesMvStorageLastAppliedForWriteIntentSwitchCommandsThatTouchSomeTableStorages()
 {
+        mockCatalogForUpdateExecution();
+
+        listener.addTableProcessor(TABLE_ID, partitionListener(TABLE_ID));
+
+        WriteIntentSwitchCommand command = 
PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
+                .txId(TestTransactionIds.newTransactionId())
+                .initiatorTime(clock.now())
+                .commit(true)
+                .tableIds(Set.of(TABLE_ID))
+                .build();
+
+        listener.onWrite(List.of(
+                writeCommandClosure(3, 2, command)
+        ).iterator());
+
+        verify(mvPartitionStorage).lastApplied(3, 2);
+        verify(txStatePartitionStorage, never()).lastApplied(anyLong(), 
anyLong());
+    }
+
+    private void mockCatalogForUpdateExecution() {
+        Catalog catalog = mock(Catalog.class);
+        when(catalogService.activeCatalog(anyLong())).thenReturn(catalog);
+        
when(catalog.indexes(anyInt())).thenReturn(List.of(mock(CatalogIndexDescriptor.class)));
+    }
+
+    @Test
+    void 
updatesTxStateStorageLastAppliedForWriteIntentSwitchCommandsThatTouchNoTableStorages()
 {
+        WriteIntentSwitchCommand command = 
writeIntentSwitchCommandForMissingTable();
+
+        listener.onWrite(List.of(
+                writeCommandClosure(3, 2, command)
+        ).iterator());
+
+        assertThat(txStatePartitionStorage.lastAppliedIndex(), is(3L));
+        assertThat(txStatePartitionStorage.lastAppliedTerm(), is(2L));
+    }
+
+    private WriteIntentSwitchCommand writeIntentSwitchCommandForMissingTable() 
{
+        return 
PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
+                .txId(TestTransactionIds.newTransactionId())
+                .initiatorTime(clock.now())
+                .commit(true)
+                .tableIds(Set.of(NON_EXISTENT_TABLE_ID))
+                .build();
+    }
+
+    @Test
+    void 
skipsTxStateStorageLastAppliedUpdateForWriteIntentSwitchCommandsWhenIndexIsAhead()
 {
+        txStatePartitionStorage.lastApplied(10L, 10L);
+
+        WriteIntentSwitchCommand command = 
writeIntentSwitchCommandForMissingTable();
+
+        listener.onWrite(List.of(
+                writeCommandClosure(3, 2, command)
+        ).iterator());
+
+        assertThat(txStatePartitionStorage.lastAppliedIndex(), is(10L));
+        assertThat(txStatePartitionStorage.lastAppliedTerm(), is(10L));
+    }
+
+    @Test
+    void 
onlyUpdatesMvStorageLastAppliedForTableAwareCommandsThatTouchTableStorages() {
+        mockCatalogForUpdateExecution();
+        when(mvPartitionStorage.leaseInfo()).thenReturn(new LeaseInfo(0, 
randomUUID(), "test"));
+
+        listener.addTableProcessor(TABLE_ID, partitionListener(TABLE_ID));
+
+        UpdateCommand command = updateCommand(TABLE_ID);
+
+        listener.onWrite(List.of(
+                writeCommandClosure(3, 2, command)
+        ).iterator());
+
+        verify(mvPartitionStorage).lastApplied(3, 2);
+        verify(txStatePartitionStorage, never()).lastApplied(anyLong(), 
anyLong());
+    }
+
+    @Test
+    void 
updatesTxStateStorageLastAppliedForTableAwareCommandsThatTouchNoTableStorages() 
{
+        UpdateCommand command = updateCommand(NON_EXISTENT_TABLE_ID);
+
+        listener.onWrite(List.of(
+                writeCommandClosure(3, 2, command)
+        ).iterator());
+
+        assertThat(txStatePartitionStorage.lastAppliedIndex(), is(3L));
+        assertThat(txStatePartitionStorage.lastAppliedTerm(), is(2L));
+    }
+
+    @Test
+    void 
skipsUpdateToTxStateStorageLastAppliedForTableAwareCommandsThatTouchNoTableStoragesButIndexIsAlreadyApplied()
 {
+        txStatePartitionStorage.lastApplied(10L, 10L);
+
+        UpdateCommand command = updateCommand(NON_EXISTENT_TABLE_ID);
+
+        listener.onWrite(List.of(
+                writeCommandClosure(3, 2, command)
+        ).iterator());
+
+        assertThat(txStatePartitionStorage.lastAppliedIndex(), is(10L));
+        assertThat(txStatePartitionStorage.lastAppliedTerm(), is(10L));
+    }
+
+    @Test
+    void 
updatesTxStateStorageLastAppliedForUpdateMinimumActiveTxBeginTimeCommandsThatTouchNoTableStorages()
 {
+        WriteCommand command = updateMinimumActiveTxBeginTimeCommand();
+
+        listener.onWrite(List.of(
+                writeCommandClosure(3, 2, command)
+        ).iterator());
+
+        assertThat(txStatePartitionStorage.lastAppliedIndex(), is(3L));
+        assertThat(txStatePartitionStorage.lastAppliedTerm(), is(2L));
+    }
+
+    @Test
+    void 
skipsUpdateToTxStateStorageLastAppliedForUpdateMinimumActiveTxBeginTimeCommandsThatTouchNoTableStoragesButIndexIsAlreadyApplied()
 {
+        txStatePartitionStorage.lastApplied(10L, 10L);
+
+        WriteCommand command = updateMinimumActiveTxBeginTimeCommand();
+
+        listener.onWrite(List.of(
+                writeCommandClosure(3, 2, command)
+        ).iterator());
+
+        assertThat(txStatePartitionStorage.lastAppliedIndex(), is(10L));
+        assertThat(txStatePartitionStorage.lastAppliedTerm(), is(10L));
+    }
+
+    @Test
+    void 
updatesLeaseInfoAndTxStateStorageLastAppliedForPrimaryReplicaChangeCommandsThatTouchNoTableStorages()
 {
+        WriteCommand command = primaryReplicaChangeCommand();
+
+        listener.onWrite(List.of(
+                writeCommandClosureWithoutSafeTime(3, 2, command)
+        ).iterator());
+
+        verify(txStatePartitionStorage).leaseInfo(any(), eq(3L), eq(2L));
+    }
+
+    @Test
+    void 
updatesLeaseInfoAndTxStateStorageLastAppliedForPrimaryReplicaChangeCommandsThatTouchSomeTableStorages()
 {
+        listener.addTableProcessor(TABLE_ID, partitionListener(TABLE_ID));
+
+        WriteCommand command = primaryReplicaChangeCommand();
+
+        listener.onWrite(List.of(
+                writeCommandClosureWithoutSafeTime(3, 2, command)
+        ).iterator());
+
+        verify(txStatePartitionStorage).leaseInfo(any(), eq(3L), eq(2L));
+    }
+
+    @Test
+    void updatesTxStateStorageLastAppliedForVacuumTxStateCommands() {
+        WriteCommand command = TX_MESSAGES_FACTORY.vacuumTxStatesCommand()
+                .txIds(Set.of(randomUUID()))
+                .build();
+
+        listener.onWrite(List.of(
+                writeCommandClosureWithoutSafeTime(3, 2, command)
+        ).iterator());
+
+        assertThat(txStatePartitionStorage.lastAppliedIndex(), is(3L));
+        assertThat(txStatePartitionStorage.lastAppliedTerm(), is(2L));
+    }
+
     @Test
     public void testSafeTime() {
         HybridTimestamp timestamp = clock.now();
@@ -599,9 +783,13 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
     }
 
     private static UpdateCommandV2 updateCommand() {
+        return updateCommand(TABLE_ID);
+    }
+
+    private static UpdateCommandV2 updateCommand(int tableId) {
         return PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
                 .rowUuid(randomUUID())
-                .tableId(TABLE_ID)
+                .tableId(tableId)
                 .commitPartitionId(defaultPartitionIdMessage())
                 .txCoordinatorId(randomUUID())
                 .txId(TestTransactionIds.newTransactionId())
@@ -748,6 +936,14 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
         return commandClosure;
     }
 
+    private static CommandClosure<WriteCommand> 
writeCommandClosureWithoutSafeTime(
+            long index,
+            long term,
+            WriteCommand writeCommand
+    ) {
+        return writeCommandClosure(index, term, writeCommand, null, null);
+    }
+
     private TablePartitionProcessor partitionListener(int tableId) {
         LeasePlacementDriver placementDriver = 
mock(LeasePlacementDriver.class);
         lenient().when(placementDriver.getCurrentPrimaryReplica(any(), 
any())).thenReturn(null);
@@ -755,6 +951,23 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
         ClockService clockService = mock(ClockService.class);
         lenient().when(clockService.current()).thenReturn(clock.current());
 
+        StorageUpdateHandler storageUpdateHandler = 
mock(StorageUpdateHandler.class);
+
+        lenient().doAnswer(invocation -> {
+            Runnable onApplication = invocation.getArgument(3);
+            if (onApplication != null) {
+                onApplication.run();
+            }
+            return null;
+        }).when(storageUpdateHandler).switchWriteIntents(any(), anyBoolean(), 
any(), any(Runnable.class), any());
+        lenient().doAnswer(invocation -> {
+            Runnable onApplication = invocation.getArgument(5);
+            if (onApplication != null) {
+                onApplication.run();
+            }
+            return null;
+        }).when(storageUpdateHandler).handleUpdate(any(), any(), any(), any(), 
anyBoolean(), any(Runnable.class), any(), any(), any());
+
         return new TablePartitionProcessor(
                 txManager,
                 new SnapshotAwarePartitionDataStorage(
@@ -763,8 +976,8 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
                         outgoingSnapshotsManager,
                         ZONE_PARTITION_KEY
                 ),
-                mock(StorageUpdateHandler.class),
-                mock(CatalogService.class),
+                storageUpdateHandler,
+                catalogService,
                 mock(SchemaRegistry.class),
                 mock(IndexMetaStorage.class),
                 randomUUID(),
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index af51cae7bde..85076be1067 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -67,6 +67,7 @@ import 
org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -78,7 +79,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
+import org.mockito.Spy;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 /**
@@ -102,8 +103,8 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
     private SystemDistributedConfiguration systemDistributedConfiguration;
 
     /** Mock partition storage. */
-    @Mock
-    private MvPartitionStorage mockStorage;
+    @Spy
+    private MvPartitionStorage mockStorage = new TestMvPartitionStorage(0);
 
     /** Internal table to test. */
     DummyInternalTableImpl internalTbl;
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
index e76edffba24..9bde91790ea 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -74,6 +75,7 @@ import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.function.Executable;
 import org.mockito.Mock;
+import org.mockito.Spy;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 /**
@@ -105,8 +107,8 @@ public class ItInternalTableReadOnlyOperationsTest extends 
IgniteAbstractTest {
     private static final ColumnsExtractor KEY_EXTRACTOR = 
BinaryRowConverter.keyExtractor(SCHEMA);
 
     /** Mock partition storage. */
-    @Mock
-    private MvPartitionStorage mockStorage;
+    @Spy
+    private MvPartitionStorage mockStorage = new TestMvPartitionStorage(0);
 
     /** Transaction mock. */
     @Mock
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
index 9268c32de85..4f8a0e7985c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
@@ -200,6 +200,14 @@ public class TablePartitionProcessor implements 
RaftTableProcessor {
             throw new AssertionError("Unknown command type [command=" + 
command.toStringForLightLogging() + ']');
         }
 
+        assert storage.lastAppliedIndex() >= commandIndex : String.format(
+                "Last applied index after command application is less than the 
command index "
+                        + "[lastAppliedIndex=%d, commandIndex=%d, command=%s]",
+                storage.lastAppliedIndex(),
+                commandIndex,
+                command.toStringForLightLogging()
+        );
+
         return result;
     }
 
@@ -271,6 +279,10 @@ public class TablePartitionProcessor implements 
RaftTableProcessor {
             long leaseStartTime = cmd.leaseStartTime();
 
             if (storageLeaseInfo == null || leaseStartTime != 
storageLeaseInfo.leaseStartTime()) {
+                // We MUST bump information about last updated index+term.
+                // See a comment in TablePartitionProcessor#processCommand() 
for explanation.
+                advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
+
                 var updateCommandResult = new UpdateCommandResult(
                         false,
                         storageLeaseInfo == null ? 0 : 
storageLeaseInfo.leaseStartTime(),
@@ -301,7 +313,7 @@ public class TablePartitionProcessor implements 
RaftTableProcessor {
             );
         } else {
             // We MUST bump information about last updated index+term.
-            // See a comment in #onWrite() for explanation.
+            // See a comment in TablePartitionProcessor#processCommand() for 
explanation.
             // If we get here, that means that we are collocated with primary 
and data was already inserted there, thus it's only required
             // to update information about index and term.
             advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
@@ -343,6 +355,10 @@ public class TablePartitionProcessor implements 
RaftTableProcessor {
             long leaseStartTime = cmd.leaseStartTime();
 
             if (storageLeaseInfo == null || leaseStartTime != 
storageLeaseInfo.leaseStartTime()) {
+                // We MUST bump information about last updated index+term.
+                // See a comment in TablePartitionProcessor#processCommand() 
for explanation.
+                advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
+
                 var updateCommandResult = new UpdateCommandResult(
                         false,
                         storageLeaseInfo == null ? 0 : 
storageLeaseInfo.leaseStartTime(),
@@ -368,7 +384,7 @@ public class TablePartitionProcessor implements 
RaftTableProcessor {
             );
         } else {
             // We MUST bump information about last updated index+term.
-            // See a comment in #onWrite() for explanation.
+            // See a comment in TablePartitionProcessor#processCommand() for 
explanation.
             // If we get here, that means that we are collocated with primary 
and data was already inserted there, thus it's only required
             // to update information about index and term.
             advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
index 0dcb39e8975..27de97d5401 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
@@ -77,6 +77,14 @@ public class MinimumActiveTxTimeCommandHandler extends 
AbstractCommandHandler<Up
 
         long timestamp = command.timestamp();
 
+        // We MUST bump information about last updated index+term.
+        // See a comment in TablePartitionProcessor#processCommand() for 
explanation.
+        storage.runConsistently(locker -> {
+            storage.lastApplied(commandIndex, commandTerm);
+
+            return null;
+        });
+
         storage.flush(false)
                 .whenComplete((r, t) -> {
                     if (t == null) {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 11940453982..54eb7e8ee5f 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -129,7 +129,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
@@ -383,25 +382,42 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     }
 
     @ParameterizedTest
-    @MethodSource("allCommandsUpdatingLastAppliedIndex")
-    void updatesLastAppliedForCommandsUpdatingLastAppliedIndex(WriteCommand 
command) {
-        commandListener.processCommand(command, 3, 2, hybridClock.now());
+    @MethodSource("commandsVariationsForIndexAdvanceTesting")
+    void updatesLastAppliedForCommandsUpdatingLastAppliedIndex(LabeledCommand 
command) {
+        commandListener.processCommand(command.command, 3, 2, 
hybridClock.now());
 
         verify(mvPartitionStorage).lastApplied(3, 2);
     }
 
-    private static UpdateCommandV2 updateCommand() {
+    private static UpdateCommandV2 updateCommandWithLeaseStartTime() {
+        return updateCommand(999L);
+    }
+
+    private static UpdateCommandV2 updateCommandWithoutLeaseStartTime() {
+        return updateCommand(null);
+    }
+
+    private static UpdateCommandV2 updateCommand(@Nullable Long 
leaseStartTime) {
         return PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
                 .rowUuid(randomUUID())
                 .tableId(TABLE_ID)
                 .commitPartitionId(defaultPartitionIdMessage())
                 .txCoordinatorId(randomUUID())
                 .txId(TestTransactionIds.newTransactionId())
+                .leaseStartTime(leaseStartTime)
                 .initiatorTime(anyTime())
                 .build();
     }
 
-    private static UpdateAllCommandV2 updateAllCommand() {
+    private static UpdateAllCommandV2 updateAllCommandWithLeaseStartTime() {
+        return updateAllCommand(999L);
+    }
+
+    private static UpdateAllCommandV2 updateAllCommandWithoutLeaseStartTime() {
+        return updateAllCommand(null);
+    }
+
+    private static UpdateAllCommandV2 updateAllCommand(@Nullable Long 
leaseStartTime) {
         return PARTITION_REPLICATION_MESSAGES_FACTORY.updateAllCommandV2()
                 .messageRowsToUpdate(singletonMap(
                         randomUUID(),
@@ -411,18 +427,22 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .commitPartitionId(defaultPartitionIdMessage())
                 .txCoordinatorId(randomUUID())
                 .txId(TestTransactionIds.newTransactionId())
+                .leaseStartTime(leaseStartTime)
                 .initiatorTime(anyTime())
                 .build();
     }
 
-    private static Stream<Arguments> allCommandsUpdatingLastAppliedIndex() {
-        return Stream.of(
-                updateCommand(),
-                updateAllCommand(),
-                writeIntentSwitchCommand(),
-                primaryReplicaChangeCommand(),
-                buildIndexCommand()
-        ).map(Arguments::of);
+    private static List<LabeledCommand> 
commandsVariationsForIndexAdvanceTesting() {
+        return List.of(
+                new LabeledCommand("UpdateCommand without lease start time", 
updateCommandWithoutLeaseStartTime()),
+                new LabeledCommand("UpdateCommand with lease start time", 
updateCommandWithLeaseStartTime()),
+                new LabeledCommand("UpdateAllCommand without lease start 
time", updateAllCommandWithoutLeaseStartTime()),
+                new LabeledCommand("UpdateAllCommand with lease start time", 
updateAllCommandWithLeaseStartTime()),
+                new LabeledCommand(writeIntentSwitchCommand()),
+                new LabeledCommand(primaryReplicaChangeCommand()),
+                new LabeledCommand(updateMinimumActiveTxBeginTimeCommand()),
+                new LabeledCommand(buildIndexCommand())
+        );
     }
 
     private static PrimaryReplicaChangeCommand primaryReplicaChangeCommand() {
@@ -432,6 +452,16 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .build();
     }
 
+    private static WriteCommand updateMinimumActiveTxBeginTimeCommand() {
+        HybridTimestamp baseTime = HybridTimestamp.MIN_VALUE;
+
+        return 
PARTITION_REPLICATION_MESSAGES_FACTORY.updateMinimumActiveTxBeginTimeCommand()
+                .initiatorTime(baseTime)
+                .safeTime(baseTime.addPhysicalTime(1))
+                .timestamp(baseTime.addPhysicalTime(1000).longValue())
+                .build();
+    }
+
     private static BuildIndexCommandV3 buildIndexCommand() {
         return PARTITION_REPLICATION_MESSAGES_FACTORY.buildIndexCommandV3()
                 .tableId(TABLE_ID)
@@ -955,4 +985,23 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
 
         return indexMeta;
     }
+
+    private static class LabeledCommand {
+        private final String label;
+        private final WriteCommand command;
+
+        private LabeledCommand(WriteCommand command) {
+            this(command.getClass().getSimpleName(), command);
+        }
+
+        private LabeledCommand(String label, WriteCommand command) {
+            this.label = label;
+            this.command = command;
+        }
+
+        @Override
+        public String toString() {
+            return label;
+        }
+    }
 }


Reply via email to