This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f674fd5e05 IGNITE-24380 Move ensureReplicaIsPrimary logic to 
ZonePartitionReplicaListener (#5352)
9f674fd5e05 is described below

commit 9f674fd5e05df14b79c1c2cbd003e47836f3e852
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Thu Mar 6 15:56:39 2025 +0400

    IGNITE-24380 Move ensureReplicaIsPrimary logic to 
ZonePartitionReplicaListener (#5352)
---
 modules/partition-replicator/build.gradle          |   1 +
 .../replicator/ItReplicaLifecycleTest.java         |   2 +-
 .../PartitionReplicaLifecycleManager.java          |   9 +-
 .../partition/replicator/ReplicaPrimacy.java       |  81 +++++++
 .../partition/replicator/ReplicaPrimacyEngine.java | 172 ++++++++++++++
 .../replicator/ReplicaTableProcessor.java          |  41 ++++
 .../replicator/ZonePartitionReplicaListener.java   |  80 +++----
 .../partition/replicator/ZoneResourcesManager.java |   2 +-
 ...xStateCommitPartitionReplicaRequestHandler.java |  77 ++----
 .../handlers/WriteIntentSwitchRequestHandler.java  |  18 +-
 .../ZonePartitionReplicaListenerTest.java          | 172 ++++++++++++++
 .../internal/table/distributed/TableManager.java   |   3 +-
 .../replicator/PartitionReplicaListener.java       | 257 ++++++++-------------
 .../replication/PartitionReplicaListenerTest.java  |  62 ++++-
 14 files changed, 688 insertions(+), 289 deletions(-)

diff --git a/modules/partition-replicator/build.gradle 
b/modules/partition-replicator/build.gradle
index e415115d53e..2cbb147f1d2 100644
--- a/modules/partition-replicator/build.gradle
+++ b/modules/partition-replicator/build.gradle
@@ -55,6 +55,7 @@ dependencies {
     testImplementation testFixtures(project(':ignite-storage-api'))
     testImplementation testFixtures(project(':ignite-table'))
     testImplementation testFixtures(project(':ignite-transactions'))
+    testImplementation testFixtures(project(':ignite-placement-driver-api'))
     testImplementation libs.hamcrest.core
     testImplementation libs.mockito.core
     testImplementation libs.mockito.junit
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index d9add982082..7e746f8258f 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -728,7 +728,7 @@ public class ItReplicaLifecycleTest extends 
ItAbstractColocationTest {
 
             Replica replica = replicaFut.get(1, SECONDS);
 
-            return replica != null && (((ZonePartitionReplicaListener) 
replica.listener()).tableReplicaListeners().size() == count);
+            return replica != null && (((ZonePartitionReplicaListener) 
replica.listener()).tableReplicaProcessors().size() == count);
         } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
             throw new RuntimeException(e);
         }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index bb3b8d33eae..c6f79e98c42 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -134,7 +134,6 @@ import org.apache.ignite.internal.replicator.ReplicaManager;
 import 
org.apache.ignite.internal.replicator.ReplicaManager.WeakReplicaStopReason;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
-import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.SchemaSyncService;
 import org.apache.ignite.internal.tx.TxManager;
@@ -1429,13 +1428,13 @@ public class PartitionReplicaLifecycleManager extends
      *
      * @param zonePartitionId Zone partition id.
      * @param tableId Table id.
-     * @param tablePartitionReplicaListenerFactory Factory for creating 
table-specific partition replicas.
+     * @param tablePartitionReplicaProcessorFactory Factory for creating 
table-specific partition replicas.
      * @param raftTableProcessor Raft table processor for the table-specific 
partition.
      */
     public void loadTableListenerToZoneReplica(
             ZonePartitionId zonePartitionId,
             int tableId,
-            Function<RaftCommandRunner, ReplicaListener> 
tablePartitionReplicaListenerFactory,
+            Function<RaftCommandRunner, ReplicaTableProcessor> 
tablePartitionReplicaProcessorFactory,
             RaftTableProcessor raftTableProcessor,
             PartitionMvStorageAccess partitionMvStorageAccess
     ) {
@@ -1445,9 +1444,9 @@ public class PartitionReplicaLifecycleManager extends
         // so the listeners will be registered by the thread completing the 
"replicaListenerFuture". On normal operation (where there is
         // a HB relationship between zone and table creation) zone-wide 
replica must already be started, this future will always be
         // completed and the listeners will be registered immediately.
-        resources.replicaListenerFuture().thenAccept(zoneReplicaListener -> 
zoneReplicaListener.addTableReplicaListener(
+        resources.replicaListenerFuture().thenAccept(zoneReplicaListener -> 
zoneReplicaListener.addTableReplicaProcessor(
                 tableId,
-                tablePartitionReplicaListenerFactory
+                tablePartitionReplicaProcessorFactory
         ));
 
         resources.raftListener().addTableProcessor(tableId, 
raftTableProcessor);
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java
new file mode 100644
index 00000000000..83349dd3967
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.Objects.requireNonNull;
+
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents replica primacy info. Contains the following information:
+ *
+ * <ul>
+ *     <li>{@code leaseStartTime} - the moment when the replica became primary 
(only filled for {@link PrimaryReplicaRequest}s)</li>
+ *     <li>{@code isPrimary} - whether this node currently hosts the primary 
(only filled for {@link ReadOnlyReplicaRequest}s
+ *     and {@link ReplicaSafeTimeSyncRequest}s)</li>
+ * </ul>
+ */
+public class ReplicaPrimacy {
+    private static final ReplicaPrimacy EMPTY = new ReplicaPrimacy(null, null);
+
+    private final @Nullable Long leaseStartTime;
+    private final @Nullable Boolean isPrimary;
+
+    private ReplicaPrimacy(@Nullable Long leaseStartTime, @Nullable Boolean 
isPrimary) {
+        this.leaseStartTime = leaseStartTime;
+        this.isPrimary = isPrimary;
+    }
+
+    /**
+     * Creates an instance representing no primacy information.
+     */
+    public static ReplicaPrimacy empty() {
+        return EMPTY;
+    }
+
+    /**
+     * Creates an instance representing information about the primary replica 
held by this node.
+     */
+    static ReplicaPrimacy forPrimaryReplicaRequest(long leaseStartTime) {
+        return new ReplicaPrimacy(leaseStartTime, null);
+    }
+
+    /**
+     * Creates an instance representing information about whether this node 
currently holds the primary.
+     */
+    static ReplicaPrimacy forIsPrimary(boolean isPrimary) {
+        return new ReplicaPrimacy(null, isPrimary);
+    }
+
+    /**
+     * Returns lease start time; throws exception if not present.
+     */
+    public long leaseStartTime() {
+        return requireNonNull(leaseStartTime);
+    }
+
+    /**
+     * Whether this node currently hosts the primary; throws if this 
information is absent.
+     */
+    public boolean isPrimary() {
+        return requireNonNull(isPrimary);
+    }
+}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java
new file mode 100644
index 00000000000..603c35feb0e
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
+import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Logic related to replica primacy checks.
+ */
+public class ReplicaPrimacyEngine {
+    private final LeasePlacementDriver placementDriver;
+    private final ClockService clockService;
+    private final ReplicationGroupId replicationGroupId;
+    private final ClusterNode localNode;
+
+    /** Constructor. */
+    public ReplicaPrimacyEngine(
+            LeasePlacementDriver placementDriver,
+            ClockService clockService,
+            ReplicationGroupId replicationGroupId,
+            ClusterNode localNode
+    ) {
+        this.placementDriver = placementDriver;
+        this.clockService = clockService;
+        this.replicationGroupId = replicationGroupId;
+        this.localNode = localNode;
+    }
+
+    /**
+     * Validates replica primacy.
+     *
+     * <ul>
+     *     <li>For {@link PrimaryReplicaRequest}s, ensures that the primary 
replica is known, that it is hosted on this node,
+     *     that it has not expired and the request was sent while the same 
replica was primary. If anything of the above is violated, the
+     *     future is completed with a {@link PrimaryReplicaMissException}.</li>
+     *     <li>For {@link ReadOnlyReplicaRequest}s and {@link 
ReplicaSafeTimeSyncRequest}s, detects whether this node is/was a primary
+     *     at the corresponding timestamp.</li>
+     * </ul>
+     *
+     * @param request Replica request.
+     */
+    public CompletableFuture<ReplicaPrimacy> validatePrimacy(ReplicaRequest 
request) {
+        HybridTimestamp now = clockService.current();
+
+        if (request instanceof PrimaryReplicaRequest) {
+            PrimaryReplicaRequest primaryReplicaRequest = 
(PrimaryReplicaRequest) request;
+            return ensureReplicaIsPrimary(primaryReplicaRequest, now);
+        } else if (request instanceof ReadOnlyReplicaRequest) {
+            return isLocalNodePrimaryReplicaAt(now);
+        } else if (request instanceof ReplicaSafeTimeSyncRequest) {
+            return isLocalNodePrimaryReplicaAt(now);
+        } else {
+            return completedFuture(ReplicaPrimacy.empty());
+        }
+    }
+
+    private CompletableFuture<ReplicaPrimacy> ensureReplicaIsPrimary(
+            PrimaryReplicaRequest primaryReplicaRequest,
+            HybridTimestamp now
+    ) {
+        Long enlistmentConsistencyToken = 
primaryReplicaRequest.enlistmentConsistencyToken();
+
+        Function<ReplicaMeta, ReplicaPrimacy> validateClo = primaryReplicaMeta 
-> validateReplicaPrimacy(
+                now,
+                primaryReplicaMeta,
+                enlistmentConsistencyToken
+        );
+
+        ReplicaMeta meta = 
placementDriver.getCurrentPrimaryReplica(replicationGroupId, now);
+
+        if (meta != null) {
+            try {
+                return completedFuture(validateClo.apply(meta));
+            } catch (Exception e) {
+                return failedFuture(e);
+            }
+        }
+
+        return placementDriver.getPrimaryReplica(replicationGroupId, 
now).thenApply(validateClo);
+    }
+
+    private ReplicaPrimacy validateReplicaPrimacy(HybridTimestamp now, 
ReplicaMeta primaryReplicaMeta, Long enlistmentConsistencyToken) {
+        if (primaryReplicaMeta == null) {
+            throw new PrimaryReplicaMissException(
+                    localNode.name(),
+                    null,
+                    localNode.id(),
+                    null,
+                    enlistmentConsistencyToken,
+                    null,
+                    null
+            );
+        }
+
+        long currentEnlistmentConsistencyToken = 
primaryReplicaMeta.getStartTime().longValue();
+
+        if (enlistmentConsistencyToken != currentEnlistmentConsistencyToken
+                || clockService.before(primaryReplicaMeta.getExpirationTime(), 
now)
+                || !isLocalPeer(primaryReplicaMeta.getLeaseholderId())
+        ) {
+            throw new PrimaryReplicaMissException(
+                    localNode.name(),
+                    primaryReplicaMeta.getLeaseholder(),
+                    localNode.id(),
+                    primaryReplicaMeta.getLeaseholderId(),
+                    enlistmentConsistencyToken,
+                    currentEnlistmentConsistencyToken,
+                    null);
+        }
+
+        return 
ReplicaPrimacy.forPrimaryReplicaRequest(primaryReplicaMeta.getStartTime().longValue());
+    }
+
+    private CompletableFuture<ReplicaPrimacy> 
isLocalNodePrimaryReplicaAt(HybridTimestamp timestamp) {
+        return placementDriver.getPrimaryReplica(replicationGroupId, timestamp)
+                .thenApply(primaryReplica -> ReplicaPrimacy.forIsPrimary(
+                        // TODO: 
https://issues.apache.org/jira/browse/IGNITE-24714 - should we also check lease 
expiration?
+                        primaryReplica != null && 
isLocalPeer(primaryReplica.getLeaseholderId())
+                ));
+    }
+
+    private boolean isLocalPeer(UUID nodeId) {
+        return localNode.id().equals(nodeId);
+    }
+
+    /**
+     * Checks whether the token still matches the current primary replica.
+     *
+     * @param suspectedEnlistmentConsistencyToken Enlistment consistency token 
that is going to be checked.
+     */
+    public boolean tokenStillMatchesPrimary(long 
suspectedEnlistmentConsistencyToken) {
+        HybridTimestamp currentTime = clockService.current();
+
+        ReplicaMeta meta = 
placementDriver.getCurrentPrimaryReplica(replicationGroupId, currentTime);
+
+        return meta != null
+                && isLocalPeer(meta.getLeaseholderId())
+                && clockService.before(currentTime, meta.getExpirationTime())
+                && suspectedEnlistmentConsistencyToken == 
meta.getStartTime().longValue();
+    }
+}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
new file mode 100644
index 00000000000..3d9f8044010
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.ReplicaResult;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+
+/**
+ * Processor of replica requests targeted at a particular table.
+ */
+public interface ReplicaTableProcessor {
+    /**
+     * Processes replica request.
+     *
+     * @param request Replica request.
+     * @param replicaPrimacy Replica primacy info.
+     * @param senderId ID of the node that sent the request.
+     * @return Future completed with the result of processing.
+     */
+    CompletableFuture<ReplicaResult> process(ReplicaRequest request, 
ReplicaPrimacy replicaPrimacy, UUID senderId);
+
+    /** Callback on replica shutdown. */
+    void onShutdown();
+}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index c04ebf70910..b598cdb5f20 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.hlc.ClockService;
-import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
@@ -36,7 +35,6 @@ import 
org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaR
 import 
org.apache.ignite.internal.partition.replicator.handlers.TxStateCommitPartitionReplicaRequestHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.VacuumTxStateReplicaRequestHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.WriteIntentSwitchRequestHandler;
-import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
@@ -44,7 +42,6 @@ import 
org.apache.ignite.internal.raft.service.RaftCommandRunner;
 import org.apache.ignite.internal.replicator.ReplicaResult;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
-import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
 import org.apache.ignite.internal.replicator.message.TableAware;
@@ -57,7 +54,6 @@ import 
org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
 import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
 import org.apache.ignite.network.ClusterNode;
-import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.VisibleForTesting;
 
 /**
@@ -68,15 +64,17 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
 
     // TODO: https://issues.apache.org/jira/browse/IGNITE-22624 await for the 
table replica listener if needed.
     // tableId -> tableProcessor.
-    private final Map<Integer, ReplicaListener> replicas = new 
ConcurrentHashMap<>();
+    private final Map<Integer, ReplicaTableProcessor> replicas = new 
ConcurrentHashMap<>();
 
     /** Raft client. */
     private final RaftCommandRunner raftClient;
 
-    private final ReplicationRaftCommandApplicator raftCommandApplicator;
-
     private final ZonePartitionId replicationGroupId;
 
+    private final ReplicaPrimacyEngine replicaPrimacyEngine;
+
+    private final ReplicationRaftCommandApplicator raftCommandApplicator;
+
     // Replica request handlers.
     private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler;
     private final WriteIntentSwitchRequestHandler 
writeIntentSwitchRequestHandler;
@@ -107,10 +105,17 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
     ) {
         this.raftClient = raftClient;
 
-        this.raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftClient, replicationGroupId);
-
         this.replicationGroupId = replicationGroupId;
 
+        this.replicaPrimacyEngine = new ReplicaPrimacyEngine(
+                placementDriver,
+                clockService,
+                replicationGroupId,
+                localNode
+        );
+
+        this.raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftClient, replicationGroupId);
+
         // Request handlers initialization.
 
         txFinishReplicaRequestHandler = new TxFinishReplicaRequestHandler(
@@ -142,11 +147,8 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
 
         txStateCommitPartitionReplicaRequestHandler = new 
TxStateCommitPartitionReplicaRequestHandler(
                 txStatePartitionStorage,
-                placementDriver,
                 txManager,
-                clockService,
                 clusterNodeResolver,
-                replicationGroupId,
                 localNode,
                 new TxRecoveryEngine(
                         txManager,
@@ -168,8 +170,8 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
 
     @Override
     public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, 
UUID senderId) {
-        return ensureReplicaIsPrimary(request)
-                .thenCompose(res -> processRequest(request, res.get1(), 
senderId, res.get2()))
+        return replicaPrimacyEngine.validatePrimacy(request)
+                .thenCompose(replicaPrimacy -> processRequest(request, 
replicaPrimacy, senderId))
                 .thenApply(res -> {
                     if (res instanceof ReplicaResult) {
                         return (ReplicaResult) res;
@@ -181,13 +183,12 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
 
     private CompletableFuture<?> processRequest(
             ReplicaRequest request,
-            @Nullable Boolean isPrimary,
-            UUID senderId,
-            @Nullable Long leaseStartTime
+            ReplicaPrimacy replicaPrimacy,
+            UUID senderId
     ) {
         if (request instanceof TableAware) {
             // This type of request propagates to the table processor directly.
-            return processTableAwareRequest(request, senderId);
+            return processTableAwareRequest(request, replicaPrimacy, senderId);
         }
 
         if (request instanceof TxFinishReplicaRequest) {
@@ -199,51 +200,40 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
             return 
txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest)
 request);
         }
 
-        return processZoneReplicaRequest(request, isPrimary, senderId, 
leaseStartTime);
-    }
-
-    /**
-     * Ensure that the primary replica was not changed.
-     *
-     * @param request Replica request.
-     * @return Future with {@link IgniteBiTuple} containing {@code boolean} 
(whether the replica is primary) and the start time of current
-     *     lease. The boolean is not {@code null} only for {@link 
ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The
-     *     lease start time is not {@code null} in case of {@link 
PrimaryReplicaRequest}.
-     */
-    private CompletableFuture<IgniteBiTuple<Boolean, Long>> 
ensureReplicaIsPrimary(ReplicaRequest request) {
-        // TODO https://issues.apache.org/jira/browse/IGNITE-24380
-        // Move PartitionReplicaListener#ensureReplicaIsPrimary to 
ZonePartitionReplicaListener.
-        return completedFuture(new IgniteBiTuple<>(null, null));
+        return processZoneReplicaRequest(request, replicaPrimacy, senderId);
     }
 
     /**
      * Processes {@link TableAware} request.
      *
      * @param request Request to be processed.
+     * @param replicaPrimacy Replica primacy information.
      * @param senderId Node sender id.
      * @return Future with the result of the request.
      */
-    private CompletableFuture<ReplicaResult> 
processTableAwareRequest(ReplicaRequest request, UUID senderId) {
+    private CompletableFuture<ReplicaResult> processTableAwareRequest(
+            ReplicaRequest request,
+            ReplicaPrimacy replicaPrimacy,
+            UUID senderId
+    ) {
         assert request instanceof TableAware : "Request should be TableAware 
[request=" + request.getClass().getSimpleName() + ']';
 
         return replicas.get(((TableAware) request).tableId())
-                .invoke(request, senderId);
+                .process(request, replicaPrimacy, senderId);
     }
 
     /**
      * Processes zone replica request.
      *
      * @param request Request to be processed.
-     * @param isPrimary {@code true} if the current node is the primary for 
the partition, {@code false} otherwise.
+     * @param replicaPrimacy Replica primacy information.
      * @param senderId Node sender id.
-     * @param leaseStartTime Lease start time.
      * @return Future with the result of the processing.
      */
     private CompletableFuture<?> processZoneReplicaRequest(
             ReplicaRequest request,
-            @Nullable Boolean isPrimary,
-            UUID senderId,
-            @Nullable Long leaseStartTime
+            ReplicaPrimacy replicaPrimacy,
+            UUID senderId
     ) {
         // TODO https://issues.apache.org/jira/browse/IGNITE-24526
         // Need to move the necessary part of 
PartitionReplicaListener#processRequest request processing here
@@ -266,21 +256,21 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
     }
 
     /**
-     * Add table partition listener to the current zone replica listener.
+     * Add table partition replica processor to the current zone replica 
listener.
      *
      * @param tableId Table id.
      * @param replicaListener Table replica listener.
      */
-    public void addTableReplicaListener(int tableId, 
Function<RaftCommandRunner, ReplicaListener> replicaListener) {
+    public void addTableReplicaProcessor(int tableId, 
Function<RaftCommandRunner, ReplicaTableProcessor> replicaListener) {
         replicas.put(tableId, replicaListener.apply(raftClient));
     }
 
     /**
-     * Removes table partition listener by table replication identifier from 
the current zone replica listener.
+     * Removes table partition replica processor by table replication 
identifier from the current zone replica listener.
      *
      * @param tableId Table's identifier.
      */
-    public void removeTableReplicaListener(int tableId) {
+    public void removeTableReplicaProcessor(int tableId) {
         replicas.remove(tableId);
     }
 
@@ -290,7 +280,7 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
      * @return Table replicas listeners.
      */
     @VisibleForTesting
-    public Map<Integer, ReplicaListener> tableReplicaListeners() {
+    public Map<Integer, ReplicaTableProcessor> tableReplicaProcessors() {
         return replicas;
     }
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index 336f93f6a7c..77c32bf82b4 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -175,7 +175,7 @@ class ZoneResourcesManager implements ManuallyCloseable {
         ZonePartitionResources resources = 
getZonePartitionResources(zonePartitionId);
 
         resources.replicaListenerFuture()
-                .thenAccept(zoneReplicaListener -> 
zoneReplicaListener.removeTableReplicaListener(tableId));
+                .thenAccept(zoneReplicaListener -> 
zoneReplicaListener.removeTableReplicaProcessor(tableId));
 
         resources.raftListener().removeTableProcessor(tableId);
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
index 9ea80c736c2..f4178d49164 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.partition.replicator.handlers;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.tx.TxState.ABANDONED;
 import static org.apache.ignite.internal.tx.TxState.FINISHING;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
@@ -26,12 +25,8 @@ import static 
org.apache.ignite.internal.tx.TxState.isFinalState;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
 import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine;
-import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
 import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxMeta;
@@ -47,12 +42,9 @@ import org.jetbrains.annotations.Nullable;
  */
 public class TxStateCommitPartitionReplicaRequestHandler {
     private final TxStatePartitionStorage txStatePartitionStorage;
-    private final LeasePlacementDriver placementDriver;
     private final TxManager txManager;
-    private final ClockService clockService;
     private final ClusterNodeResolver clusterNodeResolver;
 
-    private final ReplicationGroupId replicationGroupId;
     private final ClusterNode localNode;
 
     private final TxRecoveryEngine txRecoveryEngine;
@@ -60,20 +52,14 @@ public class TxStateCommitPartitionReplicaRequestHandler {
     /** Constructor. */
     public TxStateCommitPartitionReplicaRequestHandler(
             TxStatePartitionStorage txStatePartitionStorage,
-            LeasePlacementDriver placementDriver,
             TxManager txManager,
-            ClockService clockService,
             ClusterNodeResolver clusterNodeResolver,
-            ReplicationGroupId replicationGroupId,
             ClusterNode localNode,
             TxRecoveryEngine txRecoveryEngine
     ) {
         this.txStatePartitionStorage = txStatePartitionStorage;
-        this.placementDriver = placementDriver;
         this.txManager = txManager;
-        this.clockService = clockService;
         this.clusterNodeResolver = clusterNodeResolver;
-        this.replicationGroupId = replicationGroupId;
         this.localNode = localNode;
         this.txRecoveryEngine = txRecoveryEngine;
     }
@@ -85,56 +71,21 @@ public class TxStateCommitPartitionReplicaRequestHandler {
      * @return Result future.
      */
     public CompletableFuture<TransactionMeta> 
handle(TxStateCommitPartitionRequest request) {
-        return placementDriver.getPrimaryReplica(replicationGroupId, 
clockService.now())
-                .thenCompose(replicaMeta -> {
-                    if (replicaMeta == null || replicaMeta.getLeaseholder() == 
null) {
-                        return failedFuture(
-                                new PrimaryReplicaMissException(
-                                        localNode.name(),
-                                        null,
-                                        localNode.id(),
-                                        null,
-                                        null,
-                                        null,
-                                        null
-                                )
-                        );
-                    }
-
-                    if (!isLocalPeer(replicaMeta.getLeaseholderId())) {
-                        return failedFuture(
-                                new PrimaryReplicaMissException(
-                                        localNode.name(),
-                                        replicaMeta.getLeaseholder(),
-                                        localNode.id(),
-                                        replicaMeta.getLeaseholderId(),
-                                        null,
-                                        null,
-                                        null
-                                )
-                        );
-                    }
-
-                    UUID txId = request.txId();
-
-                    TxStateMeta txMeta = txManager.stateMeta(txId);
-
-                    if (txMeta != null && txMeta.txState() == FINISHING) {
-                        assert txMeta instanceof TxStateMetaFinishing : txMeta;
-
-                        return ((TxStateMetaFinishing) 
txMeta).txFinishFuture();
-                    } else if (txMeta == null || 
!isFinalState(txMeta.txState())) {
-                        // Try to trigger recovery, if needed. If the 
transaction will be aborted, the proper ABORTED state will be sent
-                        // in response.
-                        return 
triggerTxRecoveryOnTxStateResolutionIfNeeded(txId, txMeta);
-                    } else {
-                        return completedFuture(txMeta);
-                    }
-                });
-    }
+        UUID txId = request.txId();
+
+        TxStateMeta txMeta = txManager.stateMeta(txId);
+
+        if (txMeta != null && txMeta.txState() == FINISHING) {
+            assert txMeta instanceof TxStateMetaFinishing : txMeta;
 
-    private boolean isLocalPeer(UUID nodeId) {
-        return localNode.id().equals(nodeId);
+            return ((TxStateMetaFinishing) txMeta).txFinishFuture();
+        } else if (txMeta == null || !isFinalState(txMeta.txState())) {
+            // Try to trigger recovery, if needed. If the transaction will be 
aborted, the proper ABORTED state will be sent
+            // in response.
+            return triggerTxRecoveryOnTxStateResolutionIfNeeded(txId, txMeta);
+        } else {
+            return completedFuture(txMeta);
+        }
     }
 
     /**
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
index e649d88ab49..477fcaec2c9 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
@@ -35,6 +35,8 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.partition.replicator.FuturesCleanupResult;
 import org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
+import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
 import org.apache.ignite.internal.partition.replicator.ReplicaTxFinishMarker;
 import 
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
@@ -43,7 +45,6 @@ import 
org.apache.ignite.internal.raft.service.RaftCommandRunner;
 import org.apache.ignite.internal.replicator.CommandApplicationResult;
 import org.apache.ignite.internal.replicator.ReplicaResult;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
-import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.replicator.message.ReplicaMessageUtils;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.schema.SchemaSyncService;
@@ -68,7 +69,7 @@ public class WriteIntentSwitchRequestHandler {
 
     private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
 
-    private final IntFunction<ReplicaListener> replicaListenerByTableId;
+    private final IntFunction<ReplicaTableProcessor> replicaListenerByTableId;
 
     private final ClockService clockService;
 
@@ -80,7 +81,7 @@ public class WriteIntentSwitchRequestHandler {
 
     /** Constructor. */
     public WriteIntentSwitchRequestHandler(
-            IntFunction<ReplicaListener> replicaListenerByTableId,
+            IntFunction<ReplicaTableProcessor> replicaListenerByTableId,
             ClockService clockService,
             SchemaSyncService schemaSyncService,
             CatalogService catalogService,
@@ -159,7 +160,8 @@ public class WriteIntentSwitchRequestHandler {
                 .tableId(tableId)
                 .build();
 
-        return replicaListener(tableId).invoke(tableSpecificRequest, senderId);
+        // Using empty primacy because the request is not a 
PrimaryReplicaRequest.
+        return replicaTableProcessor(tableId).process(tableSpecificRequest, 
ReplicaPrimacy.empty(), senderId);
     }
 
     private CompletableFuture<Object> 
applyCommandToGroup(WriteIntentSwitchReplicaRequest request, Integer 
catalogVersion) {
@@ -185,11 +187,11 @@ public class WriteIntentSwitchRequestHandler {
         return new WriteIntentSwitchReplicatedInfo(request.txId(), 
replicationGroupId);
     }
 
-    private ReplicaListener replicaListener(Integer tableId) {
-        ReplicaListener replicaListener = 
replicaListenerByTableId.apply(tableId);
+    private ReplicaTableProcessor replicaTableProcessor(int tableId) {
+        ReplicaTableProcessor replicaTableProcessor = 
replicaListenerByTableId.apply(tableId);
 
-        assert replicaListener != null : "No replica listener for table ID " + 
tableId;
+        assert replicaTableProcessor != null : "No replica table processor for 
table ID " + tableId;
 
-        return replicaListener;
+        return replicaTableProcessor;
     }
 }
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java
new file mode 100644
index 00000000000..f94cff87cf3
--- /dev/null
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.ClusterNodeResolver;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
+import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStatePartitionStorage;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class ZonePartitionReplicaListenerTest extends BaseIgniteAbstractTest {
+    private final ClusterNode localNode = new ClusterNodeImpl(nodeId(1), 
"node1", NetworkAddress.from("127.0.0.1:127"));
+
+    private final ClusterNode anotherNode = new ClusterNodeImpl(nodeId(2), 
"node2", NetworkAddress.from("127.0.0.2:127"));
+
+    @Spy
+    private final TxStatePartitionStorage txStateStorage = new 
TestTxStatePartitionStorage();
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    private final ClockService clockService = new TestClockService(clock);
+
+    @Mock
+    private TxManager txManager;
+
+    @Mock
+    private ValidationSchemasSource validationSchemasSource;
+
+    @Mock
+    private SchemaSyncService schemaSyncService;
+
+    @Mock
+    private CatalogService catalogService;
+
+    @Spy
+    private final TestPlacementDriver placementDriver = new 
TestPlacementDriver(localNode);
+
+    @Mock
+    private ClusterNodeResolver clusterNodeResolver;
+
+    @Mock
+    private RaftCommandRunner raftClient;
+
+    private final ZonePartitionId groupId = new ZonePartitionId(1, 2);
+
+    private ZonePartitionReplicaListener partitionReplicaListener;
+
+    private static UUID nodeId(int id) {
+        return new UUID(0, id);
+    }
+
+    @BeforeEach
+    void setUp() {
+        partitionReplicaListener = new ZonePartitionReplicaListener(
+                txStateStorage,
+                clockService,
+                txManager,
+                validationSchemasSource,
+                schemaSyncService,
+                catalogService,
+                placementDriver,
+                clusterNodeResolver,
+                raftClient,
+                localNode,
+                groupId
+        );
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {PrimaryReplicaRequest.class, 
TxStateCommitPartitionRequest.class})
+    void primaryReplicaRequestsAreRejectedWhenPrimaryIsNotKnown(Class<? 
extends PrimaryReplicaRequest> requestClass) {
+        doReturn(null).when(placementDriver).getCurrentPrimaryReplica(any(), 
any());
+        
doReturn(nullCompletedFuture()).when(placementDriver).getPrimaryReplica(any(), 
any());
+
+        PrimaryReplicaRequest request = mock(requestClass);
+
+        assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {PrimaryReplicaRequest.class, 
TxStateCommitPartitionRequest.class})
+    void 
primaryReplicaRequestsAreRejectedWhenPrimaryDoesNotMatchLeaseStartTime(Class<? 
extends PrimaryReplicaRequest> requestClass) {
+        long leaseStartTime = clock.nowLong();
+        placementDriver.setPrimaryReplicaSupplier(
+                () -> new TestReplicaMetaImpl(localNode, 
hybridTimestamp(leaseStartTime), HybridTimestamp.MAX_VALUE)
+        );
+
+        PrimaryReplicaRequest request = mock(requestClass);
+        when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime - 
1000);
+
+        assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {PrimaryReplicaRequest.class, 
TxStateCommitPartitionRequest.class})
+    void primaryReplicaRequestsAreRejectedWhenLeaseIsExpired(Class<? extends 
PrimaryReplicaRequest> requestClass) {
+        long leaseStartTime = clock.nowLong();
+        placementDriver.setPrimaryReplicaSupplier(
+                () -> new TestReplicaMetaImpl(localNode, 
hybridTimestamp(leaseStartTime), HybridTimestamp.MIN_VALUE)
+        );
+
+        PrimaryReplicaRequest request = mock(requestClass);
+        when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime);
+
+        assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {PrimaryReplicaRequest.class, 
TxStateCommitPartitionRequest.class})
+    void primaryReplicaRequestsAreRejectedWhenLeaseholderIsDifferent(Class<? 
extends PrimaryReplicaRequest> requestClass) {
+        long leaseStartTime = clock.nowLong();
+        placementDriver.setPrimaryReplicaSupplier(
+                () -> new TestReplicaMetaImpl(anotherNode, 
hybridTimestamp(leaseStartTime), HybridTimestamp.MAX_VALUE)
+        );
+
+        PrimaryReplicaRequest request = mock(requestClass);
+        when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime);
+
+        assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 586d72a5e07..3bd881e36cd 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -150,6 +150,7 @@ import 
org.apache.ignite.internal.network.serialization.MessageSerializationRegi
 import 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent;
 import 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEventParameters;
 import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
+import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ChangePeersAndLearnersAsyncReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
@@ -912,7 +913,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
             minTimeCollectorService.addPartition(new TablePartitionId(tableId, 
partId));
 
-            Function<RaftCommandRunner, ReplicaListener> createListener = 
raftClient -> createReplicaListener(
+            Function<RaftCommandRunner, ReplicaTableProcessor> createListener 
= raftClient -> createReplicaListener(
                     zonePartitionId,
                     table,
                     safeTimeTracker,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 4ae5a0e4d0e..455ca521444 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.table.distributed.replicator;
 
-import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
@@ -82,7 +81,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
@@ -106,6 +104,9 @@ import org.apache.ignite.internal.lowwatermark.LowWatermark;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
 import org.apache.ignite.internal.partition.replicator.FuturesCleanupResult;
 import org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacyEngine;
+import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
 import org.apache.ignite.internal.partition.replicator.ReplicaTxFinishMarker;
 import 
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
 import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine;
@@ -146,8 +147,7 @@ import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasS
 import 
org.apache.ignite.internal.partition.replicator.schemacompat.IncompatibleSchemaVersionException;
 import 
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
-import org.apache.ignite.internal.placementdriver.PlacementDriver;
-import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
 import org.apache.ignite.internal.raft.PeersAndLearners;
@@ -164,7 +164,6 @@ import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissExcepti
 import org.apache.ignite.internal.replicator.exception.ReplicationException;
 import 
org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
-import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
 import 
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
@@ -237,7 +236,7 @@ import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
 
 /** Partition replication listener. */
-public class PartitionReplicaListener implements ReplicaListener {
+public class PartitionReplicaListener implements ReplicaListener, 
ReplicaTableProcessor {
     /**
      * NB: this listener makes writes to the underlying MV partition storage 
without taking the partition snapshots read lock. This causes
      * the RAFT snapshots transferred to a follower being slightly 
inconsistent for a limited amount of time.
@@ -329,9 +328,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     private final SchemaCompatibilityValidator schemaCompatValidator;
 
-    /** Instance of the local node. */
-    private final ClusterNode localNode;
-
     private final SchemaSyncService schemaSyncService;
 
     private final CatalogService catalogService;
@@ -342,9 +338,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     /** Prevents double stopping. */
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
-    /** Placement driver. */
-    private final PlacementDriver placementDriver;
-
     /** Read-write transaction operation tracker for building indexes. */
     private final IndexBuilderTxRwOperationTracker txRwOperationTracker = new 
IndexBuilderTxRwOperationTracker();
 
@@ -359,6 +352,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     private static final boolean SKIP_UPDATES = 
getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK);
 
+    private final ReplicaPrimacyEngine replicaPrimacyEngine;
     private final ReliableCatalogVersions reliableCatalogVersions;
     private final ReplicationRaftCommandApplicator raftCommandApplicator;
     private final ReplicaTxFinishMarker replicaTxFinishMarker;
@@ -415,7 +409,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             ClusterNode localNode,
             SchemaSyncService schemaSyncService,
             CatalogService catalogService,
-            PlacementDriver placementDriver,
+            LeasePlacementDriver placementDriver,
             ClusterNodeResolver clusterNodeResolver,
             RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry,
             SchemaRegistry schemaRegistry,
@@ -435,10 +429,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         this.txStatePartitionStorage = txStatePartitionStorage;
         this.transactionStateResolver = transactionStateResolver;
         this.storageUpdateHandler = storageUpdateHandler;
-        this.localNode = localNode;
         this.schemaSyncService = schemaSyncService;
         this.catalogService = catalogService;
-        this.placementDriver = placementDriver;
         this.remotelyTriggeredResourceRegistry = 
remotelyTriggeredResourceRegistry;
         this.schemaRegistry = schemaRegistry;
         this.indexMetaStorage = indexMetaStorage;
@@ -449,6 +441,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         this.schemaCompatValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
 
+        replicaPrimacyEngine = new ReplicaPrimacyEngine(placementDriver, 
clockService, replicationGroupId, localNode);
         reliableCatalogVersions = new 
ReliableCatalogVersions(schemaSyncService, catalogService);
         raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
         replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
@@ -478,11 +471,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         txStateCommitPartitionReplicaRequestHandler = new 
TxStateCommitPartitionReplicaRequestHandler(
                 txStatePartitionStorage,
-                placementDriver,
                 txManager,
-                clockService,
                 clusterNodeResolver,
-                replicationGroupId,
                 localNode,
                 txRecoveryEngine);
 
@@ -548,15 +538,30 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     @Override
     public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, 
UUID senderId) {
-        return ensureReplicaIsPrimary(request)
-                .thenCompose(res -> processRequest(request, res.get1(), 
senderId, res.get2()))
-                .thenApply(res -> {
-                    if (res instanceof ReplicaResult) {
-                        return (ReplicaResult) res;
-                    } else {
-                        return new ReplicaResult(res, null);
-                    }
-                });
+        return replicaPrimacyEngine.validatePrimacy(request)
+                .thenCompose(replicaPrimacy -> 
processRequestInContext(request, replicaPrimacy, senderId));
+    }
+
+    @Override
+    public CompletableFuture<ReplicaResult> process(ReplicaRequest request, 
ReplicaPrimacy replicaPrimacy, UUID senderId) {
+        return processRequestInContext(request, replicaPrimacy, senderId);
+    }
+
+    private CompletableFuture<ReplicaResult> processRequestInContext(
+            ReplicaRequest request,
+            ReplicaPrimacy replicaPrimacy,
+            UUID senderId
+    ) {
+        return processRequest(request, replicaPrimacy, senderId)
+                
.thenApply(PartitionReplicaListener::wrapInReplicaResultIfNeeded);
+    }
+
+    private static ReplicaResult wrapInReplicaResultIfNeeded(Object res) {
+        if (res instanceof ReplicaResult) {
+            return (ReplicaResult) res;
+        } else {
+            return new ReplicaResult(res, null);
+        }
     }
 
     /** Returns Raft-client. */
@@ -568,8 +573,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         return raftCommandRunner;
     }
 
-    private CompletableFuture<?> processRequest(ReplicaRequest request, 
@Nullable Boolean isPrimary, UUID senderId,
-            @Nullable Long leaseStartTime) {
+    private CompletableFuture<?> processRequest(ReplicaRequest request, 
ReplicaPrimacy replicaPrimacy, UUID senderId) {
         // TODO https://issues.apache.org/jira/browse/IGNITE-24526
         // Need to move the necessary part of request processing to 
ZonePartitionReplicaListener
 
@@ -623,7 +627,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         // Don't need to validate schema.
         if (opTs == null) {
             assert opTsIfDirectRo == null;
-            return 
processOperationRequestWithTxOperationManagementLogic(senderId, request, 
isPrimary, null, leaseStartTime);
+            return 
processOperationRequestWithTxOperationManagementLogic(senderId, request, 
replicaPrimacy, null);
         }
 
         assert txTs != null && opTs.compareTo(txTs) >= 0 : "Invalid request 
timestamps";
@@ -644,7 +648,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         };
 
         return 
schemaSyncService.waitForMetadataCompleteness(opTs).thenRun(validateClo).thenCompose(ignored
 ->
-                
processOperationRequestWithTxOperationManagementLogic(senderId, request, 
isPrimary, opTsIfDirectRo, leaseStartTime));
+                
processOperationRequestWithTxOperationManagementLogic(senderId, request, 
replicaPrimacy, opTsIfDirectRo));
     }
 
     private CompletableFuture<Long> processGetEstimatedSizeRequest() {
@@ -706,7 +710,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     );
                 })
                 .thenCompose(leaderWithTerm -> {
-                    if (leaderWithTerm.isEmpty() || 
!isTokenStillValidPrimary(request.enlistmentConsistencyToken())) {
+                    if (leaderWithTerm.isEmpty() || 
!replicaPrimacyEngine.tokenStillMatchesPrimary(request.enlistmentConsistencyToken()))
 {
                         return nullCompletedFuture();
                     }
 
@@ -723,17 +727,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 });
     }
 
-    private boolean isTokenStillValidPrimary(long 
suspectedEnlistmentConsistencyToken) {
-        HybridTimestamp currentTime = clockService.current();
-
-        ReplicaMeta meta = 
placementDriver.getCurrentPrimaryReplica(replicationGroupId, currentTime);
-
-        return meta != null
-                && isLocalPeer(meta.getLeaseholderId())
-                && clockService.before(currentTime, meta.getExpirationTime())
-                && suspectedEnlistmentConsistencyToken == 
meta.getStartTime().longValue();
-    }
-
     private static PeersAndLearners 
peersConfigurationFromMessage(ChangePeersAndLearnersAsyncReplicaRequest 
request) {
         Assignments pendingAssignments = 
fromBytes(request.pendingAssignments());
 
@@ -793,49 +786,76 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      *
      * @param senderId Sender id.
      * @param request Request.
-     * @param isPrimary Whether the current replica is primary.
+     * @param replicaPrimacy Replica primacy information.
      * @param opStartTsIfDirectRo Start timestamp in case of direct RO tx.
-     * @param lst Lease start time of the current lease that was active at the 
moment of the start of request processing, in the
-     *         case of {@link PrimaryReplicaRequest}, otherwise should be 
{@code null}.
      * @return Future.
      */
     private CompletableFuture<?> processOperationRequest(
             UUID senderId,
             ReplicaRequest request,
-            @Nullable Boolean isPrimary,
-            @Nullable HybridTimestamp opStartTsIfDirectRo,
-            @Nullable Long lst
+            ReplicaPrimacy replicaPrimacy,
+            @Nullable HybridTimestamp opStartTsIfDirectRo
     ) {
         if (request instanceof ReadWriteSingleRowReplicaRequest) {
             var req = (ReadWriteSingleRowReplicaRequest) request;
 
             var opId = new OperationId(senderId, req.timestamp().longValue());
 
-            return appendTxCommand(req.transactionId(), opId, 
req.requestType(), req.full(), () -> processSingleEntryAction(req, lst));
+            return appendTxCommand(
+                    req.transactionId(),
+                    opId,
+                    req.requestType(),
+                    req.full(),
+                    () -> processSingleEntryAction(req, 
replicaPrimacy.leaseStartTime())
+            );
         } else if (request instanceof ReadWriteSingleRowPkReplicaRequest) {
             var req = (ReadWriteSingleRowPkReplicaRequest) request;
 
             var opId = new OperationId(senderId, req.timestamp().longValue());
 
-            return appendTxCommand(req.transactionId(), opId, 
req.requestType(), req.full(), () -> processSingleEntryAction(req, lst));
+            return appendTxCommand(
+                    req.transactionId(),
+                    opId,
+                    req.requestType(),
+                    req.full(),
+                    () -> processSingleEntryAction(req, 
replicaPrimacy.leaseStartTime())
+            );
         } else if (request instanceof ReadWriteMultiRowReplicaRequest) {
             var req = (ReadWriteMultiRowReplicaRequest) request;
 
             var opId = new OperationId(senderId, req.timestamp().longValue());
 
-            return appendTxCommand(req.transactionId(), opId, 
req.requestType(), req.full(), () -> processMultiEntryAction(req, lst));
+            return appendTxCommand(
+                    req.transactionId(),
+                    opId,
+                    req.requestType(),
+                    req.full(),
+                    () -> processMultiEntryAction(req, 
replicaPrimacy.leaseStartTime())
+            );
         } else if (request instanceof ReadWriteMultiRowPkReplicaRequest) {
             var req = (ReadWriteMultiRowPkReplicaRequest) request;
 
             var opId = new OperationId(senderId, req.timestamp().longValue());
 
-            return appendTxCommand(req.transactionId(), opId, 
req.requestType(), req.full(), () -> processMultiEntryAction(req, lst));
+            return appendTxCommand(
+                    req.transactionId(),
+                    opId,
+                    req.requestType(),
+                    req.full(),
+                    () -> processMultiEntryAction(req, 
replicaPrimacy.leaseStartTime())
+            );
         } else if (request instanceof ReadWriteSwapRowReplicaRequest) {
             var req = (ReadWriteSwapRowReplicaRequest) request;
 
             var opId = new OperationId(senderId, req.timestamp().longValue());
 
-            return appendTxCommand(req.transactionId(), opId, 
req.requestType(), req.full(), () -> processTwoEntriesAction(req, lst));
+            return appendTxCommand(
+                    req.transactionId(),
+                    opId,
+                    req.requestType(),
+                    req.full(),
+                    () -> processTwoEntriesAction(req, 
replicaPrimacy.leaseStartTime())
+            );
         } else if (request instanceof 
ReadWriteScanRetrieveBatchReplicaRequest) {
             var req = (ReadWriteScanRetrieveBatchReplicaRequest) request;
 
@@ -882,13 +902,13 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         } else if (request instanceof TableWriteIntentSwitchReplicaRequest) {
             return 
processTableWriteIntentSwitchAction((TableWriteIntentSwitchReplicaRequest) 
request);
         } else if (request instanceof ReadOnlySingleRowPkReplicaRequest) {
-            return 
processReadOnlySingleEntryAction((ReadOnlySingleRowPkReplicaRequest) request, 
isPrimary);
+            return 
processReadOnlySingleEntryAction((ReadOnlySingleRowPkReplicaRequest) request, 
replicaPrimacy.isPrimary());
         } else if (request instanceof ReadOnlyMultiRowPkReplicaRequest) {
-            return 
processReadOnlyMultiEntryAction((ReadOnlyMultiRowPkReplicaRequest) request, 
isPrimary);
+            return 
processReadOnlyMultiEntryAction((ReadOnlyMultiRowPkReplicaRequest) request, 
replicaPrimacy.isPrimary());
         } else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) 
{
-            return 
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
 request, isPrimary);
+            return 
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
 request, replicaPrimacy.isPrimary());
         } else if (request instanceof ReplicaSafeTimeSyncRequest) {
-            return processReplicaSafeTimeSyncRequest(isPrimary);
+            return 
processReplicaSafeTimeSyncRequest(replicaPrimacy.isPrimary());
         } else if (request instanceof BuildIndexReplicaRequest) {
             return 
buildIndexReplicaRequestHandler.handle((BuildIndexReplicaRequest) request);
         } else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) {
@@ -922,10 +942,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      */
     private CompletableFuture<List<BinaryRow>> 
processReadOnlyScanRetrieveBatchAction(
             ReadOnlyScanRetrieveBatchReplicaRequest request,
-            Boolean isPrimary
+            boolean isPrimary
     ) {
-        requireNonNull(isPrimary);
-
         UUID txId = request.transactionId();
         int batchCount = request.batchSize();
         HybridTimestamp readTimestamp = request.readTimestamp();
@@ -1078,7 +1096,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param isPrimary Whether the given replica is primary.
      * @return Result future.
      */
-    private CompletableFuture<BinaryRow> 
processReadOnlySingleEntryAction(ReadOnlySingleRowPkReplicaRequest request, 
Boolean isPrimary) {
+    private CompletableFuture<BinaryRow> 
processReadOnlySingleEntryAction(ReadOnlySingleRowPkReplicaRequest request, 
boolean isPrimary) {
         BinaryTuple primaryKey = resolvePk(request.primaryKey());
         HybridTimestamp readTimestamp = request.readTimestamp();
 
@@ -1100,7 +1118,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param timestamp Timestamp to check.
      * @return True if the timestamp is already passed in the reference system 
of the current node and node is primary, false otherwise.
      */
-    private boolean isPrimaryInTimestamp(Boolean isPrimary, HybridTimestamp 
timestamp) {
+    private boolean isPrimaryInTimestamp(boolean isPrimary, HybridTimestamp 
timestamp) {
         return isPrimary && clockService.now().compareTo(timestamp) > 0;
     }
 
@@ -1113,7 +1131,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      */
     private CompletableFuture<List<BinaryRow>> processReadOnlyMultiEntryAction(
             ReadOnlyMultiRowPkReplicaRequest request,
-            Boolean isPrimary
+            boolean isPrimary
     ) {
         List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys());
         HybridTimestamp readTimestamp = request.readTimestamp();
@@ -1143,9 +1161,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param isPrimary Whether is primary replica.
      * @return Future.
      */
-    private CompletableFuture<?> processReplicaSafeTimeSyncRequest(Boolean 
isPrimary) {
-        requireNonNull(isPrimary);
-
+    private CompletableFuture<?> processReplicaSafeTimeSyncRequest(boolean 
isPrimary) {
         // Disable safe-time sync if the Colocation feature is enabled, 
safe-time is managed on a different level there.
         if (!isPrimary || enabledColocation()) {
             return nullCompletedFuture();
@@ -2053,7 +2069,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param leaseStartTime Lease start time.
      * @return Listener response.
      */
-    private CompletableFuture<ReplicaResult> 
processMultiEntryAction(ReadWriteMultiRowReplicaRequest request, Long 
leaseStartTime) {
+    private CompletableFuture<ReplicaResult> 
processMultiEntryAction(ReadWriteMultiRowReplicaRequest request, long 
leaseStartTime) {
         UUID txId = request.transactionId();
         ReplicationGroupId commitPartitionId = 
request.commitPartitionId().asReplicationGroupId();
         List<BinaryRow> searchRows = request.binaryRows();
@@ -2325,7 +2341,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param leaseStartTime Lease start time.
      * @return Listener response.
      */
-    private CompletableFuture<?> 
processMultiEntryAction(ReadWriteMultiRowPkReplicaRequest request, Long 
leaseStartTime) {
+    private CompletableFuture<?> 
processMultiEntryAction(ReadWriteMultiRowPkReplicaRequest request, long 
leaseStartTime) {
         UUID txId = request.transactionId();
         ReplicationGroupId commitPartitionId = 
request.commitPartitionId().asReplicationGroupId();
         List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys());
@@ -2469,10 +2485,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             boolean full,
             UUID txCoordinatorId,
             int catalogVersion,
-            Long leaseStartTime
+            long leaseStartTime
     ) {
-        assert leaseStartTime != null : format("Lease start time is null for 
UpdateCommand [txId={}].", txId);
-
         UpdateCommand cmd = updateCommand(
                 commitPartitionId,
                 rowUuid,
@@ -2558,7 +2572,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             @Nullable BinaryRow row,
             @Nullable HybridTimestamp lastCommitTimestamp,
             int catalogVersion,
-            Long leaseStartTime
+            long leaseStartTime
     ) {
         return applyUpdateCommand(
                 request.commitPartitionId().asReplicationGroupId(),
@@ -2593,10 +2607,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             UUID txCoordinatorId,
             int catalogVersion,
             boolean skipDelayedAck,
-            Long leaseStartTime
+            long leaseStartTime
     ) {
-        assert leaseStartTime != null : format("Lease start time is null for 
UpdateAllCommand [txId={}].", txId);
-
         UpdateAllCommand cmd = updateAllCommand(
                 rowsToUpdate,
                 commitPartitionId,
@@ -2686,7 +2698,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             ReadWriteMultiRowReplicaRequest request,
             Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
             int catalogVersion,
-            Long leaseStartTime
+            long leaseStartTime
     ) {
         return applyUpdateAllCommand(
                 rowsToUpdate,
@@ -2729,7 +2741,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param leaseStartTime Lease start time.
      * @return Listener response.
      */
-    private CompletableFuture<ReplicaResult> 
processSingleEntryAction(ReadWriteSingleRowReplicaRequest request, Long 
leaseStartTime) {
+    private CompletableFuture<ReplicaResult> 
processSingleEntryAction(ReadWriteSingleRowReplicaRequest request, long 
leaseStartTime) {
         UUID txId = request.transactionId();
         BinaryRow searchRow = request.binaryRow();
         ReplicationGroupId commitPartitionId = 
request.commitPartitionId().asReplicationGroupId();
@@ -2928,7 +2940,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param leaseStartTime Lease start time.
      * @return Listener response.
      */
-    private CompletableFuture<ReplicaResult> 
processSingleEntryAction(ReadWriteSingleRowPkReplicaRequest request, Long 
leaseStartTime) {
+    private CompletableFuture<ReplicaResult> 
processSingleEntryAction(ReadWriteSingleRowPkReplicaRequest request, long 
leaseStartTime) {
         UUID txId = request.transactionId();
         BinaryTuple primaryKey = resolvePk(request.primaryKey());
         ReplicationGroupId commitPartitionId = 
request.commitPartitionId().asReplicationGroupId();
@@ -3198,7 +3210,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param leaseStartTime Lease start time.
      * @return Listener response.
      */
-    private CompletableFuture<ReplicaResult> 
processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request, Long 
leaseStartTime) {
+    private CompletableFuture<ReplicaResult> 
processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request, long 
leaseStartTime) {
         BinaryRow newRow = request.newBinaryRow();
         BinaryRow expectedRow = request.oldBinaryRow();
         ReplicationGroupIdMessage commitPartitionId = 
request.commitPartitionId();
@@ -3271,80 +3283,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 });
     }
 
-    /**
-     * Ensure that the primary replica was not changed.
-     *
-     * @param request Replica request.
-     * @return Future with {@link IgniteBiTuple} containing {@code boolean} 
(whether the replica is primary) and the start time of current
-     *         lease. The boolean is not {@code null} only for {@link 
ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The
-     *         lease start time is not {@code null} in case of {@link 
PrimaryReplicaRequest}.
-     */
-    private CompletableFuture<IgniteBiTuple<Boolean, Long>> 
ensureReplicaIsPrimary(ReplicaRequest request) {
-        HybridTimestamp current = clockService.current();
-
-        if (request instanceof PrimaryReplicaRequest) {
-            Long enlistmentConsistencyToken = ((PrimaryReplicaRequest) 
request).enlistmentConsistencyToken();
-
-            Function<ReplicaMeta, IgniteBiTuple<Boolean, Long>> validateClo = 
primaryReplicaMeta -> {
-                if (primaryReplicaMeta == null) {
-                    throw new PrimaryReplicaMissException(
-                            localNode.name(),
-                            null,
-                            localNode.id(),
-                            null,
-                            enlistmentConsistencyToken,
-                            null,
-                            null
-                    );
-                }
-
-                long currentEnlistmentConsistencyToken = 
primaryReplicaMeta.getStartTime().longValue();
-
-                if (enlistmentConsistencyToken != 
currentEnlistmentConsistencyToken
-                        || 
clockService.before(primaryReplicaMeta.getExpirationTime(), current)
-                        || !isLocalPeer(primaryReplicaMeta.getLeaseholderId())
-                ) {
-                    throw new PrimaryReplicaMissException(
-                            localNode.name(),
-                            primaryReplicaMeta.getLeaseholder(),
-                            localNode.id(),
-                            primaryReplicaMeta.getLeaseholderId(),
-                            enlistmentConsistencyToken,
-                            currentEnlistmentConsistencyToken,
-                            null);
-                }
-
-                return new IgniteBiTuple<>(null, 
primaryReplicaMeta.getStartTime().longValue());
-            };
-
-            ReplicaMeta meta = 
placementDriver.getCurrentPrimaryReplica(replicationGroupId, current);
-
-            if (meta != null) {
-                try {
-                    return completedFuture(validateClo.apply(meta));
-                } catch (Exception e) {
-                    return failedFuture(e);
-                }
-            }
-
-            return placementDriver.getPrimaryReplica(replicationGroupId, 
current).thenApply(validateClo);
-        } else if (request instanceof ReadOnlyReplicaRequest) {
-            return isLocalNodePrimaryReplicaAt(current);
-        } else if (request instanceof ReplicaSafeTimeSyncRequest) {
-            return isLocalNodePrimaryReplicaAt(current);
-        } else {
-            return completedFuture(new IgniteBiTuple<>(null, null));
-        }
-    }
-
-    private CompletableFuture<IgniteBiTuple<Boolean, Long>> 
isLocalNodePrimaryReplicaAt(HybridTimestamp timestamp) {
-        return placementDriver.getPrimaryReplica(replicationGroupId, timestamp)
-                .thenApply(primaryReplica -> new IgniteBiTuple<>(
-                        primaryReplica != null && 
isLocalPeer(primaryReplica.getLeaseholderId()),
-                        null
-                ));
-    }
-
     /**
      * Resolves read result to the corresponding binary row. Following rules 
are used for read result resolution:
      * <ol>
@@ -3678,23 +3616,18 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         return tableId;
     }
 
-    private boolean isLocalPeer(UUID nodeId) {
-        return localNode.id().equals(nodeId);
-    }
-
     private CompletableFuture<?> 
processOperationRequestWithTxOperationManagementLogic(
             UUID senderId,
             ReplicaRequest request,
-            @Nullable Boolean isPrimary,
-            @Nullable HybridTimestamp opStartTsIfDirectRo,
-            @Nullable Long leaseStartTime
+            ReplicaPrimacy replicaPrimacy,
+            @Nullable HybridTimestamp opStartTsIfDirectRo
     ) {
         incrementRwOperationCountIfNeeded(request);
 
         UUID txIdLockingLwm = tryToLockLwmIfNeeded(request, 
opStartTsIfDirectRo);
 
         try {
-            return processOperationRequest(senderId, request, isPrimary, 
opStartTsIfDirectRo, leaseStartTime)
+            return processOperationRequest(senderId, request, replicaPrimacy, 
opStartTsIfDirectRo)
                     .whenComplete((unused, throwable) -> {
                         unlockLwmIfNeeded(txIdLockingLwm, request);
                         decrementRwOperationCountIfNeeded(request);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index f5350c11670..49c1ecfc688 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -82,6 +82,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.lenient;
@@ -174,6 +175,7 @@ import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
 import 
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
@@ -243,6 +245,7 @@ import 
org.apache.ignite.internal.tx.message.PartitionEnlistmentMessage;
 import org.apache.ignite.internal.tx.message.TransactionMetaMessage;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
 import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest;
 import org.apache.ignite.internal.tx.message.TxStateResponse;
 import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
@@ -452,7 +455,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     );
 
     /** Placement driver. */
-    private PlacementDriver placementDriver;
+    private TestPlacementDriver placementDriver;
 
     /** Partition replication listener to test. */
     private PartitionReplicaListener partitionReplicaListener;
@@ -652,7 +655,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         transactionStateResolver.start();
 
-        placementDriver = new TestPlacementDriver(localNode);
+        placementDriver = spy(new TestPlacementDriver(localNode));
 
         partitionReplicaListener = new PartitionReplicaListener(
                 testMvPartitionStorage,
@@ -824,7 +827,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     public void 
testEnsureReplicaIsPrimaryThrowsPrimaryReplicaMissIfNodeIdDoesNotMatchTheLeaseholder()
 {
         localLeader = false;
 
-        ((TestPlacementDriver) placementDriver).setPrimaryReplicaSupplier(() 
-> new TestReplicaMetaImpl("node3", nodeId(3)));
+        placementDriver.setPrimaryReplicaSupplier(() -> new 
TestReplicaMetaImpl("node3", nodeId(3)));
 
         CompletableFuture<ReplicaResult> fut = 
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
                 .groupId(tablePartitionIdMessage(grpId))
@@ -3397,6 +3400,59 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         verify(lowWatermark).unlock(any());
     }
 
+    @ParameterizedTest
+    @ValueSource(classes = {PrimaryReplicaRequest.class, 
TxStateCommitPartitionRequest.class})
+    void primaryReplicaRequestsAreRejectedWhenPrimaryIsNotKnown(Class<? 
extends PrimaryReplicaRequest> requestClass) {
+        doReturn(null).when(placementDriver).getCurrentPrimaryReplica(any(), 
any());
+        
doReturn(nullCompletedFuture()).when(placementDriver).getPrimaryReplica(any(), 
any());
+
+        PrimaryReplicaRequest request = mock(requestClass);
+
+        assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {PrimaryReplicaRequest.class, 
TxStateCommitPartitionRequest.class})
+    void 
primaryReplicaRequestsAreRejectedWhenPrimaryDoesNotMatchLeaseStartTime(Class<? 
extends PrimaryReplicaRequest> requestClass) {
+        long leaseStartTime = clock.nowLong();
+        placementDriver.setPrimaryReplicaSupplier(
+                () -> new TestReplicaMetaImpl(localNode, 
hybridTimestamp(leaseStartTime), HybridTimestamp.MAX_VALUE)
+        );
+
+        PrimaryReplicaRequest request = mock(requestClass);
+        when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime - 
1000);
+
+        assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {PrimaryReplicaRequest.class, 
TxStateCommitPartitionRequest.class})
+    void primaryReplicaRequestsAreRejectedWhenLeaseIsExpired(Class<? extends 
PrimaryReplicaRequest> requestClass) {
+        long leaseStartTime = clock.nowLong();
+        placementDriver.setPrimaryReplicaSupplier(
+                () -> new TestReplicaMetaImpl(localNode, 
hybridTimestamp(leaseStartTime), HybridTimestamp.MIN_VALUE)
+        );
+
+        PrimaryReplicaRequest request = mock(requestClass);
+        when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime);
+
+        assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {PrimaryReplicaRequest.class, 
TxStateCommitPartitionRequest.class})
+    void primaryReplicaRequestsAreRejectedWhenLeaseholderIsDifferent(Class<? 
extends PrimaryReplicaRequest> requestClass) {
+        long leaseStartTime = clock.nowLong();
+        placementDriver.setPrimaryReplicaSupplier(
+                () -> new TestReplicaMetaImpl(anotherNode, 
hybridTimestamp(leaseStartTime), HybridTimestamp.MAX_VALUE)
+        );
+
+        PrimaryReplicaRequest request = mock(requestClass);
+        when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime);
+
+        assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
+    }
+
     private static class RequestContext {
         private final TablePartitionId groupId;
         private final UUID txId;


Reply via email to