This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0013c317d04 [feature](cloud) Support multi follower fe in cloud 
(#38388)
0013c317d04 is described below

commit 0013c317d049cae7d56e6c1aec226fade6a011b6
Author: deardeng <[email protected]>
AuthorDate: Fri Sep 6 00:29:37 2024 +0800

    [feature](cloud) Support multi follower fe in cloud (#38388)
---
 cloud/src/resource-manager/resource_manager.cpp    |  65 +++++++-
 docker/runtime/doris-compose/cluster.py            |  29 +++-
 docker/runtime/doris-compose/command.py            |  21 ++-
 docker/runtime/doris-compose/resource/init_fe.sh   |  17 +-
 .../doris/cloud/catalog/CloudClusterChecker.java   |  34 +++-
 .../org/apache/doris/cloud/catalog/CloudEnv.java   |  26 ++-
 .../doris/cloud/catalog/CloudTabletRebalancer.java |   2 +-
 .../doris/cloud/system/CloudSystemInfoService.java |  23 ++-
 gensrc/proto/cloud.proto                           |   2 +
 .../org/apache/doris/regression/suite/Suite.groovy |  80 ++++++++++
 .../doris/regression/suite/SuiteCluster.groovy     |  23 ++-
 .../multi_cluster/test_multi_follower.groovy       | 176 +++++++++++++++++++++
 12 files changed, 446 insertions(+), 52 deletions(-)

diff --git a/cloud/src/resource-manager/resource_manager.cpp 
b/cloud/src/resource-manager/resource_manager.cpp
index 25848294aa7..58122ac3f7c 100644
--- a/cloud/src/resource-manager/resource_manager.cpp
+++ b/cloud/src/resource-manager/resource_manager.cpp
@@ -157,15 +157,18 @@ bool ResourceManager::check_cluster_params_valid(const 
ClusterPB& cluster, std::
     std::stringstream ss;
     bool no_err = true;
     int master_num = 0;
+    int follower_num = 0;
     for (auto& n : cluster.nodes()) {
         if (ClusterPB::SQL == cluster.type() && n.has_edit_log_port() && 
n.edit_log_port() &&
             n.has_node_type() &&
             (n.node_type() == NodeInfoPB_NodeType_FE_MASTER ||
-             n.node_type() == NodeInfoPB_NodeType_FE_OBSERVER)) {
+             n.node_type() == NodeInfoPB_NodeType_FE_OBSERVER ||
+             n.node_type() == NodeInfoPB_NodeType_FE_FOLLOWER)) {
             master_num += n.node_type() == NodeInfoPB_NodeType_FE_MASTER ? 1 : 
0;
+            follower_num += n.node_type() == NodeInfoPB_NodeType_FE_FOLLOWER ? 
1 : 0;
             continue;
-        }
-        if (ClusterPB::COMPUTE == cluster.type() && n.has_heartbeat_port() && 
n.heartbeat_port()) {
+        } else if (ClusterPB::COMPUTE == cluster.type() && 
n.has_heartbeat_port() &&
+                   n.heartbeat_port()) {
             continue;
         }
         ss << "check cluster params failed, node : " << proto_to_json(n);
@@ -173,12 +176,22 @@ bool ResourceManager::check_cluster_params_valid(const 
ClusterPB& cluster, std::
         no_err = false;
         break;
     }
-    // ATTN: add_cluster check must have only a master node
-    // add_node doesn't check it
-    if (check_master_num && ClusterPB::SQL == cluster.type() && master_num != 
1) {
+
+    if (check_master_num && ClusterPB::SQL == cluster.type()) {
         no_err = false;
-        ss << "cluster is SQL type, must have only one master node, now master 
count: "
-           << master_num;
+        if (master_num > 0 && follower_num > 0) {
+            ss << "cluster is SQL type, and use multi follower mode, cant set 
master node, master "
+                  "count: "
+               << master_num << " follower count: " << follower_num;
+        } else if (!follower_num && master_num != 1) {
+            ss << "cluster is SQL type, must have only one master node, now 
master count: "
+               << master_num;
+        } else {
+            // followers mode
+            // 1. followers 2. observers + followers
+            no_err = true;
+            ss << "";
+        }
         *err = ss.str();
     }
     return no_err;
@@ -618,6 +631,37 @@ std::pair<TxnErrorCode, std::string> 
ResourceManager::get_instance(std::shared_p
     return ec;
 }
 
+// check instance pb is valid
+bool is_instance_valid(const InstanceInfoPB& instance) {
+    // check has fe node
+    for (auto& c : instance.clusters()) {
+        if (c.has_type() && c.type() == ClusterPB::SQL) {
+            int master = 0;
+            int follower = 0;
+            std::string mode = "multi-followers";
+            for (auto& n : c.nodes()) {
+                if (n.node_type() == NodeInfoPB::FE_MASTER) {
+                    mode = "master-observers";
+                    master++;
+                } else if (n.node_type() == NodeInfoPB::FE_FOLLOWER) {
+                    follower++;
+                }
+            }
+            // if master/observers mode , not have master or have multi 
master, return false
+            if (mode == "master-observers" && master != 1) {
+                return false;
+            }
+            // if multi followers mode, not have follower, return false
+            if (mode == "multi-followers" && !follower) {
+                return false;
+            }
+            return true;
+        }
+    }
+    // check others ...
+    return true;
+}
+
 std::string ResourceManager::modify_nodes(const std::string& instance_id,
                                           const std::vector<NodeInfo>& to_add,
                                           const std::vector<NodeInfo>& to_del) 
{
@@ -910,6 +954,11 @@ std::string ResourceManager::modify_nodes(const 
std::string& instance_id,
     }
 
     LOG(INFO) << "instance " << instance_id << " info: " << 
instance.DebugString();
+    if (!to_del.empty() && !is_instance_valid(instance)) {
+        msg = "instance invalid, cant modify, plz check";
+        LOG(WARNING) << msg;
+        return msg;
+    }
 
     InstanceKeyInfo key_info {instance_id};
     std::string key;
diff --git a/docker/runtime/doris-compose/cluster.py 
b/docker/runtime/doris-compose/cluster.py
index f2e46b798cb..fcddd7f4c93 100644
--- a/docker/runtime/doris-compose/cluster.py
+++ b/docker/runtime/doris-compose/cluster.py
@@ -380,6 +380,10 @@ class Node(object):
 
 class FE(Node):
 
+    def init(self):
+        super().init()
+        self.init_is_follower()
+
     def get_add_init_config(self):
         cfg = []
         if self.cluster.fe_config:
@@ -397,10 +401,20 @@ class FE(Node):
 
         return cfg
 
+    def init_is_follower(self):
+        if self.cluster.is_cloud and self.cluster.fe_follower:
+            with open(self._is_follower_path(), "w") as f:
+                f.write("true")
+
+    def _is_follower_path(self):
+        return "{}/conf/is_follower".format(self.get_path())
+
     def docker_env(self):
         envs = super().docker_env()
         if self.cluster.is_cloud:
             envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id()
+        if os.path.exists(self._is_follower_path()):
+            envs["IS_FE_FOLLOWER"] = 1
         return envs
 
     def cloud_unique_id(self):
@@ -613,8 +627,8 @@ class FDB(Node):
 class Cluster(object):
 
     def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
-                 ms_config, recycle_config, be_disks, be_cluster, reg_be,
-                 coverage_dir, cloud_store_config):
+                 ms_config, recycle_config, fe_follower, be_disks, be_cluster,
+                 reg_be, coverage_dir, cloud_store_config):
         self.name = name
         self.subnet = subnet
         self.image = image
@@ -623,6 +637,7 @@ class Cluster(object):
         self.be_config = be_config
         self.ms_config = ms_config
         self.recycle_config = recycle_config
+        self.fe_follower = fe_follower
         self.be_disks = be_disks
         self.be_cluster = be_cluster
         self.reg_be = reg_be
@@ -635,8 +650,8 @@ class Cluster(object):
 
     @staticmethod
     def new(name, image, is_cloud, fe_config, be_config, ms_config,
-            recycle_config, be_disks, be_cluster, reg_be, coverage_dir,
-            cloud_store_config):
+            recycle_config, fe_follower, be_disks, be_cluster, reg_be,
+            coverage_dir, cloud_store_config):
         if not os.path.exists(LOCAL_DORIS_PATH):
             os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
             os.chmod(LOCAL_DORIS_PATH, 0o777)
@@ -646,9 +661,9 @@ class Cluster(object):
                 os.chmod(lock_file, 0o666)
             subnet = gen_subnet_prefix16()
             cluster = Cluster(name, subnet, image, is_cloud, fe_config,
-                              be_config, ms_config, recycle_config, be_disks,
-                              be_cluster, reg_be, coverage_dir,
-                              cloud_store_config)
+                              be_config, ms_config, recycle_config,
+                              fe_follower, be_disks, be_cluster, reg_be,
+                              coverage_dir, cloud_store_config)
             os.makedirs(cluster.get_path(), exist_ok=True)
             os.makedirs(get_status_path(name), exist_ok=True)
             cluster._save_meta()
diff --git a/docker/runtime/doris-compose/command.py 
b/docker/runtime/doris-compose/command.py
index 8611689c9a5..ed88dd03f4d 100644
--- a/docker/runtime/doris-compose/command.py
+++ b/docker/runtime/doris-compose/command.py
@@ -245,6 +245,13 @@ class UpCommand(Command):
                             type=str,
                             help="Specify recycle configs for 
doris_cloud.conf. "\
                                     "Example: --recycle-config \"log_level = 
warn\".")
+        group1.add_argument(
+            "--fe-follower",
+            default=False,
+            action=self._get_parser_bool_action(True),
+            help=
+            "The new added fe is follower but not observer. Only support in 
cloud mode."
+        )
         group1.add_argument("--be-disks",
                             nargs="*",
                             default=["HDD=1"],
@@ -383,18 +390,20 @@ class UpCommand(Command):
                 args.add_ms_num = 0
                 args.add_recycle_num = 0
 
-            cluster = CLUSTER.Cluster.new(args.NAME, args.IMAGE, args.cloud,
-                                          args.fe_config, args.be_config,
-                                          args.ms_config, args.recycle_config,
-                                          args.be_disks, args.be_cluster,
-                                          args.reg_be, args.coverage_dir,
-                                          cloud_store_config)
+            cluster = CLUSTER.Cluster.new(
+                args.NAME, args.IMAGE, args.cloud, args.fe_config,
+                args.be_config, args.ms_config, args.recycle_config,
+                args.fe_follower, args.be_disks, args.be_cluster, args.reg_be,
+                args.coverage_dir, cloud_store_config)
             LOG.info("Create new cluster {} succ, cluster path is {}".format(
                 args.NAME, cluster.get_path()))
 
         if args.be_cluster and cluster.is_cloud:
             cluster.be_cluster = args.be_cluster
 
+        if cluster.is_cloud:
+            cluster.fe_follower = args.fe_follower
+
         _, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id,
                                                     args.be_id, args.ms_id,
                                                     args.recycle_id,
diff --git a/docker/runtime/doris-compose/resource/init_fe.sh 
b/docker/runtime/doris-compose/resource/init_fe.sh
index e532d0d56e1..e46a6cac9b2 100755
--- a/docker/runtime/doris-compose/resource/init_fe.sh
+++ b/docker/runtime/doris-compose/resource/init_fe.sh
@@ -81,8 +81,10 @@ fe_daemon() {
     done
 }
 
-add_cloud_fe() {
+start_cloud_fe() {
     if [ -f "$REGISTER_FILE" ]; then
+        fe_daemon &
+        bash $DORIS_HOME/bin/start_fe.sh --daemon
         return
     fi
 
@@ -96,6 +98,10 @@ add_cloud_fe() {
         node_type=FE_OBSERVER
     fi
 
+    if [ "a$IS_FE_FOLLOWER" == "a1" ]; then
+        node_type=FE_FOLLOWER
+    fi
+
     nodes='{
         "cloud_unique_id": "'"${CLOUD_UNIQUE_ID}"'",
         "ip": "'"${MY_IP}"'",
@@ -139,6 +145,10 @@ add_cloud_fe() {
     fi
 
     touch $REGISTER_FILE
+
+    fe_daemon &
+    bash $DORIS_HOME/bin/start_fe.sh --daemon
+
     if [ "$MY_ID" == "1" ]; then
         echo $MY_IP >$MASTER_FE_IP_FILE
     fi
@@ -182,11 +192,6 @@ start_local_fe() {
     fi
 }
 
-start_cloud_fe() {
-    add_cloud_fe
-    bash $DORIS_HOME/bin/start_fe.sh --daemon
-}
-
 main() {
     trap stop_frontend SIGTERM
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index 72fd2ba353a..567dc4b3124 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -303,9 +303,9 @@ public class CloudClusterChecker extends MasterDaemon {
 
     @Override
     protected void runAfterCatalogReady() {
-        getCloudBackends();
+        checkCloudBackends();
         updateCloudMetrics();
-        getCloudObserverFes();
+        checkCloudFes();
     }
 
     private void checkFeNodesMapValid() {
@@ -348,7 +348,7 @@ public class CloudClusterChecker extends MasterDaemon {
         }
     }
 
-    private void getCloudObserverFes() {
+    private void checkCloudFes() {
         Cloud.GetClusterResponse response = 
cloudSystemInfoService.getCloudCluster(
                 Config.cloud_sql_server_cluster_name, 
Config.cloud_sql_server_cluster_id, "");
         if (!response.hasStatus() || !response.getStatus().hasCode()
@@ -371,11 +371,19 @@ public class CloudClusterChecker extends MasterDaemon {
             LOG.debug("get cloud cluster, clusterId={} nodes={}",
                     Config.cloud_sql_server_cluster_id, cpb.getNodesList());
         }
-        List<Frontend> currentFes = 
Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER);
+        List<Frontend> currentFollowers = 
Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER);
+        List<Frontend> currentObservers = 
Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER);
+        currentFollowers.addAll(currentObservers);
+        List<Frontend> currentFes = new 
ArrayList<>(currentFollowers.stream().collect(Collectors.toMap(
+                fe -> fe.getHost() + ":" + fe.getEditLogPort(),
+                fe -> fe,
+                (existing, replacement) -> existing
+        )).values());
         List<Frontend> toAdd = new ArrayList<>();
         List<Frontend> toDel = new ArrayList<>();
         List<Cloud.NodeInfoPB> expectedFes = cpb.getNodesList();
         diffNodes(toAdd, toDel, () -> {
+            // memory
             Map<String, Frontend> currentMap = new HashMap<>();
             String selfNode = Env.getCurrentEnv().getSelfNode().getIdent();
             for (Frontend fe : currentFes) {
@@ -383,10 +391,13 @@ public class CloudClusterChecker extends MasterDaemon {
                 if (selfNode.equals(endpoint)) {
                     continue;
                 }
+                // add type to map key, for diff
+                endpoint = endpoint + "_" + fe.getRole();
                 currentMap.put(endpoint, fe);
             }
             return currentMap;
         }, () -> {
+            // meta service
             Map<String, Frontend> nodeMap = new HashMap<>();
             String selfNode = Env.getCurrentEnv().getSelfNode().getIdent();
             for (Cloud.NodeInfoPB node : expectedFes) {
@@ -399,9 +410,18 @@ public class CloudClusterChecker extends MasterDaemon {
                 if (selfNode.equals(endpoint)) {
                     continue;
                 }
-                Frontend fe = new Frontend(FrontendNodeType.OBSERVER,
+                Cloud.NodeInfoPB.NodeType type = node.getNodeType();
+                // ATTN: just allow to add follower or observer
+                if (Cloud.NodeInfoPB.NodeType.FE_MASTER.equals(type)) {
+                    LOG.warn("impossible !!!,  get fe node {} type equel 
master from ms", node);
+                }
+                FrontendNodeType role = type == 
Cloud.NodeInfoPB.NodeType.FE_FOLLOWER
+                        ? FrontendNodeType.FOLLOWER :  
FrontendNodeType.OBSERVER;
+                Frontend fe = new Frontend(role,
                         CloudEnv.genFeNodeNameFromMeta(host, 
node.getEditLogPort(),
                         node.getCtime() * 1000), host, node.getEditLogPort());
+                // add type to map key, for diff
+                endpoint = endpoint + "_" + fe.getRole();
                 nodeMap.put(endpoint, fe);
             }
             return nodeMap;
@@ -421,7 +441,7 @@ public class CloudClusterChecker extends MasterDaemon {
         }
     }
 
-    private void getCloudBackends() {
+    private void checkCloudBackends() {
         Map<String, List<Backend>> clusterIdToBackend = 
cloudSystemInfoService.getCloudClusterIdToBackend();
         //rpc to ms, to get mysql user can use cluster_id
         // NOTE: rpc args all empty, use cluster_unique_id to get a instance's 
all cluster info.
@@ -474,7 +494,7 @@ public class CloudClusterChecker extends MasterDaemon {
         for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
             int aliveNum = 0;
             List<Backend> bes = clusterIdToBackend.get(entry.getValue());
-            if (bes == null || bes.size() == 0) {
+            if (bes == null || bes.isEmpty()) {
                 LOG.info("cant get be nodes by cluster {}, bes {}", entry, 
bes);
                 continue;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 535b8ea582f..1ebda4dc8aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -48,6 +48,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -140,12 +141,24 @@ public class CloudEnv extends Env {
                 
.stream().filter(NodeInfoPB::hasNodeType).collect(Collectors.toList());
 
         helperNodes.clear();
-        helperNodes.addAll(allNodes.stream()
-                .filter(nodeInfoPB -> nodeInfoPB.getNodeType() == 
NodeInfoPB.NodeType.FE_MASTER)
-                .map(nodeInfoPB -> new HostInfo(
-                Config.enable_fqdn_mode ? nodeInfoPB.getHost() : 
nodeInfoPB.getIp(), nodeInfoPB.getEditLogPort()))
-                .collect(Collectors.toList()));
-        // check only have one master node.
+        if (allNodes.stream().anyMatch(n -> n.getNodeType() == 
NodeInfoPB.NodeType.FE_FOLLOWER)) {
+            // multi followers mode, select first
+            Optional<HostInfo> helperNode = allNodes.stream()
+                    .filter(nodeInfoPB -> nodeInfoPB.getNodeType() == 
NodeInfoPB.NodeType.FE_FOLLOWER)
+                    .map(nodeInfoPB -> new HostInfo(
+                    Config.enable_fqdn_mode ? nodeInfoPB.getHost() : 
nodeInfoPB.getIp(), nodeInfoPB.getEditLogPort()))
+                    .min(Comparator.comparing(HostInfo::getHost));
+            helperNode.ifPresent(hostInfo -> helperNodes.add(hostInfo));
+        } else {
+            // master observers mode
+            // helper node select follower's first, just one
+            helperNodes.addAll(allNodes.stream()
+                    .filter(nodeInfoPB -> nodeInfoPB.getNodeType() == 
NodeInfoPB.NodeType.FE_MASTER)
+                    .map(nodeInfoPB -> new HostInfo(
+                    Config.enable_fqdn_mode ? nodeInfoPB.getHost() : 
nodeInfoPB.getIp(), nodeInfoPB.getEditLogPort()))
+                    .collect(Collectors.toList()));
+            // check only have one master node.
+        }
         Preconditions.checkState(helperNodes.size() == 1);
 
         Optional<NodeInfoPB> local = allNodes.stream().filter(n -> 
((Config.enable_fqdn_mode ? n.getHost() : n.getIp())
@@ -178,6 +191,7 @@ public class CloudEnv extends Env {
         }
 
         LOG.info("current fe's role is {}", type == 
NodeInfoPB.NodeType.FE_MASTER ? "MASTER" :
+                type == NodeInfoPB.NodeType.FE_FOLLOWER ? "FOLLOWER" :
                 type == NodeInfoPB.NodeType.FE_OBSERVER ? "OBSERVER" : 
"UNKNOWN");
         if (type == NodeInfoPB.NodeType.UNKNOWN) {
             LOG.warn("type current not support, please check it");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index fc580c4fc7e..6f4534c4b08 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -1068,7 +1068,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
                 newInfo.setPartitionId(partitionId);
                 newInfo.setIndexId(indexId);
                 newInfo.setClusterId(clusterId);
-                // APPR: in unprotectUpdateCloudReplica, use batch must set 
tabletId = -1
+                // ATTN: in unprotectUpdateCloudReplica, use batch must set 
tabletId = -1
                 newInfo.setTabletId(-1);
                 rets.add(newInfo);
             });
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 48728efb003..202d576e3bf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -282,20 +282,29 @@ public class CloudSystemInfoService extends 
SystemInfoService {
             LOG.debug("updateCloudFrontends toAdd={} toDel={}", toAdd, toDel);
         }
         String masterIp = Env.getCurrentEnv().getMasterHost();
-        for (Frontend fe : toAdd) {
+        for (Frontend fe : toDel) {
             if (masterIp.equals(fe.getHost())) {
                 continue;
             }
-            Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER,
-                    fe.getHost(), fe.getEditLogPort(), fe.getNodeName());
-            LOG.info("added cloud frontend={} ", fe);
+            try {
+                Env.getCurrentEnv().dropFrontend(fe.getRole(), fe.getHost(), 
fe.getEditLogPort());
+                LOG.info("dropped cloud frontend={} ", fe);
+            } catch (DdlException e) {
+                LOG.warn("failed to drop cloud frontend={} ", fe);
+            }
         }
-        for (Frontend fe : toDel) {
+
+        for (Frontend fe : toAdd) {
             if (masterIp.equals(fe.getHost())) {
                 continue;
             }
-            Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER, 
fe.getHost(), fe.getEditLogPort());
-            LOG.info("dropped cloud frontend={} ", fe);
+            try {
+                Env.getCurrentEnv().addFrontend(fe.getRole(),
+                        fe.getHost(), fe.getEditLogPort(), fe.getNodeName());
+                LOG.info("added cloud frontend={} ", fe);
+            } catch (DdlException e) {
+                LOG.warn("failed to add cloud frontend={} ", fe);
+            }
         }
     }
 
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 4fee41650b7..b4c2d0d0968 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -140,8 +140,10 @@ message ClusterPB {
 message NodeInfoPB {
     enum NodeType {
         UNKNOWN = 0;
+         // lagacy logic for one-master-multi-observer mode
         FE_MASTER = 1;
         FE_OBSERVER = 2;
+        FE_FOLLOWER = 3;
     }
     optional string cloud_unique_id = 1;
     optional string name = 2;
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 1587363dc0c..579afda48d9 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1789,6 +1789,86 @@ class Suite implements GroovyInterceptable {
         }
     }
 
+    def get_instance = { MetaService ms=null ->
+        def jsonOutput = new JsonOutput()
+
+        def get_instance_api = { check_func ->
+            httpTest {
+                op "get"
+                if (ms) {
+                    endpoint ms.host+':'+ms.httpPort
+                } else {
+                    endpoint context.config.metaServiceHttpAddress
+                }
+                uri 
"/MetaService/http/get_instance?token=${token}&instance_id=${instance_id}"
+                check check_func
+            }
+        }
+
+        def json
+        get_instance_api.call() {
+            respCode, body ->
+                log.info("get instance resp: ${body} ${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+        json.result
+    }
+
+
+    def drop_node = { unique_id, ip, bePort=0, fePort=0, nodeType,
+                        cluster_name, cluster_id, MetaService ms=null ->
+        def jsonOutput = new JsonOutput()
+        def type
+        if (fePort != 0) {
+            type = "SQL"
+        } else if (bePort != 0) {
+            type = "COMPUTE"
+        }
+        def clusterInfo = [
+                     type: type,
+                     cluster_name : cluster_name,
+                     cluster_id : cluster_id,
+                     nodes: [
+                         [
+                             cloud_unique_id: unique_id,
+                             ip: ip,
+                             node_type: nodeType
+                         ],
+                     ]
+                ]
+        if (bePort != 0) {
+            // drop be
+            clusterInfo['nodes'][0]['heartbeat_port'] = bePort
+        } else if (fePort != 0) {
+            // drop fe
+            clusterInfo['nodes'][0]['edit_log_port'] = fePort
+        }
+        def map = [instance_id: "${instance_id}", cluster: clusterInfo]
+        def js = jsonOutput.toJson(map)
+        log.info("drop node req: ${js} ".toString())
+
+        def drop_node_api = { request_body, check_func ->
+            httpTest {
+                if (ms) {
+                    endpoint ms.host+':'+ms.httpPort
+                } else {
+                    endpoint context.config.metaServiceHttpAddress
+                }
+                uri "/MetaService/http/drop_node?token=${token}"
+                body request_body
+                check check_func
+            }
+        }
+
+        drop_node_api.call(js) {
+            respCode, body ->
+                log.info("drop node resp: ${body} ${respCode}".toString())
+                def json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+    }
+
     def d_node = { be_unique_id, ip, port, cluster_name, cluster_id ->
         def jsonOutput = new JsonOutput()
         def clusterInfo = [
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index 6a322039985..44220500d1b 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -51,6 +51,12 @@ class ClusterOptions {
     // 3. cloudMode = null, create both cloud and none-cloud cluster, depend 
on the running pipeline mode.
     Boolean cloudMode = false
 
+    // in cloud mode, deployment methods are divided into
+    // 1. master - multi observers
+    // 2. mutli followers - multi observers
+    // default use 1
+    Boolean useFollowersMode = false
+
     // when cloudMode = true/false,  but the running pipeline is diff with 
cloudMode,
     // skip run this docker test or not.
     boolean skipRunWhenPipelineDiff = true
@@ -290,6 +296,11 @@ class SuiteCluster {
         if (isCloud) {
             cmd += ['--cloud']
         }
+
+        if (isCloud && options.useFollowersMode) {
+            cmd += ['--fe-follower']
+        }
+
         cmd += ['--wait-timeout', String.valueOf(180)]
 
         runCmd(cmd.join(' '), -1)
@@ -419,8 +430,8 @@ class SuiteCluster {
         return new Tuple4(frontends, backends, metaservices, recyclers)
     }
 
-    List<Integer> addFrontend(int num) throws Exception {
-        def result = add(num, 0, null)
+    List<Integer> addFrontend(int num, boolean followerMode=false) throws 
Exception {
+        def result = add(num, 0, null, followerMode)
         return result.first
     }
 
@@ -429,14 +440,18 @@ class SuiteCluster {
         return result.second
     }
 
-    // APPR: clusterName just used for cloud mode, 1 cluster has n bes
-    Tuple2<List<Integer>, List<Integer>> add(int feNum, int beNum, String 
clusterName) throws Exception {
+    // ATTN: clusterName just used for cloud mode, 1 cluster has n bes
+    // ATTN: followerMode just used for cloud mode
+    Tuple2<List<Integer>, List<Integer>> add(int feNum, int beNum, String 
clusterName, boolean followerMode=false) throws Exception {
         assert feNum > 0 || beNum > 0
 
         def sb = new StringBuilder()
         sb.append('up ' + name + ' ')
         if (feNum > 0) {
             sb.append('--add-fe-num ' + feNum + ' ')
+            if (followerMode) {
+                sb.append('--fe-follower' + ' ')
+            }
         }
         if (beNum > 0) {
             sb.append('--add-be-num ' + beNum + ' ')
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_multi_follower.groovy 
b/regression-test/suites/cloud_p0/multi_cluster/test_multi_follower.groovy
new file mode 100644
index 00000000000..864655a5344
--- /dev/null
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_multi_follower.groovy
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite('test_multi_followr_in_cloud', 'multi_cluster, docker') {
+    if (!isCloudMode()) {
+        return
+    }
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1'
+    ]
+    options.setFeNum(3)
+    options.setBeNum(1)
+    options.cloudMode = true
+    options.connectToFollower = true
+    options.useFollowersMode = true
+
+    docker(options) {
+        def f = "FOLLOWER"
+        def o = "OBSERVER"
+        def check = { int feNum, Closure checkFunc ->
+            def ret = sql_return_maparray """SHOW FRONTENDS"""
+            assertEquals(feNum, ret.size())
+            Boolean hasMaster = false
+            checkFunc.call(ret)
+            ret.each { row ->
+                if (Boolean.parseBoolean(row.IsMaster)) {
+                    hasMaster = true
+                }
+            }
+            assertTrue(hasMaster)
+        }
+
+        def transferType = { String old -> 
+            if (old.contains(f)) {
+                return "FE_FOLLOWER"
+            } else if (old.contains(o)) {
+                return "FE_OBSERVER"
+            }
+        }
+
+        check(3) { ret ->
+            ret.each {
+                assertTrue(it.Role.contains(f))
+            }
+        }
+        // get fe clusterName
+        def result = sql_return_maparray """ADMIN SHOW FRONTEND CONFIG LIKE 
'%cloud_sql_server_cluster_name%'"""
+        def feClusterName = result.Value[0]
+        result = sql_return_maparray """ADMIN SHOW FRONTEND CONFIG LIKE 
'%cloud_sql_server_cluster_id%'"""
+        def feClusterId = result.Value[0]
+        log.info("fe clusterName: {}, clusterId: {} ", feClusterName, 
feClusterId)
+        def toDropIP
+        def toDropPort
+        def toDropType
+        def toDropUniqueId
+        // add new follower
+        cluster.addFrontend(1, true)
+        dockerAwaitUntil(5) {
+            def ret = sql """SHOW FRONTENDS"""
+            log.info("show frontends: {}", ret)
+            ret.size() == 4
+        }
+        check(4) { def ret ->
+            ret.each {
+                assertTrue(it.Role.contains(f))
+                if (!Boolean.parseBoolean(it.IsMaster)) {
+                    toDropIP = it.Host
+                    toDropPort = it.EditLogPort
+                    toDropType = transferType(it.Role)
+                }
+            }
+        }
+        log.info("ip: {}, port: {}, type: {}, uniqueId: {}", toDropIP, 
toDropPort, toDropType, toDropUniqueId)
+        def ms = cluster.getAllMetaservices().get(0)
+        logger.info("ms addr={}, port={}", ms.host, ms.httpPort)
+        // drop a follwer
+        def findToDropUniqueId = { clusterId, hostIP, metaServices ->
+            ret = get_instance(metaServices)
+            def toDropCluster = ret.clusters.find {
+                it.cluster_id.contains(clusterId)
+            }
+            log.info("toDropCluster: {}", toDropCluster)
+            def toDropNode = toDropCluster.nodes.find {
+                it.ip.contains(hostIP)
+            }
+            log.info("toDropNode: {}", toDropNode)
+            assertNotNull(toDropCluster)
+            assertNotNull(toDropNode)
+            toDropNode.cloud_unique_id
+        }
+
+        toDropUniqueId = findToDropUniqueId.call(feClusterId, toDropIP, ms)
+        log.info("to Drop1 ip: {}, port: {}, type: {}, uniqueId: {}", 
toDropIP, toDropPort, toDropType, toDropUniqueId)
+
+        drop_node(toDropUniqueId, toDropIP, 0,
+                    toDropPort, toDropType, feClusterName, feClusterId, ms)
+
+        dockerAwaitUntil(5) {
+            def ret = sql """SHOW FRONTENDS"""
+            log.info("show frontends: {}", ret)
+            ret.size() == 3
+        }
+
+        check(3) { def ret ->
+            ret.each {
+                assertTrue(it.Role.contains(f))
+            }
+        }
+
+        // add a observer
+        cluster.addFrontend(1)
+        check(4) { def ret ->
+            int observerNum = 0
+            int followerNum = 0
+            ret.each {
+                if (it.Role.contains(o)) {
+                    observerNum++;
+                    toDropIP = it.Host
+                    toDropPort = it.EditLogPort
+                    toDropType = transferType(it.Role)
+                } else if (it.Role.contains(f)) {
+                    followerNum++;
+                }
+            }
+            assertEquals(1, observerNum)
+            assertEquals(3, followerNum)
+        }
+        
+        toDropUniqueId = findToDropUniqueId.call(feClusterId, toDropIP, ms)
+        // drop observer
+        drop_node(toDropUniqueId, toDropIP, 0,
+                    toDropPort, toDropType, feClusterName, feClusterId, ms)
+
+        dockerAwaitUntil(5) {
+            def ret = sql """SHOW FRONTENDS"""
+            log.info("show frontends: {}", ret)
+            ret.size() == 3
+        }
+
+        check(3) { def ret ->
+            int observerNum = 0
+            int followerNum = 0
+            ret.each {
+                if (it.Role.contains(o)) {
+                    observerNum++;
+                } else if (it.Role.contains(f)){
+                    followerNum++;
+                }
+            }
+            assertEquals(0, observerNum)
+            assertEquals(3, followerNum)
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to