JAkutenshi commented on code in PR #6004:
URL: https://github.com/apache/ignite-3/pull/6004#discussion_r2137573643


##########
modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java:
##########
@@ -367,6 +405,151 @@ void processorsAreNotInitializedWithoutSnapshot(@Mock 
RaftTableProcessor tablePr
         verify(tableProcessor, never()).initialize(any(), any(), anyLong(), 
anyLong());
     }
 
+    @Test
+    void testSkipWriteCommandByAppliedIndex() {
+        mvPartitionStorage = spy(new TestMvPartitionStorage(PARTITION_ID));
+
+        PartitionListener tableProcessor = partitionListener(TABLE_ID);
+
+        listener.addTableProcessor(TABLE_ID, tableProcessor);
+        // Update(All)Command handling requires both information about raft 
group topology and the primary replica,
+        // thus onConfigurationCommited and primaryReplicaChangeCommand are 
called.
+        AtomicInteger raftIndex = new AtomicInteger();
+
+        long index = raftIndex.incrementAndGet();
+
+        listener.onConfigurationCommitted(
+                new RaftGroupConfiguration(
+                        index,
+                        1,
+                        List.of("foo"),
+                        List.of("bar"),
+                        null,
+                        null
+                ),
+                index,
+                1
+        );
+
+        PrimaryReplicaChangeCommand command = 
REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
+                .primaryReplicaNodeName("primary")
+                .primaryReplicaNodeId(randomUUID())
+                
.leaseStartTime(HybridTimestamp.MIN_VALUE.addPhysicalTime(1).longValue())
+                .build();
+
+        
listener.onWrite(List.of(writeCommandCommandClosure(raftIndex.incrementAndGet(),
 1, command, null, null)).iterator());
+
+        mvPartitionStorage.lastApplied(10L, 1L);
+
+        UpdateCommand updateCommand = mock(UpdateCommand.class);
+        when(updateCommand.tableId()).thenReturn(TABLE_ID);
+
+        WriteIntentSwitchCommand writeIntentSwitchCommand = 
mock(WriteIntentSwitchCommand.class);
+
+        SafeTimeSyncCommand safeTimeSyncCommand = 
mock(SafeTimeSyncCommand.class);
+
+        FinishTxCommand finishTxCommand = mock(FinishTxCommand.class);
+        
when(finishTxCommand.groupType()).thenReturn(PartitionReplicationMessageGroup.GROUP_TYPE);
+        when(finishTxCommand.messageType()).thenReturn(Commands.FINISH_TX);
+
+        PrimaryReplicaChangeCommand primaryReplicaChangeCommand = 
mock(PrimaryReplicaChangeCommand.class);
+
+        // Checks for MvPartitionStorage.
+        listener.onWrite(List.of(
+                writeCommandCommandClosure(3, 1, updateCommand, 
updateCommandClosureResultCaptor, clock.now()),
+                writeCommandCommandClosure(10, 1, updateCommand, 
updateCommandClosureResultCaptor, clock.now()),
+                writeCommandCommandClosure(4, 1, writeIntentSwitchCommand, 
commandClosureResultCaptor, clock.now()),
+                writeCommandCommandClosure(5, 1, safeTimeSyncCommand, 
commandClosureResultCaptor, clock.now()),
+                writeCommandCommandClosure(6, 1, primaryReplicaChangeCommand, 
commandClosureResultCaptor, null)
+        ).iterator());
+
+        // Two storage runConsistently runs are expected: one for 
configuration application and another for primaryReplicaChangeCommand
+        // handling. Both comes from initial configuration preparation in 
@BeforeEach
+        verify(mvPartitionStorage, 
times(2)).runConsistently(any(WriteClosure.class));
+        verify(mvPartitionStorage, times(3)).lastApplied(anyLong(), 
anyLong()); // !! 3 vs 1
+
+        List<UpdateCommandResult> allValues = 
updateCommandClosureResultCaptor.getAllValues();
+        assertThat(allValues, containsInAnyOrder(new Throwable[]{null, null}));
+        assertThat(commandClosureResultCaptor.getAllValues(), 
containsInAnyOrder(new Throwable[]{null, null, null}));
+
+        // Checks for TxStateStorage.
+        mvPartitionStorage.lastApplied(1L, 1L);
+        txStatePartitionStorage.lastApplied(10L, 2L);
+
+        commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class);
+
+        listener.onWrite(List.of(
+                writeCommandCommandClosure(2, 1, finishTxCommand, 
commandClosureResultCaptor, clock.now()),
+                writeCommandCommandClosure(10, 1, finishTxCommand, 
commandClosureResultCaptor, clock.now())
+        ).iterator());
+
+        verify(txStatePartitionStorage, never())
+                .compareAndSet(any(UUID.class), any(TxState.class), 
any(TxMeta.class), anyLong(), anyLong());
+        verify(txStatePartitionStorage, times(1)).lastApplied(anyLong(), 
anyLong());
+
+        assertThat(commandClosureResultCaptor.getAllValues(), 
containsInAnyOrder(new Throwable[]{null, null}));
+
+        listener.removeTableProcessor(TABLE_ID);
+    }
+
+
+    @Test
+    void updatesLastAppliedForFinishTxCommands() {
+        safeTimeClock.update(clock.now(), null);
+
+        FinishTxCommand command = 
PARTITION_REPLICATION_MESSAGES_FACTORY.finishTxCommand()
+                .txId(TestTransactionIds.newTransactionId())
+                .initiatorTime(clock.now())
+                .partitions(List.of())
+                .build();
+
+        listener.onWrite(List.of(
+                writeCommandCommandClosure(3, 2, command)
+        ).iterator());
+
+        assertThat(txStatePartitionStorage.lastAppliedIndex(), is(3L));
+        assertThat(txStatePartitionStorage.lastAppliedTerm(), is(2L));
+    }
+
+    private CommandClosure<WriteCommand> writeCommandCommandClosure(

Review Comment:
   Applied.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java:
##########
@@ -237,6 +277,10 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
     private final LockManager lockManager = lockManager();
 
+    @Captor
+    private ArgumentCaptor<Command> commandCaptor;
+
+

Review Comment:
   Yes, removed.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java:
##########
@@ -322,7 +366,7 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private TopologyService topologySrv;
 
     @Mock
-    private PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTimeClock;
+    private SafeTimeValuesTracker safeTimeClock;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to