This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 47c9e2776e IGNITE-21196 Optimize primary replica events processing (#3007) 47c9e2776e is described below commit 47c9e2776ef76c7bfd7ffe013370a58fe434605b Author: Ivan Bessonov <bessonov...@gmail.com> AuthorDate: Thu Jan 4 12:29:47 2024 +0300 IGNITE-21196 Optimize primary replica events processing (#3007) --- .../apache/ignite/internal/replicator/Replica.java | 7 +++ .../ignite/internal/replicator/ReplicaManager.java | 51 ++++++++++++++++++++++ .../replicator/listener/ReplicaListener.java | 18 ++++++++ .../replicator/PartitionReplicaListener.java | 26 +++++++---- .../PartitionReplicaListenerDurableUnlockTest.java | 5 +-- 5 files changed, 96 insertions(+), 11 deletions(-) diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java index 3a4c53b2b8..5945cd8672 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java @@ -123,6 +123,13 @@ public class Replica { raftClient.subscribeLeader(this::onLeaderElected); } + /** + * Returns an instance of replica listener, associated with current replica. + */ + ReplicaListener replicaListener() { + return listener; + } + /** * Processes a replication request on the replica. * diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index 84bcd9da3d..2b045b162f 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED; import static org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED; +import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; @@ -50,6 +51,8 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent; +import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup; import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory; import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage; @@ -227,6 +230,9 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc new LinkedBlockingQueue<>(), NamedThreadFactory.create(nodeName, "replica", LOG) ); + + placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, this::onPrimaryReplicaElected); + placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this::onPrimaryReplicaExpired); } private void onReplicaMessageReceived(NetworkMessage message, String senderConsistentId, @Nullable Long correlationId) { @@ -744,6 +750,51 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc } } + + /** + * Event handler for {@link PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED}. Propagates execution to the + * {@link ReplicaListener#onPrimaryElected(PrimaryReplicaEventParameters, Throwable)} of the replica, that corresponds + * to a given {@link PrimaryReplicaEventParameters#groupId()}. + */ + private CompletableFuture<Boolean> onPrimaryReplicaElected( + PrimaryReplicaEventParameters primaryReplicaEventParameters, + Throwable throwable + ) { + CompletableFuture<Replica> replica = replicas.get(primaryReplicaEventParameters.groupId()); + + if (replica == null) { + return falseCompletedFuture(); + } + + if (replica.isDone() && !replica.isCompletedExceptionally()) { + return replica.join().replicaListener().onPrimaryElected(primaryReplicaEventParameters, throwable); + } else { + return replica.thenCompose(r -> r.replicaListener().onPrimaryElected(primaryReplicaEventParameters, throwable)); + } + } + + /** + * Event handler for {@link PrimaryReplicaEvent#PRIMARY_REPLICA_EXPIRED}. Propagates execution to the + * {@link ReplicaListener#onPrimaryExpired(PrimaryReplicaEventParameters, Throwable)} of the replica, that corresponds + * to a given {@link PrimaryReplicaEventParameters#groupId()}. + */ + private CompletableFuture<Boolean> onPrimaryReplicaExpired( + PrimaryReplicaEventParameters primaryReplicaEventParameters, + Throwable throwable + ) { + CompletableFuture<Replica> replica = replicas.get(primaryReplicaEventParameters.groupId()); + + if (replica == null) { + return falseCompletedFuture(); + } + + if (replica.isDone() && !replica.isCompletedExceptionally()) { + return replica.join().replicaListener().onPrimaryExpired(primaryReplicaEventParameters, throwable); + } else { + return replica.thenCompose(r -> r.replicaListener().onPrimaryExpired(primaryReplicaEventParameters, throwable)); + } + } + /** * Idle safe time sync for replicas. */ diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java index 88a1937e97..eeee59b449 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java @@ -18,8 +18,12 @@ package org.apache.ignite.internal.replicator.listener; import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; +import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.replicator.ReplicaResult; import org.apache.ignite.internal.replicator.message.ReplicaRequest; +import org.apache.ignite.internal.util.CompletableFutures; +import org.jetbrains.annotations.Nullable; /** Replica listener. */ @FunctionalInterface @@ -33,6 +37,20 @@ public interface ReplicaListener { */ CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String senderId); + /** + * Invoked by {@link ReplicaManager} when current replica is elected as primary. + */ + default CompletableFuture<Boolean> onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable exception) { + return CompletableFutures.falseCompletedFuture(); + } + + /** + * Invoked by {@link ReplicaManager} then current replica stops being a primary replica. + */ + default CompletableFuture<Boolean> onPrimaryExpired(PrimaryReplicaEventParameters evt, @Nullable Throwable exception) { + return CompletableFutures.falseCompletedFuture(); + } + /** Callback on replica shutdown. */ default void onShutdown() { // No-op. diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 85547ca54e..fb765d00c9 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -79,7 +79,6 @@ import org.apache.ignite.internal.lang.SafeTimeReorderException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.placementdriver.PlacementDriver; -import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent; import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.service.RaftCommandRunner; @@ -344,13 +343,17 @@ public class PartitionReplicaListener implements ReplicaListener { cursors = new ConcurrentSkipListMap<>(IgniteUuid.globalOrderComparator()); schemaCompatValidator = new SchemaCompatibilityValidator(validationSchemasSource, catalogService, schemaSyncService); - - placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, this::onPrimaryElected); - placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this::onPrimaryExpired); } - private CompletableFuture<Boolean> onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable exception) { - if (!localNode.name().equals(evt.leaseholder()) || !replicationGroupId.equals(evt.groupId())) { + @Override + public CompletableFuture<Boolean> onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable exception) { + assert replicationGroupId.equals(evt.groupId()) : format( + "The replication group listener does not match the event [grp={}, eventGrp={}]", + replicationGroupId, + evt.groupId() + ); + + if (!localNode.name().equals(evt.leaseholder())) { return falseCompletedFuture(); } @@ -428,8 +431,15 @@ public class PartitionReplicaListener implements ReplicaListener { }); } - private CompletableFuture<Boolean> onPrimaryExpired(PrimaryReplicaEventParameters evt, @Nullable Throwable exception) { - if (!localNode.name().equals(evt.leaseholder()) || !replicationGroupId.equals(evt.groupId())) { + @Override + public CompletableFuture<Boolean> onPrimaryExpired(PrimaryReplicaEventParameters evt, @Nullable Throwable exception) { + assert replicationGroupId.equals(evt.groupId()) : format( + "The replication group listener does not match the event [grp={}, eventGrp={}]", + replicationGroupId, + evt.groupId() + ); + + if (!localNode.name().equals(evt.leaseholder())) { return falseCompletedFuture(); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java index d0b6afe400..5ea1c8a6d0 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java @@ -48,7 +48,6 @@ import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.placementdriver.TestPlacementDriver; -import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent; import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.TablePartitionId; @@ -198,7 +197,7 @@ public class PartitionReplicaListenerDurableUnlockTest extends IgniteAbstractTes PrimaryReplicaEventParameters parameters = new PrimaryReplicaEventParameters(0, part0, LOCAL_NODE.name(), clock.now()); - assertThat(placementDriver.fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, parameters), willSucceedIn(1, SECONDS)); + assertThat(partitionReplicaListener.onPrimaryElected(parameters, null), willSucceedIn(1, SECONDS)); for (IgniteBiTuple<UUID, TxMeta> tx : txStateStorage.scan()) { if (isFinalState(tx.getValue().txState())) { @@ -228,7 +227,7 @@ public class PartitionReplicaListenerDurableUnlockTest extends IgniteAbstractTes PrimaryReplicaEventParameters parameters = new PrimaryReplicaEventParameters(0, part0, LOCAL_NODE.name(), clock.now()); - assertThat(placementDriver.fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, parameters), willSucceedIn(1, SECONDS)); + assertThat(partitionReplicaListener.onPrimaryElected(parameters, null), willSucceedIn(1, SECONDS)); assertTrue(txStateStorage.get(tx0).locksReleased());