sashapolo commented on code in PR #1490:
URL: https://github.com/apache/ignite-3/pull/1490#discussion_r1082313704


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -159,54 +135,164 @@ public MetaStorageManagerImpl(
         this.storage = storage;
     }
 
-    private CompletableFuture<MetaStorageService> 
initializeMetaStorage(Set<String> metaStorageNodes) {
+    private CompletableFuture<MetaStorageServiceImpl> 
initializeMetaStorage(Set<String> metaStorageNodes) {
         ClusterNode thisNode = clusterService.topologyService().localMember();
 
-        PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
-
-        Peer localPeer = configuration.peer(thisNode.name());
+        String thisNodeName = thisNode.name();
 
         CompletableFuture<RaftGroupService> raftServiceFuture;
 
         try {
-            if (localPeer == null) {
-                raftServiceFuture = 
raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration);
-            } else {
-                clusterService.topologyService().addEventHandler(new 
TopologyEventHandler() {
-                    @Override
-                    public void onDisappeared(ClusterNode member) {
-                        metaStorageSvcFut.thenAccept(svc -> 
svc.closeCursors(member.id()));
-                    }
-                });
+            // We need to configure the replication protocol differently 
whether this node is a synchronous or asynchronous replica.
+            if (metaStorageNodes.contains(thisNodeName)) {
+                PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
+
+                Peer localPeer = configuration.peer(thisNodeName);
 
-                storage.start();
+                assert localPeer != null;
 
                 raftServiceFuture = raftMgr.startRaftGroupNode(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage),
+                        new RaftGroupEventsListener() {
+                            @Override
+                            public void onLeaderElected(long term) {
+                                // TODO: add listener to remove learners when 
they leave Logical Topology
+                                //  see 
https://issues.apache.org/jira/browse/IGNITE-18554
+
+                                registerTopologyEventListener();
+
+                                // Update the configuration immediately in 
case we missed some updates.
+                                
addLearners(clusterService.topologyService().allMembers());
+                            }
+
+                            @Override
+                            public void 
onNewPeersConfigurationApplied(PeersAndLearners configuration) {
+                            }
+
+                            @Override
+                            public void onReconfigurationError(Status status, 
PeersAndLearners configuration, long term) {
+                            }
+                        }
+                );
+            } else {
+                PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+
+                Peer localPeer = configuration.learner(thisNodeName);
+
+                assert localPeer != null;
+
+                raftServiceFuture = raftMgr.startRaftGroupNode(
+                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
+                        configuration,
+                        new MetaStorageLearnerListener(storage),
                         RaftGroupEventsListener.noopLsnr
                 );
             }
         } catch (NodeStoppingException e) {
             return CompletableFuture.failedFuture(e);
         }
 
-        return raftServiceFuture.thenApply(service -> new 
MetaStorageServiceImpl(service, thisNode.id(), thisNode.name()));
+        return raftServiceFuture.thenApply(raftService -> new 
MetaStorageServiceImpl(raftService, thisNode));
+    }
+
+    private void registerTopologyEventListener() {
+        clusterService.topologyService().addEventHandler(new 
TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                addLearners(List.of(member));
+            }
+
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                metaStorageSvcFut.thenAccept(service -> 
isCurrentNodeLeader(service.raftGroupService())
+                        .thenAccept(isLeader -> {
+                            if (isLeader) {
+                                service.closeCursors(member.id());
+                            }
+                        }));
+            }
+        });
+    }
+
+    private void addLearners(Collection<ClusterNode> nodes) {
+        if (!busyLock.enterBusy()) {
+            LOG.info("Skipping Meta Storage configuration update because the 
node is stopping");
+
+            return;
+        }
+
+        try {
+            metaStorageSvcFut
+                    .thenApply(MetaStorageServiceImpl::raftGroupService)
+                    .thenCompose(raftService -> 
isCurrentNodeLeader(raftService)
+                            .thenCompose(isLeader -> {
+                                if (!isLeader) {
+                                    return 
CompletableFuture.completedFuture(null);
+                                }
+
+                                if (!busyLock.enterBusy()) {
+                                    LOG.info("Skipping Meta Storage 
configuration update because the node is stopping");
+
+                                    return 
CompletableFuture.completedFuture(null);
+                                }
+
+                                try {
+                                    Set<String> peers = 
raftService.peers().stream()
+                                            .map(Peer::consistentId)
+                                            .collect(toUnmodifiableSet());
+
+                                    Set<String> learners = nodes.stream()
+                                            .map(ClusterNode::name)
+                                            .filter(name -> 
!peers.contains(name))
+                                            .collect(toUnmodifiableSet());
+
+                                    LOG.info("New Meta Storage learners 
detected: " + learners);
+
+                                    if (learners.isEmpty()) {
+                                        return 
CompletableFuture.completedFuture(null);
+                                    }
+
+                                    PeersAndLearners newConfiguration = 
PeersAndLearners.fromConsistentIds(peers, learners);
+
+                                    return 
raftService.addLearners(newConfiguration.learners());
+                                } finally {
+                                    busyLock.leaveBusy();
+                                }
+                            })
+                    )
+                    .whenComplete((v, e) -> {
+                        if (e != null) {
+                            LOG.error("Unable to change peers on topology 
update", e);
+                        }
+                    });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<Boolean> isCurrentNodeLeader(RaftGroupService 
raftService) {
+        String name = clusterService.topologyService().localMember().name();
+
+        return raftService.refreshLeader()

Review Comment:
   Totally agree, but this will cause more changes in unrelated places =(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to