This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new a643a9ff9c IGNITE-21795 Unconditionally update storage with proper raft index within PartitionListener (#3454) a643a9ff9c is described below commit a643a9ff9cb5b750ff0719d950aa8b19bcae8770 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Thu Mar 21 10:44:17 2024 +0400 IGNITE-21795 Unconditionally update storage with proper raft index within PartitionListener (#3454) --- .../table/distributed/raft/PartitionListener.java | 18 +- .../raft/PartitionCommandListenerTest.java | 244 +++++++++++++++------ 2 files changed, 186 insertions(+), 76 deletions(-) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 21a9565a85..4e058951ac 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -276,9 +276,13 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler cmd.lastCommitTimestamp(), indexIdsAtRwTxBeginTs(catalogService, txId, storage.tableId()) ); - } - updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime()); + updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime()); + } else { + // We MUST bump information about last updated index+term. + // See a comment in #onWrite() for explanation. + advanceLastAppliedIndexConsistently(commandIndex, commandTerm); + } } replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? cmd.safeTime() : null, cmd.full()); @@ -313,6 +317,10 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler ); updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime()); + } else { + // We MUST bump information about last updated index+term. + // See a comment in #onWrite() for explanation. + advanceLastAppliedIndexConsistently(commandIndex, commandTerm); } } @@ -328,7 +336,7 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler * @return The actually stored transaction state {@link TransactionResult}. * @throws IgniteInternalException if an exception occurred during a transaction state change. */ - private TransactionResult handleFinishTxCommand(FinishTxCommand cmd, long commandIndex, long commandTerm) + private @Nullable TransactionResult handleFinishTxCommand(FinishTxCommand cmd, long commandIndex, long commandTerm) throws IgniteInternalException { // Skips the write command because the storage has already executed it. if (commandIndex <= txStateStorage.lastAppliedIndex()) { @@ -407,6 +415,10 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler // We MUST bump information about last updated index+term. // See a comment in #onWrite() for explanation. + advanceLastAppliedIndexConsistently(commandIndex, commandTerm); + } + + private void advanceLastAppliedIndexConsistently(long commandIndex, long commandTerm) { storage.runConsistently(locker -> { storage.lastApplied(commandIndex, commandTerm); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java index 656b8d4326..5187d61096 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.table.distributed.raft; +import static java.util.Collections.singletonMap; import static org.apache.ignite.internal.util.ArrayUtils.asList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -97,7 +99,9 @@ import org.apache.ignite.internal.table.distributed.TableMessagesFactory; import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage; import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand; import org.apache.ignite.internal.table.distributed.command.FinishTxCommand; +import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage; import org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage; +import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommand; import org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; @@ -121,6 +125,8 @@ import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.InOrder; @@ -317,9 +323,89 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { readAndCheck(false); } + @Test + void testSkipWriteCommandByAppliedIndex() { + mvPartitionStorage.lastApplied(10L, 1L); + + UpdateCommand updateCommand = mock(UpdateCommand.class); + when(updateCommand.safeTime()).thenAnswer(v -> hybridClock.now()); + + WriteIntentSwitchCommand writeIntentSwitchCommand = mock(WriteIntentSwitchCommand.class); + when(writeIntentSwitchCommand.safeTime()).thenAnswer(v -> hybridClock.now()); + + SafeTimeSyncCommand safeTimeSyncCommand = mock(SafeTimeSyncCommand.class); + when(safeTimeSyncCommand.safeTime()).thenAnswer(v -> hybridClock.now()); + + FinishTxCommand finishTxCommand = mock(FinishTxCommand.class); + when(finishTxCommand.safeTime()).thenAnswer(v -> hybridClock.now()); + + // Checks for MvPartitionStorage. + commandListener.onWrite(List.of( + writeCommandCommandClosure(3, 1, updateCommand, commandClosureResultCaptor), + writeCommandCommandClosure(10, 1, updateCommand, commandClosureResultCaptor), + writeCommandCommandClosure(4, 1, writeIntentSwitchCommand, commandClosureResultCaptor), + writeCommandCommandClosure(5, 1, safeTimeSyncCommand, commandClosureResultCaptor) + ).iterator()); + + verify(mvPartitionStorage, never()).runConsistently(any(WriteClosure.class)); + verify(mvPartitionStorage, times(1)).lastApplied(anyLong(), anyLong()); + + assertThat(commandClosureResultCaptor.getAllValues(), containsInAnyOrder(new Throwable[]{null, null, null, null})); + + // Checks for TxStateStorage. + mvPartitionStorage.lastApplied(1L, 1L); + txStateStorage.lastApplied(10L, 2L); + + commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class); + + commandListener.onWrite(List.of( + writeCommandCommandClosure(2, 1, finishTxCommand, commandClosureResultCaptor), + writeCommandCommandClosure(10, 1, finishTxCommand, commandClosureResultCaptor) + ).iterator()); + + verify(txStateStorage, never()).compareAndSet(any(UUID.class), any(TxState.class), any(TxMeta.class), anyLong(), anyLong()); + verify(txStateStorage, times(1)).lastApplied(anyLong(), anyLong()); + + assertThat(commandClosureResultCaptor.getAllValues(), containsInAnyOrder(new Throwable[]{null, null})); + } + + private static CommandClosure<WriteCommand> writeCommandCommandClosure( + long index, + long term, + WriteCommand writeCommand + ) { + return writeCommandCommandClosure(index, term, writeCommand, null); + } + + /** + * Create a command closure. + * + * @param index Index of the RAFT command. + * @param writeCommand Write command. + * @param resultClosureCaptor Captor for {@link CommandClosure#result(Serializable)} + */ + private static CommandClosure<WriteCommand> writeCommandCommandClosure( + long index, + long term, + WriteCommand writeCommand, + @Nullable ArgumentCaptor<Throwable> resultClosureCaptor + ) { + CommandClosure<WriteCommand> commandClosure = mock(CommandClosure.class); + + when(commandClosure.index()).thenReturn(index); + when(commandClosure.term()).thenReturn(term); + when(commandClosure.command()).thenReturn(writeCommand); + + if (resultClosureCaptor != null) { + doNothing().when(commandClosure).result(resultClosureCaptor.capture()); + } + + return commandClosure; + } + /** - * The test checks that {@link PartitionListener#onSnapshotSave(Path, Consumer)} propagates - * the maximal last applied index among storages to all storages. + * The test checks that {@link PartitionListener#onSnapshotSave(Path, Consumer)} propagates the maximal last applied index among + * storages to all storages. */ @Test public void testOnSnapshotSavePropagateLastAppliedIndexAndTerm() { @@ -378,64 +464,66 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { assertEquals(2L, txStateStorage.lastAppliedTerm()); } - @Test - void testSkipWriteCommandByAppliedIndex() { - mvPartitionStorage.lastApplied(10L, 1L); - - UpdateCommand updateCommand = mock(UpdateCommand.class); - when(updateCommand.safeTime()).thenAnswer(v -> hybridClock.now()); - - WriteIntentSwitchCommand writeIntentSwitchCommand = mock(WriteIntentSwitchCommand.class); - when(writeIntentSwitchCommand.safeTime()).thenAnswer(v -> hybridClock.now()); - - SafeTimeSyncCommand safeTimeSyncCommand = mock(SafeTimeSyncCommand.class); - when(safeTimeSyncCommand.safeTime()).thenAnswer(v -> hybridClock.now()); + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void updatesLastAppliedForUpdateCommands(boolean stale) { + safeTimeTracker.update(hybridClock.now(), null); - FinishTxCommand finishTxCommand = mock(FinishTxCommand.class); - when(finishTxCommand.safeTime()).thenAnswer(v -> hybridClock.now()); + UpdateCommand command = msgFactory.updateCommand() + .rowUuid(UUID.randomUUID()) + .tablePartitionId(defaultPartitionIdMessage()) + .txCoordinatorId(UUID.randomUUID().toString()) + .txId(TestTransactionIds.newTransactionId()) + .safeTimeLong(staleOrFreshSafeTime(stale)) + .build(); - // Checks for MvPartitionStorage. commandListener.onWrite(List.of( - writeCommandCommandClosure(3, 1, updateCommand, commandClosureResultCaptor), - writeCommandCommandClosure(10, 1, updateCommand, commandClosureResultCaptor), - writeCommandCommandClosure(4, 1, writeIntentSwitchCommand, commandClosureResultCaptor), - writeCommandCommandClosure(5, 1, safeTimeSyncCommand, commandClosureResultCaptor) + writeCommandCommandClosure(3, 2, command) ).iterator()); - verify(mvPartitionStorage, never()).runConsistently(any(WriteClosure.class)); - verify(mvPartitionStorage, times(1)).lastApplied(anyLong(), anyLong()); + verify(mvPartitionStorage).lastApplied(3, 2); + } - assertThat(commandClosureResultCaptor.getAllValues(), containsInAnyOrder(new Throwable[]{null, null, null, null})); + private long staleOrFreshSafeTime(boolean stale) { + return stale ? safeTimeTracker.current().subtractPhysicalTime(1).longValue() : hybridClock.nowLong(); + } - // Checks for TxStateStorage. - mvPartitionStorage.lastApplied(1L, 1L); - txStateStorage.lastApplied(10L, 2L); + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void updatesLastAppliedForUpdateAllCommands(boolean stale) { + safeTimeTracker.update(hybridClock.now(), null); - commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class); + UpdateAllCommand command = msgFactory.updateAllCommand() + .messageRowsToUpdate(singletonMap(UUID.randomUUID(), msgFactory.timedBinaryRowMessage().build())) + .tablePartitionId(defaultPartitionIdMessage()) + .txCoordinatorId(UUID.randomUUID().toString()) + .txId(TestTransactionIds.newTransactionId()) + .safeTimeLong(staleOrFreshSafeTime(stale)) + .build(); commandListener.onWrite(List.of( - writeCommandCommandClosure(2, 1, finishTxCommand, commandClosureResultCaptor), - writeCommandCommandClosure(10, 1, finishTxCommand, commandClosureResultCaptor) + writeCommandCommandClosure(3, 2, command) ).iterator()); - verify(txStateStorage, never()).compareAndSet(any(UUID.class), any(TxState.class), any(TxMeta.class), anyLong(), anyLong()); - verify(txStateStorage, times(1)).lastApplied(anyLong(), anyLong()); - - assertThat(commandClosureResultCaptor.getAllValues(), containsInAnyOrder(new Throwable[]{null, null})); + verify(mvPartitionStorage).lastApplied(3, 2); } - @Test - void updatesLastAppliedForSafeTimeSyncCommands() { - SafeTimeSyncCommand safeTimeSyncCommand = new ReplicaMessagesFactory() - .safeTimeSyncCommand() - .safeTimeLong(hybridClock.nowLong()) + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void updatesLastAppliedForFinishTxCommands(boolean stale) { + safeTimeTracker.update(hybridClock.now(), null); + + FinishTxCommand command = msgFactory.finishTxCommand() + .txId(TestTransactionIds.newTransactionId()) + .safeTimeLong(staleOrFreshSafeTime(stale)) .build(); commandListener.onWrite(List.of( - writeCommandCommandClosure(3, 2, safeTimeSyncCommand, commandClosureResultCaptor) + writeCommandCommandClosure(3, 2, command) ).iterator()); - verify(mvPartitionStorage).lastApplied(3, 2); + assertThat(txStateStorage.lastAppliedIndex(), is(3L)); + assertThat(txStateStorage.lastAppliedTerm(), is(2L)); } @Test @@ -560,35 +648,45 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { assertEquals(timestamp, safeTimeTracker.current()); } - /** - * Crate a command closure. - * - * @param index Index of the RAFT command. - * @param writeCommand Write command. - * @param resultClosureCaptor Captor for {@link CommandClosure#result(Serializable)} - */ - private static CommandClosure<WriteCommand> writeCommandCommandClosure( - long index, - long term, - WriteCommand writeCommand, - ArgumentCaptor<Throwable> resultClosureCaptor - ) { - CommandClosure<WriteCommand> commandClosure = mock(CommandClosure.class); + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void updatesLastAppliedForWriteIntentSwitchCommands(boolean stale) { + safeTimeTracker.update(hybridClock.now(), null); - when(commandClosure.index()).thenReturn(index); - when(commandClosure.term()).thenReturn(term); - when(commandClosure.command()).thenReturn(writeCommand); + WriteIntentSwitchCommand command = msgFactory.writeIntentSwitchCommand() + .txId(TestTransactionIds.newTransactionId()) + .safeTimeLong(staleOrFreshSafeTime(stale)) + .build(); - doNothing().when(commandClosure).result(resultClosureCaptor.capture()); + commandListener.onWrite(List.of( + writeCommandCommandClosure(3, 2, command) + ).iterator()); - return commandClosure; + verify(mvPartitionStorage).lastApplied(3, 2); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void updatesLastAppliedForSafeTimeSyncCommands(boolean stale) { + safeTimeTracker.update(hybridClock.now(), null); + + SafeTimeSyncCommand safeTimeSyncCommand = new ReplicaMessagesFactory() + .safeTimeSyncCommand() + .safeTimeLong(staleOrFreshSafeTime(stale)) + .build(); + + commandListener.onWrite(List.of( + writeCommandCommandClosure(3, 2, safeTimeSyncCommand) + ).iterator()); + + verify(mvPartitionStorage).lastApplied(3, 2); } /** * Prepares a closure iterator for a specific batch operation. * * @param func The function prepare a closure for the operation. - * @param <T> Type of the operation. + * @param <T> Type of the operation. * @return Closure iterator. */ private <T extends Command> Iterator<CommandClosure<T>> batchIterator(Consumer<CommandClosure<T>> func) { @@ -617,7 +715,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { * Prepares a closure iterator for a specific operation. * * @param func The function prepare a closure for the operation. - * @param <T> Type of the operation. + * @param <T> Type of the operation. * @return Closure iterator. */ private <T extends Command> Iterator<CommandClosure<T>> iterator(BiConsumer<Integer, CommandClosure<T>> func) { @@ -782,9 +880,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { when(clo.command()).thenReturn( msgFactory.updateCommand() - .tablePartitionId(msgFactory.tablePartitionIdMessage() - .tableId(1) - .partitionId(PARTITION_ID).build()) + .tablePartitionId(defaultPartitionIdMessage()) .rowUuid(readResult.rowId().uuid()) .messageRowToUpdate(msgFactory.timedBinaryRowMessage() .binaryRowMessage(row) @@ -811,6 +907,12 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { .build())); } + private TablePartitionIdMessage defaultPartitionIdMessage() { + return msgFactory.tablePartitionIdMessage() + .tableId(1) + .partitionId(PARTITION_ID).build(); + } + /** * Deletes row. */ @@ -827,9 +929,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { when(clo.command()).thenReturn( msgFactory.updateCommand() - .tablePartitionId(msgFactory.tablePartitionIdMessage() - .tableId(1) - .partitionId(PARTITION_ID).build()) + .tablePartitionId(defaultPartitionIdMessage()) .rowUuid(readResult.rowId().uuid()) .txId(txId) .safeTimeLong(hybridClock.nowLong()) @@ -865,7 +965,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { /** * Reads rows from the listener and checks values as expected by a mapper. * - * @param existed True if rows are existed, false otherwise. + * @param existed True if rows are existed, false otherwise. * @param keyValueMapper Mapper a key to the value which will be expected. */ private void readAndCheck(boolean existed, Function<Integer, Integer> keyValueMapper) { @@ -899,9 +999,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { when(clo.command()).thenReturn( msgFactory.updateCommand() - .tablePartitionId(msgFactory.tablePartitionIdMessage() - .tableId(1) - .partitionId(PARTITION_ID).build()) + .tablePartitionId(defaultPartitionIdMessage()) .rowUuid(UUID.randomUUID()) .messageRowToUpdate(msgFactory.timedBinaryRowMessage() .binaryRowMessage(getTestRow(i, i)) @@ -978,7 +1076,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { private @Nullable ReadResult readRow(BinaryTuple pk) { try (Cursor<RowId> cursor = pkStorage.storage().get(pk)) { return cursor.stream() - .map(rowId -> mvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE)) + .map(rowId -> mvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE)) .filter(readResult -> !readResult.isEmpty()) .findAny() .orElse(null);