JAkutenshi commented on code in PR #6004: URL: https://github.com/apache/ignite-3/pull/6004#discussion_r2137573643
########## modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java: ########## @@ -367,6 +405,151 @@ void processorsAreNotInitializedWithoutSnapshot(@Mock RaftTableProcessor tablePr verify(tableProcessor, never()).initialize(any(), any(), anyLong(), anyLong()); } + @Test + void testSkipWriteCommandByAppliedIndex() { + mvPartitionStorage = spy(new TestMvPartitionStorage(PARTITION_ID)); + + PartitionListener tableProcessor = partitionListener(TABLE_ID); + + listener.addTableProcessor(TABLE_ID, tableProcessor); + // Update(All)Command handling requires both information about raft group topology and the primary replica, + // thus onConfigurationCommited and primaryReplicaChangeCommand are called. + AtomicInteger raftIndex = new AtomicInteger(); + + long index = raftIndex.incrementAndGet(); + + listener.onConfigurationCommitted( + new RaftGroupConfiguration( + index, + 1, + List.of("foo"), + List.of("bar"), + null, + null + ), + index, + 1 + ); + + PrimaryReplicaChangeCommand command = REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand() + .primaryReplicaNodeName("primary") + .primaryReplicaNodeId(randomUUID()) + .leaseStartTime(HybridTimestamp.MIN_VALUE.addPhysicalTime(1).longValue()) + .build(); + + listener.onWrite(List.of(writeCommandCommandClosure(raftIndex.incrementAndGet(), 1, command, null, null)).iterator()); + + mvPartitionStorage.lastApplied(10L, 1L); + + UpdateCommand updateCommand = mock(UpdateCommand.class); + when(updateCommand.tableId()).thenReturn(TABLE_ID); + + WriteIntentSwitchCommand writeIntentSwitchCommand = mock(WriteIntentSwitchCommand.class); + + SafeTimeSyncCommand safeTimeSyncCommand = mock(SafeTimeSyncCommand.class); + + FinishTxCommand finishTxCommand = mock(FinishTxCommand.class); + when(finishTxCommand.groupType()).thenReturn(PartitionReplicationMessageGroup.GROUP_TYPE); + when(finishTxCommand.messageType()).thenReturn(Commands.FINISH_TX); + + PrimaryReplicaChangeCommand primaryReplicaChangeCommand = mock(PrimaryReplicaChangeCommand.class); + + // Checks for MvPartitionStorage. + listener.onWrite(List.of( + writeCommandCommandClosure(3, 1, updateCommand, updateCommandClosureResultCaptor, clock.now()), + writeCommandCommandClosure(10, 1, updateCommand, updateCommandClosureResultCaptor, clock.now()), + writeCommandCommandClosure(4, 1, writeIntentSwitchCommand, commandClosureResultCaptor, clock.now()), + writeCommandCommandClosure(5, 1, safeTimeSyncCommand, commandClosureResultCaptor, clock.now()), + writeCommandCommandClosure(6, 1, primaryReplicaChangeCommand, commandClosureResultCaptor, null) + ).iterator()); + + // Two storage runConsistently runs are expected: one for configuration application and another for primaryReplicaChangeCommand + // handling. Both comes from initial configuration preparation in @BeforeEach + verify(mvPartitionStorage, times(2)).runConsistently(any(WriteClosure.class)); + verify(mvPartitionStorage, times(3)).lastApplied(anyLong(), anyLong()); // !! 3 vs 1 + + List<UpdateCommandResult> allValues = updateCommandClosureResultCaptor.getAllValues(); + assertThat(allValues, containsInAnyOrder(new Throwable[]{null, null})); + assertThat(commandClosureResultCaptor.getAllValues(), containsInAnyOrder(new Throwable[]{null, null, null})); + + // Checks for TxStateStorage. + mvPartitionStorage.lastApplied(1L, 1L); + txStatePartitionStorage.lastApplied(10L, 2L); + + commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class); + + listener.onWrite(List.of( + writeCommandCommandClosure(2, 1, finishTxCommand, commandClosureResultCaptor, clock.now()), + writeCommandCommandClosure(10, 1, finishTxCommand, commandClosureResultCaptor, clock.now()) + ).iterator()); + + verify(txStatePartitionStorage, never()) + .compareAndSet(any(UUID.class), any(TxState.class), any(TxMeta.class), anyLong(), anyLong()); + verify(txStatePartitionStorage, times(1)).lastApplied(anyLong(), anyLong()); + + assertThat(commandClosureResultCaptor.getAllValues(), containsInAnyOrder(new Throwable[]{null, null})); + + listener.removeTableProcessor(TABLE_ID); + } + + + @Test + void updatesLastAppliedForFinishTxCommands() { + safeTimeClock.update(clock.now(), null); + + FinishTxCommand command = PARTITION_REPLICATION_MESSAGES_FACTORY.finishTxCommand() + .txId(TestTransactionIds.newTransactionId()) + .initiatorTime(clock.now()) + .partitions(List.of()) + .build(); + + listener.onWrite(List.of( + writeCommandCommandClosure(3, 2, command) + ).iterator()); + + assertThat(txStatePartitionStorage.lastAppliedIndex(), is(3L)); + assertThat(txStatePartitionStorage.lastAppliedTerm(), is(2L)); + } + + private CommandClosure<WriteCommand> writeCommandCommandClosure( Review Comment: Applied. ########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java: ########## @@ -237,6 +277,10 @@ public class ZonePartitionReplicaListenerTest extends IgniteAbstractTest { private final LockManager lockManager = lockManager(); + @Captor + private ArgumentCaptor<Command> commandCaptor; + + Review Comment: Yes, removed. ########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java: ########## @@ -322,7 +366,7 @@ public class ZonePartitionReplicaListenerTest extends IgniteAbstractTest { private TopologyService topologySrv; @Mock - private PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeClock; + private SafeTimeValuesTracker safeTimeClock; Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org