This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f74b03afd8 IGNITE-27458 Stop PartitionReplicaListener being 
ReplicaListener (#7313)
2f74b03afd8 is described below

commit 2f74b03afd84a20eb5b7a7f04e8d09fa1f1db2bc
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Dec 26 11:24:48 2025 +0400

    IGNITE-27458 Stop PartitionReplicaListener being ReplicaListener (#7313)
---
 .../partition/replicator/ReplicaPrimacy.java       |   7 +-
 ...xDistributedTestSingleNodeNoCleanupMessage.java |  29 +---
 .../internal/table/distributed/TableManager.java   |   2 -
 .../replicator/PartitionReplicaListener.java       |  37 +-----
 .../PartitionReplicaListenerIndexLockingTest.java  |  14 +-
 ...itionReplicaListenerSortedIndexLockingTest.java |  14 +-
 .../replication/PartitionReplicaListenerTest.java  | 148 ++++++++++-----------
 .../ZonePartitionReplicaListenerTest.java          |  14 +-
 .../storage/InternalTableEstimatedSizeTest.java    |  28 +++-
 .../apache/ignite/distributed/ItTxTestCluster.java |   2 -
 .../table/impl/DummyInternalTableImpl.java         |   2 -
 11 files changed, 124 insertions(+), 173 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java
index 83349dd3967..c2917d753da 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java
@@ -23,6 +23,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadO
 import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
 import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
 
 /**
  * Represents replica primacy info. Contains the following information:
@@ -54,14 +55,16 @@ public class ReplicaPrimacy {
     /**
      * Creates an instance representing information about the primary replica 
held by this node.
      */
-    static ReplicaPrimacy forPrimaryReplicaRequest(long leaseStartTime) {
+    @VisibleForTesting
+    public static ReplicaPrimacy forPrimaryReplicaRequest(long leaseStartTime) 
{
         return new ReplicaPrimacy(leaseStartTime, null);
     }
 
     /**
      * Creates an instance representing information about whether this node 
currently holds the primary.
      */
-    static ReplicaPrimacy forIsPrimary(boolean isPrimary) {
+    @VisibleForTesting
+    public static ReplicaPrimacy forIsPrimary(boolean isPrimary) {
         return new ReplicaPrimacy(null, isPrimary);
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index 6c04a27b134..0e22ded3895 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.distributed;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -28,7 +27,6 @@ import static org.mockito.Mockito.mock;
 
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
@@ -48,10 +46,8 @@ import 
org.apache.ignite.internal.network.InternalClusterNode;
 import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.replicator.ReplicaResult;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.SchemaSyncService;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -64,7 +60,6 @@ import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
 import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
@@ -72,7 +67,6 @@ import 
org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
@@ -204,10 +198,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends TxAbstractTes
                         transactionStateResolver,
                         storageUpdateHandler,
                         validationSchemasSource,
-                        localNode,
                         schemaSyncService,
                         catalogService,
-                        placementDriver,
                         clusterNodeResolver,
                         resourcesRegistry,
                         schemaRegistry,
@@ -215,22 +207,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends TxAbstractTes
                         lowWatermark,
                         mock(FailureProcessor.class),
                         new 
TableMetricSource(QualifiedName.fromSimple("test_table"))
-                ) {
-                    @Override
-                    public CompletableFuture<ReplicaResult> 
invoke(ReplicaRequest request, UUID senderId) {
-                        if (request instanceof 
WriteIntentSwitchReplicaRequest) {
-                            logger().info("Dropping cleanup request: {}", 
request);
-
-                            releaseTxLocks(
-                                    ((WriteIntentSwitchReplicaRequest) 
request).txId(),
-                                    txManager.lockManager()
-                            );
-
-                            return completedFuture(new ReplicaResult(null, 
null));
-                        }
-                        return super.invoke(request, senderId);
-                    }
-                };
+                );
             }
         };
 
@@ -291,10 +268,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends TxAbstractTes
         assertEquals(200., accounts.recordView().get(null, 
makeKey(1)).doubleValue("balance"));
     }
 
