kgusakov commented on code in PR #3633:
URL: https://github.com/apache/ignite-3/pull/3633#discussion_r1591309855


##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -465,29 +461,101 @@ private void stopLeaseProlongation(ReplicationGroupId 
groupId, @Nullable String
         });
     }
 
+    public CompletableFuture<Replica> getReplica(ReplicationGroupId 
replicationGroupId) {

Review Comment:
   No usages?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -465,29 +461,101 @@ private void stopLeaseProlongation(ReplicationGroupId 
groupId, @Nullable String
         });
     }
 
+    public CompletableFuture<Replica> getReplica(ReplicationGroupId 
replicationGroupId) {
+        return replicas.get(replicationGroupId);
+    }
+
+    public boolean isRaftClientStarted(RaftNodeId raftNodeId) {
+        return ((Loza) raftManager).isStarted(raftNodeId);
+    }
+
+    public void resetPeers(RaftNodeId raftNodeId, PeersAndLearners 
peersAndLearners) {
+        ((Loza) raftManager).resetPeers(raftNodeId, peersAndLearners);
+    }
+
+    public LogSyncer getLogSyncer() {
+        return raftManager.getLogSyncer();
+    }
+
+    /**
+     * TODO.
+     *
+     * @param isVolatileStorage TODO
+     * @param volatileLogStorageFactoryCreator TODO
+     * @return TODO
+     */
+    public RaftGroupOptions createRaftGroupOptions(boolean isVolatileStorage, 
LogStorageFactoryCreator volatileLogStorageFactoryCreator) {
+        if (isVolatileStorage) {
+            LogStorageBudgetView view = ((Loza) 
raftManager).volatileRaft().logStorage().value();
+            return RaftGroupOptions.forVolatileStores()
+                    // TODO: use RaftManager interface, see 
https://issues.apache.org/jira/browse/IGNITE-18273
+                    
.setLogStorageFactory(volatileLogStorageFactoryCreator.factory(view))
+                    .raftMetaStorageFactory((groupId, raftOptions) -> new 
VolatileRaftMetaStorage());
+        } else {
+            return RaftGroupOptions.forPersistentStores();
+        }
+    }
+
+    /**
+     * TODO.
+     *
+     * @param groupOptions TODO
+     * @param raftGrpLsnr TODO
+     * @param raftGrpEvtsLsnr TODO
+     * @param raftNodeId TODO
+     * @param stableConfiguration TODO
+     * @throws NodeStoppingException TODO
+     */
+    public void startPartitionRaftGroupNode(

Review Comment:
   Why we need this direct method? Can we do it internally on startReplica 
instead? It was the main idea - incapsulate as many RAFT logic as we can to the 
Replica(Manager) layer



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -949,9 +934,9 @@ private CompletableFuture<Void> 
startPartitionAndStartClient(
                     return false;
                 }
 
-                if (((Loza) raftMgr).isStarted(raftNodeId)) {
+                if (replicaMgr.isRaftClientStarted(raftNodeId)) {

Review Comment:
   I think the whole if can be replaced by the method like 
replicaMgr.forcePeersUpdateIfNeeded(...) - again, it will encapsulate some raft 
logic to Replica layer.



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/ExecutorInclinedRaftCommandRunner.java:
##########
@@ -46,4 +46,8 @@ public <R> CompletableFuture<R> run(Command cmd) {
 
         return future.thenApplyAsync(identity(), completionExecutor);
     }
+
+    public RaftCommandRunner decoratedCommandRunner() {

Review Comment:
   What's the purpose of this decorator? 



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -128,6 +126,11 @@ public Replica(
         raftClient.subscribeLeader(this::onLeaderElected);
     }
 
+    public final TopologyAwareRaftGroupService raftClient() {

Review Comment:
   No usages and public?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -119,7 +117,7 @@ public Replica(
         this.replicaGrpId = replicaGrpId;
         this.listener = listener;
         this.storageIndexTracker = storageIndexTracker;
-        this.raftClient = raftClient;
+        this.raftClient = raftClient();

Review Comment:
   Why not simple listener.raftClient()? 



##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -611,6 +613,7 @@ public class IgniteImpl implements Ignite {
 
         LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = 
partitionIdleSafeTimePropagationPeriodMsSupplier(replicationConfig);
 
+        Marshaller raftMarshaller = new 
ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry());

Review Comment:
   I think it will be better to encapsulate this logic in ReplicaManager itself 
(as it was earlier in TableManager)



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -497,41 +565,76 @@ public CompletableFuture<Replica> startReplica(
      * Internal method for starting a replica.
      *
      * @param replicaGrpId Replication group id.
-     * @param listener Replica listener.
-     * @param raftClient Topology aware Raft client.
+     * @param newConfiguration TODO
+     * @param createListener TODO
      * @param storageIndexTracker Storage index tracker.
      */
     private CompletableFuture<Replica> startReplicaInternal(
+            // TODO: nonsense name
+            boolean shouldSkipReplicaStarting,
             ReplicationGroupId replicaGrpId,
-            ReplicaListener listener,
-            TopologyAwareRaftGroupService raftClient,
+            PeersAndLearners newConfiguration,
+            Supplier<RaftGroupService> raftClientCache,
+            Function<RaftGroupService, ReplicaListener> createListener,
             PendingComparableValuesTracker<Long, Void> storageIndexTracker
-    ) {
+    ) throws NodeStoppingException {
         LOG.info("Replica is about to start [replicationGroupId={}].", 
replicaGrpId);
 
+        var raftClient = raftClientCache.get();
+        CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut;
+        if (raftClient == null) {
+            newRaftClientFut = createRaftClientAsync(replicaGrpId, 
newConfiguration);
+        } else {
+            newRaftClientFut = 
CompletableFuture.completedFuture((TopologyAwareRaftGroupService) raftClient);
+        }
+
+        // TODO: should be there for now because in TableManager:L978-990 
there passing TableRaftService's updating
+        CompletableFuture<ReplicaListener> newReplicaListenerFut = 
newRaftClientFut.thenApply(createListener);
+
+        if (shouldSkipReplicaStarting) {
+            return nullCompletedFuture();
+        }
+
+
+        return temporalInternalCreateReplica(replicaGrpId, 
storageIndexTracker, newReplicaListenerFut);
+    }
+
+    /**
+     * TODO.
+     *
+     * @param replicaGrpId TODO
+     * @param storageIndexTracker TODO
+     * @param newReplicaListenerFut TODO
+     * @return TODO
+     */
+    public CompletableFuture<Replica> temporalInternalCreateReplica(
+            ReplicationGroupId replicaGrpId,
+            PendingComparableValuesTracker<Long, Void> storageIndexTracker,
+            CompletableFuture<ReplicaListener> newReplicaListenerFut
+    ) {
+
         ClusterNode localNode = clusterNetSvc.topologyService().localMember();
 
-        Replica newReplica = new Replica(
+        CompletableFuture<Replica> newReplicaFut = 
newReplicaListenerFut.thenApply(listener -> new Replica(
                 replicaGrpId,
                 listener,
                 storageIndexTracker,
-                raftClient,
                 localNode,
                 executor,
                 placementDriver,
-                clockService
-        );
+                clockService));
 
         CompletableFuture<Replica> replicaFuture = 
replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> {
             if (existingReplicaFuture == null || 
existingReplicaFuture.isDone()) {
                 assert existingReplicaFuture == null || 
isCompletedSuccessfully(existingReplicaFuture);
                 LOG.info("Replica is started [replicationGroupId={}].", 
replicaGrpId);
 
-                return completedFuture(newReplica);
+                return newReplicaFut;
             } else {
-                existingReplicaFuture.complete(newReplica);
                 LOG.info("Replica is started, existing replica waiter was 
completed [replicationGroupId={}].", replicaGrpId);
 
+                existingReplicaFuture.complete(newReplicaFut.join());

Review Comment:
   Joint must be replaced by future chaining



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to