This is an automated email from the ASF dual-hosted git repository. rpuch pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 9f674fd5e05 IGNITE-24380 Move ensureReplicaIsPrimary logic to ZonePartitionReplicaListener (#5352) 9f674fd5e05 is described below commit 9f674fd5e05df14b79c1c2cbd003e47836f3e852 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Thu Mar 6 15:56:39 2025 +0400 IGNITE-24380 Move ensureReplicaIsPrimary logic to ZonePartitionReplicaListener (#5352) --- modules/partition-replicator/build.gradle | 1 + .../replicator/ItReplicaLifecycleTest.java | 2 +- .../PartitionReplicaLifecycleManager.java | 9 +- .../partition/replicator/ReplicaPrimacy.java | 81 +++++++ .../partition/replicator/ReplicaPrimacyEngine.java | 172 ++++++++++++++ .../replicator/ReplicaTableProcessor.java | 41 ++++ .../replicator/ZonePartitionReplicaListener.java | 80 +++---- .../partition/replicator/ZoneResourcesManager.java | 2 +- ...xStateCommitPartitionReplicaRequestHandler.java | 77 ++---- .../handlers/WriteIntentSwitchRequestHandler.java | 18 +- .../ZonePartitionReplicaListenerTest.java | 172 ++++++++++++++ .../internal/table/distributed/TableManager.java | 3 +- .../replicator/PartitionReplicaListener.java | 257 ++++++++------------- .../replication/PartitionReplicaListenerTest.java | 62 ++++- 14 files changed, 688 insertions(+), 289 deletions(-) diff --git a/modules/partition-replicator/build.gradle b/modules/partition-replicator/build.gradle index e415115d53e..2cbb147f1d2 100644 --- a/modules/partition-replicator/build.gradle +++ b/modules/partition-replicator/build.gradle @@ -55,6 +55,7 @@ dependencies { testImplementation testFixtures(project(':ignite-storage-api')) testImplementation testFixtures(project(':ignite-table')) testImplementation testFixtures(project(':ignite-transactions')) + testImplementation testFixtures(project(':ignite-placement-driver-api')) testImplementation libs.hamcrest.core testImplementation libs.mockito.core testImplementation libs.mockito.junit diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java index d9add982082..7e746f8258f 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java @@ -728,7 +728,7 @@ public class ItReplicaLifecycleTest extends ItAbstractColocationTest { Replica replica = replicaFut.get(1, SECONDS); - return replica != null && (((ZonePartitionReplicaListener) replica.listener()).tableReplicaListeners().size() == count); + return replica != null && (((ZonePartitionReplicaListener) replica.listener()).tableReplicaProcessors().size() == count); } catch (ExecutionException | InterruptedException | TimeoutException e) { throw new RuntimeException(e); } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java index bb3b8d33eae..c6f79e98c42 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java @@ -134,7 +134,6 @@ import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.replicator.ReplicaManager.WeakReplicaStopReason; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.ZonePartitionId; -import org.apache.ignite.internal.replicator.listener.ReplicaListener; import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.schema.SchemaSyncService; import org.apache.ignite.internal.tx.TxManager; @@ -1429,13 +1428,13 @@ public class PartitionReplicaLifecycleManager extends * * @param zonePartitionId Zone partition id. * @param tableId Table id. - * @param tablePartitionReplicaListenerFactory Factory for creating table-specific partition replicas. + * @param tablePartitionReplicaProcessorFactory Factory for creating table-specific partition replicas. * @param raftTableProcessor Raft table processor for the table-specific partition. */ public void loadTableListenerToZoneReplica( ZonePartitionId zonePartitionId, int tableId, - Function<RaftCommandRunner, ReplicaListener> tablePartitionReplicaListenerFactory, + Function<RaftCommandRunner, ReplicaTableProcessor> tablePartitionReplicaProcessorFactory, RaftTableProcessor raftTableProcessor, PartitionMvStorageAccess partitionMvStorageAccess ) { @@ -1445,9 +1444,9 @@ public class PartitionReplicaLifecycleManager extends // so the listeners will be registered by the thread completing the "replicaListenerFuture". On normal operation (where there is // a HB relationship between zone and table creation) zone-wide replica must already be started, this future will always be // completed and the listeners will be registered immediately. - resources.replicaListenerFuture().thenAccept(zoneReplicaListener -> zoneReplicaListener.addTableReplicaListener( + resources.replicaListenerFuture().thenAccept(zoneReplicaListener -> zoneReplicaListener.addTableReplicaProcessor( tableId, - tablePartitionReplicaListenerFactory + tablePartitionReplicaProcessorFactory )); resources.raftListener().addTableProcessor(tableId, raftTableProcessor); diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java new file mode 100644 index 00000000000..83349dd3967 --- /dev/null +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.Objects.requireNonNull; + +import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest; +import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest; +import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest; +import org.jetbrains.annotations.Nullable; + +/** + * Represents replica primacy info. Contains the following information: + * + * <ul> + * <li>{@code leaseStartTime} - the moment when the replica became primary (only filled for {@link PrimaryReplicaRequest}s)</li> + * <li>{@code isPrimary} - whether this node currently hosts the primary (only filled for {@link ReadOnlyReplicaRequest}s + * and {@link ReplicaSafeTimeSyncRequest}s)</li> + * </ul> + */ +public class ReplicaPrimacy { + private static final ReplicaPrimacy EMPTY = new ReplicaPrimacy(null, null); + + private final @Nullable Long leaseStartTime; + private final @Nullable Boolean isPrimary; + + private ReplicaPrimacy(@Nullable Long leaseStartTime, @Nullable Boolean isPrimary) { + this.leaseStartTime = leaseStartTime; + this.isPrimary = isPrimary; + } + + /** + * Creates an instance representing no primacy information. + */ + public static ReplicaPrimacy empty() { + return EMPTY; + } + + /** + * Creates an instance representing information about the primary replica held by this node. + */ + static ReplicaPrimacy forPrimaryReplicaRequest(long leaseStartTime) { + return new ReplicaPrimacy(leaseStartTime, null); + } + + /** + * Creates an instance representing information about whether this node currently holds the primary. + */ + static ReplicaPrimacy forIsPrimary(boolean isPrimary) { + return new ReplicaPrimacy(null, isPrimary); + } + + /** + * Returns lease start time; throws exception if not present. + */ + public long leaseStartTime() { + return requireNonNull(leaseStartTime); + } + + /** + * Whether this node currently hosts the primary; throws if this information is absent. + */ + public boolean isPrimary() { + return requireNonNull(isPrimary); + } +} diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java new file mode 100644 index 00000000000..603c35feb0e --- /dev/null +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest; +import org.apache.ignite.internal.placementdriver.LeasePlacementDriver; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException; +import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest; +import org.apache.ignite.internal.replicator.message.ReplicaRequest; +import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest; +import org.apache.ignite.network.ClusterNode; + +/** + * Logic related to replica primacy checks. + */ +public class ReplicaPrimacyEngine { + private final LeasePlacementDriver placementDriver; + private final ClockService clockService; + private final ReplicationGroupId replicationGroupId; + private final ClusterNode localNode; + + /** Constructor. */ + public ReplicaPrimacyEngine( + LeasePlacementDriver placementDriver, + ClockService clockService, + ReplicationGroupId replicationGroupId, + ClusterNode localNode + ) { + this.placementDriver = placementDriver; + this.clockService = clockService; + this.replicationGroupId = replicationGroupId; + this.localNode = localNode; + } + + /** + * Validates replica primacy. + * + * <ul> + * <li>For {@link PrimaryReplicaRequest}s, ensures that the primary replica is known, that it is hosted on this node, + * that it has not expired and the request was sent while the same replica was primary. If anything of the above is violated, the + * future is completed with a {@link PrimaryReplicaMissException}.</li> + * <li>For {@link ReadOnlyReplicaRequest}s and {@link ReplicaSafeTimeSyncRequest}s, detects whether this node is/was a primary + * at the corresponding timestamp.</li> + * </ul> + * + * @param request Replica request. + */ + public CompletableFuture<ReplicaPrimacy> validatePrimacy(ReplicaRequest request) { + HybridTimestamp now = clockService.current(); + + if (request instanceof PrimaryReplicaRequest) { + PrimaryReplicaRequest primaryReplicaRequest = (PrimaryReplicaRequest) request; + return ensureReplicaIsPrimary(primaryReplicaRequest, now); + } else if (request instanceof ReadOnlyReplicaRequest) { + return isLocalNodePrimaryReplicaAt(now); + } else if (request instanceof ReplicaSafeTimeSyncRequest) { + return isLocalNodePrimaryReplicaAt(now); + } else { + return completedFuture(ReplicaPrimacy.empty()); + } + } + + private CompletableFuture<ReplicaPrimacy> ensureReplicaIsPrimary( + PrimaryReplicaRequest primaryReplicaRequest, + HybridTimestamp now + ) { + Long enlistmentConsistencyToken = primaryReplicaRequest.enlistmentConsistencyToken(); + + Function<ReplicaMeta, ReplicaPrimacy> validateClo = primaryReplicaMeta -> validateReplicaPrimacy( + now, + primaryReplicaMeta, + enlistmentConsistencyToken + ); + + ReplicaMeta meta = placementDriver.getCurrentPrimaryReplica(replicationGroupId, now); + + if (meta != null) { + try { + return completedFuture(validateClo.apply(meta)); + } catch (Exception e) { + return failedFuture(e); + } + } + + return placementDriver.getPrimaryReplica(replicationGroupId, now).thenApply(validateClo); + } + + private ReplicaPrimacy validateReplicaPrimacy(HybridTimestamp now, ReplicaMeta primaryReplicaMeta, Long enlistmentConsistencyToken) { + if (primaryReplicaMeta == null) { + throw new PrimaryReplicaMissException( + localNode.name(), + null, + localNode.id(), + null, + enlistmentConsistencyToken, + null, + null + ); + } + + long currentEnlistmentConsistencyToken = primaryReplicaMeta.getStartTime().longValue(); + + if (enlistmentConsistencyToken != currentEnlistmentConsistencyToken + || clockService.before(primaryReplicaMeta.getExpirationTime(), now) + || !isLocalPeer(primaryReplicaMeta.getLeaseholderId()) + ) { + throw new PrimaryReplicaMissException( + localNode.name(), + primaryReplicaMeta.getLeaseholder(), + localNode.id(), + primaryReplicaMeta.getLeaseholderId(), + enlistmentConsistencyToken, + currentEnlistmentConsistencyToken, + null); + } + + return ReplicaPrimacy.forPrimaryReplicaRequest(primaryReplicaMeta.getStartTime().longValue()); + } + + private CompletableFuture<ReplicaPrimacy> isLocalNodePrimaryReplicaAt(HybridTimestamp timestamp) { + return placementDriver.getPrimaryReplica(replicationGroupId, timestamp) + .thenApply(primaryReplica -> ReplicaPrimacy.forIsPrimary( + // TODO: https://issues.apache.org/jira/browse/IGNITE-24714 - should we also check lease expiration? + primaryReplica != null && isLocalPeer(primaryReplica.getLeaseholderId()) + )); + } + + private boolean isLocalPeer(UUID nodeId) { + return localNode.id().equals(nodeId); + } + + /** + * Checks whether the token still matches the current primary replica. + * + * @param suspectedEnlistmentConsistencyToken Enlistment consistency token that is going to be checked. + */ + public boolean tokenStillMatchesPrimary(long suspectedEnlistmentConsistencyToken) { + HybridTimestamp currentTime = clockService.current(); + + ReplicaMeta meta = placementDriver.getCurrentPrimaryReplica(replicationGroupId, currentTime); + + return meta != null + && isLocalPeer(meta.getLeaseholderId()) + && clockService.before(currentTime, meta.getExpirationTime()) + && suspectedEnlistmentConsistencyToken == meta.getStartTime().longValue(); + } +} diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java new file mode 100644 index 00000000000..3d9f8044010 --- /dev/null +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.replicator.ReplicaResult; +import org.apache.ignite.internal.replicator.message.ReplicaRequest; + +/** + * Processor of replica requests targeted at a particular table. + */ +public interface ReplicaTableProcessor { + /** + * Processes replica request. + * + * @param request Replica request. + * @param replicaPrimacy Replica primacy info. + * @param senderId ID of the node that sent the request. + * @return Future completed with the result of processing. + */ + CompletableFuture<ReplicaResult> process(ReplicaRequest request, ReplicaPrimacy replicaPrimacy, UUID senderId); + + /** Callback on replica shutdown. */ + void onShutdown(); +} diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java index c04ebf70910..b598cdb5f20 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java @@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.hlc.ClockService; -import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.network.ClusterNodeResolver; @@ -36,7 +35,6 @@ import org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaR import org.apache.ignite.internal.partition.replicator.handlers.TxStateCommitPartitionReplicaRequestHandler; import org.apache.ignite.internal.partition.replicator.handlers.VacuumTxStateReplicaRequestHandler; import org.apache.ignite.internal.partition.replicator.handlers.WriteIntentSwitchRequestHandler; -import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest; import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource; import org.apache.ignite.internal.placementdriver.LeasePlacementDriver; @@ -44,7 +42,6 @@ import org.apache.ignite.internal.raft.service.RaftCommandRunner; import org.apache.ignite.internal.replicator.ReplicaResult; import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.replicator.listener.ReplicaListener; -import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest; import org.apache.ignite.internal.replicator.message.TableAware; @@ -57,7 +54,6 @@ import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest; import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest; import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; import org.apache.ignite.network.ClusterNode; -import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.VisibleForTesting; /** @@ -68,15 +64,17 @@ public class ZonePartitionReplicaListener implements ReplicaListener { // TODO: https://issues.apache.org/jira/browse/IGNITE-22624 await for the table replica listener if needed. // tableId -> tableProcessor. - private final Map<Integer, ReplicaListener> replicas = new ConcurrentHashMap<>(); + private final Map<Integer, ReplicaTableProcessor> replicas = new ConcurrentHashMap<>(); /** Raft client. */ private final RaftCommandRunner raftClient; - private final ReplicationRaftCommandApplicator raftCommandApplicator; - private final ZonePartitionId replicationGroupId; + private final ReplicaPrimacyEngine replicaPrimacyEngine; + + private final ReplicationRaftCommandApplicator raftCommandApplicator; + // Replica request handlers. private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler; private final WriteIntentSwitchRequestHandler writeIntentSwitchRequestHandler; @@ -107,10 +105,17 @@ public class ZonePartitionReplicaListener implements ReplicaListener { ) { this.raftClient = raftClient; - this.raftCommandApplicator = new ReplicationRaftCommandApplicator(raftClient, replicationGroupId); - this.replicationGroupId = replicationGroupId; + this.replicaPrimacyEngine = new ReplicaPrimacyEngine( + placementDriver, + clockService, + replicationGroupId, + localNode + ); + + this.raftCommandApplicator = new ReplicationRaftCommandApplicator(raftClient, replicationGroupId); + // Request handlers initialization. txFinishReplicaRequestHandler = new TxFinishReplicaRequestHandler( @@ -142,11 +147,8 @@ public class ZonePartitionReplicaListener implements ReplicaListener { txStateCommitPartitionReplicaRequestHandler = new TxStateCommitPartitionReplicaRequestHandler( txStatePartitionStorage, - placementDriver, txManager, - clockService, clusterNodeResolver, - replicationGroupId, localNode, new TxRecoveryEngine( txManager, @@ -168,8 +170,8 @@ public class ZonePartitionReplicaListener implements ReplicaListener { @Override public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) { - return ensureReplicaIsPrimary(request) - .thenCompose(res -> processRequest(request, res.get1(), senderId, res.get2())) + return replicaPrimacyEngine.validatePrimacy(request) + .thenCompose(replicaPrimacy -> processRequest(request, replicaPrimacy, senderId)) .thenApply(res -> { if (res instanceof ReplicaResult) { return (ReplicaResult) res; @@ -181,13 +183,12 @@ public class ZonePartitionReplicaListener implements ReplicaListener { private CompletableFuture<?> processRequest( ReplicaRequest request, - @Nullable Boolean isPrimary, - UUID senderId, - @Nullable Long leaseStartTime + ReplicaPrimacy replicaPrimacy, + UUID senderId ) { if (request instanceof TableAware) { // This type of request propagates to the table processor directly. - return processTableAwareRequest(request, senderId); + return processTableAwareRequest(request, replicaPrimacy, senderId); } if (request instanceof TxFinishReplicaRequest) { @@ -199,51 +200,40 @@ public class ZonePartitionReplicaListener implements ReplicaListener { return txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest) request); } - return processZoneReplicaRequest(request, isPrimary, senderId, leaseStartTime); - } - - /** - * Ensure that the primary replica was not changed. - * - * @param request Replica request. - * @return Future with {@link IgniteBiTuple} containing {@code boolean} (whether the replica is primary) and the start time of current - * lease. The boolean is not {@code null} only for {@link ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The - * lease start time is not {@code null} in case of {@link PrimaryReplicaRequest}. - */ - private CompletableFuture<IgniteBiTuple<Boolean, Long>> ensureReplicaIsPrimary(ReplicaRequest request) { - // TODO https://issues.apache.org/jira/browse/IGNITE-24380 - // Move PartitionReplicaListener#ensureReplicaIsPrimary to ZonePartitionReplicaListener. - return completedFuture(new IgniteBiTuple<>(null, null)); + return processZoneReplicaRequest(request, replicaPrimacy, senderId); } /** * Processes {@link TableAware} request. * * @param request Request to be processed. + * @param replicaPrimacy Replica primacy information. * @param senderId Node sender id. * @return Future with the result of the request. */ - private CompletableFuture<ReplicaResult> processTableAwareRequest(ReplicaRequest request, UUID senderId) { + private CompletableFuture<ReplicaResult> processTableAwareRequest( + ReplicaRequest request, + ReplicaPrimacy replicaPrimacy, + UUID senderId + ) { assert request instanceof TableAware : "Request should be TableAware [request=" + request.getClass().getSimpleName() + ']'; return replicas.get(((TableAware) request).tableId()) - .invoke(request, senderId); + .process(request, replicaPrimacy, senderId); } /** * Processes zone replica request. * * @param request Request to be processed. - * @param isPrimary {@code true} if the current node is the primary for the partition, {@code false} otherwise. + * @param replicaPrimacy Replica primacy information. * @param senderId Node sender id. - * @param leaseStartTime Lease start time. * @return Future with the result of the processing. */ private CompletableFuture<?> processZoneReplicaRequest( ReplicaRequest request, - @Nullable Boolean isPrimary, - UUID senderId, - @Nullable Long leaseStartTime + ReplicaPrimacy replicaPrimacy, + UUID senderId ) { // TODO https://issues.apache.org/jira/browse/IGNITE-24526 // Need to move the necessary part of PartitionReplicaListener#processRequest request processing here @@ -266,21 +256,21 @@ public class ZonePartitionReplicaListener implements ReplicaListener { } /** - * Add table partition listener to the current zone replica listener. + * Add table partition replica processor to the current zone replica listener. * * @param tableId Table id. * @param replicaListener Table replica listener. */ - public void addTableReplicaListener(int tableId, Function<RaftCommandRunner, ReplicaListener> replicaListener) { + public void addTableReplicaProcessor(int tableId, Function<RaftCommandRunner, ReplicaTableProcessor> replicaListener) { replicas.put(tableId, replicaListener.apply(raftClient)); } /** - * Removes table partition listener by table replication identifier from the current zone replica listener. + * Removes table partition replica processor by table replication identifier from the current zone replica listener. * * @param tableId Table's identifier. */ - public void removeTableReplicaListener(int tableId) { + public void removeTableReplicaProcessor(int tableId) { replicas.remove(tableId); } @@ -290,7 +280,7 @@ public class ZonePartitionReplicaListener implements ReplicaListener { * @return Table replicas listeners. */ @VisibleForTesting - public Map<Integer, ReplicaListener> tableReplicaListeners() { + public Map<Integer, ReplicaTableProcessor> tableReplicaProcessors() { return replicas; } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java index 336f93f6a7c..77c32bf82b4 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java @@ -175,7 +175,7 @@ class ZoneResourcesManager implements ManuallyCloseable { ZonePartitionResources resources = getZonePartitionResources(zonePartitionId); resources.replicaListenerFuture() - .thenAccept(zoneReplicaListener -> zoneReplicaListener.removeTableReplicaListener(tableId)); + .thenAccept(zoneReplicaListener -> zoneReplicaListener.removeTableReplicaProcessor(tableId)); resources.raftListener().removeTableProcessor(tableId); diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java index 9ea80c736c2..f4178d49164 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.partition.replicator.handlers; import static java.util.concurrent.CompletableFuture.completedFuture; -import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.tx.TxState.ABANDONED; import static org.apache.ignite.internal.tx.TxState.FINISHING; import static org.apache.ignite.internal.tx.TxState.PENDING; @@ -26,12 +25,8 @@ import static org.apache.ignite.internal.tx.TxState.isFinalState; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.network.ClusterNodeResolver; import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine; -import org.apache.ignite.internal.placementdriver.LeasePlacementDriver; -import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException; import org.apache.ignite.internal.tx.TransactionMeta; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.TxMeta; @@ -47,12 +42,9 @@ import org.jetbrains.annotations.Nullable; */ public class TxStateCommitPartitionReplicaRequestHandler { private final TxStatePartitionStorage txStatePartitionStorage; - private final LeasePlacementDriver placementDriver; private final TxManager txManager; - private final ClockService clockService; private final ClusterNodeResolver clusterNodeResolver; - private final ReplicationGroupId replicationGroupId; private final ClusterNode localNode; private final TxRecoveryEngine txRecoveryEngine; @@ -60,20 +52,14 @@ public class TxStateCommitPartitionReplicaRequestHandler { /** Constructor. */ public TxStateCommitPartitionReplicaRequestHandler( TxStatePartitionStorage txStatePartitionStorage, - LeasePlacementDriver placementDriver, TxManager txManager, - ClockService clockService, ClusterNodeResolver clusterNodeResolver, - ReplicationGroupId replicationGroupId, ClusterNode localNode, TxRecoveryEngine txRecoveryEngine ) { this.txStatePartitionStorage = txStatePartitionStorage; - this.placementDriver = placementDriver; this.txManager = txManager; - this.clockService = clockService; this.clusterNodeResolver = clusterNodeResolver; - this.replicationGroupId = replicationGroupId; this.localNode = localNode; this.txRecoveryEngine = txRecoveryEngine; } @@ -85,56 +71,21 @@ public class TxStateCommitPartitionReplicaRequestHandler { * @return Result future. */ public CompletableFuture<TransactionMeta> handle(TxStateCommitPartitionRequest request) { - return placementDriver.getPrimaryReplica(replicationGroupId, clockService.now()) - .thenCompose(replicaMeta -> { - if (replicaMeta == null || replicaMeta.getLeaseholder() == null) { - return failedFuture( - new PrimaryReplicaMissException( - localNode.name(), - null, - localNode.id(), - null, - null, - null, - null - ) - ); - } - - if (!isLocalPeer(replicaMeta.getLeaseholderId())) { - return failedFuture( - new PrimaryReplicaMissException( - localNode.name(), - replicaMeta.getLeaseholder(), - localNode.id(), - replicaMeta.getLeaseholderId(), - null, - null, - null - ) - ); - } - - UUID txId = request.txId(); - - TxStateMeta txMeta = txManager.stateMeta(txId); - - if (txMeta != null && txMeta.txState() == FINISHING) { - assert txMeta instanceof TxStateMetaFinishing : txMeta; - - return ((TxStateMetaFinishing) txMeta).txFinishFuture(); - } else if (txMeta == null || !isFinalState(txMeta.txState())) { - // Try to trigger recovery, if needed. If the transaction will be aborted, the proper ABORTED state will be sent - // in response. - return triggerTxRecoveryOnTxStateResolutionIfNeeded(txId, txMeta); - } else { - return completedFuture(txMeta); - } - }); - } + UUID txId = request.txId(); + + TxStateMeta txMeta = txManager.stateMeta(txId); + + if (txMeta != null && txMeta.txState() == FINISHING) { + assert txMeta instanceof TxStateMetaFinishing : txMeta; - private boolean isLocalPeer(UUID nodeId) { - return localNode.id().equals(nodeId); + return ((TxStateMetaFinishing) txMeta).txFinishFuture(); + } else if (txMeta == null || !isFinalState(txMeta.txState())) { + // Try to trigger recovery, if needed. If the transaction will be aborted, the proper ABORTED state will be sent + // in response. + return triggerTxRecoveryOnTxStateResolutionIfNeeded(txId, txMeta); + } else { + return completedFuture(txMeta); + } } /** diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java index e649d88ab49..477fcaec2c9 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java @@ -35,6 +35,8 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.partition.replicator.FuturesCleanupResult; import org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions; +import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy; +import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor; import org.apache.ignite.internal.partition.replicator.ReplicaTxFinishMarker; import org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; @@ -43,7 +45,6 @@ import org.apache.ignite.internal.raft.service.RaftCommandRunner; import org.apache.ignite.internal.replicator.CommandApplicationResult; import org.apache.ignite.internal.replicator.ReplicaResult; import org.apache.ignite.internal.replicator.ZonePartitionId; -import org.apache.ignite.internal.replicator.listener.ReplicaListener; import org.apache.ignite.internal.replicator.message.ReplicaMessageUtils; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; import org.apache.ignite.internal.schema.SchemaSyncService; @@ -68,7 +69,7 @@ public class WriteIntentSwitchRequestHandler { private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); - private final IntFunction<ReplicaListener> replicaListenerByTableId; + private final IntFunction<ReplicaTableProcessor> replicaListenerByTableId; private final ClockService clockService; @@ -80,7 +81,7 @@ public class WriteIntentSwitchRequestHandler { /** Constructor. */ public WriteIntentSwitchRequestHandler( - IntFunction<ReplicaListener> replicaListenerByTableId, + IntFunction<ReplicaTableProcessor> replicaListenerByTableId, ClockService clockService, SchemaSyncService schemaSyncService, CatalogService catalogService, @@ -159,7 +160,8 @@ public class WriteIntentSwitchRequestHandler { .tableId(tableId) .build(); - return replicaListener(tableId).invoke(tableSpecificRequest, senderId); + // Using empty primacy because the request is not a PrimaryReplicaRequest. + return replicaTableProcessor(tableId).process(tableSpecificRequest, ReplicaPrimacy.empty(), senderId); } private CompletableFuture<Object> applyCommandToGroup(WriteIntentSwitchReplicaRequest request, Integer catalogVersion) { @@ -185,11 +187,11 @@ public class WriteIntentSwitchRequestHandler { return new WriteIntentSwitchReplicatedInfo(request.txId(), replicationGroupId); } - private ReplicaListener replicaListener(Integer tableId) { - ReplicaListener replicaListener = replicaListenerByTableId.apply(tableId); + private ReplicaTableProcessor replicaTableProcessor(int tableId) { + ReplicaTableProcessor replicaTableProcessor = replicaListenerByTableId.apply(tableId); - assert replicaListener != null : "No replica listener for table ID " + tableId; + assert replicaTableProcessor != null : "No replica table processor for table ID " + tableId; - return replicaListener; + return replicaTableProcessor; } } diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java new file mode 100644 index 00000000000..f94cff87cf3 --- /dev/null +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.UUID; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.hlc.TestClockService; +import org.apache.ignite.internal.network.ClusterNodeImpl; +import org.apache.ignite.internal.network.ClusterNodeResolver; +import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource; +import org.apache.ignite.internal.placementdriver.TestPlacementDriver; +import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl; +import org.apache.ignite.internal.raft.service.RaftCommandRunner; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException; +import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest; +import org.apache.ignite.internal.schema.SchemaSyncService; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest; +import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; +import org.apache.ignite.internal.tx.storage.state.test.TestTxStatePartitionStorage; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.NetworkAddress; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ZonePartitionReplicaListenerTest extends BaseIgniteAbstractTest { + private final ClusterNode localNode = new ClusterNodeImpl(nodeId(1), "node1", NetworkAddress.from("127.0.0.1:127")); + + private final ClusterNode anotherNode = new ClusterNodeImpl(nodeId(2), "node2", NetworkAddress.from("127.0.0.2:127")); + + @Spy + private final TxStatePartitionStorage txStateStorage = new TestTxStatePartitionStorage(); + + private final HybridClock clock = new HybridClockImpl(); + + private final ClockService clockService = new TestClockService(clock); + + @Mock + private TxManager txManager; + + @Mock + private ValidationSchemasSource validationSchemasSource; + + @Mock + private SchemaSyncService schemaSyncService; + + @Mock + private CatalogService catalogService; + + @Spy + private final TestPlacementDriver placementDriver = new TestPlacementDriver(localNode); + + @Mock + private ClusterNodeResolver clusterNodeResolver; + + @Mock + private RaftCommandRunner raftClient; + + private final ZonePartitionId groupId = new ZonePartitionId(1, 2); + + private ZonePartitionReplicaListener partitionReplicaListener; + + private static UUID nodeId(int id) { + return new UUID(0, id); + } + + @BeforeEach + void setUp() { + partitionReplicaListener = new ZonePartitionReplicaListener( + txStateStorage, + clockService, + txManager, + validationSchemasSource, + schemaSyncService, + catalogService, + placementDriver, + clusterNodeResolver, + raftClient, + localNode, + groupId + ); + } + + @ParameterizedTest + @ValueSource(classes = {PrimaryReplicaRequest.class, TxStateCommitPartitionRequest.class}) + void primaryReplicaRequestsAreRejectedWhenPrimaryIsNotKnown(Class<? extends PrimaryReplicaRequest> requestClass) { + doReturn(null).when(placementDriver).getCurrentPrimaryReplica(any(), any()); + doReturn(nullCompletedFuture()).when(placementDriver).getPrimaryReplica(any(), any()); + + PrimaryReplicaRequest request = mock(requestClass); + + assertThat(partitionReplicaListener.invoke(request, localNode.id()), willThrow(PrimaryReplicaMissException.class)); + } + + @ParameterizedTest + @ValueSource(classes = {PrimaryReplicaRequest.class, TxStateCommitPartitionRequest.class}) + void primaryReplicaRequestsAreRejectedWhenPrimaryDoesNotMatchLeaseStartTime(Class<? extends PrimaryReplicaRequest> requestClass) { + long leaseStartTime = clock.nowLong(); + placementDriver.setPrimaryReplicaSupplier( + () -> new TestReplicaMetaImpl(localNode, hybridTimestamp(leaseStartTime), HybridTimestamp.MAX_VALUE) + ); + + PrimaryReplicaRequest request = mock(requestClass); + when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime - 1000); + + assertThat(partitionReplicaListener.invoke(request, localNode.id()), willThrow(PrimaryReplicaMissException.class)); + } + + @ParameterizedTest + @ValueSource(classes = {PrimaryReplicaRequest.class, TxStateCommitPartitionRequest.class}) + void primaryReplicaRequestsAreRejectedWhenLeaseIsExpired(Class<? extends PrimaryReplicaRequest> requestClass) { + long leaseStartTime = clock.nowLong(); + placementDriver.setPrimaryReplicaSupplier( + () -> new TestReplicaMetaImpl(localNode, hybridTimestamp(leaseStartTime), HybridTimestamp.MIN_VALUE) + ); + + PrimaryReplicaRequest request = mock(requestClass); + when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime); + + assertThat(partitionReplicaListener.invoke(request, localNode.id()), willThrow(PrimaryReplicaMissException.class)); + } + + @ParameterizedTest + @ValueSource(classes = {PrimaryReplicaRequest.class, TxStateCommitPartitionRequest.class}) + void primaryReplicaRequestsAreRejectedWhenLeaseholderIsDifferent(Class<? extends PrimaryReplicaRequest> requestClass) { + long leaseStartTime = clock.nowLong(); + placementDriver.setPrimaryReplicaSupplier( + () -> new TestReplicaMetaImpl(anotherNode, hybridTimestamp(leaseStartTime), HybridTimestamp.MAX_VALUE) + ); + + PrimaryReplicaRequest request = mock(requestClass); + when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime); + + assertThat(partitionReplicaListener.invoke(request, localNode.id()), willThrow(PrimaryReplicaMissException.class)); + } +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 586d72a5e07..3bd881e36cd 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -150,6 +150,7 @@ import org.apache.ignite.internal.network.serialization.MessageSerializationRegi import org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent; import org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEventParameters; import org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager; +import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; import org.apache.ignite.internal.partition.replicator.network.replication.ChangePeersAndLearnersAsyncReplicaRequest; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage; @@ -912,7 +913,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { minTimeCollectorService.addPartition(new TablePartitionId(tableId, partId)); - Function<RaftCommandRunner, ReplicaListener> createListener = raftClient -> createReplicaListener( + Function<RaftCommandRunner, ReplicaTableProcessor> createListener = raftClient -> createReplicaListener( zonePartitionId, table, safeTimeTracker, diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 4ae5a0e4d0e..455ca521444 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.table.distributed.replicator; -import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; @@ -82,7 +81,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.ignite.internal.binarytuple.BinaryTupleCommon; @@ -106,6 +104,9 @@ import org.apache.ignite.internal.lowwatermark.LowWatermark; import org.apache.ignite.internal.network.ClusterNodeResolver; import org.apache.ignite.internal.partition.replicator.FuturesCleanupResult; import org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions; +import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy; +import org.apache.ignite.internal.partition.replicator.ReplicaPrimacyEngine; +import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor; import org.apache.ignite.internal.partition.replicator.ReplicaTxFinishMarker; import org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator; import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine; @@ -146,8 +147,7 @@ import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasS import org.apache.ignite.internal.partition.replicator.schemacompat.IncompatibleSchemaVersionException; import org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator; import org.apache.ignite.internal.partitiondistribution.Assignments; -import org.apache.ignite.internal.placementdriver.PlacementDriver; -import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.placementdriver.LeasePlacementDriver; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner; import org.apache.ignite.internal.raft.PeersAndLearners; @@ -164,7 +164,6 @@ import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissExcepti import org.apache.ignite.internal.replicator.exception.ReplicationException; import org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException; import org.apache.ignite.internal.replicator.listener.ReplicaListener; -import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest; import org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; import org.apache.ignite.internal.replicator.message.ReplicaRequest; @@ -237,7 +236,7 @@ import org.apache.ignite.tx.TransactionException; import org.jetbrains.annotations.Nullable; /** Partition replication listener. */ -public class PartitionReplicaListener implements ReplicaListener { +public class PartitionReplicaListener implements ReplicaListener, ReplicaTableProcessor { /** * NB: this listener makes writes to the underlying MV partition storage without taking the partition snapshots read lock. This causes * the RAFT snapshots transferred to a follower being slightly inconsistent for a limited amount of time. @@ -329,9 +328,6 @@ public class PartitionReplicaListener implements ReplicaListener { private final SchemaCompatibilityValidator schemaCompatValidator; - /** Instance of the local node. */ - private final ClusterNode localNode; - private final SchemaSyncService schemaSyncService; private final CatalogService catalogService; @@ -342,9 +338,6 @@ public class PartitionReplicaListener implements ReplicaListener { /** Prevents double stopping. */ private final AtomicBoolean stopGuard = new AtomicBoolean(); - /** Placement driver. */ - private final PlacementDriver placementDriver; - /** Read-write transaction operation tracker for building indexes. */ private final IndexBuilderTxRwOperationTracker txRwOperationTracker = new IndexBuilderTxRwOperationTracker(); @@ -359,6 +352,7 @@ public class PartitionReplicaListener implements ReplicaListener { private static final boolean SKIP_UPDATES = getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK); + private final ReplicaPrimacyEngine replicaPrimacyEngine; private final ReliableCatalogVersions reliableCatalogVersions; private final ReplicationRaftCommandApplicator raftCommandApplicator; private final ReplicaTxFinishMarker replicaTxFinishMarker; @@ -415,7 +409,7 @@ public class PartitionReplicaListener implements ReplicaListener { ClusterNode localNode, SchemaSyncService schemaSyncService, CatalogService catalogService, - PlacementDriver placementDriver, + LeasePlacementDriver placementDriver, ClusterNodeResolver clusterNodeResolver, RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry, SchemaRegistry schemaRegistry, @@ -435,10 +429,8 @@ public class PartitionReplicaListener implements ReplicaListener { this.txStatePartitionStorage = txStatePartitionStorage; this.transactionStateResolver = transactionStateResolver; this.storageUpdateHandler = storageUpdateHandler; - this.localNode = localNode; this.schemaSyncService = schemaSyncService; this.catalogService = catalogService; - this.placementDriver = placementDriver; this.remotelyTriggeredResourceRegistry = remotelyTriggeredResourceRegistry; this.schemaRegistry = schemaRegistry; this.indexMetaStorage = indexMetaStorage; @@ -449,6 +441,7 @@ public class PartitionReplicaListener implements ReplicaListener { this.schemaCompatValidator = new SchemaCompatibilityValidator(validationSchemasSource, catalogService, schemaSyncService); + replicaPrimacyEngine = new ReplicaPrimacyEngine(placementDriver, clockService, replicationGroupId, localNode); reliableCatalogVersions = new ReliableCatalogVersions(schemaSyncService, catalogService); raftCommandApplicator = new ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId); replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager); @@ -478,11 +471,8 @@ public class PartitionReplicaListener implements ReplicaListener { txStateCommitPartitionReplicaRequestHandler = new TxStateCommitPartitionReplicaRequestHandler( txStatePartitionStorage, - placementDriver, txManager, - clockService, clusterNodeResolver, - replicationGroupId, localNode, txRecoveryEngine); @@ -548,15 +538,30 @@ public class PartitionReplicaListener implements ReplicaListener { @Override public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) { - return ensureReplicaIsPrimary(request) - .thenCompose(res -> processRequest(request, res.get1(), senderId, res.get2())) - .thenApply(res -> { - if (res instanceof ReplicaResult) { - return (ReplicaResult) res; - } else { - return new ReplicaResult(res, null); - } - }); + return replicaPrimacyEngine.validatePrimacy(request) + .thenCompose(replicaPrimacy -> processRequestInContext(request, replicaPrimacy, senderId)); + } + + @Override + public CompletableFuture<ReplicaResult> process(ReplicaRequest request, ReplicaPrimacy replicaPrimacy, UUID senderId) { + return processRequestInContext(request, replicaPrimacy, senderId); + } + + private CompletableFuture<ReplicaResult> processRequestInContext( + ReplicaRequest request, + ReplicaPrimacy replicaPrimacy, + UUID senderId + ) { + return processRequest(request, replicaPrimacy, senderId) + .thenApply(PartitionReplicaListener::wrapInReplicaResultIfNeeded); + } + + private static ReplicaResult wrapInReplicaResultIfNeeded(Object res) { + if (res instanceof ReplicaResult) { + return (ReplicaResult) res; + } else { + return new ReplicaResult(res, null); + } } /** Returns Raft-client. */ @@ -568,8 +573,7 @@ public class PartitionReplicaListener implements ReplicaListener { return raftCommandRunner; } - private CompletableFuture<?> processRequest(ReplicaRequest request, @Nullable Boolean isPrimary, UUID senderId, - @Nullable Long leaseStartTime) { + private CompletableFuture<?> processRequest(ReplicaRequest request, ReplicaPrimacy replicaPrimacy, UUID senderId) { // TODO https://issues.apache.org/jira/browse/IGNITE-24526 // Need to move the necessary part of request processing to ZonePartitionReplicaListener @@ -623,7 +627,7 @@ public class PartitionReplicaListener implements ReplicaListener { // Don't need to validate schema. if (opTs == null) { assert opTsIfDirectRo == null; - return processOperationRequestWithTxOperationManagementLogic(senderId, request, isPrimary, null, leaseStartTime); + return processOperationRequestWithTxOperationManagementLogic(senderId, request, replicaPrimacy, null); } assert txTs != null && opTs.compareTo(txTs) >= 0 : "Invalid request timestamps"; @@ -644,7 +648,7 @@ public class PartitionReplicaListener implements ReplicaListener { }; return schemaSyncService.waitForMetadataCompleteness(opTs).thenRun(validateClo).thenCompose(ignored -> - processOperationRequestWithTxOperationManagementLogic(senderId, request, isPrimary, opTsIfDirectRo, leaseStartTime)); + processOperationRequestWithTxOperationManagementLogic(senderId, request, replicaPrimacy, opTsIfDirectRo)); } private CompletableFuture<Long> processGetEstimatedSizeRequest() { @@ -706,7 +710,7 @@ public class PartitionReplicaListener implements ReplicaListener { ); }) .thenCompose(leaderWithTerm -> { - if (leaderWithTerm.isEmpty() || !isTokenStillValidPrimary(request.enlistmentConsistencyToken())) { + if (leaderWithTerm.isEmpty() || !replicaPrimacyEngine.tokenStillMatchesPrimary(request.enlistmentConsistencyToken())) { return nullCompletedFuture(); } @@ -723,17 +727,6 @@ public class PartitionReplicaListener implements ReplicaListener { }); } - private boolean isTokenStillValidPrimary(long suspectedEnlistmentConsistencyToken) { - HybridTimestamp currentTime = clockService.current(); - - ReplicaMeta meta = placementDriver.getCurrentPrimaryReplica(replicationGroupId, currentTime); - - return meta != null - && isLocalPeer(meta.getLeaseholderId()) - && clockService.before(currentTime, meta.getExpirationTime()) - && suspectedEnlistmentConsistencyToken == meta.getStartTime().longValue(); - } - private static PeersAndLearners peersConfigurationFromMessage(ChangePeersAndLearnersAsyncReplicaRequest request) { Assignments pendingAssignments = fromBytes(request.pendingAssignments()); @@ -793,49 +786,76 @@ public class PartitionReplicaListener implements ReplicaListener { * * @param senderId Sender id. * @param request Request. - * @param isPrimary Whether the current replica is primary. + * @param replicaPrimacy Replica primacy information. * @param opStartTsIfDirectRo Start timestamp in case of direct RO tx. - * @param lst Lease start time of the current lease that was active at the moment of the start of request processing, in the - * case of {@link PrimaryReplicaRequest}, otherwise should be {@code null}. * @return Future. */ private CompletableFuture<?> processOperationRequest( UUID senderId, ReplicaRequest request, - @Nullable Boolean isPrimary, - @Nullable HybridTimestamp opStartTsIfDirectRo, - @Nullable Long lst + ReplicaPrimacy replicaPrimacy, + @Nullable HybridTimestamp opStartTsIfDirectRo ) { if (request instanceof ReadWriteSingleRowReplicaRequest) { var req = (ReadWriteSingleRowReplicaRequest) request; var opId = new OperationId(senderId, req.timestamp().longValue()); - return appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> processSingleEntryAction(req, lst)); + return appendTxCommand( + req.transactionId(), + opId, + req.requestType(), + req.full(), + () -> processSingleEntryAction(req, replicaPrimacy.leaseStartTime()) + ); } else if (request instanceof ReadWriteSingleRowPkReplicaRequest) { var req = (ReadWriteSingleRowPkReplicaRequest) request; var opId = new OperationId(senderId, req.timestamp().longValue()); - return appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> processSingleEntryAction(req, lst)); + return appendTxCommand( + req.transactionId(), + opId, + req.requestType(), + req.full(), + () -> processSingleEntryAction(req, replicaPrimacy.leaseStartTime()) + ); } else if (request instanceof ReadWriteMultiRowReplicaRequest) { var req = (ReadWriteMultiRowReplicaRequest) request; var opId = new OperationId(senderId, req.timestamp().longValue()); - return appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> processMultiEntryAction(req, lst)); + return appendTxCommand( + req.transactionId(), + opId, + req.requestType(), + req.full(), + () -> processMultiEntryAction(req, replicaPrimacy.leaseStartTime()) + ); } else if (request instanceof ReadWriteMultiRowPkReplicaRequest) { var req = (ReadWriteMultiRowPkReplicaRequest) request; var opId = new OperationId(senderId, req.timestamp().longValue()); - return appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> processMultiEntryAction(req, lst)); + return appendTxCommand( + req.transactionId(), + opId, + req.requestType(), + req.full(), + () -> processMultiEntryAction(req, replicaPrimacy.leaseStartTime()) + ); } else if (request instanceof ReadWriteSwapRowReplicaRequest) { var req = (ReadWriteSwapRowReplicaRequest) request; var opId = new OperationId(senderId, req.timestamp().longValue()); - return appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> processTwoEntriesAction(req, lst)); + return appendTxCommand( + req.transactionId(), + opId, + req.requestType(), + req.full(), + () -> processTwoEntriesAction(req, replicaPrimacy.leaseStartTime()) + ); } else if (request instanceof ReadWriteScanRetrieveBatchReplicaRequest) { var req = (ReadWriteScanRetrieveBatchReplicaRequest) request; @@ -882,13 +902,13 @@ public class PartitionReplicaListener implements ReplicaListener { } else if (request instanceof TableWriteIntentSwitchReplicaRequest) { return processTableWriteIntentSwitchAction((TableWriteIntentSwitchReplicaRequest) request); } else if (request instanceof ReadOnlySingleRowPkReplicaRequest) { - return processReadOnlySingleEntryAction((ReadOnlySingleRowPkReplicaRequest) request, isPrimary); + return processReadOnlySingleEntryAction((ReadOnlySingleRowPkReplicaRequest) request, replicaPrimacy.isPrimary()); } else if (request instanceof ReadOnlyMultiRowPkReplicaRequest) { - return processReadOnlyMultiEntryAction((ReadOnlyMultiRowPkReplicaRequest) request, isPrimary); + return processReadOnlyMultiEntryAction((ReadOnlyMultiRowPkReplicaRequest) request, replicaPrimacy.isPrimary()); } else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) { - return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request, isPrimary); + return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request, replicaPrimacy.isPrimary()); } else if (request instanceof ReplicaSafeTimeSyncRequest) { - return processReplicaSafeTimeSyncRequest(isPrimary); + return processReplicaSafeTimeSyncRequest(replicaPrimacy.isPrimary()); } else if (request instanceof BuildIndexReplicaRequest) { return buildIndexReplicaRequestHandler.handle((BuildIndexReplicaRequest) request); } else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) { @@ -922,10 +942,8 @@ public class PartitionReplicaListener implements ReplicaListener { */ private CompletableFuture<List<BinaryRow>> processReadOnlyScanRetrieveBatchAction( ReadOnlyScanRetrieveBatchReplicaRequest request, - Boolean isPrimary + boolean isPrimary ) { - requireNonNull(isPrimary); - UUID txId = request.transactionId(); int batchCount = request.batchSize(); HybridTimestamp readTimestamp = request.readTimestamp(); @@ -1078,7 +1096,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @param isPrimary Whether the given replica is primary. * @return Result future. */ - private CompletableFuture<BinaryRow> processReadOnlySingleEntryAction(ReadOnlySingleRowPkReplicaRequest request, Boolean isPrimary) { + private CompletableFuture<BinaryRow> processReadOnlySingleEntryAction(ReadOnlySingleRowPkReplicaRequest request, boolean isPrimary) { BinaryTuple primaryKey = resolvePk(request.primaryKey()); HybridTimestamp readTimestamp = request.readTimestamp(); @@ -1100,7 +1118,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @param timestamp Timestamp to check. * @return True if the timestamp is already passed in the reference system of the current node and node is primary, false otherwise. */ - private boolean isPrimaryInTimestamp(Boolean isPrimary, HybridTimestamp timestamp) { + private boolean isPrimaryInTimestamp(boolean isPrimary, HybridTimestamp timestamp) { return isPrimary && clockService.now().compareTo(timestamp) > 0; } @@ -1113,7 +1131,7 @@ public class PartitionReplicaListener implements ReplicaListener { */ private CompletableFuture<List<BinaryRow>> processReadOnlyMultiEntryAction( ReadOnlyMultiRowPkReplicaRequest request, - Boolean isPrimary + boolean isPrimary ) { List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys()); HybridTimestamp readTimestamp = request.readTimestamp(); @@ -1143,9 +1161,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @param isPrimary Whether is primary replica. * @return Future. */ - private CompletableFuture<?> processReplicaSafeTimeSyncRequest(Boolean isPrimary) { - requireNonNull(isPrimary); - + private CompletableFuture<?> processReplicaSafeTimeSyncRequest(boolean isPrimary) { // Disable safe-time sync if the Colocation feature is enabled, safe-time is managed on a different level there. if (!isPrimary || enabledColocation()) { return nullCompletedFuture(); @@ -2053,7 +2069,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @param leaseStartTime Lease start time. * @return Listener response. */ - private CompletableFuture<ReplicaResult> processMultiEntryAction(ReadWriteMultiRowReplicaRequest request, Long leaseStartTime) { + private CompletableFuture<ReplicaResult> processMultiEntryAction(ReadWriteMultiRowReplicaRequest request, long leaseStartTime) { UUID txId = request.transactionId(); ReplicationGroupId commitPartitionId = request.commitPartitionId().asReplicationGroupId(); List<BinaryRow> searchRows = request.binaryRows(); @@ -2325,7 +2341,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @param leaseStartTime Lease start time. * @return Listener response. */ - private CompletableFuture<?> processMultiEntryAction(ReadWriteMultiRowPkReplicaRequest request, Long leaseStartTime) { + private CompletableFuture<?> processMultiEntryAction(ReadWriteMultiRowPkReplicaRequest request, long leaseStartTime) { UUID txId = request.transactionId(); ReplicationGroupId commitPartitionId = request.commitPartitionId().asReplicationGroupId(); List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys()); @@ -2469,10 +2485,8 @@ public class PartitionReplicaListener implements ReplicaListener { boolean full, UUID txCoordinatorId, int catalogVersion, - Long leaseStartTime + long leaseStartTime ) { - assert leaseStartTime != null : format("Lease start time is null for UpdateCommand [txId={}].", txId); - UpdateCommand cmd = updateCommand( commitPartitionId, rowUuid, @@ -2558,7 +2572,7 @@ public class PartitionReplicaListener implements ReplicaListener { @Nullable BinaryRow row, @Nullable HybridTimestamp lastCommitTimestamp, int catalogVersion, - Long leaseStartTime + long leaseStartTime ) { return applyUpdateCommand( request.commitPartitionId().asReplicationGroupId(), @@ -2593,10 +2607,8 @@ public class PartitionReplicaListener implements ReplicaListener { UUID txCoordinatorId, int catalogVersion, boolean skipDelayedAck, - Long leaseStartTime + long leaseStartTime ) { - assert leaseStartTime != null : format("Lease start time is null for UpdateAllCommand [txId={}].", txId); - UpdateAllCommand cmd = updateAllCommand( rowsToUpdate, commitPartitionId, @@ -2686,7 +2698,7 @@ public class PartitionReplicaListener implements ReplicaListener { ReadWriteMultiRowReplicaRequest request, Map<UUID, TimedBinaryRowMessage> rowsToUpdate, int catalogVersion, - Long leaseStartTime + long leaseStartTime ) { return applyUpdateAllCommand( rowsToUpdate, @@ -2729,7 +2741,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @param leaseStartTime Lease start time. * @return Listener response. */ - private CompletableFuture<ReplicaResult> processSingleEntryAction(ReadWriteSingleRowReplicaRequest request, Long leaseStartTime) { + private CompletableFuture<ReplicaResult> processSingleEntryAction(ReadWriteSingleRowReplicaRequest request, long leaseStartTime) { UUID txId = request.transactionId(); BinaryRow searchRow = request.binaryRow(); ReplicationGroupId commitPartitionId = request.commitPartitionId().asReplicationGroupId(); @@ -2928,7 +2940,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @param leaseStartTime Lease start time. * @return Listener response. */ - private CompletableFuture<ReplicaResult> processSingleEntryAction(ReadWriteSingleRowPkReplicaRequest request, Long leaseStartTime) { + private CompletableFuture<ReplicaResult> processSingleEntryAction(ReadWriteSingleRowPkReplicaRequest request, long leaseStartTime) { UUID txId = request.transactionId(); BinaryTuple primaryKey = resolvePk(request.primaryKey()); ReplicationGroupId commitPartitionId = request.commitPartitionId().asReplicationGroupId(); @@ -3198,7 +3210,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @param leaseStartTime Lease start time. * @return Listener response. */ - private CompletableFuture<ReplicaResult> processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request, Long leaseStartTime) { + private CompletableFuture<ReplicaResult> processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request, long leaseStartTime) { BinaryRow newRow = request.newBinaryRow(); BinaryRow expectedRow = request.oldBinaryRow(); ReplicationGroupIdMessage commitPartitionId = request.commitPartitionId(); @@ -3271,80 +3283,6 @@ public class PartitionReplicaListener implements ReplicaListener { }); } - /** - * Ensure that the primary replica was not changed. - * - * @param request Replica request. - * @return Future with {@link IgniteBiTuple} containing {@code boolean} (whether the replica is primary) and the start time of current - * lease. The boolean is not {@code null} only for {@link ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The - * lease start time is not {@code null} in case of {@link PrimaryReplicaRequest}. - */ - private CompletableFuture<IgniteBiTuple<Boolean, Long>> ensureReplicaIsPrimary(ReplicaRequest request) { - HybridTimestamp current = clockService.current(); - - if (request instanceof PrimaryReplicaRequest) { - Long enlistmentConsistencyToken = ((PrimaryReplicaRequest) request).enlistmentConsistencyToken(); - - Function<ReplicaMeta, IgniteBiTuple<Boolean, Long>> validateClo = primaryReplicaMeta -> { - if (primaryReplicaMeta == null) { - throw new PrimaryReplicaMissException( - localNode.name(), - null, - localNode.id(), - null, - enlistmentConsistencyToken, - null, - null - ); - } - - long currentEnlistmentConsistencyToken = primaryReplicaMeta.getStartTime().longValue(); - - if (enlistmentConsistencyToken != currentEnlistmentConsistencyToken - || clockService.before(primaryReplicaMeta.getExpirationTime(), current) - || !isLocalPeer(primaryReplicaMeta.getLeaseholderId()) - ) { - throw new PrimaryReplicaMissException( - localNode.name(), - primaryReplicaMeta.getLeaseholder(), - localNode.id(), - primaryReplicaMeta.getLeaseholderId(), - enlistmentConsistencyToken, - currentEnlistmentConsistencyToken, - null); - } - - return new IgniteBiTuple<>(null, primaryReplicaMeta.getStartTime().longValue()); - }; - - ReplicaMeta meta = placementDriver.getCurrentPrimaryReplica(replicationGroupId, current); - - if (meta != null) { - try { - return completedFuture(validateClo.apply(meta)); - } catch (Exception e) { - return failedFuture(e); - } - } - - return placementDriver.getPrimaryReplica(replicationGroupId, current).thenApply(validateClo); - } else if (request instanceof ReadOnlyReplicaRequest) { - return isLocalNodePrimaryReplicaAt(current); - } else if (request instanceof ReplicaSafeTimeSyncRequest) { - return isLocalNodePrimaryReplicaAt(current); - } else { - return completedFuture(new IgniteBiTuple<>(null, null)); - } - } - - private CompletableFuture<IgniteBiTuple<Boolean, Long>> isLocalNodePrimaryReplicaAt(HybridTimestamp timestamp) { - return placementDriver.getPrimaryReplica(replicationGroupId, timestamp) - .thenApply(primaryReplica -> new IgniteBiTuple<>( - primaryReplica != null && isLocalPeer(primaryReplica.getLeaseholderId()), - null - )); - } - /** * Resolves read result to the corresponding binary row. Following rules are used for read result resolution: * <ol> @@ -3678,23 +3616,18 @@ public class PartitionReplicaListener implements ReplicaListener { return tableId; } - private boolean isLocalPeer(UUID nodeId) { - return localNode.id().equals(nodeId); - } - private CompletableFuture<?> processOperationRequestWithTxOperationManagementLogic( UUID senderId, ReplicaRequest request, - @Nullable Boolean isPrimary, - @Nullable HybridTimestamp opStartTsIfDirectRo, - @Nullable Long leaseStartTime + ReplicaPrimacy replicaPrimacy, + @Nullable HybridTimestamp opStartTsIfDirectRo ) { incrementRwOperationCountIfNeeded(request); UUID txIdLockingLwm = tryToLockLwmIfNeeded(request, opStartTsIfDirectRo); try { - return processOperationRequest(senderId, request, isPrimary, opStartTsIfDirectRo, leaseStartTime) + return processOperationRequest(senderId, request, replicaPrimacy, opStartTsIfDirectRo) .whenComplete((unused, throwable) -> { unlockLwmIfNeeded(txIdLockingLwm, request); decrementRwOperationCountIfNeeded(request); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index f5350c11670..49c1ecfc688 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -82,6 +82,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.lenient; @@ -174,6 +175,7 @@ import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException; +import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest; import org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; import org.apache.ignite.internal.replicator.message.ReplicaRequest; @@ -243,6 +245,7 @@ import org.apache.ignite.internal.tx.message.PartitionEnlistmentMessage; import org.apache.ignite.internal.tx.message.TransactionMetaMessage; import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; import org.apache.ignite.internal.tx.message.TxMessagesFactory; +import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest; import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest; import org.apache.ignite.internal.tx.message.TxStateResponse; import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest; @@ -452,7 +455,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { ); /** Placement driver. */ - private PlacementDriver placementDriver; + private TestPlacementDriver placementDriver; /** Partition replication listener to test. */ private PartitionReplicaListener partitionReplicaListener; @@ -652,7 +655,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { transactionStateResolver.start(); - placementDriver = new TestPlacementDriver(localNode); + placementDriver = spy(new TestPlacementDriver(localNode)); partitionReplicaListener = new PartitionReplicaListener( testMvPartitionStorage, @@ -824,7 +827,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { public void testEnsureReplicaIsPrimaryThrowsPrimaryReplicaMissIfNodeIdDoesNotMatchTheLeaseholder() { localLeader = false; - ((TestPlacementDriver) placementDriver).setPrimaryReplicaSupplier(() -> new TestReplicaMetaImpl("node3", nodeId(3))); + placementDriver.setPrimaryReplicaSupplier(() -> new TestReplicaMetaImpl("node3", nodeId(3))); CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest() .groupId(tablePartitionIdMessage(grpId)) @@ -3397,6 +3400,59 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { verify(lowWatermark).unlock(any()); } + @ParameterizedTest + @ValueSource(classes = {PrimaryReplicaRequest.class, TxStateCommitPartitionRequest.class}) + void primaryReplicaRequestsAreRejectedWhenPrimaryIsNotKnown(Class<? extends PrimaryReplicaRequest> requestClass) { + doReturn(null).when(placementDriver).getCurrentPrimaryReplica(any(), any()); + doReturn(nullCompletedFuture()).when(placementDriver).getPrimaryReplica(any(), any()); + + PrimaryReplicaRequest request = mock(requestClass); + + assertThat(partitionReplicaListener.invoke(request, localNode.id()), willThrow(PrimaryReplicaMissException.class)); + } + + @ParameterizedTest + @ValueSource(classes = {PrimaryReplicaRequest.class, TxStateCommitPartitionRequest.class}) + void primaryReplicaRequestsAreRejectedWhenPrimaryDoesNotMatchLeaseStartTime(Class<? extends PrimaryReplicaRequest> requestClass) { + long leaseStartTime = clock.nowLong(); + placementDriver.setPrimaryReplicaSupplier( + () -> new TestReplicaMetaImpl(localNode, hybridTimestamp(leaseStartTime), HybridTimestamp.MAX_VALUE) + ); + + PrimaryReplicaRequest request = mock(requestClass); + when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime - 1000); + + assertThat(partitionReplicaListener.invoke(request, localNode.id()), willThrow(PrimaryReplicaMissException.class)); + } + + @ParameterizedTest + @ValueSource(classes = {PrimaryReplicaRequest.class, TxStateCommitPartitionRequest.class}) + void primaryReplicaRequestsAreRejectedWhenLeaseIsExpired(Class<? extends PrimaryReplicaRequest> requestClass) { + long leaseStartTime = clock.nowLong(); + placementDriver.setPrimaryReplicaSupplier( + () -> new TestReplicaMetaImpl(localNode, hybridTimestamp(leaseStartTime), HybridTimestamp.MIN_VALUE) + ); + + PrimaryReplicaRequest request = mock(requestClass); + when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime); + + assertThat(partitionReplicaListener.invoke(request, localNode.id()), willThrow(PrimaryReplicaMissException.class)); + } + + @ParameterizedTest + @ValueSource(classes = {PrimaryReplicaRequest.class, TxStateCommitPartitionRequest.class}) + void primaryReplicaRequestsAreRejectedWhenLeaseholderIsDifferent(Class<? extends PrimaryReplicaRequest> requestClass) { + long leaseStartTime = clock.nowLong(); + placementDriver.setPrimaryReplicaSupplier( + () -> new TestReplicaMetaImpl(anotherNode, hybridTimestamp(leaseStartTime), HybridTimestamp.MAX_VALUE) + ); + + PrimaryReplicaRequest request = mock(requestClass); + when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime); + + assertThat(partitionReplicaListener.invoke(request, localNode.id()), willThrow(PrimaryReplicaMissException.class)); + } + private static class RequestContext { private final TablePartitionId groupId; private final UUID txId;