-    private static void releaseTxLocks(UUID txId, LockManager lockManager) {
-        lockManager.releaseAll(txId);
-    }
-
     @Override
     protected int nodes() {
         return 1;
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 68fbcf25b0c..18bc6f9650a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1077,10 +1077,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 transactionStateResolver,
                 partitionUpdateHandlers.storageUpdateHandler,
                 new CatalogValidationSchemasSource(catalogService, 
schemaManager),
-                localNode(),
                 executorInclinedSchemaSyncService,
                 catalogService,
-                executorInclinedPlacementDriver,
                 topologyService,
                 remotelyTriggeredResourceRegistry,
                 schemaManager.schemaRegistry(table.tableId()),
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 498feb1668c..6169c45def3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -94,11 +94,9 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.lowwatermark.LowWatermark;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
-import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.partition.replicator.FuturesCleanupResult;
 import org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions;
 import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
-import org.apache.ignite.internal.partition.replicator.ReplicaPrimacyEngine;
 import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
 import 
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
 import 
org.apache.ignite.internal.partition.replicator.TableAwareReplicaRequestPreProcessor;
@@ -133,9 +131,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.replication.ScanC
 import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import 
org.apache.ignite.internal.partition.replicator.schemacompat.IncompatibleSchemaVersionException;
 import 
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
-import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
 import org.apache.ignite.internal.raft.Command;
-import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
 import org.apache.ignite.internal.raft.service.RaftCommandRunner;
 import org.apache.ignite.internal.replicator.CommandApplicationResult;
 import org.apache.ignite.internal.replicator.ReplicaResult;
@@ -145,7 +141,6 @@ import 
org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
 import org.apache.ignite.internal.replicator.exception.ReplicationException;
 import 
org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException;
-import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import 
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
@@ -206,7 +201,7 @@ import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /** Partition replication listener. */
-public class PartitionReplicaListener implements ReplicaListener, 
ReplicaTableProcessor {
+public class PartitionReplicaListener implements ReplicaTableProcessor {
     /**
      * NB: this listener makes writes to the underlying MV partition storage 
without taking the partition snapshots read lock. This causes
      * the RAFT snapshots transferred to a follower being slightly 
inconsistent for a limited amount of time.
@@ -257,9 +252,6 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
     /** Versioned partition storage. */
     private final MvPartitionStorage mvDataStorage;
 
-    /** Raft client. */
-    private final RaftCommandRunner raftCommandRunner;
-
     /** Tx manager. */
     private final TxManager txManager;
 
@@ -317,7 +309,6 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
 
     private final TableMetricSource metrics;
 
-    private final ReplicaPrimacyEngine replicaPrimacyEngine;
     private final TableAwareReplicaRequestPreProcessor 
tableAwareReplicaRequestPreProcessor;
     private final ReliableCatalogVersions reliableCatalogVersions;
     private final ReplicationRaftCommandApplicator raftCommandApplicator;
@@ -341,9 +332,7 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
      * @param safeTime Safe time clock.
      * @param transactionStateResolver Transaction state resolver.
      * @param storageUpdateHandler Handler that processes updates writing them 
to storage.
-     * @param localNode Instance of the local node.
      * @param catalogService Catalog service.
-     * @param placementDriver Placement driver.
      * @param clusterNodeResolver Node resolver.
      * @param remotelyTriggeredResourceRegistry Resource registry.
      * @param indexMetaStorage Index meta storage.
@@ -365,10 +354,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
             TransactionStateResolver transactionStateResolver,
             StorageUpdateHandler storageUpdateHandler,
             ValidationSchemasSource validationSchemasSource,
-            InternalClusterNode localNode,
             SchemaSyncService schemaSyncService,
             CatalogService catalogService,
-            LeasePlacementDriver placementDriver,
             ClusterNodeResolver clusterNodeResolver,
             RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry,
             SchemaRegistry schemaRegistry,
@@ -378,7 +365,6 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
             TableMetricSource metrics
     ) {
         this.mvDataStorage = mvDataStorage;
-        this.raftCommandRunner = raftCommandRunner;
         this.txManager = txManager;
         this.lockManager = lockManager;
         this.scanRequestExecutor = scanRequestExecutor;
@@ -399,13 +385,11 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         this.tableLockKey = new TablePartitionId(tableId, 
replicationGroupId.partitionId());
         this.metrics = metrics;
 
-        this.schemaCompatValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
+        schemaCompatValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
 
         indexBuildingProcessor = new 
PartitionReplicaBuildIndexProcessor(busyLock, tableId, indexMetaStorage, 
catalogService);
 
-        replicaPrimacyEngine = new ReplicaPrimacyEngine(placementDriver, 
clockService, replicationGroupId, localNode);
-
-        this.tableAwareReplicaRequestPreProcessor = new 
TableAwareReplicaRequestPreProcessor(
+        tableAwareReplicaRequestPreProcessor = new 
TableAwareReplicaRequestPreProcessor(
                 clockService,
                 schemaCompatValidator,
                 schemaSyncService
@@ -417,12 +401,6 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         buildIndexReplicaRequestHandler = new 
BuildIndexReplicaRequestHandler(indexMetaStorage, raftCommandApplicator);
     }
 
-    @Override
-    public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, 
UUID senderId) {
-        return replicaPrimacyEngine.validatePrimacy(request)
-                .thenCompose(replicaPrimacy -> 
processRequestInContext(request, replicaPrimacy, senderId));
-    }
-
     @Override
     public CompletableFuture<ReplicaResult> process(ReplicaRequest request, 
ReplicaPrimacy replicaPrimacy, UUID senderId) {
         return processRequestInContext(request, replicaPrimacy, senderId);
@@ -445,15 +423,6 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         }
     }
 
-    /** Returns Raft-client. */
-    @Override
-    public RaftCommandRunner raftClient() {
-        if (raftCommandRunner instanceof ExecutorInclinedRaftCommandRunner) {
-            return ((ExecutorInclinedRaftCommandRunner) 
raftCommandRunner).decoratedCommandRunner();
-        }
-        return raftCommandRunner;
-    }
-
     private CompletableFuture<?> processRequest(ReplicaRequest request, 
ReplicaPrimacy replicaPrimacy, UUID senderId) {
         boolean hasSchemaVersion = request instanceof 
SchemaVersionAwareReplicaRequest;
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 77a47bb14ef..e78d234f72f 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -72,9 +72,9 @@ import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
 import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
-import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -248,8 +248,6 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
 
         when(catalog.indexes(anyInt())).thenReturn(List.of(indexDescriptor));
 
-        InternalClusterNode localNode = DummyInternalTableImpl.LOCAL_NODE;
-
         partitionReplicaListener = new PartitionReplicaListener(
                 TEST_MV_PARTITION_STORAGE,
                 mockRaftClient,
@@ -279,10 +277,8 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                         TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
                 ),
                 new DummyValidationSchemasSource(schemaManager),
-                localNode,
                 new AlwaysSyncedSchemaSyncService(),
                 catalogService,
-                new TestPlacementDriver(localNode),
                 mock(ClusterNodeResolver.class),
                 new RemotelyTriggeredResourceRegistry(),
                 schemaManager,
@@ -395,7 +391,7 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                 throw new AssertionError("Unexpected operation type: " + 
arg.type);
         }
 
-        CompletableFuture<?> fut = partitionReplicaListener.invoke(request, 
LOCAL_NODE_ID);
+        CompletableFuture<?> fut = partitionReplicaListener.process(request, 
replicaPrimacy(), LOCAL_NODE_ID);
 
         await(fut);
 
@@ -422,6 +418,10 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
         );
     }
 
