This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 8f9b321fee IGNITE-21805 Refactor TableManager and move all RAFT related pieces to Replica (#3633) 8f9b321fee is described below commit 8f9b321fee2848757d8174b1d0c330326d21d485 Author: Mikhail Efremov <jakuten...@gmail.com> AuthorDate: Thu Jun 6 15:47:36 2024 +0600 IGNITE-21805 Refactor TableManager and move all RAFT related pieces to Replica (#3633) --- .../rebalance/PartitionMover.java | 9 +- .../raft/ExecutorInclinedRaftCommandRunner.java | 5 + .../apache/ignite/raft/jraft/core/NodeImpl.java | 3 +- .../ItPlacementDriverReplicaSideTest.java | 60 +++- .../apache/ignite/internal/replicator/Replica.java | 9 +- .../ignite/internal/replicator/ReplicaManager.java | 353 +++++++++++++++----- .../replicator/listener/ReplicaListener.java | 5 +- .../replicator/PlacementDriverReplicaSideTest.java | 6 +- .../internal/replicator/ReplicaManagerTest.java | 37 ++- .../runner/app/ItIgniteNodeRestartTest.java | 18 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 26 +- .../ignite/distributed/ReplicaUnavailableTest.java | 74 ++++- .../ItDisasterRecoveryReconfigurationTest.java | 9 +- .../rebalance/ItRebalanceDistributedTest.java | 24 +- .../internal/table/distributed/TableManager.java | 370 ++++++++------------- .../replicator/PartitionReplicaListener.java | 10 + .../table/distributed/PartitionMoverTest.java | 9 +- .../distributed/TableManagerRecoveryTest.java | 14 +- .../table/distributed/TableManagerTest.java | 27 +- .../apache/ignite/distributed/ItTxTestCluster.java | 48 ++- 20 files changed, 686 insertions(+), 430 deletions(-) diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java index 76275091eb..694e8ed7fe 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java @@ -41,12 +41,12 @@ public class PartitionMover { private final IgniteSpinBusyLock busyLock; - private final Supplier<RaftGroupService> raftGroupServiceSupplier; + private final Supplier<CompletableFuture<RaftGroupService>> raftGroupServiceSupplier; /** * Constructor. */ - public PartitionMover(IgniteSpinBusyLock busyLock, Supplier<RaftGroupService> raftGroupServiceSupplier) { + public PartitionMover(IgniteSpinBusyLock busyLock, Supplier<CompletableFuture<RaftGroupService>> raftGroupServiceSupplier) { this.busyLock = busyLock; this.raftGroupServiceSupplier = raftGroupServiceSupplier; } @@ -64,8 +64,9 @@ public class PartitionMover { } try { - return raftGroupServiceSupplier.get() - .changePeersAsync(peersAndLearners, term) + return raftGroupServiceSupplier + .get() + .thenCompose(raftGroupService -> raftGroupService.changePeersAsync(peersAndLearners, term)) .handle((resp, err) -> { if (!busyLock.enterBusy()) { throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()); diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/ExecutorInclinedRaftCommandRunner.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/ExecutorInclinedRaftCommandRunner.java index 4b16d1131b..da7fb672d3 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/ExecutorInclinedRaftCommandRunner.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/ExecutorInclinedRaftCommandRunner.java @@ -46,4 +46,9 @@ public class ExecutorInclinedRaftCommandRunner implements RaftCommandRunner { return future.thenApplyAsync(identity(), completionExecutor); } + + /** Returns decorated Raft-client. */ + public RaftCommandRunner decoratedCommandRunner() { + return commandRunner; + } } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java index 9d836a27b2..007e9387f4 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java @@ -717,7 +717,7 @@ public class NodeImpl implements Node, RaftServerService { electionRound = 0; if (electionAdjusted) { - LOG.info("Election timeout was reset to initial value due to successful leader election."); + LOG.info("Election timeout was reset to initial value."); resetElectionTimeoutMs(initialElectionTimeout); electionAdjusted = false; } @@ -3445,6 +3445,7 @@ public class NodeImpl implements Node, RaftServerService { this.conf.setConf(newConf); this.conf.getOldConf().reset(); stepDown(this.currTerm + 1, false, new Status(RaftError.ESETPEER, "Raft node set peer normally")); + resetElectionTimeoutToInitial(); return Status.OK(); } finally { diff --git a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java index 5b09cd0e0a..f82741b11d 100644 --- a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java +++ b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java @@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds; +import static org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; @@ -75,6 +76,7 @@ import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessage import org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage; import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupEventsListener; import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.TestLozaFactory; @@ -83,7 +85,10 @@ import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.server.RaftGroupOptions; +import org.apache.ignite.internal.raft.service.RaftCommandRunner; +import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator; import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration; +import org.apache.ignite.internal.replicator.listener.ReplicaListener; import org.apache.ignite.internal.replicator.message.ReplicaMessageTestGroup; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; import org.apache.ignite.internal.replicator.message.ReplicaRequest; @@ -197,7 +202,13 @@ public class ItPlacementDriverReplicaSideTest extends IgniteAbstractTest { Set.of(ReplicaMessageTestGroup.class), new TestPlacementDriver(primaryReplicaSupplier), partitionOperationsExecutor, - new NoOpFailureProcessor() + () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS, + new NoOpFailureProcessor(), + // TODO: IGNITE-22222 can't pass ThreadLocalPartitionCommandsMarshaller there due to dependency loop + null, + topologyAwareRaftGroupServiceFactory, + raftManager, + new VolatileLogStorageFactoryCreator(nodeName, workDir.resolve("volatile-log-spillout")) ); replicaManagers.put(nodeName, replicaManager); @@ -220,7 +231,9 @@ public class ItPlacementDriverReplicaSideTest extends IgniteAbstractTest { } }); - servicesToClose.add(() -> IgniteUtils.shutdownAndAwaitTermination(partitionOperationsExecutor, 10, TimeUnit.SECONDS)); + servicesToClose.addAll(List.of( + () -> IgniteUtils.shutdownAndAwaitTermination(partitionOperationsExecutor, 10, TimeUnit.SECONDS) + )); } } @@ -477,9 +490,11 @@ public class ItPlacementDriverReplicaSideTest extends IgniteAbstractTest { var rftNodeId = new RaftNodeId(groupId, peer); + PeersAndLearners newConfiguration = fromConsistentIds(nodes); + CompletableFuture<TopologyAwareRaftGroupService> raftClientFut = raftManager.startRaftGroupNode( rftNodeId, - fromConsistentIds(nodes), + newConfiguration, new TestRaftGroupListener(), RaftGroupEventsListener.noopLsnr, RaftGroupOptions.defaults(), @@ -487,24 +502,33 @@ public class ItPlacementDriverReplicaSideTest extends IgniteAbstractTest { ); serviceFutures.add(raftClientFut); - CompletableFuture<Replica> replicaFuture = raftClientFut.thenCompose(raftClient -> { + CompletableFuture<Boolean> replicaFuture = raftClientFut.thenCompose(raftClient -> { try { + ReplicaListener listener = new ReplicaListener() { + @Override + public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String senderId) { + log.info("Handle request [type={}]", request.getClass().getSimpleName()); + + return raftClient + .run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().build()) + .thenCompose(ignored -> replicaListener == null + ? completedFuture(new ReplicaResult(null, null)) + : replicaListener.apply(request, senderId)); + } + + @Override + public RaftCommandRunner raftClient() { + return raftClient; + } + }; + return replicaManager.startReplica( groupId, - (request, senderId) -> { - log.info("Handle request [type={}]", request.getClass().getSimpleName()); - - return raftClient.run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().build()) - .thenCompose(ignored -> { - if (replicaListener == null) { - return completedFuture(new ReplicaResult(null, null)); - } else { - return replicaListener.apply(request, senderId); - } - }); - }, - raftClient, - new PendingComparableValuesTracker<>(Long.MAX_VALUE)); + newConfiguration, + (unused) -> { }, + (unused) -> listener, + new PendingComparableValuesTracker<>(Long.MAX_VALUE), + completedFuture(raftClient)); } catch (NodeStoppingException e) { throw new RuntimeException(e); } diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java index 380e5c7b22..41883da2d6 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java @@ -100,7 +100,6 @@ public class Replica { * @param replicaGrpId Replication group id. * @param listener Replica listener. * @param storageIndexTracker Storage index tracker. - * @param raftClient Topology aware Raft client. * @param localNode Instance of the local node. * @param executor External executor. * @param placementDriver Placement driver. @@ -110,7 +109,6 @@ public class Replica { ReplicationGroupId replicaGrpId, ReplicaListener listener, PendingComparableValuesTracker<Long, Void> storageIndexTracker, - TopologyAwareRaftGroupService raftClient, ClusterNode localNode, ExecutorService executor, PlacementDriver placementDriver, @@ -119,7 +117,7 @@ public class Replica { this.replicaGrpId = replicaGrpId; this.listener = listener; this.storageIndexTracker = storageIndexTracker; - this.raftClient = raftClient; + this.raftClient = raftClient(); this.localNode = localNode; this.executor = executor; this.placementDriver = placementDriver; @@ -128,6 +126,11 @@ public class Replica { raftClient.subscribeLeader(this::onLeaderElected); } + /** Returns Raft-client. */ + public final TopologyAwareRaftGroupService raftClient() { + return (TopologyAwareRaftGroupService) listener.raftClient(); + } + /** * Processes a replication request on the replica. * diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index 64c29a7b81..2c410caffc 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -17,11 +17,9 @@ package org.apache.ignite.internal.replicator; -import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED; import static org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED; -import static org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS; import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ; import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE; import static org.apache.ignite.internal.thread.ThreadOperation.TX_STATE_STORAGE_ACCESS; @@ -46,8 +44,12 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.LongSupplier; +import java.util.function.Supplier; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; +import org.apache.ignite.internal.components.LogSyncer; import org.apache.ignite.internal.event.AbstractEventProducer; import org.apache.ignite.internal.failure.FailureContext; import org.apache.ignite.internal.failure.FailureProcessor; @@ -67,7 +69,21 @@ import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup; import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory; import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.Marshaller; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.PeersAndLearners; +import org.apache.ignite.internal.raft.RaftGroupEventsListener; +import org.apache.ignite.internal.raft.RaftManager; +import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; +import org.apache.ignite.internal.raft.configuration.LogStorageBudgetView; +import org.apache.ignite.internal.raft.server.RaftGroupOptions; +import org.apache.ignite.internal.raft.service.RaftGroupListener; +import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory; +import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator; import org.apache.ignite.internal.replicator.exception.ExpectedReplicationException; import org.apache.ignite.internal.replicator.exception.ReplicaIsAlreadyStartedException; import org.apache.ignite.internal.replicator.exception.ReplicaStoppingException; @@ -88,8 +104,10 @@ import org.apache.ignite.internal.thread.ThreadAttributes; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; +import org.jetbrains.annotations.VisibleForTesting; /** * Replica manager maintains {@link Replica} instances on an Ignite node. @@ -125,6 +143,19 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc /** Replica message handler. */ private final NetworkMessageHandler handler; + /** Raft manager for RAFT-clients creation. */ + // TODO: move into {@method Replica#shutdown} https://issues.apache.org/jira/browse/IGNITE-22372 + private final RaftManager raftManager; + + /** Raft clients factory for raft server endpoints starting. */ + private final TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory; + + /** Creator for {@link org.apache.ignite.internal.raft.storage.LogStorageFactory} for volatile tables. */ + private final LogStorageFactoryCreator volatileLogStorageFactoryCreator; + + /** Raft command marshaller for raft server endpoints starting. */ + private final Marshaller raftCommandsMarshaller; + /** Message handler for placement driver messages. */ private final NetworkMessageHandler placementDriverMessageHandler; @@ -141,8 +172,10 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc /** Scheduled executor for idle safe time sync. */ private final ScheduledExecutorService scheduledIdleSafeTimeSyncExecutor; + /** Executor that will be used to execute requests by replicas. */ private final Executor requestsExecutor; + /** Failure processor. */ private final FailureProcessor failureProcessor; /** Set of message groups to handler as replica requests. */ @@ -154,39 +187,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc private String localNodeId; - /** - * Constructor for a replica service. - * - * @param nodeName Node name. - * @param clusterNetSvc Cluster network service. - * @param cmgMgr Cluster group manager. - * @param clockService Clock service. - * @param messageGroupsToHandle Message handlers. - * @param placementDriver A placement driver. - */ - @TestOnly - public ReplicaManager( - String nodeName, - ClusterService clusterNetSvc, - ClusterManagementGroupManager cmgMgr, - ClockService clockService, - Set<Class<?>> messageGroupsToHandle, - PlacementDriver placementDriver, - Executor requestsExecutor, - FailureProcessor failureProcessor - ) { - this( - nodeName, - clusterNetSvc, - cmgMgr, - clockService, - messageGroupsToHandle, - placementDriver, - requestsExecutor, - () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS, - failureProcessor - ); - } + private String localNodeConsistentId; /** * Constructor for a replica service. @@ -199,6 +200,12 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc * @param placementDriver A placement driver. * @param requestsExecutor Executor that will be used to execute requests by replicas. * @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe time propagation period in ms. + * @param failureProcessor Failure processor. + * @param raftCommandsMarshaller Command marshaller for raft groups creation. + * @param raftGroupServiceFactory A factory for raft-clients creation. + * @param raftManager The manager made up of songs and words to spite all my troubles is not so bad at all. + * @param volatileLogStorageFactoryCreator Creator for {@link org.apache.ignite.internal.raft.storage.LogStorageFactory} for + * volatile tables. */ public ReplicaManager( String nodeName, @@ -209,18 +216,26 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc PlacementDriver placementDriver, Executor requestsExecutor, LongSupplier idleSafeTimePropagationPeriodMsSupplier, - FailureProcessor failureProcessor + FailureProcessor failureProcessor, + Marshaller raftCommandsMarshaller, + TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory, + RaftManager raftManager, + LogStorageFactoryCreator volatileLogStorageFactoryCreator ) { this.clusterNetSvc = clusterNetSvc; this.cmgMgr = cmgMgr; this.clockService = clockService; this.messageGroupsToHandle = messageGroupsToHandle; + this.volatileLogStorageFactoryCreator = volatileLogStorageFactoryCreator; this.handler = this::onReplicaMessageReceived; this.placementDriverMessageHandler = this::onPlacementDriverMessageReceived; this.placementDriver = placementDriver; this.requestsExecutor = requestsExecutor; this.idleSafeTimePropagationPeriodMsSupplier = idleSafeTimePropagationPeriodMsSupplier; this.failureProcessor = failureProcessor; + this.raftCommandsMarshaller = raftCommandsMarshaller; + this.raftGroupServiceFactory = raftGroupServiceFactory; + this.raftManager = raftManager; scheduledIdleSafeTimeSyncExecutor = Executors.newScheduledThreadPool( 1, @@ -466,75 +481,170 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc }); } + private CompletableFuture<Boolean> startReplicaInternal( + RaftGroupEventsListener raftGroupEventsListener, + RaftGroupListener raftGroupListener, + boolean isVolatileStorage, + SnapshotStorageFactory snapshotStorageFactory, + Consumer<RaftGroupService> updateTableRaftService, + Function<RaftGroupService, ReplicaListener> createListener, + PendingComparableValuesTracker<Long, Void> storageIndexTracker, + TablePartitionId replicaGrpId, + PeersAndLearners newConfiguration + ) throws NodeStoppingException { + RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new Peer(localNodeConsistentId)); + + RaftGroupOptions groupOptions = groupOptionsForPartition( + isVolatileStorage, + snapshotStorageFactory); + + // TODO: move into {@method Replica#shutdown} https://issues.apache.org/jira/browse/IGNITE-22372 + // TODO: use RaftManager interface, see https://issues.apache.org/jira/browse/IGNITE-18273 + CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut = ((Loza) raftManager).startRaftGroupNode( + raftNodeId, + newConfiguration, + raftGroupListener, + raftGroupEventsListener, + groupOptions, + raftGroupServiceFactory + ); + + return startReplica( + replicaGrpId, + newConfiguration, + updateTableRaftService, + createListener, + storageIndexTracker, + newRaftClientFut); + } + /** - * Starts a replica. If a replica with the same partition id already exists, the method throws an exception. + * Creates and starts a new replica. * + * @param raftGroupEventsListener Raft group events listener for raft group starting. + * @param raftGroupListener Raft group listener for raft group starting. + * @param isVolatileStorage is table storage volatile? + * @param snapshotStorageFactory Snapshot storage factory for raft group option's parameterization. + * @param updateTableRaftService Temporal consumer while TableRaftService wouldn't be removed in + * TODO: https://issues.apache.org/jira/browse/IGNITE-22218. + * @param createListener Due to creation of ReplicaListener in TableManager, the function returns desired listener by created + * raft-client inside {@link #startReplica} method. * @param replicaGrpId Replication group id. - * @param listener Replica listener. - * @param raftClient Topology aware Raft client. * @param storageIndexTracker Storage index tracker. - * @throws NodeStoppingException If node is stopping. - * @throws ReplicaIsAlreadyStartedException Is thrown when a replica with the same replication group id has already been - * started. + * @param newConfiguration A configuration for new raft group. + * @return Future that promises ready new replica when done. */ - public CompletableFuture<Replica> startReplica( - ReplicationGroupId replicaGrpId, - ReplicaListener listener, - TopologyAwareRaftGroupService raftClient, - PendingComparableValuesTracker<Long, Void> storageIndexTracker + public CompletableFuture<Boolean> startReplica( + RaftGroupEventsListener raftGroupEventsListener, + RaftGroupListener raftGroupListener, + boolean isVolatileStorage, + SnapshotStorageFactory snapshotStorageFactory, + Consumer<RaftGroupService> updateTableRaftService, + Function<RaftGroupService, ReplicaListener> createListener, + PendingComparableValuesTracker<Long, Void> storageIndexTracker, + TablePartitionId replicaGrpId, + PeersAndLearners newConfiguration ) throws NodeStoppingException { if (!busyLock.enterBusy()) { throw new NodeStoppingException(); } try { - return startReplicaInternal(replicaGrpId, listener, raftClient, storageIndexTracker); + return startReplicaInternal( + raftGroupEventsListener, + raftGroupListener, + isVolatileStorage, + snapshotStorageFactory, + updateTableRaftService, + createListener, + storageIndexTracker, + replicaGrpId, + newConfiguration); } finally { busyLock.leaveBusy(); } } /** - * Internal method for starting a replica. + * Starts a raft-client and pass it to a replica creation if the replica should be started too. If a replica with the same partition id + * already exists, the method throws an exception. + * TODO: must be deleted or be private after https://issues.apache.org/jira/browse/IGNITE-22373 * * @param replicaGrpId Replication group id. - * @param listener Replica listener. - * @param raftClient Topology aware Raft client. + * @param newConfiguration Peers and Learners of the Raft group. + * @param updateTableRaftService A temporal clojure that updates table raft service with new raft-client, but + * TODO: will be removed https://issues.apache.org/jira/browse/IGNITE-22218 + * @param createListener A clojure that returns done {@link ReplicaListener} by given raft-client {@link RaftGroupService}. * @param storageIndexTracker Storage index tracker. + * @param newRaftClientFut A future that returns created raft-client. + * @throws NodeStoppingException If node is stopping. + * @throws ReplicaIsAlreadyStartedException Is thrown when a replica with the same replication group id has already been started. */ - private CompletableFuture<Replica> startReplicaInternal( + @VisibleForTesting + @Deprecated + public CompletableFuture<Boolean> startReplica( ReplicationGroupId replicaGrpId, - ReplicaListener listener, - TopologyAwareRaftGroupService raftClient, - PendingComparableValuesTracker<Long, Void> storageIndexTracker - ) { + PeersAndLearners newConfiguration, + Consumer<RaftGroupService> updateTableRaftService, + Function<RaftGroupService, ReplicaListener> createListener, + PendingComparableValuesTracker<Long, Void> storageIndexTracker, + CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut + ) throws NodeStoppingException { LOG.info("Replica is about to start [replicationGroupId={}].", replicaGrpId); - ClusterNode localNode = clusterNetSvc.topologyService().localMember(); + CompletableFuture<Boolean> resultFuture = newRaftClientFut.thenAccept(updateTableRaftService) + .thenApply((v) -> true); - Replica newReplica = new Replica( - replicaGrpId, - listener, - storageIndexTracker, - raftClient, - localNode, - executor, - placementDriver, - clockService - ); + CompletableFuture<ReplicaListener> newReplicaListenerFut = newRaftClientFut.thenApply(createListener); - CompletableFuture<Replica> replicaFuture = replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> { - if (existingReplicaFuture == null || existingReplicaFuture.isDone()) { - assert existingReplicaFuture == null || isCompletedSuccessfully(existingReplicaFuture); - LOG.info("Replica is started [replicationGroupId={}].", replicaGrpId); + startReplica(replicaGrpId, storageIndexTracker, newReplicaListenerFut); - return completedFuture(newReplica); - } else { - existingReplicaFuture.complete(newReplica); - LOG.info("Replica is started, existing replica waiter was completed [replicationGroupId={}].", replicaGrpId); + return resultFuture; + } - return existingReplicaFuture; - } + /** + * Creates and start new replica. + * TODO: must be deleted or be private after https://issues.apache.org/jira/browse/IGNITE-22373 + * + * @param replicaGrpId Replication group id. + * @param storageIndexTracker Storage index tracker. + * @param newReplicaListenerFut Future that returns ready ReplicaListener for replica creation. + * @return Future that promises ready new replica when done. + */ + @VisibleForTesting + @Deprecated + public CompletableFuture<Replica> startReplica( + ReplicationGroupId replicaGrpId, + PendingComparableValuesTracker<Long, Void> storageIndexTracker, + CompletableFuture<ReplicaListener> newReplicaListenerFut + ) throws NodeStoppingException { + + ClusterNode localNode = clusterNetSvc.topologyService().localMember(); + + CompletableFuture<Replica> replicaFuture = newReplicaListenerFut.thenCompose(listener -> { + Replica newReplica = new Replica( + replicaGrpId, + listener, + storageIndexTracker, + localNode, + executor, + placementDriver, + clockService); + + return replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> { + if (existingReplicaFuture == null || existingReplicaFuture.isDone()) { + assert existingReplicaFuture == null || isCompletedSuccessfully(existingReplicaFuture); + LOG.info("Replica is started [replicationGroupId={}].", replicaGrpId); + + return CompletableFuture.completedFuture(newReplica); + } else { + LOG.info("Replica is started, existing replica waiter was completed [replicationGroupId={}].", replicaGrpId); + + existingReplicaFuture.complete(newReplica); + + return existingReplicaFuture; + } + }); }); var eventParams = new LocalReplicaEventParameters(replicaGrpId); @@ -548,6 +658,76 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc .thenCompose(v -> replicaFuture); } + /** + * Temporary public method for RAFT-client starting. + * TODO: will be removed after https://issues.apache.org/jira/browse/IGNITE-22315 + * + * @param replicaGrpId Replication Group ID. + * @param newConfiguration Peers and learners nodes for a raft group. + * @param raftClientCache Temporal supplier that returns RAFT-client from TableRaftService if it's already exists and was put into the + * service's map. + * @return Future that returns started RAFT-client. + * @throws NodeStoppingException In case if node was stopping. + */ + @Deprecated + public CompletableFuture<TopologyAwareRaftGroupService> startRaftClient( + ReplicationGroupId replicaGrpId, + PeersAndLearners newConfiguration, + Supplier<RaftGroupService> raftClientCache) + throws NodeStoppingException { + RaftGroupService cachedRaftClient = raftClientCache.get(); + return cachedRaftClient != null + ? CompletableFuture.completedFuture((TopologyAwareRaftGroupService) cachedRaftClient) + // TODO IGNITE-19614 This procedure takes 10 seconds if there's no majority online. + : raftManager.startRaftGroupService(replicaGrpId, newConfiguration, raftGroupServiceFactory, raftCommandsMarshaller); + } + + /** + * Returns future with a replica if it was created or null if there no any replicas starting with given identifier. + * + * @param replicationGroupId Table-Partition identifier. + * @return replica if it was created or null otherwise. + */ + public CompletableFuture<Replica> replica(ReplicationGroupId replicationGroupId) { + return replicas.get(replicationGroupId); + } + + /** + * Performs a {@code resetPeers} operation on raft node. + * + * @param replicaGrpId Replication group ID. + * @param peersAndLearners New node configuration. + */ + public void resetPeers(ReplicationGroupId replicaGrpId, PeersAndLearners peersAndLearners) { + RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new Peer(localNodeConsistentId)); + ((Loza) raftManager).resetPeers(raftNodeId, peersAndLearners); + } + + /** Getter for wrapped write-ahead log syncer. */ + // TODO: will be removed after https://issues.apache.org/jira/browse/IGNITE-22292 + public LogSyncer getLogSyncer() { + return raftManager.getLogSyncer(); + } + + private RaftGroupOptions groupOptionsForPartition(boolean isVolatileStorage, SnapshotStorageFactory snapshotFactory) { + RaftGroupOptions raftGroupOptions; + + if (isVolatileStorage) { + LogStorageBudgetView view = ((Loza) raftManager).volatileRaft().logStorage().value(); + raftGroupOptions = RaftGroupOptions.forVolatileStores() + .setLogStorageFactory(volatileLogStorageFactoryCreator.factory(view)) + .raftMetaStorageFactory((groupId, raftOptions) -> new VolatileRaftMetaStorage()); + } else { + raftGroupOptions = RaftGroupOptions.forPersistentStores(); + } + + raftGroupOptions.snapshotStorageFactory(snapshotFactory); + + raftGroupOptions.commandsMarshaller(raftCommandsMarshaller); + + return raftGroupOptions; + } + /** * Stops a replica by the partition group id. * @@ -620,7 +800,16 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc } }); - return isRemovedFuture; + return isRemovedFuture + .thenApply(v -> { + try { + // TODO: move into {@method Replica#shutdown} https://issues.apache.org/jira/browse/IGNITE-22372 + raftManager.stopRaftNodes(replicaGrpId); + } catch (NodeStoppingException ignored) { + // No-op. + } + return v; + }); } /** {@inheritDoc} */ @@ -650,6 +839,8 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc localNodeId = clusterNetSvc.topologyService().localMember().id(); + localNodeConsistentId = clusterNetSvc.topologyService().localMember().name(); + return nullCompletedFuture(); } @@ -662,8 +853,10 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc busyLock.block(); - shutdownAndAwaitTermination(scheduledIdleSafeTimeSyncExecutor, 10, TimeUnit.SECONDS); - shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS); + int shutdownTimeoutSeconds = 10; + + shutdownAndAwaitTermination(scheduledIdleSafeTimeSyncExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS); + shutdownAndAwaitTermination(executor, shutdownTimeoutSeconds, TimeUnit.SECONDS); assert replicas.values().stream().noneMatch(CompletableFuture::isDone) : "There are replicas alive [replicas=" diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java index 88a1937e97..3bb7b86822 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java @@ -18,11 +18,11 @@ package org.apache.ignite.internal.replicator.listener; import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.raft.service.RaftCommandRunner; import org.apache.ignite.internal.replicator.ReplicaResult; import org.apache.ignite.internal.replicator.message.ReplicaRequest; /** Replica listener. */ -@FunctionalInterface public interface ReplicaListener { /** * Invokes a replica listener to process request. @@ -33,6 +33,9 @@ public interface ReplicaListener { */ CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String senderId); + /** Returns Raft-client. */ + RaftCommandRunner raftClient(); + /** Callback on replica shutdown. */ default void onShutdown() { // No-op. diff --git a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java index 676eb5ec36..57bc8420d8 100644 --- a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java +++ b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java @@ -115,11 +115,13 @@ public class PlacementDriverReplicaSideTest extends BaseIgniteAbstractTest { when(raftClient.run(any())).thenAnswer(invocationOnMock -> completedFuture(null)); + var listener = mock(ReplicaListener.class); + when(listener.raftClient()).thenReturn(raftClient); + return new Replica( GRP_ID, - mock(ReplicaListener.class), + listener, storageIndexTracker, - raftClient, LOCAL_NODE, executor, new TestPlacementDriver(LOCAL_NODE), diff --git a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java index f6e661b2a8..4a7d1462a1 100644 --- a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java +++ b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java @@ -18,8 +18,10 @@ package org.apache.ignite.internal.replicator; import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED; import static org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED; +import static org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; @@ -50,7 +52,12 @@ import org.apache.ignite.internal.network.ClusterNodeImpl; import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.raft.Marshaller; +import org.apache.ignite.internal.raft.PeersAndLearners; +import org.apache.ignite.internal.raft.RaftManager; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; +import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator; import org.apache.ignite.internal.replicator.listener.ReplicaListener; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.thread.NamedThreadFactory; @@ -75,6 +82,9 @@ public class ReplicaManagerTest extends BaseIgniteAbstractTest { private ReplicaManager replicaManager; + @Mock + private RaftManager raftManager; + @BeforeEach void startReplicaManager( TestInfo testInfo, @@ -82,7 +92,10 @@ public class ReplicaManagerTest extends BaseIgniteAbstractTest { @Mock ClusterManagementGroupManager cmgManager, @Mock PlacementDriver placementDriver, @Mock MessagingService messagingService, - @Mock TopologyService topologyService + @Mock TopologyService topologyService, + @Mock Marshaller marshaller, + @Mock TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory, + @Mock VolatileLogStorageFactoryCreator volatileLogStorageFactoryCreator ) { String nodeName = testNodeName(testInfo, 0); @@ -110,7 +123,12 @@ public class ReplicaManagerTest extends BaseIgniteAbstractTest { Set.of(), placementDriver, requestsExecutor, - new NoOpFailureProcessor() + () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS, + new NoOpFailureProcessor(), + marshaller, + raftGroupServiceFactory, + raftManager, + volatileLogStorageFactoryCreator ); assertThat(replicaManager.startAsync(new ComponentContext()), willCompleteSuccessfully()); @@ -140,6 +158,7 @@ public class ReplicaManagerTest extends BaseIgniteAbstractTest { */ @Test void testReplicaEvents( + TestInfo testInfo, @Mock EventListener<LocalReplicaEventParameters> createReplicaListener, @Mock EventListener<LocalReplicaEventParameters> removeReplicaListener, @Mock ReplicaListener replicaListener, @@ -154,12 +173,18 @@ public class ReplicaManagerTest extends BaseIgniteAbstractTest { replicaManager.listen(BEFORE_REPLICA_STOPPED, removeReplicaListener); var groupId = new TablePartitionId(0, 0); + when(replicaListener.raftClient()).thenReturn(raftGroupService); + + String nodeName = testNodeName(testInfo, 0); + PeersAndLearners newConfiguration = PeersAndLearners.fromConsistentIds(Set.of(nodeName)); - CompletableFuture<Replica> startReplicaFuture = replicaManager.startReplica( + CompletableFuture<Boolean> startReplicaFuture = replicaManager.startReplica( groupId, - replicaListener, - raftGroupService, - new PendingComparableValuesTracker<>(0L) + newConfiguration, + (unused) -> { }, + (unused) -> replicaListener, + new PendingComparableValuesTracker<>(0L), + completedFuture(raftGroupService) ); assertThat(startReplicaFuture, willCompleteSuccessfully()); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index f419d3b45d..80051692c6 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -176,6 +176,7 @@ import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl; +import org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.test.WatchListenerInhibitor; import org.apache.ignite.internal.testframework.TestIgnitionManager; @@ -481,6 +482,9 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { clockService ); + ScheduledExecutorService rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE, + NamedThreadFactory.create(name, "test-rebalance-scheduler", logger())); + ReplicaManager replicaMgr = new ReplicaManager( name, clusterSvc, @@ -490,7 +494,11 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { placementDriverManager.placementDriver(), threadPoolsManager.partitionOperationsExecutor(), partitionIdleSafeTimePropagationPeriodMsSupplier, - failureProcessor + failureProcessor, + new ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry()), + topologyAwareRaftGroupServiceFactory, + raftMgr, + view -> new LocalLogStorageFactory() ); var resourcesRegistry = new RemotelyTriggeredResourceRegistry(); @@ -570,9 +578,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { var dataNodesMock = dataNodesMockByNode.get(idx); - ScheduledExecutorService rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE, - NamedThreadFactory.create(name, "test-rebalance-scheduler", logger())); - DistributionZoneManager distributionZoneManager = new DistributionZoneManager( name, registry, @@ -604,7 +609,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { messagingServiceReturningToStorageOperationsPool, clusterSvc.topologyService(), clusterSvc.serializationRegistry(), - raftMgr, replicaMgr, lockManager, replicaService, @@ -613,13 +617,12 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { storagePath, metaStorageMgr, schemaManager, - view -> new LocalLogStorageFactory(), threadPoolsManager.tableIoExecutor(), threadPoolsManager.partitionOperationsExecutor(), + rebalanceScheduler, hybridClock, clockService, new OutgoingSnapshotsManager(clusterSvc.messagingService()), - topologyAwareRaftGroupServiceFactory, distributionZoneManager, schemaSyncService, catalogManager, @@ -627,7 +630,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { placementDriverManager.placementDriver(), sqlRef::get, resourcesRegistry, - rebalanceScheduler, lowWatermark, transactionInflights ); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 3bc9aa2a6d..e966a158e3 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -156,6 +156,7 @@ import org.apache.ignite.internal.network.wrapper.JumpToExecutorByConsistentIdAf import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.placementdriver.PlacementDriverManager; import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.storage.LogStorageFactory; @@ -206,6 +207,7 @@ import org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOn import org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries; import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService; import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl; +import org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller; import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.tx.HybridTimestampTracker; @@ -630,6 +632,14 @@ public class IgniteImpl implements Ignite { LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = partitionIdleSafeTimePropagationPeriodMsSupplier(replicationConfig); + ScheduledExecutorService rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE, + NamedThreadFactory.create(name, "rebalance-scheduler", LOG)); + + // TODO: IGNITE-22222 this instantiation should be moved inside ReplicaManager's constructor + Marshaller raftMarshaller = new ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry()); + + volatileLogStorageFactoryCreator = new VolatileLogStorageFactoryCreator(name, workDir.resolve("volatile-log-spillout")); + replicaMgr = new ReplicaManager( name, clusterSvc, @@ -639,7 +649,11 @@ public class IgniteImpl implements Ignite { placementDriverMgr.placementDriver(), threadPoolsManager.partitionOperationsExecutor(), partitionIdleSafeTimePropagationPeriodMsSupplier, - failureProcessor + failureProcessor, + raftMarshaller, + topologyAwareRaftGroupServiceFactory, + raftMgr, + volatileLogStorageFactoryCreator ); metricManager.configure(clusterConfigRegistry.getConfiguration(MetricConfiguration.KEY)); @@ -670,8 +684,6 @@ public class IgniteImpl implements Ignite { nodeConfigRegistry.getConfiguration(StorageConfiguration.KEY) ); - volatileLogStorageFactoryCreator = new VolatileLogStorageFactoryCreator(name, workDir.resolve("volatile-log-spillout")); - outgoingSnapshotsManager = new OutgoingSnapshotsManager(name, clusterSvc.messagingService()); LongSupplier delayDurationMsSupplier = delayDurationMsSupplier(schemaSyncConfig); @@ -697,9 +709,6 @@ public class IgniteImpl implements Ignite { schemaManager = new SchemaManager(registry, catalogManager); - ScheduledExecutorService rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE, - NamedThreadFactory.create(name, "rebalance-scheduler", LOG)); - distributionZoneManager = new DistributionZoneManager( name, registry, @@ -771,7 +780,6 @@ public class IgniteImpl implements Ignite { messagingServiceReturningToStorageOperationsPool, clusterSvc.topologyService(), clusterSvc.serializationRegistry(), - raftMgr, replicaMgr, lockMgr, replicaSvc, @@ -780,13 +788,12 @@ public class IgniteImpl implements Ignite { storagePath, metaStorageMgr, schemaManager, - volatileLogStorageFactoryCreator, threadPoolsManager.tableIoExecutor(), threadPoolsManager.partitionOperationsExecutor(), + rebalanceScheduler, clock, clockService, outgoingSnapshotsManager, - topologyAwareRaftGroupServiceFactory, distributionZoneManager, schemaSyncService, catalogManager, @@ -794,7 +801,6 @@ public class IgniteImpl implements Ignite { placementDriverMgr.placementDriver(), this::bareSql, resourcesRegistry, - rebalanceScheduler, lowWatermark, transactionInflights ); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java index 92f1284085..fd14fd283b 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.distributed; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.distributed.ItTxTestCluster.NODE_PORT_BASE; +import static org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS; import static org.apache.ignite.internal.table.TxAbstractTest.startNode; import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; @@ -35,6 +36,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -45,6 +47,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; @@ -57,7 +61,12 @@ import org.apache.ignite.internal.manager.ComponentContext; import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.StaticNodeFinder; import org.apache.ignite.internal.placementdriver.TestPlacementDriver; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; +import org.apache.ignite.internal.raft.service.RaftCommandRunner; +import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory; import org.apache.ignite.internal.replicator.Replica; import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.replicator.ReplicaResult; @@ -67,8 +76,10 @@ import org.apache.ignite.internal.replicator.configuration.ReplicationConfigurat import org.apache.ignite.internal.replicator.exception.ReplicaStoppingException; import org.apache.ignite.internal.replicator.exception.ReplicationException; import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException; +import org.apache.ignite.internal.replicator.listener.ReplicaListener; import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.replicator.message.ReplicaResponse; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.Column; @@ -79,6 +90,7 @@ import org.apache.ignite.internal.table.distributed.TableMessagesFactory; import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest; import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; +import org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.tx.message.TxMessageGroup; @@ -126,8 +138,25 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest { private ExecutorService requestsExecutor; + private Loza raftManager; + + private TopologyAwareRaftGroupService raftClient; + + private final Function<BiFunction<ReplicaRequest, String, CompletableFuture<ReplicaResult>>, ReplicaListener> replicaListenerCreator = + (invokeImpl) -> new ReplicaListener() { + @Override + public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String senderId) { + return invokeImpl.apply(request, senderId); + } + + @Override + public RaftCommandRunner raftClient() { + return raftClient; + } + }; + @BeforeEach - public void setup() { + public void setup() throws NodeStoppingException { var networkAddress = new NetworkAddress(getLocalAddress(), NODE_PORT_BASE + 1); var nodeFinder = new StaticNodeFinder(List.of(networkAddress)); @@ -139,6 +168,10 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest { // This test is run without Meta storage. when(cmgManager.metaStorageNodes()).thenReturn(emptySetCompletedFuture()); + raftManager = mock(Loza.class); + raftClient = mock(TopologyAwareRaftGroupService.class); + when(raftManager.startRaftGroupService(any(), any(), any(), any())).thenReturn(completedFuture(raftClient)); + requestsExecutor = new ThreadPoolExecutor( 0, 5, 0, TimeUnit.SECONDS, @@ -151,6 +184,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest { clock, replicationConfiguration ); + replicaManager = new ReplicaManager( NODE_NAME, clusterService, @@ -159,7 +193,12 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest { Set.of(TableMessageGroup.class, TxMessageGroup.class), new TestPlacementDriver(clusterService.topologyService().localMember()), requestsExecutor, - new NoOpFailureProcessor() + () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS, + new NoOpFailureProcessor(), + mock(ThreadLocalPartitionCommandsMarshaller.class), + mock(TopologyAwareRaftGroupServiceFactory.class), + raftManager, + view -> new LocalLogStorageFactory() ); assertThat(replicaManager.startAsync(new ComponentContext()), willCompleteSuccessfully()); @@ -184,18 +223,27 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest { ReadWriteSingleRowReplicaRequest request = getRequest(tablePartitionId); + PeersAndLearners newConfiguration = PeersAndLearners.fromConsistentIds(Set.of(clusterNode.name())); + clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class, (message, sender, correlationId) -> { try { log.info("Replica msg " + message.getClass().getSimpleName()); + ReplicaListener listener = replicaListenerCreator.apply((req, senderId) -> { + ReplicaResponse response = replicaMessageFactory.replicaResponse() + .result(5) + .build(); + return completedFuture(new ReplicaResult(response, null)); + }); + replicaManager.startReplica( tablePartitionId, - (request0, senderId) -> completedFuture(new ReplicaResult(replicaMessageFactory.replicaResponse() - .result(5) - .build(), null)), - mock(TopologyAwareRaftGroupService.class), - new PendingComparableValuesTracker<>(0L) + newConfiguration, + (unused) -> { }, + (unused) -> listener, + new PendingComparableValuesTracker<>(0L), + completedFuture(mock(TopologyAwareRaftGroupService.class)) ); } catch (NodeStoppingException e) { throw new RuntimeException(e); @@ -297,16 +345,22 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest { TablePartitionId tablePartitionId = new TablePartitionId(1, 1); + PeersAndLearners newConfiguration = PeersAndLearners.fromConsistentIds(Set.of(clusterNode.name())); + clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class, (message, sender, correlationId) -> { runAsync(() -> { try { log.info("Replica msg " + message.getClass().getSimpleName()); + ReplicaListener listener = replicaListenerCreator.apply((r, id) -> new CompletableFuture<>()); + replicaManager.startReplica( tablePartitionId, - (request, senderId) -> new CompletableFuture<>(), - mock(TopologyAwareRaftGroupService.class), - new PendingComparableValuesTracker<>(0L) + newConfiguration, + (unused) -> { }, + (unused) -> listener, + new PendingComparableValuesTracker<>(0L), + completedFuture(mock(TopologyAwareRaftGroupService.class)) ); } catch (NodeStoppingException e) { throw new RuntimeException(e); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java index 2ea1aaeb48..15655475a0 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java @@ -30,6 +30,7 @@ import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThr import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn; import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; @@ -203,7 +204,7 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra Set.of() ); - assertThat(updateFuture, willCompleteSuccessfully()); + assertThat(updateFuture, willSucceedIn(60, SECONDS)); awaitPrimaryReplica(node0, partId); @@ -249,7 +250,7 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra Set.of(anotherPartId) ); - assertThat(updateFuture, willCompleteSuccessfully()); + assertThat(updateFuture, willSucceedIn(60, SECONDS)); awaitPrimaryReplica(node0, anotherPartId); @@ -302,7 +303,7 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra CompletableFuture<ReplicaMeta> awaitPrimaryReplicaFuture = node0.placementDriver() .awaitPrimaryReplica(new TablePartitionId(tableId, partId), node0.clock().now(), 60, SECONDS); - assertThat(awaitPrimaryReplicaFuture, willCompleteSuccessfully()); + assertThat(awaitPrimaryReplicaFuture, willSucceedIn(60, SECONDS)); } private void assertRealAssignments(IgniteImpl node0, int partId, Integer... expected) throws InterruptedException { @@ -330,7 +331,7 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra CompletableFuture<Void> insertFuture = keyValueView.putAsync(null, key, Tuple.create(of("val", i + offset))); try { - insertFuture.get(1000, MILLISECONDS); + insertFuture.get(10, SECONDS); Tuple value = keyValueView.get(null, key); assertNotNull(value); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 39453a5142..9306da8ddd 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -48,6 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.notNull; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -155,7 +156,6 @@ import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; -import org.apache.ignite.internal.raft.server.RaftGroupOptions; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.raft.storage.LogStorageFactory; import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory; @@ -188,6 +188,7 @@ import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService; import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl; +import org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.TestIgnitionManager; import org.apache.ignite.internal.testframework.WorkDirectory; @@ -820,9 +821,9 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { private void verifyThatRaftNodesAndReplicasWereStartedOnlyOnce() throws Exception { for (int i = 0; i < NODE_COUNT; i++) { verify(getNode(i).raftManager, timeout(AWAIT_TIMEOUT_MILLIS).times(1)) - .startRaftGroupNodeWithoutService(any(), any(), any(), any(), any(RaftGroupOptions.class)); + .startRaftGroupNode(any(), any(), any(), any(), any(), notNull(TopologyAwareRaftGroupServiceFactory.class)); verify(getNode(i).replicaManager, timeout(AWAIT_TIMEOUT_MILLIS).times(1)) - .startReplica(any(), any(), any(), any()); + .startReplica(any(), any(), any()); } } @@ -1194,6 +1195,9 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { lowWatermark ); + rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE, + NamedThreadFactory.create(name, "test-rebalance-scheduler", logger())); + replicaManager = spy(new ReplicaManager( name, clusterService, @@ -1203,7 +1207,11 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { placementDriver, threadPoolsManager.partitionOperationsExecutor(), partitionIdleSafeTimePropagationPeriodMsSupplier, - new NoOpFailureProcessor() + new NoOpFailureProcessor(), + new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()), + topologyAwareRaftGroupServiceFactory, + raftManager, + view -> new LocalLogStorageFactory() )); LongSupplier delayDurationMsSupplier = () -> 10L; @@ -1219,9 +1227,6 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { schemaSyncService = new SchemaSyncServiceImpl(metaStorageManager.clusterTime(), delayDurationMsSupplier); - rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE, - NamedThreadFactory.create(name, "test-rebalance-scheduler", logger())); - distributionZoneManager = new DistributionZoneManager( name, registry, @@ -1244,7 +1249,6 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { clusterService.messagingService(), clusterService.topologyService(), clusterService.serializationRegistry(), - raftManager, replicaManager, mock(LockManager.class), replicaSvc, @@ -1253,13 +1257,12 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { storagePath, metaStorageManager, schemaManager, - view -> new LocalLogStorageFactory(), threadPoolsManager.tableIoExecutor(), threadPoolsManager.partitionOperationsExecutor(), + rebalanceScheduler, clock, clockService, new OutgoingSnapshotsManager(clusterService.messagingService()), - topologyAwareRaftGroupServiceFactory, distributionZoneManager, schemaSyncService, catalogManager, @@ -1267,7 +1270,6 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { placementDriver, () -> mock(IgniteSql.class), resourcesRegistry, - rebalanceScheduler, lowWatermark, transactionInflights ) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index a239a046f4..ce5246a8f6 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -92,6 +92,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.IntSupplier; import java.util.function.LongFunction; import java.util.function.Supplier; @@ -141,23 +142,19 @@ import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry; import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner; -import org.apache.ignite.internal.raft.Loza; -import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupEventsListener; -import org.apache.ignite.internal.raft.RaftManager; -import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; -import org.apache.ignite.internal.raft.server.RaftGroupOptions; import org.apache.ignite.internal.raft.service.LeaderWithTerm; import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.raft.service.RaftGroupService; -import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator; +import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory; +import org.apache.ignite.internal.replicator.Replica; import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.listener.ReplicaListener; import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.schema.SchemaRegistry; import org.apache.ignite.internal.schema.configuration.GcConfiguration; @@ -171,7 +168,6 @@ import org.apache.ignite.internal.table.IgniteTablesInternal; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.LongPriorityQueue; import org.apache.ignite.internal.table.TableImpl; -import org.apache.ignite.internal.table.TableRaftService; import org.apache.ignite.internal.table.TableViewInternal; import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; import org.apache.ignite.internal.table.distributed.gc.MvGc; @@ -191,7 +187,6 @@ import org.apache.ignite.internal.table.distributed.schema.ExecutorInclinedSchem import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService; import org.apache.ignite.internal.table.distributed.schema.SchemaVersions; import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl; -import org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.table.distributed.storage.PartitionStorages; import org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl; @@ -223,7 +218,6 @@ import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.util.IgniteNameUtils; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.TopologyService; -import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.Table; import org.jetbrains.annotations.Nullable; @@ -246,9 +240,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { private final TopologyService topologyService; - /** Raft manager. */ - private final RaftManager raftMgr; - /** Replica manager. */ private final ReplicaManager replicaMgr; @@ -318,11 +309,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { /** Schema manager. */ private final SchemaManager schemaManager; - private final LogStorageFactoryCreator volatileLogStorageFactoryCreator; - - /** Executor for scheduling rebalance routine. */ - private final ScheduledExecutorService rebalanceScheduler; - /** Transaction state storage scheduled pool. */ private final ScheduledExecutorService txStateStorageScheduledPool; @@ -345,8 +331,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { private final OutgoingSnapshotsManager outgoingSnapshotsManager; - private final TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory; - private final DistributionZoneManager distributionZoneManager; private final SchemaSyncService executorInclinedSchemaSyncService; @@ -369,8 +353,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { private final LowWatermark lowWatermark; - private final Marshaller raftCommandsMarshaller; - private final HybridTimestampTracker observableTimestampTracker; /** Placement driver. */ @@ -397,6 +379,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { */ private final Executor partitionOperationsExecutor; + /** Executor for scheduling rebalance routine. */ + private final ScheduledExecutorService rebalanceScheduler; + /** Marshallers provider. */ private final ReflectionMarshallersProvider marshallers = new ReflectionMarshallersProvider(); @@ -426,23 +411,19 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { * @param gcConfig Garbage collector configuration. * @param txCfg Transaction configuration. * @param storageUpdateConfig Storage update handler configuration. - * @param raftMgr Raft manager. * @param replicaMgr Replica manager. * @param lockMgr Lock manager. * @param replicaSvc Replica service. * @param txManager Transaction manager. * @param dataStorageMgr Data storage manager. * @param schemaManager Schema manager. - * @param volatileLogStorageFactoryCreator Creator for {@link org.apache.ignite.internal.raft.storage.LogStorageFactory} for - * volatile tables. * @param ioExecutor Separate executor for IO operations like partition storage initialization or partition raft group meta data * persisting. * @param partitionOperationsExecutor Striped executor on which partition operations (potentially requiring I/O with storages) * will be executed. - * @param raftGroupServiceFactory Factory that is used for creation of raft group services for replication groups. + * @param rebalanceScheduler Executor for scheduling rebalance routine. * @param placementDriver Placement driver. * @param sql A supplier function that returns {@link IgniteSql}. - * @param rebalanceScheduler Executor for scheduling rebalance routine. * @param lowWatermark Low watermark. * @param transactionInflights Transaction inflights. */ @@ -455,7 +436,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { MessagingService messagingService, TopologyService topologyService, MessageSerializationRegistry messageSerializationRegistry, - RaftManager raftMgr, ReplicaManager replicaMgr, LockManager lockMgr, ReplicaService replicaSvc, @@ -464,13 +444,12 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { Path storagePath, MetaStorageManager metaStorageMgr, SchemaManager schemaManager, - LogStorageFactoryCreator volatileLogStorageFactoryCreator, ExecutorService ioExecutor, Executor partitionOperationsExecutor, + ScheduledExecutorService rebalanceScheduler, HybridClock clock, ClockService clockService, OutgoingSnapshotsManager outgoingSnapshotsManager, - TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory, DistributionZoneManager distributionZoneManager, SchemaSyncService schemaSyncService, CatalogService catalogService, @@ -478,12 +457,10 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { PlacementDriver placementDriver, Supplier<IgniteSql> sql, RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry, - ScheduledExecutorService rebalanceScheduler, LowWatermark lowWatermark, TransactionInflights transactionInflights ) { this.topologyService = topologyService; - this.raftMgr = raftMgr; this.replicaMgr = replicaMgr; this.lockMgr = lockMgr; this.replicaSvc = replicaSvc; @@ -491,20 +468,18 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { this.dataStorageMgr = dataStorageMgr; this.metaStorageMgr = metaStorageMgr; this.schemaManager = schemaManager; - this.volatileLogStorageFactoryCreator = volatileLogStorageFactoryCreator; this.ioExecutor = ioExecutor; this.partitionOperationsExecutor = partitionOperationsExecutor; + this.rebalanceScheduler = rebalanceScheduler; this.clock = clock; this.clockService = clockService; this.outgoingSnapshotsManager = outgoingSnapshotsManager; - this.raftGroupServiceFactory = raftGroupServiceFactory; this.distributionZoneManager = distributionZoneManager; this.catalogService = catalogService; this.observableTimestampTracker = observableTimestampTracker; this.sql = sql; this.storageUpdateConfig = storageUpdateConfig; this.remotelyTriggeredResourceRegistry = remotelyTriggeredResourceRegistry; - this.rebalanceScheduler = rebalanceScheduler; this.lowWatermark = lowWatermark; this.transactionInflights = transactionInflights; this.txCfg = txCfg; @@ -565,8 +540,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { mvGc = new MvGc(nodeName, gcConfig, lowWatermark); - raftCommandsMarshaller = new ThreadLocalPartitionCommandsMarshaller(messageSerializationRegistry); - partitionReplicatorNodeRecovery = new PartitionReplicatorNodeRecovery( metaStorageMgr, messagingService, @@ -581,7 +554,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { storagePath.resolve(TX_STATE_DIR), txStateStorageScheduledPool, txStateStoragePool, - raftMgr.getLogSyncer(), + replicaMgr.getLogSyncer(), TX_STATE_STORAGE_FLUSH_DELAY_SUPPLIER ); @@ -922,11 +895,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { storageUpdateConfig ); - Peer serverPeer = realConfiguration.peer(localNode().name()); - - var raftNodeId = localMemberAssignment == null ? null : new RaftNodeId(replicaGrpId, serverPeer); - - boolean shouldStartRaftListeners = localMemberAssignment != null && !((Loza) raftMgr).isStarted(raftNodeId); + boolean shouldStartRaftListeners = shouldStartRaftListeners(assignments, nonStableNodeAssignments); if (shouldStartRaftListeners) { ((InternalTableImpl) internalTbl).updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); @@ -934,6 +903,24 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { mvGc.addStorage(replicaGrpId, partitionUpdateHandlers.gcUpdateHandler); } + // TODO: will be removed in https://issues.apache.org/jira/browse/IGNITE-22315 + Supplier<RaftGroupService> getCachedRaftClient = () -> { + try { + // Return existing service if it's already started. + return internalTbl + .tableRaftService() + .partitionRaftGroupService(replicaGrpId.partitionId()); + } catch (IgniteInternalException e) { + // We use "IgniteInternalException" in accordance with the javadoc of "partitionRaftGroupService" method. + return null; + } + }; + + // TODO: will be removed in https://issues.apache.org/jira/browse/IGNITE-22218 + Consumer<RaftGroupService> updateTableRaftService = (raftClient) -> ((InternalTableImpl) internalTbl) + .tableRaftService() + .updateInternalTableRaftGroupService(partId, raftClient); + CompletableFuture<Boolean> startGroupFut; if (localMemberAssignment != null) { @@ -946,85 +933,88 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { ) : trueCompletedFuture(); - startGroupFut = shouldStartGroupFut.thenApplyAsync(startGroup -> inBusyLock(busyLock, () -> { + startGroupFut = shouldStartGroupFut.thenComposeAsync(startGroup -> inBusyLock(busyLock, () -> { + // (1) if partitionReplicatorNodeRecovery#shouldStartGroup fails -> do start nothing if (!startGroup) { - return false; + return falseCompletedFuture(); } - if (((Loza) raftMgr).isStarted(raftNodeId)) { + // (2) if replica already started => check force reset and finish the process + if (replicaMgr.isReplicaStarted(replicaGrpId)) { if (nonStableNodeAssignments != null && nonStableNodeAssignments.force()) { - ((Loza) raftMgr).resetPeers(raftNodeId, configurationFromAssignments(nonStableNodeAssignments.nodes())); + replicaMgr.resetPeers(replicaGrpId, configurationFromAssignments(nonStableNodeAssignments.nodes())); } - - return true; + return trueCompletedFuture(); } + // (3) Otherwise let's start replica manually + InternalTable internalTable = table.internalTable(); + + RaftGroupListener raftGroupListener = new PartitionListener( + txManager, + partitionDataStorage, + partitionUpdateHandlers.storageUpdateHandler, + partitionStorages.getTxStateStorage(), + safeTimeTracker, + storageIndexTracker, + catalogService, + table.schemaView(), + clockService + ); + + SnapshotStorageFactory snapshotStorageFactory = createSnapshotStorageFactory(replicaGrpId, + partitionUpdateHandlers, internalTable); + + Function<RaftGroupService, ReplicaListener> createListener = (raftClient) -> createReplicaListener( + replicaGrpId, + table, + safeTimeTracker, + partitionStorages.getMvPartitionStorage(), + partitionStorages.getTxStateStorage(), + partitionUpdateHandlers, + raftClient); + + RaftGroupEventsListener raftGroupEventsListener = createRaftGroupEventsListener(zoneId, replicaGrpId); + + MvTableStorage mvTableStorage = internalTable.storage(); + try { - startPartitionRaftGroupNode( - replicaGrpId, - raftNodeId, - newConfiguration, - safeTimeTracker, + var ret = replicaMgr.startReplica( + raftGroupEventsListener, + raftGroupListener, + mvTableStorage.isVolatile(), + snapshotStorageFactory, + updateTableRaftService, + createListener, storageIndexTracker, - table, - partitionStorages.getTxStateStorage(), - partitionDataStorage, - partitionUpdateHandlers, - zoneId - ); - - return true; - } catch (NodeStoppingException ex) { - throw new CompletionException(ex); + replicaGrpId, + newConfiguration); + return ret; + } catch (NodeStoppingException e) { + throw new AssertionError("Loza was stopped before Table manager", e); } }), ioExecutor); } else { + // TODO: will be removed after https://issues.apache.org/jira/browse/IGNITE-22315 + // (4) in case if node not in the assignments startGroupFut = falseCompletedFuture(); } startGroupFut - .thenComposeAsync(v -> inBusyLock(busyLock, () -> { - TableRaftService tableRaftService = table.internalTable().tableRaftService(); - - try { - // Return existing service if it's already started. - return completedFuture( - (TopologyAwareRaftGroupService) tableRaftService.partitionRaftGroupService(replicaGrpId.partitionId()) - ); - } catch (IgniteInternalException e) { - // We use "IgniteInternalException" in accordance with the javadoc of "partitionRaftGroupService" method. - try { - // TODO IGNITE-19614 This procedure takes 10 seconds if there's no majority online. - return raftMgr - .startRaftGroupService(replicaGrpId, newConfiguration, raftGroupServiceFactory, raftCommandsMarshaller); - } catch (NodeStoppingException ex) { - return failedFuture(ex); - } - } - }), ioExecutor) - .thenAcceptAsync(updatedRaftGroupService -> inBusyLock(busyLock, () -> { - ((InternalTableImpl) internalTbl).tableRaftService() - .updateInternalTableRaftGroupService(partId, updatedRaftGroupService); - - boolean startedRaftNode = startGroupFut.join(); - if (localMemberAssignment == null || !startedRaftNode || replicaMgr.isReplicaStarted(replicaGrpId)) { - return; + // TODO: the stage will be removed after https://issues.apache.org/jira/browse/IGNITE-22315 + .thenComposeAsync(isReplicaStarted -> inBusyLock(busyLock, () -> { + if (isReplicaStarted) { + return nullCompletedFuture(); } + CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut; try { - startReplicaWithNewListener( - replicaGrpId, - table, - safeTimeTracker, - storageIndexTracker, - partitionStorages.getMvPartitionStorage(), - partitionStorages.getTxStateStorage(), - partitionUpdateHandlers, - updatedRaftGroupService - ); - } catch (NodeStoppingException ex) { - throw new AssertionError("Loza was stopped before Table manager", ex); + newRaftClientFut = replicaMgr.startRaftClient( + replicaGrpId, newConfiguration, getCachedRaftClient); + } catch (NodeStoppingException e) { + throw new CompletionException(e); } + return newRaftClientFut.thenAccept(updateTableRaftService); }), ioExecutor) .whenComplete((res, ex) -> { if (ex != null) { @@ -1039,31 +1029,36 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { return resultFuture; } - private void startReplicaWithNewListener( - TablePartitionId replicaGrpId, - TableImpl table, - PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker, - PendingComparableValuesTracker<Long, Void> storageIndexTracker, - MvPartitionStorage mvPartitionStorage, - TxStateStorage txStatePartitionStorage, - PartitionUpdateHandlers partitionUpdateHandlers, - TopologyAwareRaftGroupService raftGroupService - ) throws NodeStoppingException { - PartitionReplicaListener listener = createReplicaListener( - replicaGrpId, - table, - safeTimeTracker, - mvPartitionStorage, - txStatePartitionStorage, - partitionUpdateHandlers, - raftGroupService - ); + private boolean shouldStartRaftListeners(Assignments assignments, @Nullable Assignments nonStableNodeAssignments) { + Set<Assignment> nodesForStarting = nonStableNodeAssignments == null + ? assignments.nodes() + : RebalanceUtil.subtract(nonStableNodeAssignments.nodes(), assignments.nodes()); + return nodesForStarting + .stream() + .anyMatch(assignment -> assignment.consistentId().equals(localNode().name())); + } + + private PartitionMover createPartitionMover(TablePartitionId replicaGrpId) { + return new PartitionMover(busyLock, () -> { + CompletableFuture<Replica> replicaFut = replicaMgr.replica(replicaGrpId); + if (replicaFut == null) { + return failedFuture(new IgniteInternalException("No such replica for partition " + replicaGrpId.partitionId() + + " in table " + replicaGrpId.tableId())); + } + return replicaFut.thenApply(Replica::raftClient); + }); + } + + private RaftGroupEventsListener createRaftGroupEventsListener(int zoneId, TablePartitionId replicaGrpId) { + PartitionMover partitionMover = createPartitionMover(replicaGrpId); - replicaMgr.startReplica( + return new RebalanceRaftGroupEventsListener( + metaStorageMgr, replicaGrpId, - listener, - raftGroupService, - storageIndexTracker + busyLock, + partitionMover, + rebalanceScheduler, + zoneId ); } @@ -1122,46 +1117,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { return new PartitionKey(internalTbl.tableId(), partId); } - private RaftGroupOptions groupOptionsForPartition( - MvTableStorage mvTableStorage, - TxStateTableStorage txStateTableStorage, - PartitionKey partitionKey, - PartitionUpdateHandlers partitionUpdateHandlers - ) { - RaftGroupOptions raftGroupOptions; - - if (mvTableStorage.isVolatile()) { - raftGroupOptions = RaftGroupOptions.forVolatileStores() - // TODO: use RaftManager interface, see https://issues.apache.org/jira/browse/IGNITE-18273 - .setLogStorageFactory(volatileLogStorageFactoryCreator.factory(((Loza) raftMgr).volatileRaft().logStorage().value())) - .raftMetaStorageFactory((groupId, raftOptions) -> new VolatileRaftMetaStorage()); - } else { - raftGroupOptions = RaftGroupOptions.forPersistentStores(); - } - - raftGroupOptions.snapshotStorageFactory(new PartitionSnapshotStorageFactory( - topologyService, - outgoingSnapshotsManager, - new PartitionAccessImpl( - partitionKey, - mvTableStorage, - txStateTableStorage, - mvGc, - partitionUpdateHandlers.indexUpdateHandler, - partitionUpdateHandlers.gcUpdateHandler, - fullStateTransferIndexChooser, - schemaManager.schemaRegistry(partitionKey.tableId()), - lowWatermark - ), - catalogService, - incomingSnapshotsExecutor - )); - - raftGroupOptions.commandsMarshaller(raftCommandsMarshaller); - - return raftGroupOptions; - } - @Override public void beforeNodeStop() { if (!beforeStopGuard.compareAndSet(false, true)) { @@ -1194,11 +1149,11 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { mvGc, fullStateTransferIndexChooser, sharedTxStateStorage, - () -> shutdownAndAwaitTermination(rebalanceScheduler, shutdownTimeoutSeconds, TimeUnit.SECONDS), () -> shutdownAndAwaitTermination(txStateStoragePool, shutdownTimeoutSeconds, TimeUnit.SECONDS), () -> shutdownAndAwaitTermination(txStateStorageScheduledPool, shutdownTimeoutSeconds, TimeUnit.SECONDS), () -> shutdownAndAwaitTermination(scanRequestExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS), () -> shutdownAndAwaitTermination(incomingSnapshotsExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS), + () -> shutdownAndAwaitTermination(rebalanceScheduler, shutdownTimeoutSeconds, TimeUnit.SECONDS), () -> shutdownAndAwaitTermination(streamerFlushExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS) ); } catch (Exception e) { @@ -1857,7 +1812,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { boolean isRecovery ) { ClusterNode localMember = localNode(); - RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new Peer(localNode().name())); boolean pendingAssignmentsAreForced = pendingAssignments.force(); Set<Assignment> pendingAssignmentsNodes = pendingAssignments.nodes(); @@ -1923,8 +1877,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { }), ioExecutor); } else { localServicesStartFuture = runAsync(() -> { - if (pendingAssignmentsAreForced && ((Loza) raftMgr).isStarted(raftNodeId)) { - ((Loza) raftMgr).resetPeers(raftNodeId, configurationFromAssignments(nonStableNodeAssignmentsFinal.nodes())); + if (pendingAssignmentsAreForced && replicaMgr.isReplicaStarted(replicaGrpId)) { + replicaMgr.resetPeers(replicaGrpId, configurationFromAssignments(nonStableNodeAssignmentsFinal.nodes())); } }, ioExecutor); } @@ -2047,55 +2001,29 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { }); } - private void startPartitionRaftGroupNode( + private SnapshotStorageFactory createSnapshotStorageFactory( TablePartitionId replicaGrpId, - RaftNodeId raftNodeId, - PeersAndLearners stableConfiguration, - PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker, - PendingComparableValuesTracker<Long, Void> storageIndexTracker, - TableImpl table, - TxStateStorage txStatePartitionStorage, - PartitionDataStorage partitionDataStorage, PartitionUpdateHandlers partitionUpdateHandlers, - int zoneId - ) throws NodeStoppingException { - InternalTable internalTable = table.internalTable(); - - RaftGroupOptions groupOptions = groupOptionsForPartition( - internalTable.storage(), - internalTable.txStateStorage(), - partitionKey(internalTable, replicaGrpId.partitionId()), - partitionUpdateHandlers - ); + InternalTable internalTable + ) { + PartitionKey partitionKey = partitionKey(internalTable, replicaGrpId.partitionId()); - RaftGroupListener raftGrpLsnr = new PartitionListener( - txManager, - partitionDataStorage, - partitionUpdateHandlers.storageUpdateHandler, - txStatePartitionStorage, - safeTimeTracker, - storageIndexTracker, + return new PartitionSnapshotStorageFactory( + topologyService, + outgoingSnapshotsManager, + new PartitionAccessImpl( + partitionKey, + internalTable.storage(), + internalTable.txStateStorage(), + mvGc, + partitionUpdateHandlers.indexUpdateHandler, + partitionUpdateHandlers.gcUpdateHandler, + fullStateTransferIndexChooser, + schemaManager.schemaRegistry(partitionKey.tableId()), + lowWatermark + ), catalogService, - table.schemaView(), - clockService - ); - - RaftGroupEventsListener raftGrpEvtsLsnr = new RebalanceRaftGroupEventsListener( - metaStorageMgr, - replicaGrpId, - busyLock, - createPartitionMover(internalTable, replicaGrpId.partitionId()), - rebalanceScheduler, - zoneId - ); - - // TODO: use RaftManager interface, see https://issues.apache.org/jira/browse/IGNITE-18273 - ((Loza) raftMgr).startRaftGroupNodeWithoutService( - raftNodeId, - stableConfiguration, - raftGrpLsnr, - raftGrpEvtsLsnr, - groupOptions + incomingSnapshotsExecutor ); } @@ -2169,10 +2097,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { }; } - private PartitionMover createPartitionMover(InternalTable internalTable, int partId) { - return new PartitionMover(busyLock, () -> internalTable.tableRaftService().partitionRaftGroupService(partId)); - } - private static PeersAndLearners configurationFromAssignments(Collection<Assignment> assignments) { var peers = new HashSet<String>(); var learners = new HashSet<String>(); @@ -2382,15 +2306,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { } return stopReplicaFuture - .thenCompose(v -> { - try { - raftMgr.stopRaftNodes(tablePartitionId); - } catch (NodeStoppingException ignored) { - // No-op. - } - - return mvGc.removeStorage(tablePartitionId); - }); + .thenCompose(v -> mvGc.removeStorage(tablePartitionId)); } private CompletableFuture<Void> destroyPartitionStorages(TablePartitionId tablePartitionId, TableImpl table) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index b38ddbc09c..406954e831 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -91,6 +91,7 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.raft.Command; +import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner; import org.apache.ignite.internal.raft.service.RaftCommandRunner; import org.apache.ignite.internal.replicator.ReplicaResult; import org.apache.ignite.internal.replicator.TablePartitionId; @@ -452,6 +453,15 @@ public class PartitionReplicaListener implements ReplicaListener { }); } + /** Returns Raft-client. */ + @Override + public RaftCommandRunner raftClient() { + if (raftClient instanceof ExecutorInclinedRaftCommandRunner) { + return ((ExecutorInclinedRaftCommandRunner) raftClient).decoratedCommandRunner(); + } + return raftClient; + } + private CompletableFuture<?> processRequest(ReplicaRequest request, @Nullable Boolean isPrimary, String senderId, @Nullable Long leaseStartTime) { if (request instanceof SchemaVersionAwareReplicaRequest) { diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java index 261eb1461a..ce75a0e26a 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table.distributed; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed; @@ -66,7 +67,7 @@ class PartitionMoverTest extends BaseIgniteAbstractTest { .thenReturn(failedFuture(new IOException())) .thenReturn(nullCompletedFuture()); - var partitionMover = new PartitionMover(new IgniteSpinBusyLock(), () -> raftService); + var partitionMover = new PartitionMover(new IgniteSpinBusyLock(), () -> completedFuture(raftService)); assertThat(partitionMover.movePartition(PEERS_AND_LEARNERS, TERM), willCompleteSuccessfully()); @@ -77,7 +78,9 @@ class PartitionMoverTest extends BaseIgniteAbstractTest { public void testComponentStop() { var lock = new IgniteSpinBusyLock(); - var partitionMover = new PartitionMover(lock, () -> mock(RaftGroupService.class)); + RaftGroupService raftService = mock(RaftGroupService.class); + + var partitionMover = new PartitionMover(lock, () -> completedFuture(raftService)); lock.block(); @@ -93,7 +96,7 @@ class PartitionMoverTest extends BaseIgniteAbstractTest { when(raftService.changePeersAsync(any(), anyLong())) .then(invocation -> CompletableFuture.runAsync(lock::block)); - var partitionMover = new PartitionMover(lock, () -> raftService); + var partitionMover = new PartitionMover(lock, () -> completedFuture(raftService)); assertThat(partitionMover.movePartition(PEERS_AND_LEARNERS, TERM), willThrowWithCauseOrSuppressed(NodeStoppingException.class)); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java index d8233857e3..0a857939f1 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java @@ -26,6 +26,7 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ; import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE; import static org.apache.ignite.internal.util.CompletableFutures.emptySetCompletedFuture; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.closeAll; import static org.apache.ignite.internal.util.IgniteUtils.startAsync; @@ -87,9 +88,7 @@ import org.apache.ignite.internal.placementdriver.TestPlacementDriver; import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.service.RaftGroupService; -import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory; import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaManager; @@ -277,6 +276,12 @@ public class TableManagerRecoveryTest extends IgniteAbstractTest { when(topologyService.localMember()).thenReturn(node); when(distributionZoneManager.dataNodes(anyLong(), anyInt(), anyInt())).thenReturn(emptySetCompletedFuture()); + when(replicaMgr.getLogSyncer()).thenReturn(mock(LogSyncer.class)); + when(replicaMgr.startReplica(any(), any(), any(), any(), any(), any())) + .thenReturn(nullCompletedFuture()); + // TODO: will be removed after https://issues.apache.org/jira/browse/IGNITE-22315 + when(replicaMgr.startRaftClient(any(), any(), any())) + .thenReturn(completedFuture(mock(TopologyAwareRaftGroupService.class))); when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture()); try (MockedStatic<SchemaUtils> schemaServiceMock = mockStatic(SchemaUtils.class)) { @@ -314,7 +319,6 @@ public class TableManagerRecoveryTest extends IgniteAbstractTest { clusterService.messagingService(), clusterService.topologyService(), clusterService.serializationRegistry(), - rm, replicaMgr, null, null, @@ -323,13 +327,12 @@ public class TableManagerRecoveryTest extends IgniteAbstractTest { workDir, metaStorageManager, sm = new SchemaManager(revisionUpdater, catalogManager), - budgetView -> new LocalLogStorageFactory(), partitionOperationsExecutor, partitionOperationsExecutor, + mock(ScheduledExecutorService.class), clock, clockService, new OutgoingSnapshotsManager(clusterService.messagingService()), - mock(TopologyAwareRaftGroupServiceFactory.class), distributionZoneManager, new AlwaysSyncedSchemaSyncService(), catalogManager, @@ -337,7 +340,6 @@ public class TableManagerRecoveryTest extends IgniteAbstractTest { placementDriver, () -> mock(IgniteSql.class), new RemotelyTriggeredResourceRegistry(), - mock(ScheduledExecutorService.class), lowWatermark, new TransactionInflights(placementDriver, clockService) ) { diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index dcfad44ebc..9c80784c5f 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -43,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.atMost; @@ -102,11 +103,7 @@ import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.placementdriver.TestPlacementDriver; import org.apache.ignite.internal.raft.Loza; -import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; -import org.apache.ignite.internal.raft.service.RaftGroupService; -import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory; import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaManager; @@ -281,6 +278,11 @@ public class TableManagerTest extends IgniteAbstractTest { when(distributionZoneManager.dataNodes(anyLong(), anyInt(), anyInt())).thenReturn(emptySetCompletedFuture()); + when(replicaMgr.startReplica(any(), any(), anyBoolean(), any(), any(), any(), any(), any(), any())) + .thenReturn(trueCompletedFuture()); + // TODO: will be removed after https://issues.apache.org/jira/browse/IGNITE-22315 + when(replicaMgr.startRaftClient(any(), any(), any())) + .thenReturn(completedFuture(mock(TopologyAwareRaftGroupService.class))); when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture()); tblManagerFut = new CompletableFuture<>(); @@ -556,8 +558,6 @@ public class TableManagerTest extends IgniteAbstractTest { private IgniteBiTuple<TableViewInternal, TableManager> startTableManagerStopTest() throws Exception { TableViewInternal table = mockManagersAndCreateTable(DYNAMIC_TABLE_FOR_DROP_NAME, tblManagerFut); - verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any(), any(), any()); - TableManager tableManager = tblManagerFut.join(); return new IgniteBiTuple<>(table, tableManager); @@ -569,7 +569,6 @@ public class TableManagerTest extends IgniteAbstractTest { tableManager.beforeNodeStop(); assertThat(tableManager.stopAsync(new ComponentContext()), willCompleteSuccessfully()); - verify(rm, times(PARTITIONS)).stopRaftNodes(any()); verify(replicaMgr, times(PARTITIONS)).stopReplica(any()); verify(table.internalTable().storage()).close(); @@ -716,15 +715,6 @@ public class TableManagerTest extends IgniteAbstractTest { ) throws Exception { String consistentId = "node0"; - when(rm.startRaftGroupService(any(), any(), any(), any())).thenAnswer(mock -> { - RaftGroupService raftGrpSrvcMock = mock(TopologyAwareRaftGroupService.class); - - when(raftGrpSrvcMock.leader()).thenReturn(new Peer(consistentId)); - - return completedFuture(raftGrpSrvcMock); - }); - - // TODO: useless code https://issues.apache.org/jira/browse/IGNITE-22388 when(ts.getByConsistentId(any())).thenReturn(new ClusterNodeImpl( UUID.randomUUID().toString(), consistentId, @@ -795,7 +785,6 @@ public class TableManagerTest extends IgniteAbstractTest { clusterService.messagingService(), clusterService.topologyService(), clusterService.serializationRegistry(), - rm, replicaMgr, null, null, @@ -804,13 +793,12 @@ public class TableManagerTest extends IgniteAbstractTest { workDir, msm, sm = new SchemaManager(revisionUpdater, catalogManager), - budgetView -> new LocalLogStorageFactory(), partitionOperationsExecutor, partitionOperationsExecutor, + mock(ScheduledExecutorService.class), clock, new TestClockService(clock), new OutgoingSnapshotsManager(clusterService.messagingService()), - mock(TopologyAwareRaftGroupServiceFactory.class), distributionZoneManager, new AlwaysSyncedSchemaSyncService(), catalogManager, @@ -818,7 +806,6 @@ public class TableManagerTest extends IgniteAbstractTest { new TestPlacementDriver(node), () -> mock(IgniteSql.class), new RemotelyTriggeredResourceRegistry(), - mock(ScheduledExecutorService.class), lowWatermark, mock(TransactionInflights.class) ) { diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index 84030d69bf..d242bf8be0 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -100,6 +100,7 @@ import org.apache.ignite.internal.raft.TestLozaFactory; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator; import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.TablePartitionId; @@ -384,8 +385,10 @@ public class ItTxTestCluster { HybridClock clock = new HybridClockImpl(); TestClockService clockService = new TestClockService(clock); - clocks.put(node.name(), clock); - clockServices.put(node.name(), clockService); + String nodeName = node.name(); + + clocks.put(nodeName, clock); + clockServices.put(nodeName, clockService); var raftSrv = TestLozaFactory.create( clusterService, @@ -396,27 +399,41 @@ public class ItTxTestCluster { assertThat(raftSrv.startAsync(new ComponentContext()), willCompleteSuccessfully()); - raftServers.put(node.name(), raftSrv); + raftServers.put(nodeName, raftSrv); var cmgManager = mock(ClusterManagementGroupManager.class); // This test is run without Meta storage. when(cmgManager.metaStorageNodes()).thenReturn(emptySetCompletedFuture()); + var commandMarshaller = new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()); + + var raftClientFactory = new TopologyAwareRaftGroupServiceFactory( + clusterService, + logicalTopologyService(clusterService), + Loza.FACTORY, + new RaftGroupEventsClientListener() + ); + ReplicaManager replicaMgr = new ReplicaManager( - node.name(), + nodeName, clusterService, cmgManager, clockService, Set.of(TableMessageGroup.class, TxMessageGroup.class), placementDriver, partitionOperationsExecutor, - new NoOpFailureProcessor() + () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS, + new NoOpFailureProcessor(), + commandMarshaller, + raftClientFactory, + raftSrv, + new VolatileLogStorageFactoryCreator(nodeName, workDir.resolve("volatile-log-spillout")) ); assertThat(replicaMgr.startAsync(new ComponentContext()), willCompleteSuccessfully()); - replicaManagers.put(node.name(), replicaMgr); + replicaManagers.put(nodeName, replicaMgr); LOG.info("Replica manager has been started, node=[" + node + ']'); @@ -428,15 +445,15 @@ public class ItTxTestCluster { executor )); - replicaServices.put(node.name(), replicaSvc); + replicaServices.put(nodeName, replicaSvc); var resourcesRegistry = new RemotelyTriggeredResourceRegistry(); TransactionInflights transactionInflights = new TransactionInflights(placementDriver, clockService); - txInflights.put(node.name(), transactionInflights); + txInflights.put(nodeName, transactionInflights); - cursorRegistries.put(node.name(), resourcesRegistry); + cursorRegistries.put(nodeName, resourcesRegistry); TxManagerImpl txMgr = newTxManager( clusterService, @@ -451,7 +468,7 @@ public class ItTxTestCluster { ); ResourceVacuumManager resourceVacuumManager = new ResourceVacuumManager( - node.name(), + nodeName, resourcesRegistry, clusterService.topologyService(), clusterService.messagingService(), @@ -460,12 +477,12 @@ public class ItTxTestCluster { ); assertThat(txMgr.startAsync(new ComponentContext()), willCompleteSuccessfully()); - txManagers.put(node.name(), txMgr); + txManagers.put(nodeName, txMgr); assertThat(resourceVacuumManager.startAsync(new ComponentContext()), willCompleteSuccessfully()); - resourceCleanupManagers.put(node.name(), resourceVacuumManager); + resourceCleanupManagers.put(nodeName, resourceVacuumManager); - txStateStorages.put(node.name(), new TestTxStateStorage()); + txStateStorages.put(nodeName, new TestTxStateStorage()); } LOG.info("Raft servers have been started"); @@ -689,9 +706,8 @@ public class ItTxTestCluster { replicaManagers.get(assignment).startReplica( new TablePartitionId(tableId, partId), - listener, - raftSvc, - storageIndexTracker + storageIndexTracker, + completedFuture(listener) ); } catch (NodeStoppingException e) { fail("Unexpected node stopping", e);