This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a643a9ff9c IGNITE-21795 Unconditionally update storage with proper 
raft index within PartitionListener (#3454)
a643a9ff9c is described below

commit a643a9ff9cb5b750ff0719d950aa8b19bcae8770
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Thu Mar 21 10:44:17 2024 +0400

    IGNITE-21795 Unconditionally update storage with proper raft index within 
PartitionListener (#3454)
---
 .../table/distributed/raft/PartitionListener.java  |  18 +-
 .../raft/PartitionCommandListenerTest.java         | 244 +++++++++++++++------
 2 files changed, 186 insertions(+), 76 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 21a9565a85..4e058951ac 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -276,9 +276,13 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
                         cmd.lastCommitTimestamp(),
                         indexIdsAtRwTxBeginTs(catalogService, txId, 
storage.tableId())
                 );
-            }
 
-            updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
+                updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
+            } else {
+                // We MUST bump information about last updated index+term.
+                // See a comment in #onWrite() for explanation.
+                advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
+            }
         }
 
         replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? cmd.safeTime() 
: null, cmd.full());
@@ -313,6 +317,10 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
                 );
 
                 updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
+            } else {
+                // We MUST bump information about last updated index+term.
+                // See a comment in #onWrite() for explanation.
+                advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
             }
         }
 
@@ -328,7 +336,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
      * @return The actually stored transaction state {@link TransactionResult}.
      * @throws IgniteInternalException if an exception occurred during a 
transaction state change.
      */