+    private ReplicaPrimacy replicaPrimacy() {
+        return 
ReplicaPrimacy.forPrimaryReplicaRequest(HybridTimestamp.MIN_VALUE.longValue());
+    }
+
     /** Verifies the mode in which the lock was acquired on the index key for 
a particular operation. */
     @ParameterizedTest
     @MethodSource("readWriteMultiTestArguments")
@@ -484,7 +484,7 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                 throw new AssertionError("Unexpected operation type: " + 
arg.type);
         }
 
-        CompletableFuture<?> fut = partitionReplicaListener.invoke(request, 
LOCAL_NODE_ID);
+        CompletableFuture<?> fut = partitionReplicaListener.process(request, 
replicaPrimacy(), LOCAL_NODE_ID);
 
         await(fut);
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
index 8d63b89dba4..cc7fe16c6c7 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
@@ -73,9 +73,9 @@ import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
 import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
-import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -223,8 +223,6 @@ public class PartitionReplicaListenerSortedIndexLockingTest 
extends IgniteAbstra
 
         when(catalog.indexes(anyInt())).thenReturn(List.of(indexDescriptor));
 
-        InternalClusterNode localNode = DummyInternalTableImpl.LOCAL_NODE;
-
         partitionReplicaListener = new PartitionReplicaListener(
                 TEST_MV_PARTITION_STORAGE,
                 mockRaftClient,
@@ -249,10 +247,8 @@ public class 
PartitionReplicaListenerSortedIndexLockingTest extends IgniteAbstra
                         TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
                 ),
                 new DummyValidationSchemasSource(schemaManager),
-                localNode,
                 new AlwaysSyncedSchemaSyncService(),
                 catalogService,
-                new TestPlacementDriver(localNode),
                 mock(ClusterNodeResolver.class),
                 new RemotelyTriggeredResourceRegistry(),
                 schemaManager,
@@ -293,6 +289,10 @@ public class 
PartitionReplicaListenerSortedIndexLockingTest extends IgniteAbstra
         return txManager;
     }
 
+    private static ReplicaPrimacy validRwPrimacy() {
+        return ReplicaPrimacy.forPrimaryReplicaRequest(1);
+    }
+
     @BeforeEach
     public void beforeTest() {
         ((TestSortedIndexStorage) pkStorage.get().storage()).clear();
@@ -365,7 +365,7 @@ public class PartitionReplicaListenerSortedIndexLockingTest 
extends IgniteAbstra
                 throw new AssertionError("Unexpected operation type: " + 
arg.type);
         }
 
