This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0013c317d04 [feature](cloud) Support multi follower fe in cloud
(#38388)
0013c317d04 is described below
commit 0013c317d049cae7d56e6c1aec226fade6a011b6
Author: deardeng <[email protected]>
AuthorDate: Fri Sep 6 00:29:37 2024 +0800
[feature](cloud) Support multi follower fe in cloud (#38388)
---
cloud/src/resource-manager/resource_manager.cpp | 65 +++++++-
docker/runtime/doris-compose/cluster.py | 29 +++-
docker/runtime/doris-compose/command.py | 21 ++-
docker/runtime/doris-compose/resource/init_fe.sh | 17 +-
.../doris/cloud/catalog/CloudClusterChecker.java | 34 +++-
.../org/apache/doris/cloud/catalog/CloudEnv.java | 26 ++-
.../doris/cloud/catalog/CloudTabletRebalancer.java | 2 +-
.../doris/cloud/system/CloudSystemInfoService.java | 23 ++-
gensrc/proto/cloud.proto | 2 +
.../org/apache/doris/regression/suite/Suite.groovy | 80 ++++++++++
.../doris/regression/suite/SuiteCluster.groovy | 23 ++-
.../multi_cluster/test_multi_follower.groovy | 176 +++++++++++++++++++++
12 files changed, 446 insertions(+), 52 deletions(-)
diff --git a/cloud/src/resource-manager/resource_manager.cpp
b/cloud/src/resource-manager/resource_manager.cpp
index 25848294aa7..58122ac3f7c 100644
--- a/cloud/src/resource-manager/resource_manager.cpp
+++ b/cloud/src/resource-manager/resource_manager.cpp
@@ -157,15 +157,18 @@ bool ResourceManager::check_cluster_params_valid(const
ClusterPB& cluster, std::
std::stringstream ss;
bool no_err = true;
int master_num = 0;
+ int follower_num = 0;
for (auto& n : cluster.nodes()) {
if (ClusterPB::SQL == cluster.type() && n.has_edit_log_port() &&
n.edit_log_port() &&
n.has_node_type() &&
(n.node_type() == NodeInfoPB_NodeType_FE_MASTER ||
- n.node_type() == NodeInfoPB_NodeType_FE_OBSERVER)) {
+ n.node_type() == NodeInfoPB_NodeType_FE_OBSERVER ||
+ n.node_type() == NodeInfoPB_NodeType_FE_FOLLOWER)) {
master_num += n.node_type() == NodeInfoPB_NodeType_FE_MASTER ? 1 :
0;
+ follower_num += n.node_type() == NodeInfoPB_NodeType_FE_FOLLOWER ?
1 : 0;
continue;
- }
- if (ClusterPB::COMPUTE == cluster.type() && n.has_heartbeat_port() &&
n.heartbeat_port()) {
+ } else if (ClusterPB::COMPUTE == cluster.type() &&
n.has_heartbeat_port() &&
+ n.heartbeat_port()) {
continue;
}
ss << "check cluster params failed, node : " << proto_to_json(n);
@@ -173,12 +176,22 @@ bool ResourceManager::check_cluster_params_valid(const
ClusterPB& cluster, std::
no_err = false;
break;
}
- // ATTN: add_cluster check must have only a master node
- // add_node doesn't check it
- if (check_master_num && ClusterPB::SQL == cluster.type() && master_num !=
1) {
+
+ if (check_master_num && ClusterPB::SQL == cluster.type()) {
no_err = false;
- ss << "cluster is SQL type, must have only one master node, now master
count: "
- << master_num;
+ if (master_num > 0 && follower_num > 0) {
+ ss << "cluster is SQL type, and use multi follower mode, cant set
master node, master "
+ "count: "
+ << master_num << " follower count: " << follower_num;
+ } else if (!follower_num && master_num != 1) {
+ ss << "cluster is SQL type, must have only one master node, now
master count: "
+ << master_num;
+ } else {
+ // followers mode
+ // 1. followers 2. observers + followers
+ no_err = true;
+ ss << "";
+ }
*err = ss.str();
}
return no_err;
@@ -618,6 +631,37 @@ std::pair<TxnErrorCode, std::string>
ResourceManager::get_instance(std::shared_p
return ec;
}
+// check instance pb is valid
+bool is_instance_valid(const InstanceInfoPB& instance) {
+ // check has fe node
+ for (auto& c : instance.clusters()) {
+ if (c.has_type() && c.type() == ClusterPB::SQL) {
+ int master = 0;
+ int follower = 0;
+ std::string mode = "multi-followers";
+ for (auto& n : c.nodes()) {
+ if (n.node_type() == NodeInfoPB::FE_MASTER) {
+ mode = "master-observers";
+ master++;
+ } else if (n.node_type() == NodeInfoPB::FE_FOLLOWER) {
+ follower++;
+ }
+ }
+ // if master/observers mode , not have master or have multi
master, return false
+ if (mode == "master-observers" && master != 1) {
+ return false;
+ }
+ // if multi followers mode, not have follower, return false
+ if (mode == "multi-followers" && !follower) {
+ return false;
+ }
+ return true;
+ }
+ }
+ // check others ...
+ return true;
+}
+
std::string ResourceManager::modify_nodes(const std::string& instance_id,
const std::vector<NodeInfo>& to_add,
const std::vector<NodeInfo>& to_del)
{
@@ -910,6 +954,11 @@ std::string ResourceManager::modify_nodes(const
std::string& instance_id,
}
LOG(INFO) << "instance " << instance_id << " info: " <<
instance.DebugString();
+ if (!to_del.empty() && !is_instance_valid(instance)) {
+ msg = "instance invalid, cant modify, plz check";
+ LOG(WARNING) << msg;
+ return msg;
+ }
InstanceKeyInfo key_info {instance_id};
std::string key;
diff --git a/docker/runtime/doris-compose/cluster.py
b/docker/runtime/doris-compose/cluster.py
index f2e46b798cb..fcddd7f4c93 100644
--- a/docker/runtime/doris-compose/cluster.py
+++ b/docker/runtime/doris-compose/cluster.py
@@ -380,6 +380,10 @@ class Node(object):
class FE(Node):
+ def init(self):
+ super().init()
+ self.init_is_follower()
+
def get_add_init_config(self):
cfg = []
if self.cluster.fe_config:
@@ -397,10 +401,20 @@ class FE(Node):
return cfg
+ def init_is_follower(self):
+ if self.cluster.is_cloud and self.cluster.fe_follower:
+ with open(self._is_follower_path(), "w") as f:
+ f.write("true")
+
+ def _is_follower_path(self):
+ return "{}/conf/is_follower".format(self.get_path())
+
def docker_env(self):
envs = super().docker_env()
if self.cluster.is_cloud:
envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id()
+ if os.path.exists(self._is_follower_path()):
+ envs["IS_FE_FOLLOWER"] = 1
return envs
def cloud_unique_id(self):
@@ -613,8 +627,8 @@ class FDB(Node):
class Cluster(object):
def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
- ms_config, recycle_config, be_disks, be_cluster, reg_be,
- coverage_dir, cloud_store_config):
+ ms_config, recycle_config, fe_follower, be_disks, be_cluster,
+ reg_be, coverage_dir, cloud_store_config):
self.name = name
self.subnet = subnet
self.image = image
@@ -623,6 +637,7 @@ class Cluster(object):
self.be_config = be_config
self.ms_config = ms_config
self.recycle_config = recycle_config
+ self.fe_follower = fe_follower
self.be_disks = be_disks
self.be_cluster = be_cluster
self.reg_be = reg_be
@@ -635,8 +650,8 @@ class Cluster(object):
@staticmethod
def new(name, image, is_cloud, fe_config, be_config, ms_config,
- recycle_config, be_disks, be_cluster, reg_be, coverage_dir,
- cloud_store_config):
+ recycle_config, fe_follower, be_disks, be_cluster, reg_be,
+ coverage_dir, cloud_store_config):
if not os.path.exists(LOCAL_DORIS_PATH):
os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
os.chmod(LOCAL_DORIS_PATH, 0o777)
@@ -646,9 +661,9 @@ class Cluster(object):
os.chmod(lock_file, 0o666)
subnet = gen_subnet_prefix16()
cluster = Cluster(name, subnet, image, is_cloud, fe_config,
- be_config, ms_config, recycle_config, be_disks,
- be_cluster, reg_be, coverage_dir,
- cloud_store_config)
+ be_config, ms_config, recycle_config,
+ fe_follower, be_disks, be_cluster, reg_be,
+ coverage_dir, cloud_store_config)
os.makedirs(cluster.get_path(), exist_ok=True)
os.makedirs(get_status_path(name), exist_ok=True)
cluster._save_meta()
diff --git a/docker/runtime/doris-compose/command.py
b/docker/runtime/doris-compose/command.py
index 8611689c9a5..ed88dd03f4d 100644
--- a/docker/runtime/doris-compose/command.py
+++ b/docker/runtime/doris-compose/command.py
@@ -245,6 +245,13 @@ class UpCommand(Command):
type=str,
help="Specify recycle configs for
doris_cloud.conf. "\
"Example: --recycle-config \"log_level =
warn\".")
+ group1.add_argument(
+ "--fe-follower",
+ default=False,
+ action=self._get_parser_bool_action(True),
+ help=
+ "The new added fe is follower but not observer. Only support in
cloud mode."
+ )
group1.add_argument("--be-disks",
nargs="*",
default=["HDD=1"],
@@ -383,18 +390,20 @@ class UpCommand(Command):
args.add_ms_num = 0
args.add_recycle_num = 0
- cluster = CLUSTER.Cluster.new(args.NAME, args.IMAGE, args.cloud,
- args.fe_config, args.be_config,
- args.ms_config, args.recycle_config,
- args.be_disks, args.be_cluster,
- args.reg_be, args.coverage_dir,
- cloud_store_config)
+ cluster = CLUSTER.Cluster.new(
+ args.NAME, args.IMAGE, args.cloud, args.fe_config,
+ args.be_config, args.ms_config, args.recycle_config,
+ args.fe_follower, args.be_disks, args.be_cluster, args.reg_be,
+ args.coverage_dir, cloud_store_config)
LOG.info("Create new cluster {} succ, cluster path is {}".format(
args.NAME, cluster.get_path()))
if args.be_cluster and cluster.is_cloud:
cluster.be_cluster = args.be_cluster
+ if cluster.is_cloud:
+ cluster.fe_follower = args.fe_follower
+
_, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id,
args.be_id, args.ms_id,
args.recycle_id,
diff --git a/docker/runtime/doris-compose/resource/init_fe.sh
b/docker/runtime/doris-compose/resource/init_fe.sh
index e532d0d56e1..e46a6cac9b2 100755
--- a/docker/runtime/doris-compose/resource/init_fe.sh
+++ b/docker/runtime/doris-compose/resource/init_fe.sh
@@ -81,8 +81,10 @@ fe_daemon() {
done
}
-add_cloud_fe() {
+start_cloud_fe() {
if [ -f "$REGISTER_FILE" ]; then
+ fe_daemon &
+ bash $DORIS_HOME/bin/start_fe.sh --daemon
return
fi
@@ -96,6 +98,10 @@ add_cloud_fe() {
node_type=FE_OBSERVER
fi
+ if [ "a$IS_FE_FOLLOWER" == "a1" ]; then
+ node_type=FE_FOLLOWER
+ fi
+
nodes='{
"cloud_unique_id": "'"${CLOUD_UNIQUE_ID}"'",
"ip": "'"${MY_IP}"'",
@@ -139,6 +145,10 @@ add_cloud_fe() {
fi
touch $REGISTER_FILE
+
+ fe_daemon &
+ bash $DORIS_HOME/bin/start_fe.sh --daemon
+
if [ "$MY_ID" == "1" ]; then
echo $MY_IP >$MASTER_FE_IP_FILE
fi
@@ -182,11 +192,6 @@ start_local_fe() {
fi
}
-start_cloud_fe() {
- add_cloud_fe
- bash $DORIS_HOME/bin/start_fe.sh --daemon
-}
-
main() {
trap stop_frontend SIGTERM
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index 72fd2ba353a..567dc4b3124 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -303,9 +303,9 @@ public class CloudClusterChecker extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
- getCloudBackends();
+ checkCloudBackends();
updateCloudMetrics();
- getCloudObserverFes();
+ checkCloudFes();
}
private void checkFeNodesMapValid() {
@@ -348,7 +348,7 @@ public class CloudClusterChecker extends MasterDaemon {
}
}
- private void getCloudObserverFes() {
+ private void checkCloudFes() {
Cloud.GetClusterResponse response =
cloudSystemInfoService.getCloudCluster(
Config.cloud_sql_server_cluster_name,
Config.cloud_sql_server_cluster_id, "");
if (!response.hasStatus() || !response.getStatus().hasCode()
@@ -371,11 +371,19 @@ public class CloudClusterChecker extends MasterDaemon {
LOG.debug("get cloud cluster, clusterId={} nodes={}",
Config.cloud_sql_server_cluster_id, cpb.getNodesList());
}
- List<Frontend> currentFes =
Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER);
+ List<Frontend> currentFollowers =
Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER);
+ List<Frontend> currentObservers =
Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER);
+ currentFollowers.addAll(currentObservers);
+ List<Frontend> currentFes = new
ArrayList<>(currentFollowers.stream().collect(Collectors.toMap(
+ fe -> fe.getHost() + ":" + fe.getEditLogPort(),
+ fe -> fe,
+ (existing, replacement) -> existing
+ )).values());
List<Frontend> toAdd = new ArrayList<>();
List<Frontend> toDel = new ArrayList<>();
List<Cloud.NodeInfoPB> expectedFes = cpb.getNodesList();
diffNodes(toAdd, toDel, () -> {
+ // memory
Map<String, Frontend> currentMap = new HashMap<>();
String selfNode = Env.getCurrentEnv().getSelfNode().getIdent();
for (Frontend fe : currentFes) {
@@ -383,10 +391,13 @@ public class CloudClusterChecker extends MasterDaemon {
if (selfNode.equals(endpoint)) {
continue;
}
+ // add type to map key, for diff
+ endpoint = endpoint + "_" + fe.getRole();
currentMap.put(endpoint, fe);
}
return currentMap;
}, () -> {
+ // meta service
Map<String, Frontend> nodeMap = new HashMap<>();
String selfNode = Env.getCurrentEnv().getSelfNode().getIdent();
for (Cloud.NodeInfoPB node : expectedFes) {
@@ -399,9 +410,18 @@ public class CloudClusterChecker extends MasterDaemon {
if (selfNode.equals(endpoint)) {
continue;
}
- Frontend fe = new Frontend(FrontendNodeType.OBSERVER,
+ Cloud.NodeInfoPB.NodeType type = node.getNodeType();
+ // ATTN: just allow to add follower or observer
+ if (Cloud.NodeInfoPB.NodeType.FE_MASTER.equals(type)) {
+ LOG.warn("impossible !!!, get fe node {} type equel
master from ms", node);
+ }
+ FrontendNodeType role = type ==
Cloud.NodeInfoPB.NodeType.FE_FOLLOWER
+ ? FrontendNodeType.FOLLOWER :
FrontendNodeType.OBSERVER;
+ Frontend fe = new Frontend(role,
CloudEnv.genFeNodeNameFromMeta(host,
node.getEditLogPort(),
node.getCtime() * 1000), host, node.getEditLogPort());
+ // add type to map key, for diff
+ endpoint = endpoint + "_" + fe.getRole();
nodeMap.put(endpoint, fe);
}
return nodeMap;
@@ -421,7 +441,7 @@ public class CloudClusterChecker extends MasterDaemon {
}
}
- private void getCloudBackends() {
+ private void checkCloudBackends() {
Map<String, List<Backend>> clusterIdToBackend =
cloudSystemInfoService.getCloudClusterIdToBackend();
//rpc to ms, to get mysql user can use cluster_id
// NOTE: rpc args all empty, use cluster_unique_id to get a instance's
all cluster info.
@@ -474,7 +494,7 @@ public class CloudClusterChecker extends MasterDaemon {
for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
int aliveNum = 0;
List<Backend> bes = clusterIdToBackend.get(entry.getValue());
- if (bes == null || bes.size() == 0) {
+ if (bes == null || bes.isEmpty()) {
LOG.info("cant get be nodes by cluster {}, bes {}", entry,
bes);
continue;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 535b8ea582f..1ebda4dc8aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -48,6 +48,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInputStream;
import java.io.IOException;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -140,12 +141,24 @@ public class CloudEnv extends Env {
.stream().filter(NodeInfoPB::hasNodeType).collect(Collectors.toList());
helperNodes.clear();
- helperNodes.addAll(allNodes.stream()
- .filter(nodeInfoPB -> nodeInfoPB.getNodeType() ==
NodeInfoPB.NodeType.FE_MASTER)
- .map(nodeInfoPB -> new HostInfo(
- Config.enable_fqdn_mode ? nodeInfoPB.getHost() :
nodeInfoPB.getIp(), nodeInfoPB.getEditLogPort()))
- .collect(Collectors.toList()));
- // check only have one master node.
+ if (allNodes.stream().anyMatch(n -> n.getNodeType() ==
NodeInfoPB.NodeType.FE_FOLLOWER)) {
+ // multi followers mode, select first
+ Optional<HostInfo> helperNode = allNodes.stream()
+ .filter(nodeInfoPB -> nodeInfoPB.getNodeType() ==
NodeInfoPB.NodeType.FE_FOLLOWER)
+ .map(nodeInfoPB -> new HostInfo(
+ Config.enable_fqdn_mode ? nodeInfoPB.getHost() :
nodeInfoPB.getIp(), nodeInfoPB.getEditLogPort()))
+ .min(Comparator.comparing(HostInfo::getHost));
+ helperNode.ifPresent(hostInfo -> helperNodes.add(hostInfo));
+ } else {
+ // master observers mode
+ // helper node select follower's first, just one
+ helperNodes.addAll(allNodes.stream()
+ .filter(nodeInfoPB -> nodeInfoPB.getNodeType() ==
NodeInfoPB.NodeType.FE_MASTER)
+ .map(nodeInfoPB -> new HostInfo(
+ Config.enable_fqdn_mode ? nodeInfoPB.getHost() :
nodeInfoPB.getIp(), nodeInfoPB.getEditLogPort()))
+ .collect(Collectors.toList()));
+ // check only have one master node.
+ }
Preconditions.checkState(helperNodes.size() == 1);
Optional<NodeInfoPB> local = allNodes.stream().filter(n ->
((Config.enable_fqdn_mode ? n.getHost() : n.getIp())
@@ -178,6 +191,7 @@ public class CloudEnv extends Env {
}
LOG.info("current fe's role is {}", type ==
NodeInfoPB.NodeType.FE_MASTER ? "MASTER" :
+ type == NodeInfoPB.NodeType.FE_FOLLOWER ? "FOLLOWER" :
type == NodeInfoPB.NodeType.FE_OBSERVER ? "OBSERVER" :
"UNKNOWN");
if (type == NodeInfoPB.NodeType.UNKNOWN) {
LOG.warn("type current not support, please check it");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index fc580c4fc7e..6f4534c4b08 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -1068,7 +1068,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
newInfo.setPartitionId(partitionId);
newInfo.setIndexId(indexId);
newInfo.setClusterId(clusterId);
- // APPR: in unprotectUpdateCloudReplica, use batch must set
tabletId = -1
+ // ATTN: in unprotectUpdateCloudReplica, use batch must set
tabletId = -1
newInfo.setTabletId(-1);
rets.add(newInfo);
});
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 48728efb003..202d576e3bf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -282,20 +282,29 @@ public class CloudSystemInfoService extends
SystemInfoService {
LOG.debug("updateCloudFrontends toAdd={} toDel={}", toAdd, toDel);
}
String masterIp = Env.getCurrentEnv().getMasterHost();
- for (Frontend fe : toAdd) {
+ for (Frontend fe : toDel) {
if (masterIp.equals(fe.getHost())) {
continue;
}
- Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER,
- fe.getHost(), fe.getEditLogPort(), fe.getNodeName());
- LOG.info("added cloud frontend={} ", fe);
+ try {
+ Env.getCurrentEnv().dropFrontend(fe.getRole(), fe.getHost(),
fe.getEditLogPort());
+ LOG.info("dropped cloud frontend={} ", fe);
+ } catch (DdlException e) {
+ LOG.warn("failed to drop cloud frontend={} ", fe);
+ }
}
- for (Frontend fe : toDel) {
+
+ for (Frontend fe : toAdd) {
if (masterIp.equals(fe.getHost())) {
continue;
}
- Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER,
fe.getHost(), fe.getEditLogPort());
- LOG.info("dropped cloud frontend={} ", fe);
+ try {
+ Env.getCurrentEnv().addFrontend(fe.getRole(),
+ fe.getHost(), fe.getEditLogPort(), fe.getNodeName());
+ LOG.info("added cloud frontend={} ", fe);
+ } catch (DdlException e) {
+ LOG.warn("failed to add cloud frontend={} ", fe);
+ }
}
}
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 4fee41650b7..b4c2d0d0968 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -140,8 +140,10 @@ message ClusterPB {
message NodeInfoPB {
enum NodeType {
UNKNOWN = 0;
+ // lagacy logic for one-master-multi-observer mode
FE_MASTER = 1;
FE_OBSERVER = 2;
+ FE_FOLLOWER = 3;
}
optional string cloud_unique_id = 1;
optional string name = 2;
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 1587363dc0c..579afda48d9 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1789,6 +1789,86 @@ class Suite implements GroovyInterceptable {
}
}
+ def get_instance = { MetaService ms=null ->
+ def jsonOutput = new JsonOutput()
+
+ def get_instance_api = { check_func ->
+ httpTest {
+ op "get"
+ if (ms) {
+ endpoint ms.host+':'+ms.httpPort
+ } else {
+ endpoint context.config.metaServiceHttpAddress
+ }
+ uri
"/MetaService/http/get_instance?token=${token}&instance_id=${instance_id}"
+ check check_func
+ }
+ }
+
+ def json
+ get_instance_api.call() {
+ respCode, body ->
+ log.info("get instance resp: ${body} ${respCode}".toString())
+ json = parseJson(body)
+ assertTrue(json.code.equalsIgnoreCase("OK"))
+ }
+ json.result
+ }
+
+
+ def drop_node = { unique_id, ip, bePort=0, fePort=0, nodeType,
+ cluster_name, cluster_id, MetaService ms=null ->
+ def jsonOutput = new JsonOutput()
+ def type
+ if (fePort != 0) {
+ type = "SQL"
+ } else if (bePort != 0) {
+ type = "COMPUTE"
+ }
+ def clusterInfo = [
+ type: type,
+ cluster_name : cluster_name,
+ cluster_id : cluster_id,
+ nodes: [
+ [
+ cloud_unique_id: unique_id,
+ ip: ip,
+ node_type: nodeType
+ ],
+ ]
+ ]
+ if (bePort != 0) {
+ // drop be
+ clusterInfo['nodes'][0]['heartbeat_port'] = bePort
+ } else if (fePort != 0) {
+ // drop fe
+ clusterInfo['nodes'][0]['edit_log_port'] = fePort
+ }
+ def map = [instance_id: "${instance_id}", cluster: clusterInfo]
+ def js = jsonOutput.toJson(map)
+ log.info("drop node req: ${js} ".toString())
+
+ def drop_node_api = { request_body, check_func ->
+ httpTest {
+ if (ms) {
+ endpoint ms.host+':'+ms.httpPort
+ } else {
+ endpoint context.config.metaServiceHttpAddress
+ }
+ uri "/MetaService/http/drop_node?token=${token}"
+ body request_body
+ check check_func
+ }
+ }
+
+ drop_node_api.call(js) {
+ respCode, body ->
+ log.info("drop node resp: ${body} ${respCode}".toString())
+ def json = parseJson(body)
+ assertTrue(json.code.equalsIgnoreCase("OK"))
+ }
+ }
+
def d_node = { be_unique_id, ip, port, cluster_name, cluster_id ->
def jsonOutput = new JsonOutput()
def clusterInfo = [
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index 6a322039985..44220500d1b 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -51,6 +51,12 @@ class ClusterOptions {
// 3. cloudMode = null, create both cloud and none-cloud cluster, depend
on the running pipeline mode.
Boolean cloudMode = false
+ // in cloud mode, deployment methods are divided into
+ // 1. master - multi observers
+ // 2. mutli followers - multi observers
+ // default use 1
+ Boolean useFollowersMode = false
+
// when cloudMode = true/false, but the running pipeline is diff with
cloudMode,
// skip run this docker test or not.
boolean skipRunWhenPipelineDiff = true
@@ -290,6 +296,11 @@ class SuiteCluster {
if (isCloud) {
cmd += ['--cloud']
}
+
+ if (isCloud && options.useFollowersMode) {
+ cmd += ['--fe-follower']
+ }
+
cmd += ['--wait-timeout', String.valueOf(180)]
runCmd(cmd.join(' '), -1)
@@ -419,8 +430,8 @@ class SuiteCluster {
return new Tuple4(frontends, backends, metaservices, recyclers)
}
- List<Integer> addFrontend(int num) throws Exception {
- def result = add(num, 0, null)
+ List<Integer> addFrontend(int num, boolean followerMode=false) throws
Exception {
+ def result = add(num, 0, null, followerMode)
return result.first
}
@@ -429,14 +440,18 @@ class SuiteCluster {
return result.second
}
- // APPR: clusterName just used for cloud mode, 1 cluster has n bes
- Tuple2<List<Integer>, List<Integer>> add(int feNum, int beNum, String
clusterName) throws Exception {
+ // ATTN: clusterName just used for cloud mode, 1 cluster has n bes
+ // ATTN: followerMode just used for cloud mode
+ Tuple2<List<Integer>, List<Integer>> add(int feNum, int beNum, String
clusterName, boolean followerMode=false) throws Exception {
assert feNum > 0 || beNum > 0
def sb = new StringBuilder()
sb.append('up ' + name + ' ')
if (feNum > 0) {
sb.append('--add-fe-num ' + feNum + ' ')
+ if (followerMode) {
+ sb.append('--fe-follower' + ' ')
+ }
}
if (beNum > 0) {
sb.append('--add-be-num ' + beNum + ' ')
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_multi_follower.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_multi_follower.groovy
new file mode 100644
index 00000000000..864655a5344
--- /dev/null
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_multi_follower.groovy
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite('test_multi_followr_in_cloud', 'multi_cluster, docker') {
+ if (!isCloudMode()) {
+ return
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1'
+ ]
+ options.setFeNum(3)
+ options.setBeNum(1)
+ options.cloudMode = true
+ options.connectToFollower = true
+ options.useFollowersMode = true
+
+ docker(options) {
+ def f = "FOLLOWER"
+ def o = "OBSERVER"
+ def check = { int feNum, Closure checkFunc ->
+ def ret = sql_return_maparray """SHOW FRONTENDS"""
+ assertEquals(feNum, ret.size())
+ Boolean hasMaster = false
+ checkFunc.call(ret)
+ ret.each { row ->
+ if (Boolean.parseBoolean(row.IsMaster)) {
+ hasMaster = true
+ }
+ }
+ assertTrue(hasMaster)
+ }
+
+ def transferType = { String old ->
+ if (old.contains(f)) {
+ return "FE_FOLLOWER"
+ } else if (old.contains(o)) {
+ return "FE_OBSERVER"
+ }
+ }
+
+ check(3) { ret ->
+ ret.each {
+ assertTrue(it.Role.contains(f))
+ }
+ }
+ // get fe clusterName
+ def result = sql_return_maparray """ADMIN SHOW FRONTEND CONFIG LIKE
'%cloud_sql_server_cluster_name%'"""
+ def feClusterName = result.Value[0]
+ result = sql_return_maparray """ADMIN SHOW FRONTEND CONFIG LIKE
'%cloud_sql_server_cluster_id%'"""
+ def feClusterId = result.Value[0]
+ log.info("fe clusterName: {}, clusterId: {} ", feClusterName,
feClusterId)
+ def toDropIP
+ def toDropPort
+ def toDropType
+ def toDropUniqueId
+ // add new follower
+ cluster.addFrontend(1, true)
+ dockerAwaitUntil(5) {
+ def ret = sql """SHOW FRONTENDS"""
+ log.info("show frontends: {}", ret)
+ ret.size() == 4
+ }
+ check(4) { def ret ->
+ ret.each {
+ assertTrue(it.Role.contains(f))
+ if (!Boolean.parseBoolean(it.IsMaster)) {
+ toDropIP = it.Host
+ toDropPort = it.EditLogPort
+ toDropType = transferType(it.Role)
+ }
+ }
+ }
+ log.info("ip: {}, port: {}, type: {}, uniqueId: {}", toDropIP,
toDropPort, toDropType, toDropUniqueId)
+ def ms = cluster.getAllMetaservices().get(0)
+ logger.info("ms addr={}, port={}", ms.host, ms.httpPort)
+ // drop a follwer
+ def findToDropUniqueId = { clusterId, hostIP, metaServices ->
+ ret = get_instance(metaServices)
+ def toDropCluster = ret.clusters.find {
+ it.cluster_id.contains(clusterId)
+ }
+ log.info("toDropCluster: {}", toDropCluster)
+ def toDropNode = toDropCluster.nodes.find {
+ it.ip.contains(hostIP)
+ }
+ log.info("toDropNode: {}", toDropNode)
+ assertNotNull(toDropCluster)
+ assertNotNull(toDropNode)
+ toDropNode.cloud_unique_id
+ }
+
+ toDropUniqueId = findToDropUniqueId.call(feClusterId, toDropIP, ms)
+ log.info("to Drop1 ip: {}, port: {}, type: {}, uniqueId: {}",
toDropIP, toDropPort, toDropType, toDropUniqueId)
+
+ drop_node(toDropUniqueId, toDropIP, 0,
+ toDropPort, toDropType, feClusterName, feClusterId, ms)
+
+ dockerAwaitUntil(5) {
+ def ret = sql """SHOW FRONTENDS"""
+ log.info("show frontends: {}", ret)
+ ret.size() == 3
+ }
+
+ check(3) { def ret ->
+ ret.each {
+ assertTrue(it.Role.contains(f))
+ }
+ }
+
+ // add a observer
+ cluster.addFrontend(1)
+ check(4) { def ret ->
+ int observerNum = 0
+ int followerNum = 0
+ ret.each {
+ if (it.Role.contains(o)) {
+ observerNum++;
+ toDropIP = it.Host
+ toDropPort = it.EditLogPort
+ toDropType = transferType(it.Role)
+ } else if (it.Role.contains(f)) {
+ followerNum++;
+ }
+ }
+ assertEquals(1, observerNum)
+ assertEquals(3, followerNum)
+ }
+
+ toDropUniqueId = findToDropUniqueId.call(feClusterId, toDropIP, ms)
+ // drop observer
+ drop_node(toDropUniqueId, toDropIP, 0,
+ toDropPort, toDropType, feClusterName, feClusterId, ms)
+
+ dockerAwaitUntil(5) {
+ def ret = sql """SHOW FRONTENDS"""
+ log.info("show frontends: {}", ret)
+ ret.size() == 3
+ }
+
+ check(3) { def ret ->
+ int observerNum = 0
+ int followerNum = 0
+ ret.each {
+ if (it.Role.contains(o)) {
+ observerNum++;
+ } else if (it.Role.contains(f)){
+ followerNum++;
+ }
+ }
+ assertEquals(0, observerNum)
+ assertEquals(3, followerNum)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]