This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a643a9ff9c IGNITE-21795 Unconditionally update storage with proper
raft index within PartitionListener (#3454)
a643a9ff9c is described below
commit a643a9ff9cb5b750ff0719d950aa8b19bcae8770
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Mar 21 10:44:17 2024 +0400
IGNITE-21795 Unconditionally update storage with proper raft index within
PartitionListener (#3454)
---
.../table/distributed/raft/PartitionListener.java | 18 +-
.../raft/PartitionCommandListenerTest.java | 244 +++++++++++++++------
2 files changed, 186 insertions(+), 76 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 21a9565a85..4e058951ac 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -276,9 +276,13 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
cmd.lastCommitTimestamp(),
indexIdsAtRwTxBeginTs(catalogService, txId,
storage.tableId())
);
- }
- updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
+ updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
+ } else {
+ // We MUST bump information about last updated index+term.
+ // See a comment in #onWrite() for explanation.
+ advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
+ }
}
replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? cmd.safeTime()
: null, cmd.full());
@@ -313,6 +317,10 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
);
updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
+ } else {
+ // We MUST bump information about last updated index+term.
+ // See a comment in #onWrite() for explanation.
+ advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
}
}
@@ -328,7 +336,7 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
* @return The actually stored transaction state {@link TransactionResult}.
* @throws IgniteInternalException if an exception occurred during a
transaction state change.
*/
- private TransactionResult handleFinishTxCommand(FinishTxCommand cmd, long
commandIndex, long commandTerm)
+ private @Nullable TransactionResult handleFinishTxCommand(FinishTxCommand
cmd, long commandIndex, long commandTerm)
throws IgniteInternalException {
// Skips the write command because the storage has already executed it.
if (commandIndex <= txStateStorage.lastAppliedIndex()) {
@@ -407,6 +415,10 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
// We MUST bump information about last updated index+term.
// See a comment in #onWrite() for explanation.
+ advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
+ }
+
+ private void advanceLastAppliedIndexConsistently(long commandIndex, long
commandTerm) {
storage.runConsistently(locker -> {
storage.lastApplied(commandIndex, commandTerm);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 656b8d4326..5187d61096 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.table.distributed.raft;
+import static java.util.Collections.singletonMap;
import static org.apache.ignite.internal.util.ArrayUtils.asList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -97,7 +99,9 @@ import
org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage;
+import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import
org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
@@ -121,6 +125,8 @@ import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InOrder;
@@ -317,9 +323,89 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
readAndCheck(false);
}
+ @Test
+ void testSkipWriteCommandByAppliedIndex() {
+ mvPartitionStorage.lastApplied(10L, 1L);
+
+ UpdateCommand updateCommand = mock(UpdateCommand.class);
+ when(updateCommand.safeTime()).thenAnswer(v -> hybridClock.now());
+
+ WriteIntentSwitchCommand writeIntentSwitchCommand =
mock(WriteIntentSwitchCommand.class);
+ when(writeIntentSwitchCommand.safeTime()).thenAnswer(v ->
hybridClock.now());
+
+ SafeTimeSyncCommand safeTimeSyncCommand =
mock(SafeTimeSyncCommand.class);
+ when(safeTimeSyncCommand.safeTime()).thenAnswer(v ->
hybridClock.now());
+
+ FinishTxCommand finishTxCommand = mock(FinishTxCommand.class);
+ when(finishTxCommand.safeTime()).thenAnswer(v -> hybridClock.now());
+
+ // Checks for MvPartitionStorage.
+ commandListener.onWrite(List.of(
+ writeCommandCommandClosure(3, 1, updateCommand,
commandClosureResultCaptor),
+ writeCommandCommandClosure(10, 1, updateCommand,
commandClosureResultCaptor),
+ writeCommandCommandClosure(4, 1, writeIntentSwitchCommand,
commandClosureResultCaptor),
+ writeCommandCommandClosure(5, 1, safeTimeSyncCommand,
commandClosureResultCaptor)
+ ).iterator());
+
+ verify(mvPartitionStorage,
never()).runConsistently(any(WriteClosure.class));
+ verify(mvPartitionStorage, times(1)).lastApplied(anyLong(), anyLong());
+
+ assertThat(commandClosureResultCaptor.getAllValues(),
containsInAnyOrder(new Throwable[]{null, null, null, null}));
+
+ // Checks for TxStateStorage.
+ mvPartitionStorage.lastApplied(1L, 1L);
+ txStateStorage.lastApplied(10L, 2L);
+
+ commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class);
+
+ commandListener.onWrite(List.of(
+ writeCommandCommandClosure(2, 1, finishTxCommand,
commandClosureResultCaptor),
+ writeCommandCommandClosure(10, 1, finishTxCommand,
commandClosureResultCaptor)
+ ).iterator());
+
+ verify(txStateStorage, never()).compareAndSet(any(UUID.class),
any(TxState.class), any(TxMeta.class), anyLong(), anyLong());
+ verify(txStateStorage, times(1)).lastApplied(anyLong(), anyLong());
+
+ assertThat(commandClosureResultCaptor.getAllValues(),
containsInAnyOrder(new Throwable[]{null, null}));
+ }
+
+ private static CommandClosure<WriteCommand> writeCommandCommandClosure(
+ long index,
+ long term,
+ WriteCommand writeCommand
+ ) {
+ return writeCommandCommandClosure(index, term, writeCommand, null);
+ }
+
+ /**
+ * Create a command closure.
+ *
+ * @param index Index of the RAFT command.
+ * @param writeCommand Write command.
+ * @param resultClosureCaptor Captor for {@link
CommandClosure#result(Serializable)}
+ */
+ private static CommandClosure<WriteCommand> writeCommandCommandClosure(
+ long index,
+ long term,
+ WriteCommand writeCommand,
+ @Nullable ArgumentCaptor<Throwable> resultClosureCaptor
+ ) {
+ CommandClosure<WriteCommand> commandClosure =
mock(CommandClosure.class);
+
+ when(commandClosure.index()).thenReturn(index);
+ when(commandClosure.term()).thenReturn(term);
+ when(commandClosure.command()).thenReturn(writeCommand);
+
+ if (resultClosureCaptor != null) {
+
doNothing().when(commandClosure).result(resultClosureCaptor.capture());
+ }
+
+ return commandClosure;
+ }
+
/**
- * The test checks that {@link PartitionListener#onSnapshotSave(Path,
Consumer)} propagates
- * the maximal last applied index among storages to all storages.
+ * The test checks that {@link PartitionListener#onSnapshotSave(Path,
Consumer)} propagates the maximal last applied index among
+ * storages to all storages.
*/
@Test
public void testOnSnapshotSavePropagateLastAppliedIndexAndTerm() {
@@ -378,64 +464,66 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
assertEquals(2L, txStateStorage.lastAppliedTerm());
}
- @Test
- void testSkipWriteCommandByAppliedIndex() {
- mvPartitionStorage.lastApplied(10L, 1L);
-
- UpdateCommand updateCommand = mock(UpdateCommand.class);
- when(updateCommand.safeTime()).thenAnswer(v -> hybridClock.now());
-
- WriteIntentSwitchCommand writeIntentSwitchCommand =
mock(WriteIntentSwitchCommand.class);
- when(writeIntentSwitchCommand.safeTime()).thenAnswer(v ->
hybridClock.now());
-
- SafeTimeSyncCommand safeTimeSyncCommand =
mock(SafeTimeSyncCommand.class);
- when(safeTimeSyncCommand.safeTime()).thenAnswer(v ->
hybridClock.now());
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void updatesLastAppliedForUpdateCommands(boolean stale) {
+ safeTimeTracker.update(hybridClock.now(), null);
- FinishTxCommand finishTxCommand = mock(FinishTxCommand.class);
- when(finishTxCommand.safeTime()).thenAnswer(v -> hybridClock.now());
+ UpdateCommand command = msgFactory.updateCommand()
+ .rowUuid(UUID.randomUUID())
+ .tablePartitionId(defaultPartitionIdMessage())
+ .txCoordinatorId(UUID.randomUUID().toString())
+ .txId(TestTransactionIds.newTransactionId())
+ .safeTimeLong(staleOrFreshSafeTime(stale))
+ .build();
- // Checks for MvPartitionStorage.
commandListener.onWrite(List.of(
- writeCommandCommandClosure(3, 1, updateCommand,
commandClosureResultCaptor),
- writeCommandCommandClosure(10, 1, updateCommand,
commandClosureResultCaptor),
- writeCommandCommandClosure(4, 1, writeIntentSwitchCommand,
commandClosureResultCaptor),
- writeCommandCommandClosure(5, 1, safeTimeSyncCommand,
commandClosureResultCaptor)
+ writeCommandCommandClosure(3, 2, command)
).iterator());
- verify(mvPartitionStorage,
never()).runConsistently(any(WriteClosure.class));
- verify(mvPartitionStorage, times(1)).lastApplied(anyLong(), anyLong());
+ verify(mvPartitionStorage).lastApplied(3, 2);
+ }
- assertThat(commandClosureResultCaptor.getAllValues(),
containsInAnyOrder(new Throwable[]{null, null, null, null}));
+ private long staleOrFreshSafeTime(boolean stale) {
+ return stale ?
safeTimeTracker.current().subtractPhysicalTime(1).longValue() :
hybridClock.nowLong();
+ }
- // Checks for TxStateStorage.
- mvPartitionStorage.lastApplied(1L, 1L);
- txStateStorage.lastApplied(10L, 2L);
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void updatesLastAppliedForUpdateAllCommands(boolean stale) {
+ safeTimeTracker.update(hybridClock.now(), null);
- commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class);
+ UpdateAllCommand command = msgFactory.updateAllCommand()
+ .messageRowsToUpdate(singletonMap(UUID.randomUUID(),
msgFactory.timedBinaryRowMessage().build()))
+ .tablePartitionId(defaultPartitionIdMessage())
+ .txCoordinatorId(UUID.randomUUID().toString())
+ .txId(TestTransactionIds.newTransactionId())
+ .safeTimeLong(staleOrFreshSafeTime(stale))
+ .build();
commandListener.onWrite(List.of(
- writeCommandCommandClosure(2, 1, finishTxCommand,
commandClosureResultCaptor),
- writeCommandCommandClosure(10, 1, finishTxCommand,
commandClosureResultCaptor)
+ writeCommandCommandClosure(3, 2, command)
).iterator());
- verify(txStateStorage, never()).compareAndSet(any(UUID.class),
any(TxState.class), any(TxMeta.class), anyLong(), anyLong());
- verify(txStateStorage, times(1)).lastApplied(anyLong(), anyLong());
-
- assertThat(commandClosureResultCaptor.getAllValues(),
containsInAnyOrder(new Throwable[]{null, null}));
+ verify(mvPartitionStorage).lastApplied(3, 2);
}
- @Test
- void updatesLastAppliedForSafeTimeSyncCommands() {
- SafeTimeSyncCommand safeTimeSyncCommand = new ReplicaMessagesFactory()
- .safeTimeSyncCommand()
- .safeTimeLong(hybridClock.nowLong())
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void updatesLastAppliedForFinishTxCommands(boolean stale) {
+ safeTimeTracker.update(hybridClock.now(), null);
+
+ FinishTxCommand command = msgFactory.finishTxCommand()
+ .txId(TestTransactionIds.newTransactionId())
+ .safeTimeLong(staleOrFreshSafeTime(stale))
.build();
commandListener.onWrite(List.of(
- writeCommandCommandClosure(3, 2, safeTimeSyncCommand,
commandClosureResultCaptor)
+ writeCommandCommandClosure(3, 2, command)
).iterator());
- verify(mvPartitionStorage).lastApplied(3, 2);
+ assertThat(txStateStorage.lastAppliedIndex(), is(3L));
+ assertThat(txStateStorage.lastAppliedTerm(), is(2L));
}
@Test
@@ -560,35 +648,45 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
assertEquals(timestamp, safeTimeTracker.current());
}
- /**
- * Crate a command closure.
- *
- * @param index Index of the RAFT command.
- * @param writeCommand Write command.
- * @param resultClosureCaptor Captor for {@link
CommandClosure#result(Serializable)}
- */
- private static CommandClosure<WriteCommand> writeCommandCommandClosure(
- long index,
- long term,
- WriteCommand writeCommand,
- ArgumentCaptor<Throwable> resultClosureCaptor
- ) {
- CommandClosure<WriteCommand> commandClosure =
mock(CommandClosure.class);
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void updatesLastAppliedForWriteIntentSwitchCommands(boolean stale) {
+ safeTimeTracker.update(hybridClock.now(), null);
- when(commandClosure.index()).thenReturn(index);
- when(commandClosure.term()).thenReturn(term);
- when(commandClosure.command()).thenReturn(writeCommand);
+ WriteIntentSwitchCommand command =
msgFactory.writeIntentSwitchCommand()
+ .txId(TestTransactionIds.newTransactionId())
+ .safeTimeLong(staleOrFreshSafeTime(stale))
+ .build();
- doNothing().when(commandClosure).result(resultClosureCaptor.capture());
+ commandListener.onWrite(List.of(
+ writeCommandCommandClosure(3, 2, command)
+ ).iterator());
- return commandClosure;
+ verify(mvPartitionStorage).lastApplied(3, 2);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void updatesLastAppliedForSafeTimeSyncCommands(boolean stale) {
+ safeTimeTracker.update(hybridClock.now(), null);
+
+ SafeTimeSyncCommand safeTimeSyncCommand = new ReplicaMessagesFactory()
+ .safeTimeSyncCommand()
+ .safeTimeLong(staleOrFreshSafeTime(stale))
+ .build();
+
+ commandListener.onWrite(List.of(
+ writeCommandCommandClosure(3, 2, safeTimeSyncCommand)
+ ).iterator());
+
+ verify(mvPartitionStorage).lastApplied(3, 2);
}
/**
* Prepares a closure iterator for a specific batch operation.
*
* @param func The function prepare a closure for the operation.
- * @param <T> Type of the operation.
+ * @param <T> Type of the operation.
* @return Closure iterator.
*/
private <T extends Command> Iterator<CommandClosure<T>>
batchIterator(Consumer<CommandClosure<T>> func) {
@@ -617,7 +715,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
* Prepares a closure iterator for a specific operation.
*
* @param func The function prepare a closure for the operation.
- * @param <T> Type of the operation.
+ * @param <T> Type of the operation.
* @return Closure iterator.
*/
private <T extends Command> Iterator<CommandClosure<T>>
iterator(BiConsumer<Integer, CommandClosure<T>> func) {
@@ -782,9 +880,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
when(clo.command()).thenReturn(
msgFactory.updateCommand()
-
.tablePartitionId(msgFactory.tablePartitionIdMessage()
- .tableId(1)
- .partitionId(PARTITION_ID).build())
+ .tablePartitionId(defaultPartitionIdMessage())
.rowUuid(readResult.rowId().uuid())
.messageRowToUpdate(msgFactory.timedBinaryRowMessage()
.binaryRowMessage(row)
@@ -811,6 +907,12 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.build()));
}
+ private TablePartitionIdMessage defaultPartitionIdMessage() {
+ return msgFactory.tablePartitionIdMessage()
+ .tableId(1)
+ .partitionId(PARTITION_ID).build();
+ }
+
/**
* Deletes row.
*/
@@ -827,9 +929,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
when(clo.command()).thenReturn(
msgFactory.updateCommand()
-
.tablePartitionId(msgFactory.tablePartitionIdMessage()
- .tableId(1)
- .partitionId(PARTITION_ID).build())
+ .tablePartitionId(defaultPartitionIdMessage())
.rowUuid(readResult.rowId().uuid())
.txId(txId)
.safeTimeLong(hybridClock.nowLong())
@@ -865,7 +965,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
/**
* Reads rows from the listener and checks values as expected by a mapper.
*
- * @param existed True if rows are existed, false otherwise.
+ * @param existed True if rows are existed, false otherwise.
* @param keyValueMapper Mapper a key to the value which will be expected.
*/
private void readAndCheck(boolean existed, Function<Integer, Integer>
keyValueMapper) {
@@ -899,9 +999,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
when(clo.command()).thenReturn(
msgFactory.updateCommand()
-
.tablePartitionId(msgFactory.tablePartitionIdMessage()
- .tableId(1)
- .partitionId(PARTITION_ID).build())
+ .tablePartitionId(defaultPartitionIdMessage())
.rowUuid(UUID.randomUUID())
.messageRowToUpdate(msgFactory.timedBinaryRowMessage()
.binaryRowMessage(getTestRow(i, i))
@@ -978,7 +1076,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
private @Nullable ReadResult readRow(BinaryTuple pk) {
try (Cursor<RowId> cursor = pkStorage.storage().get(pk)) {
return cursor.stream()
- .map(rowId -> mvPartitionStorage.read(rowId,
HybridTimestamp.MAX_VALUE))
+ .map(rowId -> mvPartitionStorage.read(rowId,
HybridTimestamp.MAX_VALUE))
.filter(readResult -> !readResult.isEmpty())
.findAny()
.orElse(null);