This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 51ad1f918d3 IGNITE-27481 Do not take partition snapshot lock in table 
raft processor (#7332)
51ad1f918d3 is described below

commit 51ad1f918d385c09501a45bca6c67ac132cbf98c
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Dec 30 16:48:14 2025 +0400

    IGNITE-27481 Do not take partition snapshot lock in table raft processor 
(#7332)
---
 .../raft/snapshot/PartitionDataStorage.java        |  17 +--
 .../raft/ZonePartitionRaftListenerTest.java        | 170 ++++++++++++++++++++-
 .../SnapshotAwarePartitionDataStorageTest.java     |  14 --
 .../distributed/raft/TablePartitionProcessor.java  |  19 +--
 .../raft/handlers/BuildIndexCommandHandler.java    |   6 +
 .../MinimumActiveTxTimeCommandHandler.java         |   2 +
 .../SnapshotAwarePartitionDataStorage.java         |  14 --
 .../raft/PartitionCommandListenerTest.java         | 140 ++++++++---------
 .../distributed/TestPartitionDataStorage.java      |  10 --
 9 files changed, 256 insertions(+), 136 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
index 4f183843a17..4deb45ad02e 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
@@ -22,6 +22,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.PartitionSnapshots;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.AbortResult;
@@ -44,7 +45,7 @@ import org.jetbrains.annotations.Nullable;
  *
  * <p>Methods writing to MV storage ({@link #addWrite(RowId, BinaryRow, UUID, 
int, int)}, {@link #abortWrite}
  * and {@link #commitWrite}) and TX data storage MUST be invoked under a lock 
acquired using
- * {@link #acquirePartitionSnapshotsReadLock()}.
+ * {@link PartitionSnapshots#acquireReadLock()}.
  *
  * <p>Each MvPartitionStorage instance represents exactly one partition. All 
RowIds within a partition are sorted consistently with the
  * {@link RowId#compareTo} comparison order.
@@ -70,16 +71,6 @@ public interface PartitionDataStorage extends 
ManuallyCloseable {
      */
     <V> V runConsistently(WriteClosure<V> closure) throws StorageException;
 
-    /**
-     * Acquires the read lock on partition snapshots.
-     */
-    void acquirePartitionSnapshotsReadLock();
-
-    /**
-     * Releases the read lock on partition snapshots.
-     */
-    void releasePartitionSnapshotsReadLock();
-
     /**
      * Flushes current state of the data or <i>the state from the nearest 
future</i> to the storage.
      * This feature allows implementing a batch flush for several partitions 
at once.
@@ -188,7 +179,7 @@ public interface PartitionDataStorage extends 
ManuallyCloseable {
     /**
      * Aborts a pending update of the ongoing uncommitted transaction. Invoked 
during rollback.
      *
-     * <p>This must be called under a lock acquired using {@link 
#acquirePartitionSnapshotsReadLock()}.
+     * <p>This must be called under a lock acquired using {@link 
PartitionSnapshots#acquireReadLock()}.
      *
      * @param rowId Row ID.
      * @param txId Transaction ID that abort write intent.
@@ -201,7 +192,7 @@ public interface PartitionDataStorage extends 
ManuallyCloseable {
     /**
      * Commits a pending update of the ongoing transaction. Invoked during 
commit. Committed value will be versioned by the given timestamp.
      *
-     * <p>This must be called under a lock acquired using {@link 
#acquirePartitionSnapshotsReadLock()}.
+     * <p>This must be called under a lock acquired using {@link 
PartitionSnapshots#acquireReadLock()}.
      *
      * @param rowId Row ID.
      * @param timestamp Timestamp to associate with committed value.
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
index 2a3481839bb..83eac87ffc8 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.partition.replicator.raft;
 
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonMap;
 import static java.util.UUID.randomUUID;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.runAsync;
@@ -31,6 +33,7 @@ import static org.mockito.Answers.RETURNS_DEEP_STUBS;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -48,6 +51,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -56,10 +60,14 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommandV3;
 import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandV2;
+import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommandV2;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2;
+import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
+import 
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommandV2;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.PartitionSnapshots;
@@ -73,6 +81,7 @@ import 
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
 import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
 import 
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.Locker;
@@ -103,8 +112,12 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -150,11 +163,19 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
 
     private final SafeTimeValuesTracker safeTimeTracker = new 
SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
 
+    @Spy
+    private final PendingComparableValuesTracker<Long, Void> 
storageIndexTracker = new PendingComparableValuesTracker<>(0L);
+
     private final HybridClock clock = new HybridClockImpl();
 
+    @Mock
+    private PartitionSnapshots partitionSnapshots;
+
     @BeforeEach
     void setUp() {
         listener = createListener();
+
+        
when(outgoingSnapshotsManager.partitionSnapshots(ZONE_PARTITION_KEY)).thenReturn(partitionSnapshots);
     }
 
     private ZonePartitionRaftListener createListener() {
@@ -163,7 +184,7 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
                 txStatePartitionStorage,
                 txManager,
                 safeTimeTracker,
-                new PendingComparableValuesTracker<>(0L),
+                storageIndexTracker,
                 outgoingSnapshotsManager,
                 executor
         );
@@ -175,7 +196,7 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    void closesOngoingSnapshots(@Mock PartitionSnapshots partitionSnapshots) {
+    void closesOngoingSnapshots() {
         listener.onShutdown();
 
         
verify(outgoingSnapshotsManager).cleanupOutgoingSnapshots(ZONE_PARTITION_KEY);
@@ -545,6 +566,151 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
         verify(txStatePartitionStorage).lastApplied(3, 2);
     }
 
+    @Test
+    void locksOnConfigCommit(@Mock RaftTableProcessor tableProcessor) {
+        listener.addTableProcessor(TABLE_ID, tableProcessor);
+
+        long index = 10;
+        listener.onConfigurationCommitted(
+                new RaftGroupConfiguration(
+                        index,
+                        2,
+                        111L,
+                        110L,
+                        List.of("peer"),
+                        List.of("learner"),
+                        List.of("old-peer"),
+                        List.of("old-learner")
+                ),
+                index,
+                2
+        );
+
+        InOrder inOrder = inOrder(partitionSnapshots, tableProcessor);
+
+        inOrder.verify(partitionSnapshots).acquireReadLock();
+        inOrder.verify(tableProcessor).onConfigurationCommitted(any(), 
anyLong(), anyLong());
+        inOrder.verify(partitionSnapshots).releaseReadLock();
+    }
+
+    private void applyCommand(WriteCommand command, long index, long term, 
@Nullable HybridTimestamp safeTimestamp) {
+        listener.onWrite(List.of(
+                writeCommandClosure(index, term, command, null, safeTimestamp)
+        ).iterator());
+    }
+
+    private static UpdateCommandV2 updateCommand() {
+        return PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
+                .rowUuid(randomUUID())
+                .tableId(TABLE_ID)
+                .commitPartitionId(defaultPartitionIdMessage())
+                .txCoordinatorId(randomUUID())
+                .txId(TestTransactionIds.newTransactionId())
+                .initiatorTime(anyTime())
+                .build();
+    }
+
+    private static TablePartitionIdMessage defaultPartitionIdMessage() {
+        return REPLICA_MESSAGES_FACTORY.tablePartitionIdMessage()
+                .tableId(TABLE_ID)
+                .partitionId(PARTITION_ID)
+                .build();
+    }
+
+    private static UpdateAllCommandV2 updateAllCommand() {
+        return PARTITION_REPLICATION_MESSAGES_FACTORY.updateAllCommandV2()
+                .messageRowsToUpdate(singletonMap(
+                        randomUUID(),
+                        
PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage().build())
+                )
+                .tableId(TABLE_ID)
+                .commitPartitionId(defaultPartitionIdMessage())
+                .txCoordinatorId(randomUUID())
+                .txId(TestTransactionIds.newTransactionId())
+                .initiatorTime(anyTime())
+                .build();
+    }
+
+    @ParameterizedTest
+    @MethodSource("tableCommands")
+    void locksOnApplicationOfTableCommands(WriteCommand command, @Mock 
RaftTableProcessor tableProcessor) {
+        listener.addTableProcessor(TABLE_ID, tableProcessor);
+        when(tableProcessor.processCommand(any(), anyLong(), anyLong(), any()))
+                .thenReturn(EMPTY_APPLIED_RESULT);
+
+        applyCommand(command, 3, 2, null);
+
+        InOrder inOrder = inOrder(partitionSnapshots, tableProcessor);
+
+        inOrder.verify(partitionSnapshots).acquireReadLock();
+        inOrder.verify(tableProcessor).processCommand(any(), anyLong(), 
anyLong(), any());
+        inOrder.verify(partitionSnapshots).releaseReadLock();
+    }
+
+    private static Stream<Arguments> tableCommands() {
+        return Stream.of(
+                updateCommand(),
+                updateAllCommand(),
+                writeIntentSwitchCommand(),
+                primaryReplicaChangeCommand(),
+                buildIndexCommand(),
+                updateMinimumActiveTxBeginTimeCommand()
+        ).map(Arguments::of);
+    }
+
+    private static WriteIntentSwitchCommandV2 writeIntentSwitchCommand() {
+        return 
PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
+                .txId(TestTransactionIds.newTransactionId())
+                .tableIds(Set.of(1))
+                .initiatorTime(anyTime())
+                .safeTime(anyTime())
+                .build();
+    }
+
+    private static PrimaryReplicaChangeCommand primaryReplicaChangeCommand() {
+        return new ReplicaMessagesFactory().primaryReplicaChangeCommand()
+                .primaryReplicaNodeId(randomUUID())
+                .primaryReplicaNodeName("test-node")
+                .build();
+    }
+
+    private static BuildIndexCommandV3 buildIndexCommand() {
+        return PARTITION_REPLICATION_MESSAGES_FACTORY.buildIndexCommandV3()
+                .tableId(TABLE_ID)
+                .rowIds(List.of(randomUUID()))
+                .abortedTransactionIds(emptySet())
+                .build();
+    }
+
+    private static HybridTimestamp anyTime() {
+        return HybridTimestamp.MIN_VALUE.addPhysicalTime(1000);
+    }
+
+    private static UpdateMinimumActiveTxBeginTimeCommand 
updateMinimumActiveTxBeginTimeCommand() {
+        return 
PARTITION_REPLICATION_MESSAGES_FACTORY.updateMinimumActiveTxBeginTimeCommand()
+                .initiatorTime(anyTime())
+                .safeTime(anyTime())
+                .build();
+    }
+
+    @Test
+    void locksOnApplicationOfSafeTimeSyncCommand() {
+        applyCommand(safeTimeSyncCommand(), 3, 2, null);
+
+        InOrder inOrder = inOrder(partitionSnapshots, txStatePartitionStorage);
+
+        inOrder.verify(partitionSnapshots).acquireReadLock();
+        inOrder.verify(txStatePartitionStorage).lastApplied(3, 2);
+        inOrder.verify(partitionSnapshots).releaseReadLock();
+    }
+
+    private static SafeTimeSyncCommand safeTimeSyncCommand() {
+        return new ReplicaMessagesFactory()
+                .safeTimeSyncCommand()
+                .initiatorTime(anyTime())
+                .build();
+    }
+
     private CommandClosure<WriteCommand> writeCommandClosure(
             long index,
             long term,
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
index 860e678959f..ca8238df59c 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
@@ -225,20 +225,6 @@ class SnapshotAwarePartitionDataStorageTest extends 
BaseIgniteAbstractTest {
         verify(partitionStorage, never()).close();
     }
 
-    @Test
-    void delegatesAcquirePartitionSnapshotsReadLock() {
-        testedStorage.acquirePartitionSnapshotsReadLock();
-
-        verify(partitionSnapshots).acquireReadLock();
-    }
-
-    @Test
-    void delegatesReleasePartitionSnapshotsReadLock() {
-        testedStorage.releasePartitionSnapshotsReadLock();
-
-        verify(partitionSnapshots).releaseReadLock();
-    }
-
     @ParameterizedTest
     @EnumSource(MvWriteAction.class)
     void notYetPassedRowIsEnqueued(MvWriteAction writeAction) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
index e90fc76f4ae..4b46f0dd9f7 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
@@ -434,21 +434,12 @@ public class TablePartitionProcessor implements 
RaftTableProcessor {
 
         setCurrentGroupTopology(config);
 
-        // Do the update under lock to make sure no snapshot is started 
concurrently with this update.
-        // Note that we do not need to protect from a concurrent command 
execution by this listener because
-        // configuration is committed in the same thread in which commands are 
applied.
-        storage.acquirePartitionSnapshotsReadLock();
-
-        try {
-            storage.runConsistently(locker -> {
-                storage.committedGroupConfiguration(config);
-                storage.lastApplied(lastAppliedIndex, lastAppliedTerm);
+        storage.runConsistently(locker -> {
+            storage.committedGroupConfiguration(config);
+            storage.lastApplied(lastAppliedIndex, lastAppliedTerm);
 
-                return null;
-            });
-        } finally {
-            storage.releasePartitionSnapshotsReadLock();
-        }
+            return null;
+        });
     }
 
     private void setCurrentGroupTopology(RaftGroupConfiguration config) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
index 5a2f7ce2465..7754195a824 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
@@ -107,6 +107,12 @@ public class BuildIndexCommandHandler extends 
AbstractCommandHandler<BuildIndexC
 
         if (indexMeta == null || indexMeta.isDropped()) {
             // Index has been dropped.
+
+            storage.runConsistently(locker -> {
+                storage.lastApplied(commandIndex, commandTerm);
+                return null;
+            });
+
             return EMPTY_APPLIED_RESULT;
         }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
index b2aa0127523..0dcb39e8975 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
@@ -84,6 +84,8 @@ public class MinimumActiveTxTimeCommandHandler extends 
AbstractCommandHandler<Up
                     }
                 });
 
+        // This command does not update last applied index because it does not 
change the storage state.
+
         return EMPTY_APPLIED_RESULT;
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
index 507f0ea49e2..8b0f6da90e0 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
@@ -89,20 +89,6 @@ public class SnapshotAwarePartitionDataStorage implements 
PartitionDataStorage {
         return partitionStorage.runConsistently(closure);
     }
 
-    @Override
-    public void acquirePartitionSnapshotsReadLock() {
-        PartitionSnapshots partitionSnapshots = getPartitionSnapshots();
-
-        partitionSnapshots.acquireReadLock();
-    }
-
-    @Override
-    public void releasePartitionSnapshotsReadLock() {
-        PartitionSnapshots partitionSnapshots = getPartitionSnapshots();
-
-        partitionSnapshots.releaseReadLock();
-    }
-
     private PartitionSnapshots getPartitionSnapshots() {
         return partitionsSnapshots.partitionSnapshots(partitionKey);
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 245502c1f6b..63521ef0329 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.table.distributed.raft;
 
+import static java.util.Collections.emptySet;
 import static java.util.Collections.singletonMap;
+import static java.util.UUID.randomUUID;
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
 import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.BUILDING;
 import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED;
@@ -32,6 +34,7 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.lenient;
@@ -43,7 +46,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.nio.ByteBuffer;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -71,11 +73,11 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.network.ClusterService;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
+import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommandV3;
 import 
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
-import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
-import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommandV2;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2;
-import 
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
+import 
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommandV2;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
 import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
@@ -117,7 +119,6 @@ import 
org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
-import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.test.TestTransactionIds;
@@ -128,6 +129,9 @@ import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -179,9 +183,6 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
 
     private final PartitionDataStorage partitionDataStorage = spy(new 
TestPartitionDataStorage(TABLE_ID, PARTITION_ID, mvPartitionStorage));
 
-    @WorkDirectory
-    private Path workDir;
-
     private static final PartitionReplicationMessagesFactory 
PARTITION_REPLICATION_MESSAGES_FACTORY =
             new PartitionReplicationMessagesFactory();
 
@@ -302,12 +303,15 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
 
             PrimaryReplicaChangeCommand command = 
REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
                     .primaryReplicaNodeName("primary")
-                    .primaryReplicaNodeId(UUID.randomUUID())
+                    .primaryReplicaNodeId(randomUUID())
                     
.leaseStartTime(HybridTimestamp.MIN_VALUE.addPhysicalTime(1).longValue())
                     .build();
 
             commandListener.processCommand(command, 
raftIndex.incrementAndGet(), 1, null);
         }
+
+        // Do it so that in actual tests we can verify only interactions that 
are done in test methods.
+        clearInvocations(partitionDataStorage);
     }
 
     /**
@@ -380,39 +384,67 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
         readAndCheck(false);
     }
 
-    @Test
-    void updatesLastAppliedForUpdateCommands() {
-        UpdateCommand command = 
PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
-                .rowUuid(UUID.randomUUID())
+    @ParameterizedTest
+    @MethodSource("allCommandsUpdatingLastAppliedIndex")
+    void updatesLastAppliedForCommandsUpdatingLastAppliedIndex(WriteCommand 
command) {
+        commandListener.processCommand(command, 3, 2, hybridClock.now());
+
+        verify(mvPartitionStorage).lastApplied(3, 2);
+    }
+
+    private static UpdateCommandV2 updateCommand() {
+        return PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
+                .rowUuid(randomUUID())
                 .tableId(TABLE_ID)
                 .commitPartitionId(defaultPartitionIdMessage())
-                .txCoordinatorId(UUID.randomUUID())
+                .txCoordinatorId(randomUUID())
                 .txId(TestTransactionIds.newTransactionId())
-                .initiatorTime(hybridClock.now())
+                .initiatorTime(anyTime())
                 .build();
-
-        commandListener.processCommand(command, 3, 2, hybridClock.now());
-
-        verify(mvPartitionStorage).lastApplied(3, 2);
     }
 
-    @Test
-    void updatesLastAppliedForUpdateAllCommands() {
-        UpdateAllCommand command = 
PARTITION_REPLICATION_MESSAGES_FACTORY.updateAllCommandV2()
+    private static UpdateAllCommandV2 updateAllCommand() {
+        return PARTITION_REPLICATION_MESSAGES_FACTORY.updateAllCommandV2()
                 .messageRowsToUpdate(singletonMap(
-                        UUID.randomUUID(),
+                        randomUUID(),
                         
PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage().build())
                 )
                 .tableId(TABLE_ID)
                 .commitPartitionId(defaultPartitionIdMessage())
-                .txCoordinatorId(UUID.randomUUID())
+                .txCoordinatorId(randomUUID())
                 .txId(TestTransactionIds.newTransactionId())
-                .initiatorTime(hybridClock.now())
+                .initiatorTime(anyTime())
                 .build();
+    }
 
-        commandListener.processCommand(command, 3, 2, hybridClock.now());
+    private static Stream<Arguments> allCommandsUpdatingLastAppliedIndex() {
+        return Stream.of(
+                updateCommand(),
+                updateAllCommand(),
+                writeIntentSwitchCommand(),
+                primaryReplicaChangeCommand(),
+                buildIndexCommand()
+        ).map(Arguments::of);
+    }
 
-        verify(mvPartitionStorage).lastApplied(3, 2);
+    private static PrimaryReplicaChangeCommand primaryReplicaChangeCommand() {
+        return new ReplicaMessagesFactory().primaryReplicaChangeCommand()
+                .primaryReplicaNodeId(randomUUID())
+                .primaryReplicaNodeName("test-node")
+                .build();
+    }
+
+    private static BuildIndexCommandV3 buildIndexCommand() {
+        return PARTITION_REPLICATION_MESSAGES_FACTORY.buildIndexCommandV3()
+                .tableId(TABLE_ID)
+                .indexId(1)
+                .rowIds(List.of(randomUUID()))
+                .abortedTransactionIds(emptySet())
+                .build();
+    }
+
+    private static HybridTimestamp anyTime() {
+        return HybridTimestamp.MIN_VALUE.addPhysicalTime(1000);
     }
 
     @Test
@@ -486,31 +518,6 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
         verify(mvPartitionStorage, times(1)).lastApplied(eq(1L), anyLong());
     }
 
-    @Test
-    void locksOnConfigCommit() {
-        long index = raftIndex.incrementAndGet();
-        commandListener.onConfigurationCommitted(
-                new RaftGroupConfiguration(
-                        index,
-                        2,
-                        111L,
-                        110L,
-                        List.of("peer"),
-                        List.of("learner"),
-                        List.of("old-peer"),
-                        List.of("old-learner")
-                ),
-                index,
-                2
-        );
-
-        InOrder inOrder = inOrder(partitionDataStorage);
-
-        
inOrder.verify(partitionDataStorage).acquirePartitionSnapshotsReadLock();
-        inOrder.verify(partitionDataStorage).lastApplied(raftIndex.get(), 2);
-        
inOrder.verify(partitionDataStorage).releasePartitionSnapshotsReadLock();
-    }
-
     @Test
     void testBuildIndexCommand() {
         int indexId = pkStorage.id();
@@ -596,18 +603,13 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .build();
     }
 
-    @Test
-    void updatesLastAppliedForWriteIntentSwitchCommands() {
-        WriteIntentSwitchCommand command = 
PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
+    private static WriteIntentSwitchCommandV2 writeIntentSwitchCommand() {
+        return 
PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
                 .txId(TestTransactionIds.newTransactionId())
                 .tableIds(Set.of(1))
-                .initiatorTime(hybridClock.now())
-                .safeTime(hybridClock.now())
+                .initiatorTime(anyTime())
+                .safeTime(anyTime())
                 .build();
-
-        commandListener.processCommand(command, raftIndex.incrementAndGet(), 
2, hybridClock.now());
-
-        verify(mvPartitionStorage).lastApplied(raftIndex.get(), 2);
     }
 
     /**
@@ -636,7 +638,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .txId(txId)
                 .initiatorTime(hybridClock.now())
                 .safeTime(hybridClock.now())
-                .txCoordinatorId(UUID.randomUUID())
+                .txCoordinatorId(randomUUID())
                 .build());
 
         
invokeBatchedCommand(PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
@@ -679,7 +681,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .txId(txId)
                 .initiatorTime(hybridClock.now())
                 .safeTime(hybridClock.now())
-                .txCoordinatorId(UUID.randomUUID())
+                .txCoordinatorId(randomUUID())
                 .build());
 
         
invokeBatchedCommand(PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
@@ -717,7 +719,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .txId(txId)
                 .initiatorTime(hybridClock.now())
                 .safeTime(hybridClock.now())
-                .txCoordinatorId(UUID.randomUUID())
+                .txCoordinatorId(randomUUID())
                 .build());
 
         
invokeBatchedCommand(PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
@@ -756,7 +758,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                     .txId(txId)
                     .initiatorTime(hybridClock.now())
                     .safeTime(hybridClock.now())
-                    .txCoordinatorId(UUID.randomUUID())
+                    .txCoordinatorId(randomUUID())
                     .build();
 
             commandListener.processCommand(command, 
raftIndex.incrementAndGet(), 1, hybridClock.now());
@@ -774,7 +776,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .build()));
     }
 
-    private TablePartitionIdMessage defaultPartitionIdMessage() {
+    private static TablePartitionIdMessage defaultPartitionIdMessage() {
         return REPLICA_MESSAGES_FACTORY.tablePartitionIdMessage()
                 .tableId(TABLE_ID)
                 .partitionId(PARTITION_ID)
@@ -800,7 +802,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                     .txId(txId)
                     .initiatorTime(hybridClock.now())
                     .safeTime(hybridClock.now())
-                    .txCoordinatorId(UUID.randomUUID())
+                    .txCoordinatorId(randomUUID())
                     .build();
 
             commandListener.processCommand(command, 
raftIndex.incrementAndGet(), 1, hybridClock.now());
@@ -864,14 +866,14 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
             UpdateCommandV2 command = 
PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
                     .tableId(TABLE_ID)
                     .commitPartitionId(defaultPartitionIdMessage())
-                    .rowUuid(UUID.randomUUID())
+                    .rowUuid(randomUUID())
                     
.messageRowToUpdate(PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
                             .binaryRowMessage(getTestRow(i, i))
                             .build())
                     .txId(txId0)
                     .initiatorTime(hybridClock.now())
                     .safeTime(hybridClock.now())
-                    .txCoordinatorId(UUID.randomUUID())
+                    .txCoordinatorId(randomUUID())
                     .build();
 
             commandListener.processCommand(command, 
raftIndex.incrementAndGet(), 1, hybridClock.now());
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 4d68eb2a087..76544ec73c8 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -78,16 +78,6 @@ public class TestPartitionDataStorage implements 
PartitionDataStorage {
         return partitionStorage.runConsistently(closure);
     }
 
-    @Override
-    public void acquirePartitionSnapshotsReadLock() {
-        // There is no 'write' side, so we don't need to take any lock.
-    }
-
-    @Override
-    public void releasePartitionSnapshotsReadLock() {
-        // There is no 'write' side, so we don't need to releasetestbala any 
lock.
-    }
-
     @Override
     public CompletableFuture<Void> flush(boolean trigger) {
         return partitionStorage.flush(trigger);


Reply via email to