-        CompletableFuture<?> fut = partitionReplicaListener.invoke(request, 
LOCAL_NODE_ID);
+        CompletableFuture<?> fut = partitionReplicaListener.process(request, 
validRwPrimacy(), LOCAL_NODE_ID);
 
         await(fut);
 
@@ -444,7 +444,7 @@ public class PartitionReplicaListenerSortedIndexLockingTest 
extends IgniteAbstra
                 throw new AssertionError("Unexpected operation type: " + 
arg.type);
         }
 
-        CompletableFuture<?> fut = partitionReplicaListener.invoke(request, 
LOCAL_NODE_ID);
+        CompletableFuture<?> fut = partitionReplicaListener.process(request, 
validRwPrimacy(), LOCAL_NODE_ID);
 
         await(fut);
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index dd436d029fe..262e01d38af 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -135,6 +135,8 @@ import 
org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.SingleClusterNodeResolver;
 import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacyEngine;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.command.CatalogVersionAware;
 import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
@@ -473,6 +475,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     @Mock
     private IndexMetaStorage indexMetaStorage;
 
+    private ReplicaPrimacyEngine primacyEngine;
+
     private static UUID nodeId(int id) {
         return new UUID(0, id);
     }
@@ -660,10 +664,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
                 ),
                 validationSchemasSource,
-                localNode,
                 schemaSyncService,
                 catalogService,
-                placementDriver,
                 new SingleClusterNodeResolver(localNode),
                 new RemotelyTriggeredResourceRegistry(),
                 new DummySchemaManagerImpl(schemaDescriptor, 
schemaDescriptorVersion2),
@@ -678,6 +680,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         when(lowWatermark.tryLock(any(), any())).thenReturn(true);
 
+        primacyEngine = new ReplicaPrimacyEngine(placementDriver, 
clockService, grpId, localNode);
+
         reset();
     }
 
@@ -760,7 +764,11 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     }
 
     private CompletableFuture<ReplicaResult> invokeListener(ReplicaRequest 
request) {
-        return partitionReplicaListener.invoke(request, localNode.id());
+        return processWithPrimacy(request);
+    }
+
+    private static ReplicaPrimacy validRoPrimacy() {
+        return ReplicaPrimacy.forIsPrimary(true);
     }
 
     private ReadOnlySingleRowPkReplicaRequest 
readOnlySingleRowPkReplicaRequest(BinaryRow pk, HybridTimestamp readTimestamp) {
@@ -891,7 +899,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         UUID scanTxId = newTxId();
 
         // Request first batch
-        CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(
+        CompletableFuture<ReplicaResult> fut = processWithPrimacy(
                 
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                         .groupId(zonePartitionIdMessage(grpId))
                         .tableId(TABLE_ID)
@@ -904,7 +912,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .commitPartitionId(commitPartitionId())
                         .coordinatorId(localNode.id())
                         .timestamp(clock.now())
-                        .build(), localNode.id());
+                        .build());
 
         List<BinaryRow> rows = (List<BinaryRow>) fut.get(1, 
TimeUnit.SECONDS).result();
 
@@ -912,7 +920,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         assertEquals(4, rows.size());
 
         // Request second batch
-        fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
+        fut = 
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                 .groupId(zonePartitionIdMessage(grpId))
                 .tableId(TABLE_ID)
                 .transactionId(scanTxId)
@@ -924,7 +932,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .commitPartitionId(commitPartitionId())
                 .coordinatorId(localNode.id())
                 .timestamp(clock.now())
-                .build(), localNode.id());
+                .build());
 
         rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
 
@@ -932,7 +940,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         assertEquals(2, rows.size());
 
         // Request bounded.
-        fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
+        fut = 
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                 .groupId(zonePartitionIdMessage(grpId))
                 .tableId(TABLE_ID)
                 .transactionId(newTxId())
@@ -947,7 +955,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .commitPartitionId(commitPartitionId())
                 .coordinatorId(localNode.id())
                 .timestamp(clock.now())
-                .build(), localNode.id());
+                .build());
 
         rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
 
@@ -955,7 +963,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         assertEquals(2, rows.size());
 
         // Empty result.
-        fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
+        fut = 
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                 .groupId(zonePartitionIdMessage(grpId))
                 .tableId(TABLE_ID)
                 .transactionId(newTxId())
@@ -968,7 +976,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .commitPartitionId(commitPartitionId())
                 .coordinatorId(localNode.id())
                 .timestamp(clock.now())
-                .build(), localNode.id());
+                .build());
 
         rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
 
