sashapolo commented on code in PR #1490:
URL: https://github.com/apache/ignite-3/pull/1490#discussion_r1082313013


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -159,54 +135,164 @@ public MetaStorageManagerImpl(
         this.storage = storage;
     }
 
-    private CompletableFuture<MetaStorageService> 
initializeMetaStorage(Set<String> metaStorageNodes) {
+    private CompletableFuture<MetaStorageServiceImpl> 
initializeMetaStorage(Set<String> metaStorageNodes) {
         ClusterNode thisNode = clusterService.topologyService().localMember();
 
-        PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
-
-        Peer localPeer = configuration.peer(thisNode.name());
+        String thisNodeName = thisNode.name();
 
         CompletableFuture<RaftGroupService> raftServiceFuture;
 
         try {
-            if (localPeer == null) {
-                raftServiceFuture = 
raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration);
-            } else {
-                clusterService.topologyService().addEventHandler(new 
TopologyEventHandler() {
-                    @Override
-                    public void onDisappeared(ClusterNode member) {
-                        metaStorageSvcFut.thenAccept(svc -> 
svc.closeCursors(member.id()));
-                    }
-                });
+            // We need to configure the replication protocol differently 
whether this node is a synchronous or asynchronous replica.
+            if (metaStorageNodes.contains(thisNodeName)) {
+                PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
+
+                Peer localPeer = configuration.peer(thisNodeName);
 
-                storage.start();
+                assert localPeer != null;
 
                 raftServiceFuture = raftMgr.startRaftGroupNode(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage),
+                        new RaftGroupEventsListener() {
+                            @Override
+                            public void onLeaderElected(long term) {
+                                // TODO: add listener to remove learners when 
they leave Logical Topology
+                                //  see 
https://issues.apache.org/jira/browse/IGNITE-18554
+
+                                registerTopologyEventListener();
+
+                                // Update the configuration immediately in 
case we missed some updates.
+                                
addLearners(clusterService.topologyService().allMembers());
+                            }
+
+                            @Override
+                            public void 
onNewPeersConfigurationApplied(PeersAndLearners configuration) {
+                            }
+
+                            @Override
+                            public void onReconfigurationError(Status status, 
PeersAndLearners configuration, long term) {
+                            }
+                        }
+                );
+            } else {
+                PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+
+                Peer localPeer = configuration.learner(thisNodeName);
+
+                assert localPeer != null;
+
+                raftServiceFuture = raftMgr.startRaftGroupNode(
+                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
+                        configuration,
+                        new MetaStorageLearnerListener(storage),
                         RaftGroupEventsListener.noopLsnr
                 );
             }
         } catch (NodeStoppingException e) {
             return CompletableFuture.failedFuture(e);
         }
 
-        return raftServiceFuture.thenApply(service -> new 
MetaStorageServiceImpl(service, thisNode.id(), thisNode.name()));
+        return raftServiceFuture.thenApply(raftService -> new 
MetaStorageServiceImpl(raftService, thisNode));
+    }
+
+    private void registerTopologyEventListener() {
+        clusterService.topologyService().addEventHandler(new 
TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                addLearners(List.of(member));
+            }
+
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                metaStorageSvcFut.thenAccept(service -> 
isCurrentNodeLeader(service.raftGroupService())

Review Comment:
   yes, this implementation is buggy, I will fix it when we are going to move 
to reactive cursors



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to