kgusakov commented on code in PR #3633: URL: https://github.com/apache/ignite-3/pull/3633#discussion_r1591309855
########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java: ########## @@ -465,29 +461,101 @@ private void stopLeaseProlongation(ReplicationGroupId groupId, @Nullable String }); } + public CompletableFuture<Replica> getReplica(ReplicationGroupId replicationGroupId) { Review Comment: No usages? ########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java: ########## @@ -465,29 +461,101 @@ private void stopLeaseProlongation(ReplicationGroupId groupId, @Nullable String }); } + public CompletableFuture<Replica> getReplica(ReplicationGroupId replicationGroupId) { + return replicas.get(replicationGroupId); + } + + public boolean isRaftClientStarted(RaftNodeId raftNodeId) { + return ((Loza) raftManager).isStarted(raftNodeId); + } + + public void resetPeers(RaftNodeId raftNodeId, PeersAndLearners peersAndLearners) { + ((Loza) raftManager).resetPeers(raftNodeId, peersAndLearners); + } + + public LogSyncer getLogSyncer() { + return raftManager.getLogSyncer(); + } + + /** + * TODO. + * + * @param isVolatileStorage TODO + * @param volatileLogStorageFactoryCreator TODO + * @return TODO + */ + public RaftGroupOptions createRaftGroupOptions(boolean isVolatileStorage, LogStorageFactoryCreator volatileLogStorageFactoryCreator) { + if (isVolatileStorage) { + LogStorageBudgetView view = ((Loza) raftManager).volatileRaft().logStorage().value(); + return RaftGroupOptions.forVolatileStores() + // TODO: use RaftManager interface, see https://issues.apache.org/jira/browse/IGNITE-18273 + .setLogStorageFactory(volatileLogStorageFactoryCreator.factory(view)) + .raftMetaStorageFactory((groupId, raftOptions) -> new VolatileRaftMetaStorage()); + } else { + return RaftGroupOptions.forPersistentStores(); + } + } + + /** + * TODO. + * + * @param groupOptions TODO + * @param raftGrpLsnr TODO + * @param raftGrpEvtsLsnr TODO + * @param raftNodeId TODO + * @param stableConfiguration TODO + * @throws NodeStoppingException TODO + */ + public void startPartitionRaftGroupNode( Review Comment: Why we need this direct method? Can we do it internally on startReplica instead? It was the main idea - incapsulate as many RAFT logic as we can to the Replica(Manager) layer ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -949,9 +934,9 @@ private CompletableFuture<Void> startPartitionAndStartClient( return false; } - if (((Loza) raftMgr).isStarted(raftNodeId)) { + if (replicaMgr.isRaftClientStarted(raftNodeId)) { Review Comment: I think the whole if can be replaced by the method like replicaMgr.forcePeersUpdateIfNeeded(...) - again, it will encapsulate some raft logic to Replica layer. ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/ExecutorInclinedRaftCommandRunner.java: ########## @@ -46,4 +46,8 @@ public <R> CompletableFuture<R> run(Command cmd) { return future.thenApplyAsync(identity(), completionExecutor); } + + public RaftCommandRunner decoratedCommandRunner() { Review Comment: What's the purpose of this decorator? ########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java: ########## @@ -128,6 +126,11 @@ public Replica( raftClient.subscribeLeader(this::onLeaderElected); } + public final TopologyAwareRaftGroupService raftClient() { Review Comment: No usages and public? ########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java: ########## @@ -119,7 +117,7 @@ public Replica( this.replicaGrpId = replicaGrpId; this.listener = listener; this.storageIndexTracker = storageIndexTracker; - this.raftClient = raftClient; + this.raftClient = raftClient(); Review Comment: Why not simple listener.raftClient()? ########## modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java: ########## @@ -611,6 +613,7 @@ public class IgniteImpl implements Ignite { LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = partitionIdleSafeTimePropagationPeriodMsSupplier(replicationConfig); + Marshaller raftMarshaller = new ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry()); Review Comment: I think it will be better to encapsulate this logic in ReplicaManager itself (as it was earlier in TableManager) ########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java: ########## @@ -497,41 +565,76 @@ public CompletableFuture<Replica> startReplica( * Internal method for starting a replica. * * @param replicaGrpId Replication group id. - * @param listener Replica listener. - * @param raftClient Topology aware Raft client. + * @param newConfiguration TODO + * @param createListener TODO * @param storageIndexTracker Storage index tracker. */ private CompletableFuture<Replica> startReplicaInternal( + // TODO: nonsense name + boolean shouldSkipReplicaStarting, ReplicationGroupId replicaGrpId, - ReplicaListener listener, - TopologyAwareRaftGroupService raftClient, + PeersAndLearners newConfiguration, + Supplier<RaftGroupService> raftClientCache, + Function<RaftGroupService, ReplicaListener> createListener, PendingComparableValuesTracker<Long, Void> storageIndexTracker - ) { + ) throws NodeStoppingException { LOG.info("Replica is about to start [replicationGroupId={}].", replicaGrpId); + var raftClient = raftClientCache.get(); + CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut; + if (raftClient == null) { + newRaftClientFut = createRaftClientAsync(replicaGrpId, newConfiguration); + } else { + newRaftClientFut = CompletableFuture.completedFuture((TopologyAwareRaftGroupService) raftClient); + } + + // TODO: should be there for now because in TableManager:L978-990 there passing TableRaftService's updating + CompletableFuture<ReplicaListener> newReplicaListenerFut = newRaftClientFut.thenApply(createListener); + + if (shouldSkipReplicaStarting) { + return nullCompletedFuture(); + } + + + return temporalInternalCreateReplica(replicaGrpId, storageIndexTracker, newReplicaListenerFut); + } + + /** + * TODO. + * + * @param replicaGrpId TODO + * @param storageIndexTracker TODO + * @param newReplicaListenerFut TODO + * @return TODO + */ + public CompletableFuture<Replica> temporalInternalCreateReplica( + ReplicationGroupId replicaGrpId, + PendingComparableValuesTracker<Long, Void> storageIndexTracker, + CompletableFuture<ReplicaListener> newReplicaListenerFut + ) { + ClusterNode localNode = clusterNetSvc.topologyService().localMember(); - Replica newReplica = new Replica( + CompletableFuture<Replica> newReplicaFut = newReplicaListenerFut.thenApply(listener -> new Replica( replicaGrpId, listener, storageIndexTracker, - raftClient, localNode, executor, placementDriver, - clockService - ); + clockService)); CompletableFuture<Replica> replicaFuture = replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> { if (existingReplicaFuture == null || existingReplicaFuture.isDone()) { assert existingReplicaFuture == null || isCompletedSuccessfully(existingReplicaFuture); LOG.info("Replica is started [replicationGroupId={}].", replicaGrpId); - return completedFuture(newReplica); + return newReplicaFut; } else { - existingReplicaFuture.complete(newReplica); LOG.info("Replica is started, existing replica waiter was completed [replicationGroupId={}].", replicaGrpId); + existingReplicaFuture.complete(newReplicaFut.join()); Review Comment: Joint must be replaced by future chaining -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org