@@ -976,7 +984,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         assertEquals(0, rows.size());
 
         // Lookup.
-        fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
+        fut = 
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                 .groupId(zonePartitionIdMessage(grpId))
                 .tableId(TABLE_ID)
                 .transactionId(newTxId())
@@ -989,7 +997,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .commitPartitionId(commitPartitionId())
                 .coordinatorId(localNode.id())
                 .timestamp(clock.now())
-                .build(), localNode.id());
+                .build());
 
         rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
 
@@ -1019,7 +1027,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         UUID scanTxId = newTxId();
 
         // Request first batch
-        CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(
+        CompletableFuture<ReplicaResult> fut = 
partitionReplicaListener.process(
                 
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                         .groupId(zonePartitionIdMessage(grpId))
                         .tableId(TABLE_ID)
@@ -1029,7 +1037,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .indexToUse(sortedIndexId)
                         .batchSize(4)
                         .coordinatorId(localNode.id())
-                        .build(), localNode.id());
+                        .build(), validRoPrimacy(), localNode.id());
 
         List<BinaryRow> rows = (List<BinaryRow>) fut.get(1, 
TimeUnit.SECONDS).result();
 
@@ -1037,7 +1045,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         assertEquals(4, rows.size());
 
         // Request second batch
-        fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+        fut = 
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(zonePartitionIdMessage(grpId))
                 .tableId(TABLE_ID)
                 .transactionId(scanTxId)
@@ -1046,7 +1054,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .indexToUse(sortedIndexId)
                 .batchSize(4)
                 .coordinatorId(localNode.id())
-                .build(), localNode.id());
+                .build(), validRoPrimacy(), localNode.id());
 
         rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
 
@@ -1054,7 +1062,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         assertEquals(2, rows.size());
 
         // Request bounded.
-        fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+        fut = 
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(zonePartitionIdMessage(grpId))
                 .tableId(TABLE_ID)
                 .transactionId(newTxId())
@@ -1066,7 +1074,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .flags(SortedIndexStorage.LESS_OR_EQUAL)
                 .batchSize(5)
                 .coordinatorId(localNode.id())
-                .build(), localNode.id());
+                .build(), validRoPrimacy(), localNode.id());
 
         rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
 
@@ -1074,7 +1082,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         assertEquals(2, rows.size());
 
         // Empty result.
-        fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+        fut = 
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(zonePartitionIdMessage(grpId))
                 .tableId(TABLE_ID)
                 .transactionId(newTxId())
@@ -1084,7 +1092,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .lowerBoundPrefix(toIndexBound(5))
                 .batchSize(5)
                 .coordinatorId(localNode.id())
-                .build(), localNode.id());
+                .build(), validRoPrimacy(), localNode.id());
 
         rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
 
@@ -1092,7 +1100,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         assertEquals(0, rows.size());
 
         // Lookup.
-        fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+        fut = 
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(zonePartitionIdMessage(grpId))
                 .tableId(TABLE_ID)
                 .transactionId(newTxId())
@@ -1102,7 +1110,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .exactKey(toIndexKey(0))
                 .batchSize(5)
                 .coordinatorId(localNode.id())
-                .build(), localNode.id());
+                .build(), validRoPrimacy(), localNode.id());
 
         rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
 
@@ -1132,7 +1140,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         UUID scanTxId = newTxId();
 
         // Request first batch
-        CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(
+        CompletableFuture<ReplicaResult> fut = 
partitionReplicaListener.process(
                 
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                         .groupId(zonePartitionIdMessage(grpId))
                         .tableId(TABLE_ID)
@@ -1143,7 +1151,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .exactKey(toIndexKey(0))
                         .batchSize(3)
                         .coordinatorId(localNode.id())
-                        .build(), localNode.id());
+                        .build(), validRoPrimacy(), localNode.id());
 
         List<BinaryRow> rows = (List<BinaryRow>) fut.get(1, 
TimeUnit.SECONDS).result();
 
@@ -1151,7 +1159,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         assertEquals(3, rows.size());
 
         // Request second batch
-        fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+        fut = 
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(zonePartitionIdMessage(grpId))
                 .tableId(TABLE_ID)
                 .transactionId(scanTxId)
@@ -1161,7 +1169,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .exactKey(toIndexKey(0))
                 .batchSize(1)
                 .coordinatorId(localNode.id())
-                .build(), localNode.id());
+                .build(), validRoPrimacy(), localNode.id());
 
         rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
 
@@ -1169,7 +1177,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         assertEquals(1, rows.size());
 
         // Empty result.
-        fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+        fut = 
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(zonePartitionIdMessage(grpId))
                 .tableId(TABLE_ID)
                 .transactionId(newTxId())