-    private TransactionResult handleFinishTxCommand(FinishTxCommand cmd, long 
commandIndex, long commandTerm)
+    private @Nullable TransactionResult handleFinishTxCommand(FinishTxCommand 
cmd, long commandIndex, long commandTerm)
             throws IgniteInternalException {
         // Skips the write command because the storage has already executed it.
         if (commandIndex <= txStateStorage.lastAppliedIndex()) {
@@ -407,6 +415,10 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
 
         // We MUST bump information about last updated index+term.
         // See a comment in #onWrite() for explanation.
+        advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
+    }
+
+    private void advanceLastAppliedIndexConsistently(long commandIndex, long 
commandTerm) {
         storage.runConsistently(locker -> {
             storage.lastApplied(commandIndex, commandTerm);
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 656b8d4326..5187d61096 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.table.distributed.raft;
 
+import static java.util.Collections.singletonMap;
 import static org.apache.ignite.internal.util.ArrayUtils.asList;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -97,7 +99,9 @@ import 
org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import 
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage;
+import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import 
org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
@@ -121,6 +125,8 @@ import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.InOrder;
@@ -317,9 +323,89 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
         readAndCheck(false);
     }
 
+    @Test
+    void testSkipWriteCommandByAppliedIndex() {
+        mvPartitionStorage.lastApplied(10L, 1L);
+
+        UpdateCommand updateCommand = mock(UpdateCommand.class);
+        when(updateCommand.safeTime()).thenAnswer(v -> hybridClock.now());
+
+        WriteIntentSwitchCommand writeIntentSwitchCommand = 
mock(WriteIntentSwitchCommand.class);
+        when(writeIntentSwitchCommand.safeTime()).thenAnswer(v -> 
hybridClock.now());
+
+        SafeTimeSyncCommand safeTimeSyncCommand = 
mock(SafeTimeSyncCommand.class);
+        when(safeTimeSyncCommand.safeTime()).thenAnswer(v -> 
hybridClock.now());
+
+        FinishTxCommand finishTxCommand = mock(FinishTxCommand.class);
+        when(finishTxCommand.safeTime()).thenAnswer(v -> hybridClock.now());
+
+        // Checks for MvPartitionStorage.
+        commandListener.onWrite(List.of(
+                writeCommandCommandClosure(3, 1, updateCommand, 
commandClosureResultCaptor),
+                writeCommandCommandClosure(10, 1, updateCommand, 
commandClosureResultCaptor),
+                writeCommandCommandClosure(4, 1, writeIntentSwitchCommand, 
commandClosureResultCaptor),
+                writeCommandCommandClosure(5, 1, safeTimeSyncCommand, 
commandClosureResultCaptor)
+        ).iterator());
+
+        verify(mvPartitionStorage, 
never()).runConsistently(any(WriteClosure.class));
+        verify(mvPartitionStorage, times(1)).lastApplied(anyLong(), anyLong());
+
+        assertThat(commandClosureResultCaptor.getAllValues(), 
containsInAnyOrder(new Throwable[]{null, null, null, null}));
+
+        // Checks for TxStateStorage.
+        mvPartitionStorage.lastApplied(1L, 1L);
+        txStateStorage.lastApplied(10L, 2L);
+
+        commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class);
+
+        commandListener.onWrite(List.of(
+                writeCommandCommandClosure(2, 1, finishTxCommand, 
commandClosureResultCaptor),
+                writeCommandCommandClosure(10, 1, finishTxCommand, 
commandClosureResultCaptor)
+        ).iterator());
+
+        verify(txStateStorage, never()).compareAndSet(any(UUID.class), 
any(TxState.class), any(TxMeta.class), anyLong(), anyLong());
+        verify(txStateStorage, times(1)).lastApplied(anyLong(), anyLong());
+
+        assertThat(commandClosureResultCaptor.getAllValues(), 
containsInAnyOrder(new Throwable[]{null, null}));
+    }
+
+    private static CommandClosure<WriteCommand> writeCommandCommandClosure(
+            long index,
+            long term,
+            WriteCommand writeCommand
+    ) {
+        return writeCommandCommandClosure(index, term, writeCommand, null);
+    }
+
+    /**
+     * Create a command closure.
+     *
+     * @param index Index of the RAFT command.
+     * @param writeCommand Write command.
+     * @param resultClosureCaptor Captor for {@link 
CommandClosure#result(Serializable)}
+     */
+    private static CommandClosure<WriteCommand> writeCommandCommandClosure(
+            long index,
+            long term,
+            WriteCommand writeCommand,
+            @Nullable ArgumentCaptor<Throwable> resultClosureCaptor
+    ) {
+        CommandClosure<WriteCommand> commandClosure = 
mock(CommandClosure.class);
+
+        when(commandClosure.index()).thenReturn(index);
+        when(commandClosure.term()).thenReturn(term);
+        when(commandClosure.command()).thenReturn(writeCommand);
+
+        if (resultClosureCaptor != null) {
+            
doNothing().when(commandClosure).result(resultClosureCaptor.capture());
+        }
+
+        return commandClosure;
+    }
+
     /**
-     * The test checks that {@link PartitionListener#onSnapshotSave(Path, 
Consumer)} propagates
-     * the maximal last applied index among storages to all storages.
+     * The test checks that {@link PartitionListener#onSnapshotSave(Path, 
Consumer)} propagates the maximal last applied index among
+     * storages to all storages.
      */
     @Test
     public void testOnSnapshotSavePropagateLastAppliedIndexAndTerm() {
@@ -378,64 +464,66 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
         assertEquals(2L, txStateStorage.lastAppliedTerm());
     }
 
-    @Test
-    void testSkipWriteCommandByAppliedIndex() {
-        mvPartitionStorage.lastApplied(10L, 1L);
-
-        UpdateCommand updateCommand = mock(UpdateCommand.class);
-        when(updateCommand.safeTime()).thenAnswer(v -> hybridClock.now());
-
-        WriteIntentSwitchCommand writeIntentSwitchCommand = 
mock(WriteIntentSwitchCommand.class);
-        when(writeIntentSwitchCommand.safeTime()).thenAnswer(v -> 
hybridClock.now());
-
-        SafeTimeSyncCommand safeTimeSyncCommand = 
mock(SafeTimeSyncCommand.class);
-        when(safeTimeSyncCommand.safeTime()).thenAnswer(v -> 
hybridClock.now());
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void updatesLastAppliedForUpdateCommands(boolean stale) {
+        safeTimeTracker.update(hybridClock.now(), null);
 
-        FinishTxCommand finishTxCommand = mock(FinishTxCommand.class);
-        when(finishTxCommand.safeTime()).thenAnswer(v -> hybridClock.now());
+        UpdateCommand command = msgFactory.updateCommand()
+                .rowUuid(UUID.randomUUID())
+                .tablePartitionId(defaultPartitionIdMessage())
+                .txCoordinatorId(UUID.randomUUID().toString())
+                .txId(TestTransactionIds.newTransactionId())
+                .safeTimeLong(staleOrFreshSafeTime(stale))
+                .build();
 
-        // Checks for MvPartitionStorage.
         commandListener.onWrite(List.of(
-                writeCommandCommandClosure(3, 1, updateCommand, 
commandClosureResultCaptor),
-                writeCommandCommandClosure(10, 1, updateCommand, 
commandClosureResultCaptor),
-                writeCommandCommandClosure(4, 1, writeIntentSwitchCommand, 
commandClosureResultCaptor),
-                writeCommandCommandClosure(5, 1, safeTimeSyncCommand, 
commandClosureResultCaptor)
+                writeCommandCommandClosure(3, 2, command)
         ).iterator());
 
-        verify(mvPartitionStorage, 
never()).runConsistently(any(WriteClosure.class));
-        verify(mvPartitionStorage, times(1)).lastApplied(anyLong(), anyLong());
+        verify(mvPartitionStorage).lastApplied(3, 2);
+    }
 
-        assertThat(commandClosureResultCaptor.getAllValues(), 
containsInAnyOrder(new Throwable[]{null, null, null, null}));
+    private long staleOrFreshSafeTime(boolean stale) {
+        return stale ? 
safeTimeTracker.current().subtractPhysicalTime(1).longValue() : 
hybridClock.nowLong();
+    }
 
-        // Checks for TxStateStorage.
-        mvPartitionStorage.lastApplied(1L, 1L);
-        txStateStorage.lastApplied(10L, 2L);
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void updatesLastAppliedForUpdateAllCommands(boolean stale) {
+        safeTimeTracker.update(hybridClock.now(), null);
 
-        commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class);
+        UpdateAllCommand command = msgFactory.updateAllCommand()
+                .messageRowsToUpdate(singletonMap(UUID.randomUUID(), 
msgFactory.timedBinaryRowMessage().build()))
+                .tablePartitionId(defaultPartitionIdMessage())
+                .txCoordinatorId(UUID.randomUUID().toString())
+                .txId(TestTransactionIds.newTransactionId())
+                .safeTimeLong(staleOrFreshSafeTime(stale))
+                .build();
 
         commandListener.onWrite(List.of(
-                writeCommandCommandClosure(2, 1, finishTxCommand, 
commandClosureResultCaptor),
-                writeCommandCommandClosure(10, 1, finishTxCommand, 
commandClosureResultCaptor)
+                writeCommandCommandClosure(3, 2, command)
         ).iterator());
 
-        verify(txStateStorage, never()).compareAndSet(any(UUID.class), 
any(TxState.class), any(TxMeta.class), anyLong(), anyLong());
-        verify(txStateStorage, times(1)).lastApplied(anyLong(), anyLong());
-
-        assertThat(commandClosureResultCaptor.getAllValues(), 
containsInAnyOrder(new Throwable[]{null, null}));
+        verify(mvPartitionStorage).lastApplied(3, 2);
     }
 
-    @Test
-    void updatesLastAppliedForSafeTimeSyncCommands() {
-        SafeTimeSyncCommand safeTimeSyncCommand = new ReplicaMessagesFactory()
-                .safeTimeSyncCommand()
-                .safeTimeLong(hybridClock.nowLong())
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void updatesLastAppliedForFinishTxCommands(boolean stale) {
+        safeTimeTracker.update(hybridClock.now(), null);
+
+        FinishTxCommand command = msgFactory.finishTxCommand()
+                .txId(TestTransactionIds.newTransactionId())
+                .safeTimeLong(staleOrFreshSafeTime(stale))
                 .build();
 
         commandListener.onWrite(List.of(
-                writeCommandCommandClosure(3, 2, safeTimeSyncCommand, 
commandClosureResultCaptor)
+                writeCommandCommandClosure(3, 2, command)
         ).iterator());
 
-        verify(mvPartitionStorage).lastApplied(3, 2);
+        assertThat(txStateStorage.lastAppliedIndex(), is(3L));
+        assertThat(txStateStorage.lastAppliedTerm(), is(2L));
     }
 
     @Test
@@ -560,35 +648,45 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
         assertEquals(timestamp, safeTimeTracker.current());
     }
 
-    /**
-     * Crate a command closure.
-     *
-     * @param index Index of the RAFT command.
-     * @param writeCommand Write command.
-     * @param resultClosureCaptor Captor for {@link 
CommandClosure#result(Serializable)}
-     */
-    private static CommandClosure<WriteCommand> writeCommandCommandClosure(
-            long index,
-            long term,
-            WriteCommand writeCommand,
-            ArgumentCaptor<Throwable> resultClosureCaptor
-    ) {
-        CommandClosure<WriteCommand> commandClosure = 
mock(CommandClosure.class);
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void updatesLastAppliedForWriteIntentSwitchCommands(boolean stale) {
+        safeTimeTracker.update(hybridClock.now(), null);
 
-        when(commandClosure.index()).thenReturn(index);
-        when(commandClosure.term()).thenReturn(term);
-        when(commandClosure.command()).thenReturn(writeCommand);
+        WriteIntentSwitchCommand command = 
msgFactory.writeIntentSwitchCommand()
+                .txId(TestTransactionIds.newTransactionId())
+                .safeTimeLong(staleOrFreshSafeTime(stale))
+                .build();
 
-        doNothing().when(commandClosure).result(resultClosureCaptor.capture());
+        commandListener.onWrite(List.of(
+                writeCommandCommandClosure(3, 2, command)
+        ).iterator());
 
-        return commandClosure;
+        verify(mvPartitionStorage).lastApplied(3, 2);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void updatesLastAppliedForSafeTimeSyncCommands(boolean stale) {
+        safeTimeTracker.update(hybridClock.now(), null);
+
+        SafeTimeSyncCommand safeTimeSyncCommand = new ReplicaMessagesFactory()
+                .safeTimeSyncCommand()
+                .safeTimeLong(staleOrFreshSafeTime(stale))
+                .build();
+
+        commandListener.onWrite(List.of(
+                writeCommandCommandClosure(3, 2, safeTimeSyncCommand)
+        ).iterator());
+
+        verify(mvPartitionStorage).lastApplied(3, 2);
     }
 
     /**
      * Prepares a closure iterator for a specific batch operation.
      *
      * @param func The function prepare a closure for the operation.
-     * @param <T>  Type of the operation.
+     * @param <T> Type of the operation.
      * @return Closure iterator.
      */
     private <T extends Command> Iterator<CommandClosure<T>> 
batchIterator(Consumer<CommandClosure<T>> func) {
@@ -617,7 +715,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
      * Prepares a closure iterator for a specific operation.
      *
      * @param func The function prepare a closure for the operation.
-     * @param <T>  Type of the operation.
+     * @param <T> Type of the operation.
      * @return Closure iterator.
      */
     private <T extends Command> Iterator<CommandClosure<T>> 
iterator(BiConsumer<Integer, CommandClosure<T>> func) {
@@ -782,9 +880,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
 
             when(clo.command()).thenReturn(
                     msgFactory.updateCommand()
-                            
.tablePartitionId(msgFactory.tablePartitionIdMessage()
-                                    .tableId(1)
-                                    .partitionId(PARTITION_ID).build())
+                            .tablePartitionId(defaultPartitionIdMessage())
                             .rowUuid(readResult.rowId().uuid())
                             
.messageRowToUpdate(msgFactory.timedBinaryRowMessage()
                                     .binaryRowMessage(row)
@@ -811,6 +907,12 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .build()));
     }
 
+    private TablePartitionIdMessage defaultPartitionIdMessage() {
+        return msgFactory.tablePartitionIdMessage()
+                .tableId(1)
+                .partitionId(PARTITION_ID).build();
+    }
+
     /**
      * Deletes row.
      */
@@ -827,9 +929,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
 
             when(clo.command()).thenReturn(
                     msgFactory.updateCommand()
-                            
.tablePartitionId(msgFactory.tablePartitionIdMessage()
-                                    .tableId(1)
-                                    .partitionId(PARTITION_ID).build())
+                            .tablePartitionId(defaultPartitionIdMessage())
                             .rowUuid(readResult.rowId().uuid())
                             .txId(txId)
                             .safeTimeLong(hybridClock.nowLong())
@@ -865,7 +965,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     /**
      * Reads rows from the listener and checks values as expected by a mapper.
      *
-     * @param existed        True if rows are existed, false otherwise.
+     * @param existed True if rows are existed, false otherwise.
      * @param keyValueMapper Mapper a key to the value which will be expected.
      */
     private void readAndCheck(boolean existed, Function<Integer, Integer> 
keyValueMapper) {
@@ -899,9 +999,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
 
             when(clo.command()).thenReturn(
                     msgFactory.updateCommand()
-                            
.tablePartitionId(msgFactory.tablePartitionIdMessage()
-                                    .tableId(1)
-                                    .partitionId(PARTITION_ID).build())
+                            .tablePartitionId(defaultPartitionIdMessage())
                             .rowUuid(UUID.randomUUID())
                             
.messageRowToUpdate(msgFactory.timedBinaryRowMessage()
                                     .binaryRowMessage(getTestRow(i, i))
@@ -978,7 +1076,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     private @Nullable ReadResult readRow(BinaryTuple pk) {
         try (Cursor<RowId> cursor = pkStorage.storage().get(pk)) {
             return cursor.stream()
-                    .map(rowId ->  mvPartitionStorage.read(rowId, 
HybridTimestamp.MAX_VALUE))
+                    .map(rowId -> mvPartitionStorage.read(rowId, 
HybridTimestamp.MAX_VALUE))
                     .filter(readResult -> !readResult.isEmpty())
                     .findAny()
                     .orElse(null);

Reply via email to