sashapolo commented on code in PR #1490: URL: https://github.com/apache/ignite-3/pull/1490#discussion_r1082313704
########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java: ########## @@ -159,54 +135,164 @@ public MetaStorageManagerImpl( this.storage = storage; } - private CompletableFuture<MetaStorageService> initializeMetaStorage(Set<String> metaStorageNodes) { + private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) { ClusterNode thisNode = clusterService.topologyService().localMember(); - PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes); - - Peer localPeer = configuration.peer(thisNode.name()); + String thisNodeName = thisNode.name(); CompletableFuture<RaftGroupService> raftServiceFuture; try { - if (localPeer == null) { - raftServiceFuture = raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration); - } else { - clusterService.topologyService().addEventHandler(new TopologyEventHandler() { - @Override - public void onDisappeared(ClusterNode member) { - metaStorageSvcFut.thenAccept(svc -> svc.closeCursors(member.id())); - } - }); + // We need to configure the replication protocol differently whether this node is a synchronous or asynchronous replica. + if (metaStorageNodes.contains(thisNodeName)) { + PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes); + + Peer localPeer = configuration.peer(thisNodeName); - storage.start(); + assert localPeer != null; raftServiceFuture = raftMgr.startRaftGroupNode( new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer), configuration, new MetaStorageListener(storage), + new RaftGroupEventsListener() { + @Override + public void onLeaderElected(long term) { + // TODO: add listener to remove learners when they leave Logical Topology + // see https://issues.apache.org/jira/browse/IGNITE-18554 + + registerTopologyEventListener(); + + // Update the configuration immediately in case we missed some updates. + addLearners(clusterService.topologyService().allMembers()); + } + + @Override + public void onNewPeersConfigurationApplied(PeersAndLearners configuration) { + } + + @Override + public void onReconfigurationError(Status status, PeersAndLearners configuration, long term) { + } + } + ); + } else { + PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName)); + + Peer localPeer = configuration.learner(thisNodeName); + + assert localPeer != null; + + raftServiceFuture = raftMgr.startRaftGroupNode( + new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer), + configuration, + new MetaStorageLearnerListener(storage), RaftGroupEventsListener.noopLsnr ); } } catch (NodeStoppingException e) { return CompletableFuture.failedFuture(e); } - return raftServiceFuture.thenApply(service -> new MetaStorageServiceImpl(service, thisNode.id(), thisNode.name())); + return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(raftService, thisNode)); + } + + private void registerTopologyEventListener() { + clusterService.topologyService().addEventHandler(new TopologyEventHandler() { + @Override + public void onAppeared(ClusterNode member) { + addLearners(List.of(member)); + } + + @Override + public void onDisappeared(ClusterNode member) { + metaStorageSvcFut.thenAccept(service -> isCurrentNodeLeader(service.raftGroupService()) + .thenAccept(isLeader -> { + if (isLeader) { + service.closeCursors(member.id()); + } + })); + } + }); + } + + private void addLearners(Collection<ClusterNode> nodes) { + if (!busyLock.enterBusy()) { + LOG.info("Skipping Meta Storage configuration update because the node is stopping"); + + return; + } + + try { + metaStorageSvcFut + .thenApply(MetaStorageServiceImpl::raftGroupService) + .thenCompose(raftService -> isCurrentNodeLeader(raftService) + .thenCompose(isLeader -> { + if (!isLeader) { + return CompletableFuture.completedFuture(null); + } + + if (!busyLock.enterBusy()) { + LOG.info("Skipping Meta Storage configuration update because the node is stopping"); + + return CompletableFuture.completedFuture(null); + } + + try { + Set<String> peers = raftService.peers().stream() + .map(Peer::consistentId) + .collect(toUnmodifiableSet()); + + Set<String> learners = nodes.stream() + .map(ClusterNode::name) + .filter(name -> !peers.contains(name)) + .collect(toUnmodifiableSet()); + + LOG.info("New Meta Storage learners detected: " + learners); + + if (learners.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + PeersAndLearners newConfiguration = PeersAndLearners.fromConsistentIds(peers, learners); + + return raftService.addLearners(newConfiguration.learners()); + } finally { + busyLock.leaveBusy(); + } + }) + ) + .whenComplete((v, e) -> { + if (e != null) { + LOG.error("Unable to change peers on topology update", e); + } + }); + } finally { + busyLock.leaveBusy(); + } + } + + private CompletableFuture<Boolean> isCurrentNodeLeader(RaftGroupService raftService) { + String name = clusterService.topologyService().localMember().name(); + + return raftService.refreshLeader() Review Comment: Totally agree, but this will cause more changes in unrelated places =( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org