@@ -1179,7 +1187,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .exactKey(toIndexKey(5))
                 .batchSize(5)
                 .coordinatorId(localNode.id())
-                .build(), localNode.id());
+                .build(), validRoPrimacy(), localNode.id());
 
         rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
 
@@ -1187,7 +1195,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         assertEquals(0, rows.size());
 
         // Lookup.
-        fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+        fut = 
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(zonePartitionIdMessage(grpId))
                 .tableId(TABLE_ID)
                 .transactionId(newTxId())
@@ -1197,7 +1205,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .exactKey(toIndexKey(1))
                 .batchSize(5)
                 .coordinatorId(localNode.id())
-                .build(), localNode.id());
+                .build(), validRoPrimacy(), localNode.id());
 
         rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
 
@@ -1343,7 +1351,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
             boolean full,
             @Nullable String txLabel
     ) {
-        return 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
+        return 
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
                         .groupId(zonePartitionIdMessage(grpId))
                         .tableId(TABLE_ID)
                         .transactionId(txId)
@@ -1356,8 +1364,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .full(full)
                         .timestamp(clock.now())
                         .txLabel(txLabel)
-                        .build(),
-                localNode.id()
+                        .build()
         );
     }
 
@@ -1376,7 +1383,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
             boolean full,
             @Nullable String txLabel
     ) {
-        return 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
+        return 
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
                         .groupId(zonePartitionIdMessage(grpId))
                         .tableId(TABLE_ID)
                         .transactionId(txId)
@@ -1389,8 +1396,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .full(full)
                         .timestamp(clock.now())
                         .txLabel(txLabel)
-                        .build(),
-                localNode.id()
+                        .build()
         );
     }
 
@@ -1420,7 +1426,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
             boolean full,
             @Nullable String txLabel
     ) {
-        return 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
+        return 
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                         .groupId(zonePartitionIdMessage(grpId))
                         .tableId(TABLE_ID)
                         .transactionId(txId)
@@ -1433,8 +1439,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .full(full)
                         .timestamp(clock.now())
                         .txLabel(txLabel)
-                        .build(),
-                localNode.id()
+                        .build()
         );
     }
 
@@ -1457,7 +1462,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
             boolean full,
             @Nullable String txLabel
     ) {
-        return 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
+        return 
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
                         .groupId(zonePartitionIdMessage(grpId))
                         .tableId(TABLE_ID)
                         .transactionId(txId)
@@ -1470,8 +1475,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .full(full)
                         .timestamp(clock.now())
                         .txLabel(txLabel)
-                        .build(),
-                localNode.id()
+                        .build()
         );
     }
 
@@ -1785,7 +1789,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     }
 
     private CompletableFuture<?> doReplaceRequest(UUID targetTxId, BinaryRow 
oldRow, BinaryRow newRow, boolean full) {
-        return 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSwapRowReplicaRequest()
+        return 
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteSwapRowReplicaRequest()
                         .groupId(zonePartitionIdMessage(grpId))
                         .tableId(TABLE_ID)
                         .transactionId(targetTxId)
@@ -1798,15 +1802,14 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .coordinatorId(localNode.id())
                         .full(full)
                         .timestamp(clock.now())
-                        .build(),
-                localNode.id()
+                        .build()
         );
     }
 
     @Test
     public void 
