This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 51ad1f918d3 IGNITE-27481 Do not take partition snapshot lock in table
raft processor (#7332)
51ad1f918d3 is described below
commit 51ad1f918d385c09501a45bca6c67ac132cbf98c
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Dec 30 16:48:14 2025 +0400
IGNITE-27481 Do not take partition snapshot lock in table raft processor
(#7332)
---
.../raft/snapshot/PartitionDataStorage.java | 17 +--
.../raft/ZonePartitionRaftListenerTest.java | 170 ++++++++++++++++++++-
.../SnapshotAwarePartitionDataStorageTest.java | 14 --
.../distributed/raft/TablePartitionProcessor.java | 19 +--
.../raft/handlers/BuildIndexCommandHandler.java | 6 +
.../MinimumActiveTxTimeCommandHandler.java | 2 +
.../SnapshotAwarePartitionDataStorage.java | 14 --
.../raft/PartitionCommandListenerTest.java | 140 ++++++++---------
.../distributed/TestPartitionDataStorage.java | 10 --
9 files changed, 256 insertions(+), 136 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
index 4f183843a17..4deb45ad02e 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
@@ -22,6 +22,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.PartitionSnapshots;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.AbortResult;
@@ -44,7 +45,7 @@ import org.jetbrains.annotations.Nullable;
*
* <p>Methods writing to MV storage ({@link #addWrite(RowId, BinaryRow, UUID,
int, int)}, {@link #abortWrite}
* and {@link #commitWrite}) and TX data storage MUST be invoked under a lock
acquired using
- * {@link #acquirePartitionSnapshotsReadLock()}.
+ * {@link PartitionSnapshots#acquireReadLock()}.
*
* <p>Each MvPartitionStorage instance represents exactly one partition. All
RowIds within a partition are sorted consistently with the
* {@link RowId#compareTo} comparison order.
@@ -70,16 +71,6 @@ public interface PartitionDataStorage extends
ManuallyCloseable {
*/
<V> V runConsistently(WriteClosure<V> closure) throws StorageException;
- /**
- * Acquires the read lock on partition snapshots.
- */
- void acquirePartitionSnapshotsReadLock();
-
- /**
- * Releases the read lock on partition snapshots.
- */
- void releasePartitionSnapshotsReadLock();
-
/**
* Flushes current state of the data or <i>the state from the nearest
future</i> to the storage.
* This feature allows implementing a batch flush for several partitions
at once.
@@ -188,7 +179,7 @@ public interface PartitionDataStorage extends
ManuallyCloseable {
/**
* Aborts a pending update of the ongoing uncommitted transaction. Invoked
during rollback.
*
- * <p>This must be called under a lock acquired using {@link
#acquirePartitionSnapshotsReadLock()}.
+ * <p>This must be called under a lock acquired using {@link
PartitionSnapshots#acquireReadLock()}.
*
* @param rowId Row ID.
* @param txId Transaction ID that abort write intent.
@@ -201,7 +192,7 @@ public interface PartitionDataStorage extends
ManuallyCloseable {
/**
* Commits a pending update of the ongoing transaction. Invoked during
commit. Committed value will be versioned by the given timestamp.
*
- * <p>This must be called under a lock acquired using {@link
#acquirePartitionSnapshotsReadLock()}.
+ * <p>This must be called under a lock acquired using {@link
PartitionSnapshots#acquireReadLock()}.
*
* @param rowId Row ID.
* @param timestamp Timestamp to associate with committed value.
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
index 2a3481839bb..83eac87ffc8 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.partition.replicator.raft;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonMap;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.runAsync;
@@ -31,6 +33,7 @@ import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -48,6 +51,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -56,10 +60,14 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommandV3;
import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandV2;
+import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommandV2;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2;
+import
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
+import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommandV2;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.PartitionSnapshots;
@@ -73,6 +81,7 @@ import
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.Locker;
@@ -103,8 +112,12 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
+import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -150,11 +163,19 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
private final SafeTimeValuesTracker safeTimeTracker = new
SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
+ @Spy
+ private final PendingComparableValuesTracker<Long, Void>
storageIndexTracker = new PendingComparableValuesTracker<>(0L);
+
private final HybridClock clock = new HybridClockImpl();
+ @Mock
+ private PartitionSnapshots partitionSnapshots;
+
@BeforeEach
void setUp() {
listener = createListener();
+
+
when(outgoingSnapshotsManager.partitionSnapshots(ZONE_PARTITION_KEY)).thenReturn(partitionSnapshots);
}
private ZonePartitionRaftListener createListener() {
@@ -163,7 +184,7 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
txStatePartitionStorage,
txManager,
safeTimeTracker,
- new PendingComparableValuesTracker<>(0L),
+ storageIndexTracker,
outgoingSnapshotsManager,
executor
);
@@ -175,7 +196,7 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
}
@Test
- void closesOngoingSnapshots(@Mock PartitionSnapshots partitionSnapshots) {
+ void closesOngoingSnapshots() {
listener.onShutdown();
verify(outgoingSnapshotsManager).cleanupOutgoingSnapshots(ZONE_PARTITION_KEY);
@@ -545,6 +566,151 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
verify(txStatePartitionStorage).lastApplied(3, 2);
}
+ @Test
+ void locksOnConfigCommit(@Mock RaftTableProcessor tableProcessor) {
+ listener.addTableProcessor(TABLE_ID, tableProcessor);
+
+ long index = 10;
+ listener.onConfigurationCommitted(
+ new RaftGroupConfiguration(
+ index,
+ 2,
+ 111L,
+ 110L,
+ List.of("peer"),
+ List.of("learner"),
+ List.of("old-peer"),
+ List.of("old-learner")
+ ),
+ index,
+ 2
+ );
+
+ InOrder inOrder = inOrder(partitionSnapshots, tableProcessor);
+
+ inOrder.verify(partitionSnapshots).acquireReadLock();
+ inOrder.verify(tableProcessor).onConfigurationCommitted(any(),
anyLong(), anyLong());
+ inOrder.verify(partitionSnapshots).releaseReadLock();
+ }
+
+ private void applyCommand(WriteCommand command, long index, long term,
@Nullable HybridTimestamp safeTimestamp) {
+ listener.onWrite(List.of(
+ writeCommandClosure(index, term, command, null, safeTimestamp)
+ ).iterator());
+ }
+
+ private static UpdateCommandV2 updateCommand() {
+ return PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
+ .rowUuid(randomUUID())
+ .tableId(TABLE_ID)
+ .commitPartitionId(defaultPartitionIdMessage())
+ .txCoordinatorId(randomUUID())
+ .txId(TestTransactionIds.newTransactionId())
+ .initiatorTime(anyTime())
+ .build();
+ }
+
+ private static TablePartitionIdMessage defaultPartitionIdMessage() {
+ return REPLICA_MESSAGES_FACTORY.tablePartitionIdMessage()
+ .tableId(TABLE_ID)
+ .partitionId(PARTITION_ID)
+ .build();
+ }
+
+ private static UpdateAllCommandV2 updateAllCommand() {
+ return PARTITION_REPLICATION_MESSAGES_FACTORY.updateAllCommandV2()
+ .messageRowsToUpdate(singletonMap(
+ randomUUID(),
+
PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage().build())
+ )
+ .tableId(TABLE_ID)
+ .commitPartitionId(defaultPartitionIdMessage())
+ .txCoordinatorId(randomUUID())
+ .txId(TestTransactionIds.newTransactionId())
+ .initiatorTime(anyTime())
+ .build();
+ }
+
+ @ParameterizedTest
+ @MethodSource("tableCommands")
+ void locksOnApplicationOfTableCommands(WriteCommand command, @Mock
RaftTableProcessor tableProcessor) {
+ listener.addTableProcessor(TABLE_ID, tableProcessor);
+ when(tableProcessor.processCommand(any(), anyLong(), anyLong(), any()))
+ .thenReturn(EMPTY_APPLIED_RESULT);
+
+ applyCommand(command, 3, 2, null);
+
+ InOrder inOrder = inOrder(partitionSnapshots, tableProcessor);
+
+ inOrder.verify(partitionSnapshots).acquireReadLock();
+ inOrder.verify(tableProcessor).processCommand(any(), anyLong(),
anyLong(), any());
+ inOrder.verify(partitionSnapshots).releaseReadLock();
+ }
+
+ private static Stream<Arguments> tableCommands() {
+ return Stream.of(
+ updateCommand(),
+ updateAllCommand(),
+ writeIntentSwitchCommand(),
+ primaryReplicaChangeCommand(),
+ buildIndexCommand(),
+ updateMinimumActiveTxBeginTimeCommand()
+ ).map(Arguments::of);
+ }
+
+ private static WriteIntentSwitchCommandV2 writeIntentSwitchCommand() {
+ return
PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
+ .txId(TestTransactionIds.newTransactionId())
+ .tableIds(Set.of(1))
+ .initiatorTime(anyTime())
+ .safeTime(anyTime())
+ .build();
+ }
+
+ private static PrimaryReplicaChangeCommand primaryReplicaChangeCommand() {
+ return new ReplicaMessagesFactory().primaryReplicaChangeCommand()
+ .primaryReplicaNodeId(randomUUID())
+ .primaryReplicaNodeName("test-node")
+ .build();
+ }
+
+ private static BuildIndexCommandV3 buildIndexCommand() {
+ return PARTITION_REPLICATION_MESSAGES_FACTORY.buildIndexCommandV3()
+ .tableId(TABLE_ID)
+ .rowIds(List.of(randomUUID()))
+ .abortedTransactionIds(emptySet())
+ .build();
+ }
+
+ private static HybridTimestamp anyTime() {
+ return HybridTimestamp.MIN_VALUE.addPhysicalTime(1000);
+ }
+
+ private static UpdateMinimumActiveTxBeginTimeCommand
updateMinimumActiveTxBeginTimeCommand() {
+ return
PARTITION_REPLICATION_MESSAGES_FACTORY.updateMinimumActiveTxBeginTimeCommand()
+ .initiatorTime(anyTime())
+ .safeTime(anyTime())
+ .build();
+ }
+
+ @Test
+ void locksOnApplicationOfSafeTimeSyncCommand() {
+ applyCommand(safeTimeSyncCommand(), 3, 2, null);
+
+ InOrder inOrder = inOrder(partitionSnapshots, txStatePartitionStorage);
+
+ inOrder.verify(partitionSnapshots).acquireReadLock();
+ inOrder.verify(txStatePartitionStorage).lastApplied(3, 2);
+ inOrder.verify(partitionSnapshots).releaseReadLock();
+ }
+
+ private static SafeTimeSyncCommand safeTimeSyncCommand() {
+ return new ReplicaMessagesFactory()
+ .safeTimeSyncCommand()
+ .initiatorTime(anyTime())
+ .build();
+ }
+
private CommandClosure<WriteCommand> writeCommandClosure(
long index,
long term,
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
index 860e678959f..ca8238df59c 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
@@ -225,20 +225,6 @@ class SnapshotAwarePartitionDataStorageTest extends
BaseIgniteAbstractTest {
verify(partitionStorage, never()).close();
}
- @Test
- void delegatesAcquirePartitionSnapshotsReadLock() {
- testedStorage.acquirePartitionSnapshotsReadLock();
-
- verify(partitionSnapshots).acquireReadLock();
- }
-
- @Test
- void delegatesReleasePartitionSnapshotsReadLock() {
- testedStorage.releasePartitionSnapshotsReadLock();
-
- verify(partitionSnapshots).releaseReadLock();
- }
-
@ParameterizedTest
@EnumSource(MvWriteAction.class)
void notYetPassedRowIsEnqueued(MvWriteAction writeAction) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
index e90fc76f4ae..4b46f0dd9f7 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
@@ -434,21 +434,12 @@ public class TablePartitionProcessor implements
RaftTableProcessor {
setCurrentGroupTopology(config);
- // Do the update under lock to make sure no snapshot is started
concurrently with this update.
- // Note that we do not need to protect from a concurrent command
execution by this listener because
- // configuration is committed in the same thread in which commands are
applied.
- storage.acquirePartitionSnapshotsReadLock();
-
- try {
- storage.runConsistently(locker -> {
- storage.committedGroupConfiguration(config);
- storage.lastApplied(lastAppliedIndex, lastAppliedTerm);
+ storage.runConsistently(locker -> {
+ storage.committedGroupConfiguration(config);
+ storage.lastApplied(lastAppliedIndex, lastAppliedTerm);
- return null;
- });
- } finally {
- storage.releasePartitionSnapshotsReadLock();
- }
+ return null;
+ });
}
private void setCurrentGroupTopology(RaftGroupConfiguration config) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
index 5a2f7ce2465..7754195a824 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
@@ -107,6 +107,12 @@ public class BuildIndexCommandHandler extends
AbstractCommandHandler<BuildIndexC
if (indexMeta == null || indexMeta.isDropped()) {
// Index has been dropped.
+
+ storage.runConsistently(locker -> {
+ storage.lastApplied(commandIndex, commandTerm);
+ return null;
+ });
+
return EMPTY_APPLIED_RESULT;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
index b2aa0127523..0dcb39e8975 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
@@ -84,6 +84,8 @@ public class MinimumActiveTxTimeCommandHandler extends
AbstractCommandHandler<Up
}
});
+ // This command does not update last applied index because it does not
change the storage state.
+
return EMPTY_APPLIED_RESULT;
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
index 507f0ea49e2..8b0f6da90e0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
@@ -89,20 +89,6 @@ public class SnapshotAwarePartitionDataStorage implements
PartitionDataStorage {
return partitionStorage.runConsistently(closure);
}
- @Override
- public void acquirePartitionSnapshotsReadLock() {
- PartitionSnapshots partitionSnapshots = getPartitionSnapshots();
-
- partitionSnapshots.acquireReadLock();
- }
-
- @Override
- public void releasePartitionSnapshotsReadLock() {
- PartitionSnapshots partitionSnapshots = getPartitionSnapshots();
-
- partitionSnapshots.releaseReadLock();
- }
-
private PartitionSnapshots getPartitionSnapshots() {
return partitionsSnapshots.partitionSnapshots(partitionKey);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 245502c1f6b..63521ef0329 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.table.distributed.raft;
+import static java.util.Collections.emptySet;
import static java.util.Collections.singletonMap;
+import static java.util.UUID.randomUUID;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.BUILDING;
import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED;
@@ -32,6 +34,7 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.lenient;
@@ -43,7 +46,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
-import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -71,11 +73,11 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.ClusterService;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
+import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommandV3;
import
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
-import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
-import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommandV2;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2;
-import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
+import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommandV2;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
@@ -117,7 +119,6 @@ import
org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
-import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
@@ -128,6 +129,9 @@ import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -179,9 +183,6 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
private final PartitionDataStorage partitionDataStorage = spy(new
TestPartitionDataStorage(TABLE_ID, PARTITION_ID, mvPartitionStorage));
- @WorkDirectory
- private Path workDir;
-
private static final PartitionReplicationMessagesFactory
PARTITION_REPLICATION_MESSAGES_FACTORY =
new PartitionReplicationMessagesFactory();
@@ -302,12 +303,15 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
PrimaryReplicaChangeCommand command =
REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
.primaryReplicaNodeName("primary")
- .primaryReplicaNodeId(UUID.randomUUID())
+ .primaryReplicaNodeId(randomUUID())
.leaseStartTime(HybridTimestamp.MIN_VALUE.addPhysicalTime(1).longValue())
.build();
commandListener.processCommand(command,
raftIndex.incrementAndGet(), 1, null);
}
+
+ // Do it so that in actual tests we can verify only interactions that
are done in test methods.
+ clearInvocations(partitionDataStorage);
}
/**
@@ -380,39 +384,67 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
readAndCheck(false);
}
- @Test
- void updatesLastAppliedForUpdateCommands() {
- UpdateCommand command =
PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
- .rowUuid(UUID.randomUUID())
+ @ParameterizedTest
+ @MethodSource("allCommandsUpdatingLastAppliedIndex")
+ void updatesLastAppliedForCommandsUpdatingLastAppliedIndex(WriteCommand
command) {
+ commandListener.processCommand(command, 3, 2, hybridClock.now());
+
+ verify(mvPartitionStorage).lastApplied(3, 2);
+ }
+
+ private static UpdateCommandV2 updateCommand() {
+ return PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
+ .rowUuid(randomUUID())
.tableId(TABLE_ID)
.commitPartitionId(defaultPartitionIdMessage())
- .txCoordinatorId(UUID.randomUUID())
+ .txCoordinatorId(randomUUID())
.txId(TestTransactionIds.newTransactionId())
- .initiatorTime(hybridClock.now())
+ .initiatorTime(anyTime())
.build();
-
- commandListener.processCommand(command, 3, 2, hybridClock.now());
-
- verify(mvPartitionStorage).lastApplied(3, 2);
}
- @Test
- void updatesLastAppliedForUpdateAllCommands() {
- UpdateAllCommand command =
PARTITION_REPLICATION_MESSAGES_FACTORY.updateAllCommandV2()
+ private static UpdateAllCommandV2 updateAllCommand() {
+ return PARTITION_REPLICATION_MESSAGES_FACTORY.updateAllCommandV2()
.messageRowsToUpdate(singletonMap(
- UUID.randomUUID(),
+ randomUUID(),
PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage().build())
)
.tableId(TABLE_ID)
.commitPartitionId(defaultPartitionIdMessage())
- .txCoordinatorId(UUID.randomUUID())
+ .txCoordinatorId(randomUUID())
.txId(TestTransactionIds.newTransactionId())
- .initiatorTime(hybridClock.now())
+ .initiatorTime(anyTime())
.build();
+ }
- commandListener.processCommand(command, 3, 2, hybridClock.now());
+ private static Stream<Arguments> allCommandsUpdatingLastAppliedIndex() {
+ return Stream.of(
+ updateCommand(),
+ updateAllCommand(),
+ writeIntentSwitchCommand(),
+ primaryReplicaChangeCommand(),
+ buildIndexCommand()
+ ).map(Arguments::of);
+ }
- verify(mvPartitionStorage).lastApplied(3, 2);
+ private static PrimaryReplicaChangeCommand primaryReplicaChangeCommand() {
+ return new ReplicaMessagesFactory().primaryReplicaChangeCommand()
+ .primaryReplicaNodeId(randomUUID())
+ .primaryReplicaNodeName("test-node")
+ .build();
+ }
+
+ private static BuildIndexCommandV3 buildIndexCommand() {
+ return PARTITION_REPLICATION_MESSAGES_FACTORY.buildIndexCommandV3()
+ .tableId(TABLE_ID)
+ .indexId(1)
+ .rowIds(List.of(randomUUID()))
+ .abortedTransactionIds(emptySet())
+ .build();
+ }
+
+ private static HybridTimestamp anyTime() {
+ return HybridTimestamp.MIN_VALUE.addPhysicalTime(1000);
}
@Test
@@ -486,31 +518,6 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
verify(mvPartitionStorage, times(1)).lastApplied(eq(1L), anyLong());
}
- @Test
- void locksOnConfigCommit() {
- long index = raftIndex.incrementAndGet();
- commandListener.onConfigurationCommitted(
- new RaftGroupConfiguration(
- index,
- 2,
- 111L,
- 110L,
- List.of("peer"),
- List.of("learner"),
- List.of("old-peer"),
- List.of("old-learner")
- ),
- index,
- 2
- );
-
- InOrder inOrder = inOrder(partitionDataStorage);
-
-
inOrder.verify(partitionDataStorage).acquirePartitionSnapshotsReadLock();
- inOrder.verify(partitionDataStorage).lastApplied(raftIndex.get(), 2);
-
inOrder.verify(partitionDataStorage).releasePartitionSnapshotsReadLock();
- }
-
@Test
void testBuildIndexCommand() {
int indexId = pkStorage.id();
@@ -596,18 +603,13 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.build();
}
- @Test
- void updatesLastAppliedForWriteIntentSwitchCommands() {
- WriteIntentSwitchCommand command =
PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
+ private static WriteIntentSwitchCommandV2 writeIntentSwitchCommand() {
+ return
PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
.txId(TestTransactionIds.newTransactionId())
.tableIds(Set.of(1))
- .initiatorTime(hybridClock.now())
- .safeTime(hybridClock.now())
+ .initiatorTime(anyTime())
+ .safeTime(anyTime())
.build();
-
- commandListener.processCommand(command, raftIndex.incrementAndGet(),
2, hybridClock.now());
-
- verify(mvPartitionStorage).lastApplied(raftIndex.get(), 2);
}
/**
@@ -636,7 +638,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.txId(txId)
.initiatorTime(hybridClock.now())
.safeTime(hybridClock.now())
- .txCoordinatorId(UUID.randomUUID())
+ .txCoordinatorId(randomUUID())
.build());
invokeBatchedCommand(PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
@@ -679,7 +681,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.txId(txId)
.initiatorTime(hybridClock.now())
.safeTime(hybridClock.now())
- .txCoordinatorId(UUID.randomUUID())
+ .txCoordinatorId(randomUUID())
.build());
invokeBatchedCommand(PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
@@ -717,7 +719,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.txId(txId)
.initiatorTime(hybridClock.now())
.safeTime(hybridClock.now())
- .txCoordinatorId(UUID.randomUUID())
+ .txCoordinatorId(randomUUID())
.build());
invokeBatchedCommand(PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
@@ -756,7 +758,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.txId(txId)
.initiatorTime(hybridClock.now())
.safeTime(hybridClock.now())
- .txCoordinatorId(UUID.randomUUID())
+ .txCoordinatorId(randomUUID())
.build();
commandListener.processCommand(command,
raftIndex.incrementAndGet(), 1, hybridClock.now());
@@ -774,7 +776,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.build()));
}
- private TablePartitionIdMessage defaultPartitionIdMessage() {
+ private static TablePartitionIdMessage defaultPartitionIdMessage() {
return REPLICA_MESSAGES_FACTORY.tablePartitionIdMessage()
.tableId(TABLE_ID)
.partitionId(PARTITION_ID)
@@ -800,7 +802,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.txId(txId)
.initiatorTime(hybridClock.now())
.safeTime(hybridClock.now())
- .txCoordinatorId(UUID.randomUUID())
+ .txCoordinatorId(randomUUID())
.build();
commandListener.processCommand(command,
raftIndex.incrementAndGet(), 1, hybridClock.now());
@@ -864,14 +866,14 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
UpdateCommandV2 command =
PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
.tableId(TABLE_ID)
.commitPartitionId(defaultPartitionIdMessage())
- .rowUuid(UUID.randomUUID())
+ .rowUuid(randomUUID())
.messageRowToUpdate(PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
.binaryRowMessage(getTestRow(i, i))
.build())
.txId(txId0)
.initiatorTime(hybridClock.now())
.safeTime(hybridClock.now())
- .txCoordinatorId(UUID.randomUUID())
+ .txCoordinatorId(randomUUID())
.build();
commandListener.processCommand(command,
raftIndex.incrementAndGet(), 1, hybridClock.now());
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 4d68eb2a087..76544ec73c8 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -78,16 +78,6 @@ public class TestPartitionDataStorage implements
PartitionDataStorage {
return partitionStorage.runConsistently(closure);
}
- @Override
- public void acquirePartitionSnapshotsReadLock() {
- // There is no 'write' side, so we don't need to take any lock.
- }
-
- @Override
- public void releasePartitionSnapshotsReadLock() {
- // There is no 'write' side, so we don't need to releasetestbala any
lock.
- }
-
@Override
public CompletableFuture<Void> flush(boolean trigger) {
return partitionStorage.flush(trigger);