failsWhenScanByExactMatchReadsTupleWithIncompatibleSchemaFromFuture() {
         testFailsWhenReadingFromFutureIncompatibleSchema(
-                (targetTxId, key) -> partitionReplicaListener.invoke(
+                (targetTxId, key) -> processWithPrimacy(
                         
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                                 .groupId(zonePartitionIdMessage(grpId))
                                 .tableId(TABLE_ID)
@@ -1819,8 +1822,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                                 .commitPartitionId(commitPartitionId())
                                 .coordinatorId(localNode.id())
                                 .timestamp(clock.now())
-                                .build(),
-                        localNode.id()
+                                .build()
                 )
         );
     }
@@ -1828,7 +1830,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     @Test
     public void 
failsWhenScanByIndexReadsTupleWithIncompatibleSchemaFromFuture() {
         testFailsWhenReadingFromFutureIncompatibleSchema(
-                (targetTxId, key) -> partitionReplicaListener.invoke(
+                (targetTxId, key) -> processWithPrimacy(
                         
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                                 .groupId(zonePartitionIdMessage(grpId))
                                 .tableId(TABLE_ID)
@@ -1840,8 +1842,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                                 .commitPartitionId(commitPartitionId())
                                 .coordinatorId(localNode.id())
                                 .timestamp(clock.now())
-                                .build(),
-                        localNode.id()
+                                .build()
                 )
         );
     }
@@ -1928,7 +1929,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     }
 
     private CompletableFuture<?> doRwScanRetrieveBatchRequest(UUID targetTxId) 
{
-        return partitionReplicaListener.invoke(
+        return processWithPrimacy(
                 
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                         .groupId(zonePartitionIdMessage(grpId))
                         .tableId(TABLE_ID)
@@ -1940,24 +1941,19 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .commitPartitionId(commitPartitionId())
                         .coordinatorId(localNode.id())
                         .timestamp(clock.now())
-                        .build(),
-                localNode.id()
+                        .build()
         );
     }
 
     private CompletableFuture<?> doRwScanCloseRequest(UUID targetTxId) {
-        ZonePartitionIdMessage serializedMsg =
-                zonePartitionIdMessage(new 
ZonePartitionId(tableDescriptor.zoneId(), grpId.partitionId()));
-
-        return partitionReplicaListener.invoke(
+        return processWithPrimacy(
                 TABLE_MESSAGES_FACTORY.scanCloseReplicaRequest()
-                        .groupId(serializedMsg)
+                        .groupId(zonePartitionIdMessage(new 
ZonePartitionId(tableDescriptor.zoneId(), grpId.partitionId())))
                         .tableId(grpId.zoneId())
                         .transactionId(targetTxId)
                         .timestamp(beginTimestamp(targetTxId))
                         .scanId(1)
-                        .build(),
-                localNode.id()
+                        .build()
         );
     }
 
@@ -2313,7 +2309,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .timestamp(clock.now())
                 .build();
 
-        return partitionReplicaListener.invoke(message, localNode.id());
+        return processWithPrimacy(message);
     }
 
     private void delete(UUID txId, BinaryRow row) {
@@ -2330,7 +2326,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .timestamp(clock.now())
                 .build();
 
-        assertThat(partitionReplicaListener.invoke(message, localNode.id()), 
willCompleteSuccessfully());
+        assertThat(processWithPrimacy(message), willCompleteSuccessfully());
     }
 
     private BinaryRow roGet(BinaryRow row, HybridTimestamp readTimestamp) {
@@ -2344,7 +2340,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private CompletableFuture<BinaryRow> roGetAsync(BinaryRow row, 
HybridTimestamp readTimestamp) {
         ReadOnlySingleRowPkReplicaRequest message = 
readOnlySingleRowPkReplicaRequest(row, readTimestamp);
 
-        return partitionReplicaListener.invoke(message, 
localNode.id()).thenApply(replicaResult -> (BinaryRow) replicaResult.result());
+        return partitionReplicaListener.process(message, validRoPrimacy(), 
localNode.id())
+                .thenApply(replicaResult -> (BinaryRow) 
replicaResult.result());
     }
 
     private List<BinaryRow> roGetAll(Collection<BinaryRow> rows, 
HybridTimestamp readTimestamp) {
@@ -2743,7 +2740,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         PrimaryReplicaRequest request = mock(requestClass);
 
-        assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
+        assertThat(processWithPrimacy(request), 
willThrow(PrimaryReplicaMissException.class));
     }
 
     @ParameterizedTest
@@ -2757,7 +2754,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         PrimaryReplicaRequest request = mock(requestClass);
         when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime - 
1000);
 
-        assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
+        assertThat(processWithPrimacy(request), 
willThrow(PrimaryReplicaMissException.class));
     }
 
     @ParameterizedTest
@@ -2771,21 +2768,12 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         PrimaryReplicaRequest request = mock(requestClass);
         when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime);
 
-        assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
+        assertThat(processWithPrimacy(request), 
willThrow(PrimaryReplicaMissException.class));
     }
 
-    @ParameterizedTest
-    @ValueSource(classes = {PrimaryReplicaRequest.class, 
TxStateCommitPartitionRequest.class})
-    void primaryReplicaRequestsAreRejectedWhenLeaseholderIsDifferent(Class<? 
extends PrimaryReplicaRequest> requestClass) {
-        long leaseStartTime = clock.nowLong();
-        placementDriver.setPrimaryReplicaSupplier(
-                () -> new TestReplicaMetaImpl(anotherNode, 
hybridTimestamp(leaseStartTime), HybridTimestamp.MAX_VALUE)
-        );
-
-        PrimaryReplicaRequest request = mock(requestClass);
-        when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime);
-
-        assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
+    private CompletableFuture<ReplicaResult> processWithPrimacy(ReplicaRequest 
request) {
+        return primacyEngine.validatePrimacy(request)
+                .thenCompose(primacy -> 
partitionReplicaListener.process(request, primacy, localNode.id()));
     }
 
     private static class RequestContext {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index f6a6185d4f5..89b21369687 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -116,6 +116,7 @@ import 
org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.SingleClusterNodeResolver;
 import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacyEngine;
 import 
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
@@ -452,6 +453,8 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
     @Mock
     private IndexMetaStorage indexMetaStorage;
 
+    private ReplicaPrimacyEngine primacyEngine;
+
     private static UUID nodeId(int id) {
         return new UUID(0, id);
     }
@@ -658,10 +661,8 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
                 ),
                 validationSchemasSource,
-                localNode,
                 schemaSyncService,
                 catalogService,
-                placementDriver,
                 new SingleClusterNodeResolver(localNode),
                 new RemotelyTriggeredResourceRegistry(),
                 new DummySchemaManagerImpl(schemaDescriptor, 
schemaDescriptorVersion2),
@@ -675,6 +676,8 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         when(lowWatermark.tryLock(any(), any())).thenReturn(true);
 
+        primacyEngine = new ReplicaPrimacyEngine(placementDriver, 
clockService, grpId, localNode);
+
         reset();
     }
 
@@ -1881,7 +1884,12 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .timestamp(clock.now())
                 .build();
 
-        return tableReplicaProcessor.invoke(message, localNode.id());
+        return processWithPrimacy(message);
+    }
+
+    private CompletableFuture<ReplicaResult> processWithPrimacy(ReplicaRequest 
request) {
+        return primacyEngine.validatePrimacy(request)
+                .thenCompose(primacy -> tableReplicaProcessor.process(request, 
primacy, localNode.id()));
     }
 
     private CompletableFuture<ReplicaResult> 
doReadOnlyMultiGet(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
index 8a46eae550f..01e81855b8f 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -44,6 +44,7 @@ import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -74,12 +75,12 @@ import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.StaticNodeFinder;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacyEngine;
 import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.leases.Lease;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.service.RaftCommandRunner;
-import org.apache.ignite.internal.replicator.PartitionGroupId;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -250,13 +251,30 @@ public class InternalTableEstimatedSizeTest extends 
BaseIgniteAbstractTest {
                 ))
                 .collect(toList());
 
+        Map<ZonePartitionId, ReplicaPrimacyEngine> primacyEngines = new 
HashMap<>();
+        for (int partitionIndex = 0; partitionIndex < PARTITIONS_NUM; 
partitionIndex++) {
+            var zonePartitionId = new ZonePartitionId(ZONE_ID, partitionIndex);
+
+            var primacyEngine = new ReplicaPrimacyEngine(
+                    placementDriver,
+                    clockService,
+                    zonePartitionId,
+                    node
+            );
+
+            primacyEngines.put(zonePartitionId, primacyEngine);
+        }
+
         lenient().doAnswer(invocation -> {
             ReplicaRequest request = invocation.getArgument(1);
 
-            var tablePartitionId = (PartitionGroupId) 
request.groupId().asReplicationGroupId();
+            var zonePartitionId = (ZonePartitionId) 
request.groupId().asReplicationGroupId();
 
-            return 
partitionReplicaListeners.get(tablePartitionId.partitionId())
-                    .invoke(request, node.id())
+            return primacyEngines.get(zonePartitionId)
+                    .validatePrimacy(request)
+                    .thenCompose(
+                            primacy -> 
partitionReplicaListeners.get(zonePartitionId.partitionId()).process(request, 
primacy, node.id())
+                    )
                     .thenApply(replicaResult -> new ReplicaMessagesFactory()
                             .replicaResponse()
                             .result(replicaResult.result())
@@ -321,10 +339,8 @@ public class InternalTableEstimatedSizeTest extends 
BaseIgniteAbstractTest {
                 transactionStateResolver,
                 storageUpdateHandler,
                 validationSchemasSource,
-                node,
                 schemaSyncService,
                 catalogService,
-                placementDriver,
                 clusterNodeResolver,
                 remotelyTriggeredResourceRegistry,
                 schemaRegistry,
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 2ec4dea377d..ae79a299b63 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -1058,10 +1058,8 @@ public class ItTxTestCluster {
                 transactionStateResolver,
                 storageUpdateHandler,
                 validationSchemasSource,
-                localNode,
                 schemaSyncService,
                 catalogService,
-                placementDriver,
                 clusterNodeResolver,
                 resourcesRegistry,
                 schemaRegistry,
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 0caf6b7f99d..f4306d09da0 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -486,10 +486,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 transactionStateResolver,
                 storageUpdateHandler,
                 new DummyValidationSchemasSource(schemaManager),
-                LOCAL_NODE,
                 new AlwaysSyncedSchemaSyncService(),
                 catalogService,
-                placementDriver,
                 mock(ClusterNodeResolver.class),
                 resourcesRegistry,
                 schemaManager,

Reply via email to