This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 1ac00ea9830 branch-2.1: [feat](doris compose) Copy lastest compose
code from master branch (#43464)
1ac00ea9830 is described below
commit 1ac00ea983004d6209c0ed68dba300ed11374c87
Author: yujun <[email protected]>
AuthorDate: Fri Nov 8 09:47:19 2024 +0800
branch-2.1: [feat](doris compose) Copy lastest compose code from master
branch (#43464)
Copy lastest code from master branch to support run docker suites
without external doris cluster, enable jvm debug port, ..., etc.
---
docker/runtime/doris-compose/Dockerfile | 23 +-
docker/runtime/doris-compose/Readme.md | 58 +++-
docker/runtime/doris-compose/cluster.py | 225 ++++++++++---
docker/runtime/doris-compose/command.py | 349 +++++++++++++++++----
docker/runtime/doris-compose/database.py | 171 ++++++----
docker/runtime/doris-compose/doris-compose.py | 9 +-
.../{requirements.txt => format-code.sh} | 9 +-
docker/runtime/doris-compose/requirements.txt | 2 +
docker/runtime/doris-compose/resource/common.sh | 13 +-
.../runtime/doris-compose/resource/entrypoint.sh | 68 ++++
docker/runtime/doris-compose/resource/init_be.sh | 8 +-
.../runtime/doris-compose/resource/init_cloud.sh | 12 +-
docker/runtime/doris-compose/resource/init_fe.sh | 43 ++-
docker/runtime/doris-compose/utils.py | 23 +-
.../org/apache/doris/regression/Config.groovy | 22 +-
.../org/apache/doris/regression/suite/Suite.groovy | 4 +-
.../doris/regression/suite/SuiteCluster.groovy | 232 +++++++++++---
.../suites/demo_p0/docker_action.groovy | 2 -
18 files changed, 999 insertions(+), 274 deletions(-)
diff --git a/docker/runtime/doris-compose/Dockerfile
b/docker/runtime/doris-compose/Dockerfile
index 73561e6410e..c64b732fe34 100644
--- a/docker/runtime/doris-compose/Dockerfile
+++ b/docker/runtime/doris-compose/Dockerfile
@@ -29,16 +29,7 @@ ARG JDK_IMAGE=openjdk:17-jdk-slim
FROM ${JDK_IMAGE}
-RUN <<EOF
- if [ -d "/usr/local/openjdk-17" ]; then
- ln -s /usr/local/openjdk-17 /usr/local/openjdk
- else \
- ln -s /usr/local/openjdk-8 /usr/local/openjdk
- fi
-EOF
-
# set environment variables
-ENV JAVA_HOME="/usr/local/openjdk"
ENV JACOCO_VERSION 0.8.8
RUN mkdir -p /opt/apache-doris/coverage
@@ -47,7 +38,7 @@ RUN sed -i s@/deb.debian.org/@/mirrors.aliyun.com/@g
/etc/apt/sources.list
RUN apt-get clean
RUN apt-get update && \
- apt-get install -y default-mysql-client python lsof tzdata curl unzip
patchelf jq procps && \
+ apt-get install -y default-mysql-client python lsof tzdata curl unzip
patchelf jq procps util-linux gosu && \
ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \
dpkg-reconfigure -f noninteractive tzdata && \
apt-get clean
@@ -57,16 +48,14 @@ RUN curl -f
https://repo1.maven.org/maven2/org/jacoco/jacoco/${JACOCO_VERSION}/j
unzip jacoco.zip -d /jacoco
# cloud
-#COPY cloud/CMakeLists.txt cloud/output* output/ms* /opt/apache-doris/cloud/
-RUN <<EOF
- mkdir /opt/apache-doris/fdb
- if [ -d /opt/apache-doris/cloud/bin ]; then
- sed -i 's/\<chmod\>/echo/g' /opt/apache-doris/cloud/bin/start.sh
+# COPY --chmod=777 README.md cloud/output* output/ms* /opt/apache-doris/cloud/
+RUN mkdir /opt/apache-doris/fdb
+RUN if [ -d /opt/apache-doris/cloud/bin ]; then \
+ sed -i 's/\<chmod\>/echo/g' /opt/apache-doris/cloud/bin/start.sh ; \
fi
-EOF
# fe and be
-COPY output /opt/apache-doris/
+COPY --chmod=777 output /opt/apache-doris/
# in docker, run 'chmod 755 doris_be' first time cost 1min, remove it.
RUN sed -i 's/\<chmod\>/echo/g' /opt/apache-doris/be/bin/start_be.sh
diff --git a/docker/runtime/doris-compose/Readme.md
b/docker/runtime/doris-compose/Readme.md
index a83fa81e761..c4c4dc0990f 100644
--- a/docker/runtime/doris-compose/Readme.md
+++ b/docker/runtime/doris-compose/Readme.md
@@ -23,7 +23,16 @@ Use doris compose to create doris docker compose clusters.
## Requirements
-1. The doris image should contains:
+##### 1. Make sure you have docker permissions
+
+ run:
+```
+docker run hello-world
+```
+
+if have problem with permission denied, then
[add-docker-permission](https://docs.docker.com/engine/install/linux-postinstall/).
+
+##### 2. The doris image should contains
```
/opt/apache-doris/{fe, be, cloud}
@@ -32,16 +41,14 @@ Use doris compose to create doris docker compose clusters.
if don't create cloud cluster, the image no need to contains the cloud pkg.
-if build doris use `sh build.sh --fe --be --cloud`, then its output satisfy
with all above, then run command in doris root
+if build doris use `sh build.sh --fe --be --cloud`, then its output satisfy
with all above, then run command in doris root directory
+ will generate such a image.
```
docker build -f docker/runtime/doris-compose/Dockerfile -t <image> .
```
-will generate a image.
-
-2. Install the dependent python library in
'docker/runtime/doris-compose/requirements.txt'
-
+##### 3. Install the dependent python library in
'docker/runtime/doris-compose/requirements.txt'
```
python -m pip install --user -r docker/runtime/doris-compose/requirements.txt
@@ -49,6 +56,20 @@ python -m pip install --user -r
docker/runtime/doris-compose/requirements.txt
## Usage
+### Notice
+
+Each cluster will have a directory in '/tmp/doris/{cluster-name}', user can
set env LOCAL_DORIS_PATH to change its directory.
+
+For example, if user export LOCAL_DORIS_PATH=/mydoris, then the cluster's
directory is '/mydoris/{cluster-name}'.
+
+And cluster's directory will contains all its containers's logs and data, like
fe-1, fe-2, be-1, ..., etc.
+
+If there are multiple users run doris-compose on the same machine, suggest
don't change LOCAL_DORIS_PATH or they should export the same LOCAL_DORIS_PATH.
+
+Because when create a new cluster, doris-compose will search the local doris
path, and choose a docker network which is different with this path's clusters.
+
+So if multiple users use different LOCAL_DORIS_PATH, their clusters may have
docker network conflict!!!
+
### Create a cluster or recreate its containers
```
@@ -65,9 +86,11 @@ add fe/be nodes with the specific image, or update existing
nodes with `--fe-id`
For create a cloud cluster, steps are as below:
+
1. Write cloud s3 store config file, its default path is
'/tmp/doris/cloud.ini'.
It's defined in environment variable DORIS_CLOUD_CFG_FILE, user can change
this env var to change its path.
A Example file is locate in
'docker/runtime/doris-compose/resource/cloud.ini.example'.
+
2. Use doris compose up command with option '--cloud' to create a new cloud
cluster.
The simplest way to create a cloud cluster:
@@ -127,7 +150,26 @@ Generate regression-conf-custom.groovy to connect to the
specific docker cluster
steps:
-1. Create a new cluster: `python doris-compose.py up my-cluster my-image
--add-fe-num 2 --add-be-num 4 --cloud`
-2. Generate regression-conf-custom.groovy: `python doris-compose.py config
my-cluster <doris-root-path> --connect-follow-fe`
+1. Create a new cluster: `python
docker/runtime/doris-compose/doris-compose.py up my-cluster my-image
--add-fe-num 2 --add-be-num 4 --cloud`
+2. Generate regression-conf-custom.groovy: `python
docker/runtime/doris-compose/doris-compose.py config my-cluster
<doris-root-path> --connect-follow-fe`
3. Run regression test: `bash run-regression-test.sh --run -times 1 -parallel
1 -suiteParallel 1 -d cloud/multi_cluster`
+## Problem investigation
+
+#### Log
+
+Each cluster has logs in /tmp/doris/{cluster-name}/{node-xxx}/log. For each
node, doris compose will also print log in
/tmp/doris/{cluster-name}/{node-xxx}/log/health.out
+
+#### Up cluster using non-detach mode
+
+```
+python docker/runtime/doris-compose/doris-compose.py up ... -no-detach
+```
+
+## Developer
+
+Before submitting code, pls format code.
+
+```
+bash format-code.sh
+```
diff --git a/docker/runtime/doris-compose/cluster.py
b/docker/runtime/doris-compose/cluster.py
index 5381c094cf2..ba834167bd1 100644
--- a/docker/runtime/doris-compose/cluster.py
+++ b/docker/runtime/doris-compose/cluster.py
@@ -15,8 +15,10 @@
# specific language governing permissions and limitations
# under the License.
+import configparser
import filelock
-import json
+import getpass
+import hashlib
import jsonpickle
import os
import os.path
@@ -24,7 +26,10 @@ import utils
DOCKER_DORIS_PATH = "/opt/apache-doris"
LOCAL_DORIS_PATH = os.getenv("LOCAL_DORIS_PATH", "/tmp/doris")
-DORIS_SUBNET_START = int(os.getenv("DORIS_SUBNET_START", 128))
+
+# an integer between 128 and 191, generally no need to set
+DORIS_SUBNET_START = os.getenv("DORIS_SUBNET_START")
+
LOCAL_RESOURCE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"resource")
DOCKER_RESOURCE_PATH = os.path.join(DOCKER_DORIS_PATH, "resource")
@@ -35,6 +40,7 @@ FE_HTTP_PORT = 8030
FE_RPC_PORT = 9020
FE_QUERY_PORT = 9030
FE_EDITLOG_PORT = 9010
+FE_JAVA_DBG_PORT = 5005
BE_PORT = 9060
BE_WEBSVR_PORT = 8040
@@ -49,6 +55,8 @@ ID_LIMIT = 10000
IP_PART4_SIZE = 200
+CLUSTER_ID = "12345678"
+
LOG = utils.get_logger()
@@ -56,6 +64,15 @@ def get_cluster_path(cluster_name):
return os.path.join(LOCAL_DORIS_PATH, cluster_name)
+def get_node_name(node_type, id):
+ return "{}-{}".format(node_type, id)
+
+
+def get_node_path(cluster_name, node_type, id):
+ return os.path.join(get_cluster_path(cluster_name),
+ get_node_name(node_type, id))
+
+
def get_compose_file(cluster_name):
return os.path.join(get_cluster_path(cluster_name), "docker-compose.yml")
@@ -83,11 +100,39 @@ def gen_subnet_prefix16():
except:
pass
- for i in range(DORIS_SUBNET_START, 192):
- for j in range(256):
- subnet = "{}.{}".format(i, j)
- if not used_subnet.get(subnet, False):
- return subnet
+ subnet_begin = 128
+ subnet_end = 192
+
+ subnet_part_1 = None
+ subnet_part_2 = None
+ if DORIS_SUBNET_START:
+ subnet_part_1 = int(DORIS_SUBNET_START)
+ subnet_part_2 = 0
+ else:
+ m = hashlib.md5()
+ m.update(getpass.getuser().encode("utf-8"))
+ hash_val = int(m.hexdigest(), 16)
+ # want subnet part ii to be a small num, just less than 100, so don't
use 256 here.
+ small_width = 100
+ slot_num = (subnet_end - subnet_begin) * small_width
+ idx = hash_val % slot_num
+ if idx < 0:
+ idx += slot_num
+ subnet_part_1 = subnet_begin + int(idx / small_width)
+ subnet_part_2 = idx % small_width
+
+ intervals = [
+ [(subnet_part_1, subnet_part_1 + 1), (subnet_part_2, 256)],
+ [(subnet_part_1 + 1, subnet_end), (0, 256)],
+ [(subnet_begin, subnet_part_1), (0, 256)],
+ [(subnet_part_1, subnet_part_1 + 1), (0, subnet_part_2)],
+ ]
+ for interval in intervals:
+ for i in range(interval[0][0], interval[0][1]):
+ for j in range(interval[1][0], interval[1][1]):
+ subnet = "{}.{}".format(i, j)
+ if not used_subnet.get(subnet, False):
+ return subnet
raise Exception("Failed to gen subnet")
@@ -207,8 +252,6 @@ class Node(object):
path = self.get_path()
os.makedirs(path, exist_ok=True)
- config = self.get_add_init_config()
-
# copy config to local
conf_dir = os.path.join(path, "conf")
if not os.path.exists(conf_dir) or utils.is_dir_empty(conf_dir):
@@ -216,12 +259,15 @@ class Node(object):
assert not utils.is_dir_empty(conf_dir), "conf directory {} is
empty, " \
"check doris path in image is correct".format(conf_dir)
utils.enable_dir_with_rw_perm(conf_dir)
+ config = self.get_add_init_config()
if config:
with open(os.path.join(conf_dir, self.conf_file_name()),
"a") as f:
f.write("\n")
+ f.write("#### start doris-compose add config ####\n\n")
for item in config:
f.write(item + "\n")
+ f.write("\n#### end doris-compose add config ####\n")
for sub_dir in self.expose_sub_dirs():
os.makedirs(os.path.join(path, sub_dir), exist_ok=True)
@@ -246,11 +292,10 @@ class Node(object):
return ["conf", "log"]
def get_name(self):
- return "{}-{}".format(self.node_type(), self.id)
+ return get_node_name(self.node_type(), self.id)
def get_path(self):
- return os.path.join(get_cluster_path(self.cluster.name),
- self.get_name())
+ return get_node_path(self.cluster.name, self.node_type(), self.id)
def get_image(self):
return self.meta.image
@@ -292,11 +337,18 @@ class Node(object):
"DORIS_HOME": os.path.join(self.docker_home_dir()),
"STOP_GRACE": 1 if enable_coverage else 0,
"IS_CLOUD": 1 if self.cluster.is_cloud else 0,
+ "SQL_MODE_NODE_MGR": 1 if self.cluster.sql_mode_node_mgr else 0,
}
if self.cluster.is_cloud:
envs["META_SERVICE_ENDPOINT"] = self.cluster.get_meta_server_addr()
+ # run as host user
+ if not getattr(self.cluster, 'is_root_user', True):
+ envs["HOST_USER"] = getpass.getuser()
+ envs["HOST_UID"] = os.getuid()
+ envs["HOST_GID"] = os.getgid()
+
if enable_coverage:
outfile = "{}/coverage/{}-coverage-{}-{}".format(
DOCKER_DORIS_PATH, self.node_type(), self.cluster.name,
@@ -311,6 +363,15 @@ class Node(object):
return envs
+ def entrypoint(self):
+ if self.start_script():
+ return [
+ "bash",
+ os.path.join(DOCKER_RESOURCE_PATH, "entrypoint.sh")
+ ] + self.start_script()
+ else:
+ return None
+
def get_add_init_config(self):
return []
@@ -334,10 +395,16 @@ class Node(object):
for path in ("/etc/localtime", "/etc/timezone",
"/usr/share/zoneinfo") if os.path.exists(path)
]
+
if self.cluster.coverage_dir:
volumes.append("{}:{}/coverage".format(self.cluster.coverage_dir,
DOCKER_DORIS_PATH))
+ extra_hosts = [
+ "{}:{}".format(node.get_name(), node.get_ip())
+ for node in self.cluster.get_all_nodes()
+ ]
+
content = {
"cap_add": ["SYS_PTRACE"],
"hostname": self.get_name(),
@@ -349,6 +416,7 @@ class Node(object):
"ipv4_address": self.get_ip(),
}
},
+ "extra_hosts": extra_hosts,
"ports": self.docker_ports(),
"ulimits": {
"core": -1
@@ -365,37 +433,74 @@ class Node(object):
class FE(Node):
+ def init(self):
+ super().init()
+ self.init_is_follower()
+
def get_add_init_config(self):
cfg = []
if self.cluster.fe_config:
cfg += self.cluster.fe_config
if self.cluster.is_cloud:
cfg += [
- "cloud_unique_id = " + self.cloud_unique_id(),
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
"",
"# For regression-test",
"ignore_unsupported_properties_in_cloud_mode = true",
"merge_on_write_forced_to_false = true",
+ "deploy_mode = cloud",
]
+ if self.cluster.sql_mode_node_mgr:
+ cfg += [
+ "cluster_id = " + CLUSTER_ID,
+ ]
+ else:
+ cfg += [
+ "cloud_unique_id = " + self.cloud_unique_id(),
+ ]
+
+ with open("{}/conf/{}".format(self.get_path(), self.conf_file_name()),
+ "r") as f:
+ parser = configparser.ConfigParser()
+ parser.read_string('[dummy_section]\n' + f.read())
+ for key in ("JAVA_OPTS", "JAVA_OPTS_FOR_JDK_17"):
+ value = parser["dummy_section"].get(key)
+ if value:
+ cfg.append(
+ f"{key} = \"{value}
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:{FE_JAVA_DBG_PORT}\""
+ )
+
return cfg
+ def init_is_follower(self):
+ if self.cluster.is_cloud and self.cluster.fe_follower:
+ with open(self._is_follower_path(), "w") as f:
+ f.write("true")
+
+ def _is_follower_path(self):
+ return "{}/conf/is_follower".format(self.get_path())
+
def docker_env(self):
envs = super().docker_env()
if self.cluster.is_cloud:
envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id()
+ if os.path.exists(self._is_follower_path()):
+ envs["IS_FE_FOLLOWER"] = 1
return envs
def cloud_unique_id(self):
return "sql_server_{}".format(self.id)
- def entrypoint(self):
- return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_fe.sh")]
+ def start_script(self):
+ return ["init_fe.sh"]
def docker_ports(self):
- return [FE_HTTP_PORT, FE_EDITLOG_PORT, FE_RPC_PORT, FE_QUERY_PORT]
+ return [
+ FE_HTTP_PORT, FE_EDITLOG_PORT, FE_RPC_PORT, FE_QUERY_PORT,
+ FE_JAVA_DBG_PORT
+ ]
def docker_home_dir(self):
return os.path.join(DOCKER_DORIS_PATH, "fe")
@@ -421,14 +526,26 @@ class BE(Node):
cfg += self.cluster.be_config
if self.cluster.is_cloud:
cfg += [
- "cloud_unique_id = " + self.cloud_unique_id(),
- "meta_service_endpoint = {}".format(
- self.cluster.get_meta_server_addr()),
'tmp_file_dirs = [
{"path":"./storage/tmp","max_cache_bytes":10240000,
"max_upload_bytes":10240000}]',
'enable_file_cache = true',
'file_cache_path = [ {{"path": "{}/storage/file_cache",
"total_size":53687091200, "query_limit": 10737418240}}]'
.format(self.docker_home_dir()),
+ "deploy_mode = cloud",
]
+
+ if self.cluster.be_metaservice_endpoint:
+ cfg += [
+ "meta_service_endpoint = {}".format(
+ self.cluster.get_meta_server_addr()),
+ ]
+ if self.cluster.be_cluster_id:
+ cfg += [
+ "cluster_id = " + CLUSTER_ID,
+ ]
+ if not self.cluster.sql_mode_node_mgr:
+ cfg += [
+ "cloud_unique_id = " + self.cloud_unique_id(),
+ ]
return cfg
def init_cluster_name(self):
@@ -477,8 +594,8 @@ class BE(Node):
storage_root_path = ";".join(dir_descs) if dir_descs else '""'
f.write("\nstorage_root_path = {}\n".format(storage_root_path))
- def entrypoint(self):
- return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_be.sh")]
+ def start_script(self):
+ return ["init_be.sh"]
def docker_env(self):
envs = super().docker_env()
@@ -529,12 +646,8 @@ class MS(CLOUD):
cfg += self.cluster.ms_config
return cfg
- def entrypoint(self):
- return [
- "bash",
- os.path.join(DOCKER_RESOURCE_PATH, "init_cloud.sh"),
- "--meta-service"
- ]
+ def start_script(self):
+ return ["init_cloud.sh", "--meta-service"]
def node_type(self):
return Node.TYPE_MS
@@ -554,11 +667,8 @@ class RECYCLE(CLOUD):
cfg += self.cluster.recycle_config
return cfg
- def entrypoint(self):
- return [
- "bash",
- os.path.join(DOCKER_RESOURCE_PATH, "init_cloud.sh"), "--recycler"
- ]
+ def start_script(self):
+ return ["init_cloud.sh", "--recycler"]
def node_type(self):
return Node.TYPE_RECYCLE
@@ -579,8 +689,8 @@ class FDB(Node):
with open(os.path.join(local_conf_dir, "fdb.cluster"), "w") as f:
f.write(self.cluster.get_fdb_cluster())
- def entrypoint(self):
- return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_fdb.sh")]
+ def start_script(self):
+ return ["init_fdb.sh"]
def docker_home_dir(self):
return os.path.join(DOCKER_DORIS_PATH, "fdb")
@@ -597,17 +707,20 @@ class FDB(Node):
class Cluster(object):
- def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
- ms_config, recycle_config, be_disks, be_cluster, reg_be,
- coverage_dir, cloud_store_config):
+ def __init__(self, name, subnet, image, is_cloud, is_root_user, fe_config,
+ be_config, ms_config, recycle_config, fe_follower, be_disks,
+ be_cluster, reg_be, coverage_dir, cloud_store_config,
+ sql_mode_node_mgr, be_metaservice_endpoint, be_cluster_id):
self.name = name
self.subnet = subnet
self.image = image
self.is_cloud = is_cloud
+ self.is_root_user = is_root_user
self.fe_config = fe_config
self.be_config = be_config
self.ms_config = ms_config
self.recycle_config = recycle_config
+ self.fe_follower = fe_follower
self.be_disks = be_disks
self.be_cluster = be_cluster
self.reg_be = reg_be
@@ -617,18 +730,29 @@ class Cluster(object):
node_type: Group(node_type)
for node_type in Node.TYPE_ALL
}
+ self.sql_mode_node_mgr = sql_mode_node_mgr
+ self.be_metaservice_endpoint = be_metaservice_endpoint
+ self.be_cluster_id = be_cluster_id
@staticmethod
- def new(name, image, is_cloud, fe_config, be_config, ms_config,
- recycle_config, be_disks, be_cluster, reg_be, coverage_dir,
- cloud_store_config):
- os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
- with filelock.FileLock(os.path.join(LOCAL_DORIS_PATH, "lock")):
+ def new(name, image, is_cloud, is_root_user, fe_config, be_config,
+ ms_config, recycle_config, fe_follower, be_disks, be_cluster,
+ reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr,
+ be_metaservice_endpoint, be_cluster_id):
+ if not os.path.exists(LOCAL_DORIS_PATH):
+ os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
+ os.chmod(LOCAL_DORIS_PATH, 0o777)
+ lock_file = os.path.join(LOCAL_DORIS_PATH, "lock")
+ with filelock.FileLock(lock_file):
+ if os.getuid() == utils.get_path_uid(lock_file):
+ os.chmod(lock_file, 0o666)
subnet = gen_subnet_prefix16()
- cluster = Cluster(name, subnet, image, is_cloud, fe_config,
- be_config, ms_config, recycle_config, be_disks,
- be_cluster, reg_be, coverage_dir,
- cloud_store_config)
+ cluster = Cluster(name, subnet, image, is_cloud, is_root_user,
+ fe_config, be_config, ms_config, recycle_config,
+ fe_follower, be_disks, be_cluster, reg_be,
+ coverage_dir, cloud_store_config,
+ sql_mode_node_mgr, be_metaservice_endpoint,
+ be_cluster_id)
os.makedirs(cluster.get_path(), exist_ok=True)
os.makedirs(get_status_path(name), exist_ok=True)
cluster._save_meta()
@@ -686,7 +810,14 @@ class Cluster(object):
raise Exception("No found {} with id {}".format(node_type, id))
return Node.new(self, node_type, id, meta)
- def get_all_nodes(self, node_type):
+ def get_all_nodes(self, node_type=None):
+ if node_type is None:
+ nodes = []
+ for nt, group in self.groups.items():
+ for id, meta in group.get_all_nodes().items():
+ nodes.append(Node.new(self, nt, id, meta))
+ return nodes
+
group = self.groups.get(node_type, None)
if not group:
raise Exception("Unknown node_type: {}".format(node_type))
diff --git a/docker/runtime/doris-compose/command.py
b/docker/runtime/doris-compose/command.py
index 87ae862236a..7a2f3f3c195 100644
--- a/docker/runtime/doris-compose/command.py
+++ b/docker/runtime/doris-compose/command.py
@@ -30,6 +30,47 @@ import time
LOG = utils.get_logger()
+def wait_ready_service(wait_timeout, cluster, fe_ids, be_ids):
+
+ def is_fe_ready_service(ip):
+ return utils.is_socket_avail(ip, CLUSTER.FE_QUERY_PORT)
+
+ def is_be_ready_service(ip):
+ return utils.is_socket_avail(ip, CLUSTER.BE_HEARTBEAT_PORT)
+
+ if wait_timeout == 0:
+ return
+ if wait_timeout == -1:
+ wait_timeout = 1000000000
+ expire_ts = time.time() + wait_timeout
+ while True:
+ db_mgr = database.get_db_mgr(cluster.name, False)
+ dead_frontends = []
+ for id in fe_ids:
+ fe = cluster.get_node(CLUSTER.Node.TYPE_FE, id)
+ fe_state = db_mgr.get_fe(id)
+ if not fe_state or not fe_state.alive or not is_fe_ready_service(
+ fe.get_ip()):
+ dead_frontends.append(id)
+ dead_backends = []
+ for id in be_ids:
+ be = cluster.get_node(CLUSTER.Node.TYPE_BE, id)
+ be_state = db_mgr.get_be(id)
+ if not be_state or not be_state.alive or not is_be_ready_service(
+ be.get_ip()):
+ dead_backends.append(id)
+ if not dead_frontends and not dead_backends:
+ break
+ if time.time() >= expire_ts:
+ err = ""
+ if dead_frontends:
+ err += "dead fe: " + str(dead_frontends) + ". "
+ if dead_backends:
+ err += "dead be: " + str(dead_backends) + ". "
+ raise Exception(err)
+ time.sleep(1)
+
+
# return for_all, related_nodes, related_node_num
def get_ids_related_nodes(cluster,
fe_ids,
@@ -92,7 +133,12 @@ class Command(object):
def run(self, args):
raise Exception("No implemented")
- def _add_parser_output_json(self, parser):
+ def _add_parser_common_args(self, parser):
+ parser.add_argument("-v",
+ "--verbose",
+ default=False,
+ action=self._get_parser_bool_action(True),
+ help="verbose logging.")
parser.add_argument("--output-json",
default=False,
action=self._get_parser_bool_action(True),
@@ -136,6 +182,18 @@ class Command(object):
def _support_boolean_action(self):
return sys.version_info.major == 3 and sys.version_info.minor >= 9
+ def _print_table(self, header, datas):
+ if utils.is_enable_log():
+ table = prettytable.PrettyTable(
+ [utils.render_green(field) for field in header])
+ for row in datas:
+ table.add_row(row)
+ print(table)
+ return ""
+ else:
+ datas.insert(0, header)
+ return datas
+
class SimpleCommand(Command):
@@ -150,11 +208,12 @@ class SimpleCommand(Command):
parser = args_parsers.add_parser(self.command, help=help)
parser.add_argument("NAME", help="Specify cluster name.")
self._add_parser_ids_args(parser)
- self._add_parser_output_json(parser)
+ self._add_parser_common_args(parser)
+ return parser
def run(self, args):
cluster = CLUSTER.Cluster.load(args.NAME)
- _, related_nodes, related_node_num = get_ids_related_nodes(
+ for_all, related_nodes, related_node_num = get_ids_related_nodes(
cluster, args.fe_id, args.be_id, args.ms_id, args.recycle_id,
args.fdb_id)
utils.exec_docker_compose_command(cluster.get_compose_file(),
@@ -165,6 +224,31 @@ class SimpleCommand(Command):
utils.render_green("{} succ, total related node num {}".format(
show_cmd, related_node_num)))
+ if for_all:
+ related_nodes = cluster.get_all_nodes()
+ return cluster, related_nodes
+
+
+class NeedStartCommand(SimpleCommand):
+
+ def add_parser(self, args_parsers):
+ parser = super().add_parser(args_parsers)
+ parser.add_argument(
+ "--wait-timeout",
+ type=int,
+ default=0,
+ help=
+ "Specify wait seconds for fe/be ready for service: 0 not wait
(default), "\
+ "> 0 max wait seconds, -1 wait unlimited."
+ )
+ return parser
+
+ def run(self, args):
+ cluster, related_nodes = super().run(args)
+ fe_ids = [node.id for node in related_nodes if node.is_fe()]
+ be_ids = [node.id for node in related_nodes if node.is_be()]
+ wait_ready_service(args.wait_timeout, cluster, fe_ids, be_ids)
+
class UpCommand(Command):
@@ -180,6 +264,7 @@ class UpCommand(Command):
nargs="?",
help="Specify docker image.")
+ self._add_parser_common_args(parser)
parser.add_argument(
"--cloud",
default=False,
@@ -187,6 +272,13 @@ class UpCommand(Command):
help=
"Create cloud cluster, default is false. Only use when creating
new cluster."
)
+ parser.add_argument(
+ "--root",
+ default=False,
+ action=self._get_parser_bool_action(True),
+ help=
+ "Run cluster as root user, default is false, it will run as host
user."
+ )
parser.add_argument(
"--wait-timeout",
@@ -197,8 +289,6 @@ class UpCommand(Command):
"> 0 max wait seconds, -1 wait unlimited."
)
- self._add_parser_output_json(parser)
-
group1 = parser.add_argument_group("add new nodes",
"add cluster nodes.")
group1.add_argument(
@@ -245,6 +335,13 @@ class UpCommand(Command):
type=str,
help="Specify recycle configs for
doris_cloud.conf. "\
"Example: --recycle-config \"log_level =
warn\".")
+ group1.add_argument(
+ "--fe-follower",
+ default=False,
+ action=self._get_parser_bool_action(True),
+ help=
+ "The new added fe is follower but not observer. Only support in
cloud mode."
+ )
group1.add_argument("--be-disks",
nargs="*",
default=["HDD=1"],
@@ -283,13 +380,50 @@ class UpCommand(Command):
group2.add_argument("--force-recreate",
default=False,
action=self._get_parser_bool_action(True),
- help="Recreate containers even if their
configuration" \
+ help="Recreate containers even if their
configuration " \
"and image haven't changed. ")
parser.add_argument("--coverage-dir",
default="",
help="Set code coverage output directory")
+ parser.add_argument("--sql-mode-node-mgr",
+ default=False,
+ action=self._get_parser_bool_action(True),
+ help="Manager fe be via sql instead of http")
+
+ if self._support_boolean_action():
+ parser.add_argument(
+ "--be-metaservice-endpoint",
+ default=True,
+ action=self._get_parser_bool_action(False),
+ help=
+ "Do not set BE meta service endpoint in conf. Default is
False."
+ )
+ else:
+ parser.add_argument(
+ "--no-be-metaservice-endpoint",
+ dest='be_metaservice_endpoint',
+ default=True,
+ action=self._get_parser_bool_action(False),
+ help=
+ "Do not set BE meta service endpoint in conf. Default is
False."
+ )
+
+ if self._support_boolean_action():
+ parser.add_argument(
+ "--be-cluster-id",
+ default=True,
+ action=self._get_parser_bool_action(False),
+ help="Do not set BE cluster ID in conf. Default is False.")
+ else:
+ parser.add_argument(
+ "--no-be-cluster-id",
+ dest='be_cluster_id',
+ default=True,
+ action=self._get_parser_bool_action(False),
+ help="Do not set BE cluser ID in conf. Default is False.")
+
parser.add_argument(
"--fdb-version",
type=str,
@@ -383,18 +517,21 @@ class UpCommand(Command):
args.add_ms_num = 0
args.add_recycle_num = 0
- cluster = CLUSTER.Cluster.new(args.NAME, args.IMAGE, args.cloud,
- args.fe_config, args.be_config,
- args.ms_config, args.recycle_config,
- args.be_disks, args.be_cluster,
- args.reg_be, args.coverage_dir,
- cloud_store_config)
+ cluster = CLUSTER.Cluster.new(
+ args.NAME, args.IMAGE, args.cloud, args.root, args.fe_config,
+ args.be_config, args.ms_config, args.recycle_config,
+ args.fe_follower, args.be_disks, args.be_cluster, args.reg_be,
+ args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr,
+ args.be_metaservice_endpoint, args.be_cluster_id)
LOG.info("Create new cluster {} succ, cluster path is {}".format(
args.NAME, cluster.get_path()))
if args.be_cluster and cluster.is_cloud:
cluster.be_cluster = args.be_cluster
+ if cluster.is_cloud:
+ cluster.fe_follower = args.fe_follower
+
_, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id,
args.be_id, args.ms_id,
args.recycle_id,
@@ -474,32 +611,51 @@ class UpCommand(Command):
"Not up cluster cause specific --no-start, related node
num {}"
.format(related_node_num)))
else:
- if args.wait_timeout != 0:
- if args.wait_timeout == -1:
- args.wait_timeout = 1000000000
- expire_ts = time.time() + args.wait_timeout
- while True:
- db_mgr = database.get_db_mgr(args.NAME, False)
- dead_frontends = []
- for id in add_fe_ids:
- fe_state = db_mgr.get_fe(id)
- if not fe_state or not fe_state.alive:
- dead_frontends.append(id)
- dead_backends = []
- for id in add_be_ids:
- be_state = db_mgr.get_be(id)
- if not be_state or not be_state.alive:
- dead_backends.append(id)
- if not dead_frontends and not dead_backends:
+ LOG.info("Using SQL mode for node management ? {}".format(
+ args.sql_mode_node_mgr))
+
+ # Wait for FE master to be elected
+ LOG.info("Waiting for FE master to be elected...")
+ expire_ts = time.time() + 30
+ while expire_ts > time.time():
+ db_mgr = database.get_db_mgr(args.NAME, False)
+ for id in add_fe_ids:
+ fe_state = db_mgr.get_fe(id)
+ if fe_state is not None and fe_state.alive:
break
- if time.time() >= expire_ts:
- err = ""
- if dead_frontends:
- err += "dead fe: " + str(dead_frontends) + ". "
- if dead_backends:
- err += "dead be: " + str(dead_backends) + ". "
- raise Exception(err)
- time.sleep(1)
+ LOG.info("there is no fe ready")
+ time.sleep(5)
+
+ if cluster.is_cloud and args.sql_mode_node_mgr:
+ db_mgr = database.get_db_mgr(args.NAME, False)
+ master_fe_endpoint = CLUSTER.get_master_fe_endpoint(
+ cluster.name)
+ # Add FEs except master_fe
+ for fe in cluster.get_all_nodes(CLUSTER.Node.TYPE_FE):
+ fe_endpoint = f"{fe.get_ip()}:{CLUSTER.FE_EDITLOG_PORT}"
+ if fe_endpoint != master_fe_endpoint:
+ try:
+ db_mgr.add_fe(fe_endpoint)
+ LOG.info(f"Added FE {fe_endpoint} successfully.")
+ except Exception as e:
+ LOG.error(
+ f"Failed to add FE {fe_endpoint}: {str(e)}")
+
+ # Add BEs
+ for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE):
+ be_endpoint = f"{be.get_ip()}:{CLUSTER.BE_HEARTBEAT_PORT}"
+ try:
+ db_mgr.add_be(be_endpoint)
+ LOG.info(f"Added BE {be_endpoint} successfully.")
+ except Exception as e:
+ LOG.error(f"Failed to add BE {be_endpoint}: {str(e)}")
+
+ cloud_store_config = self._get_cloud_store_config()
+
+ db_mgr.create_default_storage_vault(cloud_store_config)
+
+ wait_ready_service(args.wait_timeout, cluster, add_fe_ids,
+ add_be_ids)
LOG.info(
utils.render_green(
"Up cluster {} succ, related node num {}".format(
@@ -576,7 +732,7 @@ class DownCommand(Command):
"then apply to all containers.")
parser.add_argument("NAME", help="Specify cluster name")
self._add_parser_ids_args(parser)
- self._add_parser_output_json(parser)
+ self._add_parser_common_args(parser)
parser.add_argument(
"--clean",
default=False,
@@ -606,6 +762,8 @@ class DownCommand(Command):
args.fdb_id,
ignore_not_exists=True)
+ LOG.info("down cluster " + args.NAME + " for all " + str(for_all))
+
if for_all:
if os.path.exists(cluster.get_compose_file()):
try:
@@ -687,7 +845,6 @@ class ListNode(object):
self.created = ""
self.alive = ""
self.is_master = ""
- self.query_port = ""
self.tablet_num = ""
self.last_heartbeat = ""
self.err_msg = ""
@@ -702,16 +859,23 @@ class ListNode(object):
if detail:
query_port = ""
http_port = ""
+ heartbeat_port = ""
+ edit_log_port = ""
+ node_path = CLUSTER.get_node_path(self.cluster_name,
+ self.node_type, self.id)
if self.node_type == CLUSTER.Node.TYPE_FE:
query_port = CLUSTER.FE_QUERY_PORT
http_port = CLUSTER.FE_HTTP_PORT
+ edit_log_port = CLUSTER.FE_EDITLOG_PORT
elif self.node_type == CLUSTER.Node.TYPE_BE:
http_port = CLUSTER.BE_WEBSVR_PORT
+ heartbeat_port = CLUSTER.BE_HEARTBEAT_PORT
+ elif self.node_type == CLUSTER.Node.TYPE_MS or self.node_type ==
CLUSTER.Node.TYPE_RECYCLE:
+ http_port = CLUSTER.MS_PORT
else:
pass
result += [
- query_port,
- http_port,
+ query_port, http_port, node_path, edit_log_port, heartbeat_port
]
return result
@@ -721,7 +885,6 @@ class ListNode(object):
if fe:
self.alive = str(fe.alive).lower()
self.is_master = str(fe.is_master).lower()
- self.query_port = fe.query_port
self.last_heartbeat = fe.last_heartbeat
self.err_msg = fe.err_msg
elif self.node_type == CLUSTER.Node.TYPE_BE:
@@ -812,7 +975,16 @@ cloudUniqueId= "{fe_cloud_unique_id}"
print("\nNo write regression custom file.")
return
+ annotation_start = "//---------- Start auto generate by
doris-compose.py---------"
+ annotation_end = "//---------- End auto generate by
doris-compose.py---------"
+
+ old_contents = []
+ if os.path.exists(regression_conf_custom):
+ with open(regression_conf_custom, "r") as f:
+ old_contents = f.readlines()
with open(regression_conf_custom, "w") as f:
+ # write auto gen config
+ f.write(annotation_start)
f.write(base_conf.format(fe_ip=fe_ip))
if cluster.is_cloud:
multi_cluster_bes = ",".join([
@@ -831,6 +1003,23 @@ cloudUniqueId= "{fe_cloud_unique_id}"
multi_cluster_bes=multi_cluster_bes,
fe_cloud_unique_id=cluster.get_node(
CLUSTER.Node.TYPE_FE, 1).cloud_unique_id()))
+ f.write(annotation_end + "\n\n")
+ annotation_end_line_count = -1
+
+ # write not-auto gen config
+ in_annotation = False
+ annotation_end_line_idx = -100
+ for line_idx, line in enumerate(old_contents):
+ line = line.rstrip()
+ if line == annotation_start:
+ in_annotation = True
+ elif line == annotation_end:
+ in_annotation = False
+ annotation_end_line_idx = line_idx
+ elif not in_annotation:
+ if line or line_idx != annotation_end_line_idx + 1:
+ f.write(line + "\n")
+
print("\nWrite succ: " + regression_conf_custom)
@@ -845,24 +1034,12 @@ class ListCommand(Command):
help=
"Specify multiple clusters, if specific, show all their
containers."
)
- self._add_parser_output_json(parser)
+ self._add_parser_common_args(parser)
parser.add_argument("--detail",
default=False,
action=self._get_parser_bool_action(True),
help="Print more detail fields.")
- def _handle_data(self, header, datas):
- if utils.is_enable_log():
- table = prettytable.PrettyTable(
- [utils.render_green(field) for field in header])
- for row in datas:
- table.add_row(row)
- print(table)
- return ""
- else:
- datas.insert(0, header)
- return datas
-
def run(self, args):
COMPOSE_MISSING = "(missing)"
COMPOSE_BAD = "(bad)"
@@ -954,7 +1131,7 @@ class ListCommand(Command):
CLUSTER.get_master_fe_endpoint(name), is_cloud,
"{}{}".format(compose_file,
cluster_info["status"])))
- return self._handle_data(header, rows)
+ return self._print_table(header, rows)
header = [
"CLUSTER", "NAME", "IP", "STATUS", "CONTAINER ID", "IMAGE",
@@ -965,6 +1142,9 @@ class ListCommand(Command):
header += [
"query_port",
"http_port",
+ "path",
+ "edit_log_port",
+ "heartbeat_port",
]
rows = []
@@ -1038,17 +1218,68 @@ class ListCommand(Command):
for node in sorted(nodes, key=get_node_seq):
rows.append(node.info(args.detail))
- return self._handle_data(header, rows)
+ return self._print_table(header, rows)
+
+
+class InfoCommand(Command):
+
+ def add_parser(self, args_parsers):
+ parser = args_parsers.add_parser(
+ "info", help="Show info like cloud.ini, port, path, etc")
+ self._add_parser_common_args(parser)
+
+ def run(self, args):
+
+ header = ["key", "value", "scope"]
+ cloud_cfg_file_env = os.getenv("DORIS_CLOUD_CFG_FILE")
+ cloud_cfg_file = cloud_cfg_file_env if cloud_cfg_file_env else
"${LOCAL_DORIS_PATH}/cloud.ini"
+ rows = [
+ ("LOCAL_DORIS_PATH", CLUSTER.LOCAL_DORIS_PATH, "env variable"),
+ ("DORIS_CLOUD_CFG_FILE", cloud_cfg_file, "env variable"),
+ ("FE_QUERY_PORT", CLUSTER.FE_QUERY_PORT, "constant"),
+ ("FE_HTTP_PORT", CLUSTER.FE_HTTP_PORT, "constant"),
+ ("FE_EDITLOG_PORT", CLUSTER.FE_EDITLOG_PORT, "constant"),
+ ("FE_JAVA_DBG_PORT", CLUSTER.FE_JAVA_DBG_PORT, "constant"),
+ ("BE_HEARTBEAT_PORT", CLUSTER.BE_HEARTBEAT_PORT, "constant"),
+ ("BE_WEBSVR_PORT", CLUSTER.BE_WEBSVR_PORT, "constant"),
+ ("MS_PORT", CLUSTER.MS_PORT, "constant"),
+ ("RECYCLER_PORT", CLUSTER.MS_PORT, "constant"),
+ ]
+
+ with open(CLUSTER.CLOUD_CFG_FILE, "r") as f:
+ for line in f:
+ line = line.strip()
+ if line and not line.startswith('#'):
+ key, value = line.split('=', 1)
+ rows.append((key.strip(), value.strip(), "cloud.ini"))
+
+ return self._print_table(header, rows)
+
+
+class AddRWPermCommand(Command):
+
+ def add_parser(self, args_parsers):
+ parser = args_parsers.add_parser(
+ "add-rw-perm",
+ help="Add read and write permissions to the cluster files")
+ parser.add_argument("NAME", help="Specify cluster name.")
+ self._add_parser_common_args(parser)
+
+ def run(self, args):
+ utils.enable_dir_with_rw_perm(CLUSTER.get_cluster_path(args.NAME))
+ return ""
ALL_COMMANDS = [
UpCommand("up"),
DownCommand("down"),
- SimpleCommand("start", "Start the doris containers. "),
+ NeedStartCommand("start", "Start the doris containers. "),
SimpleCommand("stop", "Stop the doris containers. "),
- SimpleCommand("restart", "Restart the doris containers. "),
+ NeedStartCommand("restart", "Restart the doris containers. "),
SimpleCommand("pause", "Pause the doris containers. "),
SimpleCommand("unpause", "Unpause the doris containers. "),
GenConfCommand("config"),
+ InfoCommand("info"),
ListCommand("ls"),
+ AddRWPermCommand("add-rw-perm"),
]
diff --git a/docker/runtime/doris-compose/database.py
b/docker/runtime/doris-compose/database.py
index d29905e94a9..46cdd961c9f 100644
--- a/docker/runtime/doris-compose/database.py
+++ b/docker/runtime/doris-compose/database.py
@@ -20,16 +20,15 @@ import os.path
import pymysql
import time
import utils
+import uuid
LOG = utils.get_logger()
class FEState(object):
- def __init__(self, id, query_port, is_master, alive, last_heartbeat,
- err_msg):
+ def __init__(self, id, is_master, alive, last_heartbeat, err_msg):
self.id = id
- self.query_port = query_port
self.is_master = is_master
self.alive = alive
self.last_heartbeat = last_heartbeat
@@ -54,11 +53,8 @@ class DBManager(object):
def __init__(self):
self.fe_states = {}
self.be_states = {}
- self.query_port = -1
self.conn = None
-
- def set_query_port(self, query_port):
- self.query_port = query_port
+ self.master_fe_ip = ""
def get_fe(self, id):
return self.fe_states.get(id, None)
@@ -66,10 +62,19 @@ class DBManager(object):
def get_be(self, id):
return self.be_states.get(id, None)
- def load_states(self, query_ports):
- self._load_fe_states(query_ports)
+ def load_states(self):
+ self._load_fe_states()
self._load_be_states()
+ def add_fe(self, fe_endpoint):
+ try:
+ sql = f"ALTER SYSTEM ADD FOLLOWER '{fe_endpoint}'"
+ self._exec_query(sql)
+ LOG.info(f"Added FE {fe_endpoint} via SQL successfully.")
+ except Exception as e:
+ LOG.error(f"Failed to add FE {fe_endpoint} via SQL: {str(e)}")
+ raise
+
def drop_fe(self, fe_endpoint):
id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")])
try:
@@ -85,6 +90,15 @@ class DBManager(object):
return
raise e
+ def add_be(self, be_endpoint):
+ try:
+ sql = f"ALTER SYSTEM ADD BACKEND '{be_endpoint}'"
+ self._exec_query(sql)
+ LOG.info(f"Added BE {be_endpoint} via SQL successfully.")
+ except Exception as e:
+ LOG.error(f"Failed to add BE {be_endpoint} via SQL: {str(e)}")
+ raise
+
def drop_be(self, be_endpoint):
id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
try:
@@ -140,103 +154,124 @@ class DBManager(object):
time.sleep(5)
- def _load_fe_states(self, query_ports):
+ def create_default_storage_vault(self, cloud_store_config):
+ try:
+ # Create storage vault
+ create_vault_sql = f"""
+ CREATE STORAGE VAULT IF NOT EXISTS default_vault
+ PROPERTIES (
+ "type" = "S3",
+ "s3.access_key" = "{cloud_store_config['DORIS_CLOUD_AK']}",
+ "s3.secret_key" = "{cloud_store_config['DORIS_CLOUD_SK']}",
+ "s3.endpoint" = "{cloud_store_config['DORIS_CLOUD_ENDPOINT']}",
+ "s3.bucket" = "{cloud_store_config['DORIS_CLOUD_BUCKET']}",
+ "s3.region" = "{cloud_store_config['DORIS_CLOUD_REGION']}",
+ "s3.root.path" = "{str(uuid.uuid4())}",
+ "provider" = "{cloud_store_config['DORIS_CLOUD_PROVIDER']}"
+ );
+ """
+ self._exec_query(create_vault_sql)
+ LOG.info("Created storage vault 'default_vault'")
+
+ # Set as default storage vault
+ set_default_vault_sql = "SET default_vault as DEFAULT STORAGE
VAULT;"
+ self._exec_query(set_default_vault_sql)
+ LOG.info("Set 'default_vault' as the default storage vault")
+
+ except Exception as e:
+ LOG.error(f"Failed to create default storage vault: {str(e)}")
+ raise
+
+ def _load_fe_states(self):
fe_states = {}
- alive_master_fe_port = None
- for record in self._exec_query('''
- select Host, IsMaster, Alive, LastHeartbeat, ErrMsg
- from frontends()'''):
- ip, is_master, alive, last_heartbeat, err_msg = record
- is_master = utils.is_true(is_master)
- alive = utils.is_true(alive)
+ alive_master_fe_ip = None
+ for record in self._exec_query("show frontends"):
+ name = record["Name"]
+ ip = record["Host"]
+ role = record["Role"]
+ is_master = utils.is_true(record["IsMaster"])
+ alive = utils.is_true(record["Alive"])
id = CLUSTER.Node.get_id_from_ip(ip)
- query_port = query_ports.get(id, "")
- last_heartbeat = utils.escape_null(last_heartbeat)
- fe = FEState(id, query_port, is_master, alive, last_heartbeat,
- err_msg)
+ last_heartbeat = utils.escape_null(record["LastHeartbeat"])
+ err_msg = record["ErrMsg"]
+ fe = FEState(id, is_master, alive, last_heartbeat, err_msg)
fe_states[id] = fe
- if is_master and alive and query_port:
- alive_master_fe_port = query_port
+ if is_master and alive:
+ alive_master_fe_ip = ip
+ LOG.debug(
+ "record of show frontends, name {}, ip {}, alive {}, is_master
{}, role {}"
+ .format(name, ip, alive, is_master, role))
+
self.fe_states = fe_states
- if alive_master_fe_port and alive_master_fe_port != self.query_port:
- self.query_port = alive_master_fe_port
+ if alive_master_fe_ip and alive_master_fe_ip != self.master_fe_ip:
+ self.master_fe_ip = alive_master_fe_ip
self._reset_conn()
def _load_be_states(self):
be_states = {}
- for record in self._exec_query('''
- select BackendId, Host, LastHeartbeat, Alive,
SystemDecommissioned, TabletNum, ErrMsg
- from backends()'''):
- backend_id, ip, last_heartbeat, alive, decommissioned, tablet_num,
err_msg = record
- backend_id = int(backend_id)
- alive = utils.is_true(alive)
- decommissioned = utils.is_true(decommissioned)
- tablet_num = int(tablet_num)
- id = CLUSTER.Node.get_id_from_ip(ip)
- last_heartbeat = utils.escape_null(last_heartbeat)
+ for record in self._exec_query("show backends"):
+ backend_id = int(record["BackendId"])
+ alive = utils.is_true(record["Alive"])
+ decommissioned = utils.is_true(record["SystemDecommissioned"])
+ tablet_num = int(record["TabletNum"])
+ id = CLUSTER.Node.get_id_from_ip(record["Host"])
+ last_heartbeat = utils.escape_null(record["LastHeartbeat"])
+ err_msg = record["ErrMsg"]
be = BEState(id, backend_id, decommissioned, alive, tablet_num,
last_heartbeat, err_msg)
be_states[id] = be
self.be_states = be_states
+ # return rows, and each row is a record map
def _exec_query(self, sql):
self._prepare_conn()
with self.conn.cursor() as cursor:
cursor.execute(sql)
- return cursor.fetchall()
+ fields = [field_md[0] for field_md in cursor.description
+ ] if cursor.description else []
+ return [dict(zip(fields, row)) for row in cursor.fetchall()]
def _prepare_conn(self):
if self.conn:
return
- if self.query_port <= 0:
- raise Exception("Not set query_port")
self._reset_conn()
def _reset_conn(self):
self.conn = pymysql.connect(user="root",
- host="127.0.0.1",
+ host=self.master_fe_ip,
read_timeout=10,
- port=self.query_port)
+ connect_timeout=3,
+ port=CLUSTER.FE_QUERY_PORT)
def get_db_mgr(cluster_name, required_load_succ=True):
assert cluster_name
db_mgr = DBManager()
- containers = utils.get_doris_containers(cluster_name).get(
- cluster_name, None)
- if not containers:
+ master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name),
+ "master_fe_ip")
+ master_fe_ip = None
+ if os.path.exists(master_fe_ip_file):
+ with open(master_fe_ip_file, "r") as f:
+ master_fe_ip = f.read().strip()
+
+ if not master_fe_ip:
return db_mgr
- alive_fe_ports = {}
+
+ has_alive_fe = False
+ containers = utils.get_doris_containers(cluster_name).get(cluster_name, [])
for container in containers:
if utils.is_container_running(container):
- _, node_type, id = utils.parse_service_name(container.name)
+ _, node_type, _ = utils.parse_service_name(container.name)
if node_type == CLUSTER.Node.TYPE_FE:
- query_port = utils.get_map_ports(container).get(
- CLUSTER.FE_QUERY_PORT, None)
- if query_port:
- alive_fe_ports[id] = query_port
- if not alive_fe_ports:
+ has_alive_fe = True
+ break
+
+ if not has_alive_fe:
return db_mgr
- master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name),
- "master_fe_ip")
- query_port = None
- if os.path.exists(master_fe_ip_file):
- with open(master_fe_ip_file, "r") as f:
- master_fe_ip = f.read()
- if master_fe_ip:
- master_id = CLUSTER.Node.get_id_from_ip(master_fe_ip)
- query_port = alive_fe_ports.get(master_id, None)
- if not query_port:
- # A new cluster's master is fe-1
- if 1 in alive_fe_ports:
- query_port = alive_fe_ports[1]
- else:
- query_port = list(alive_fe_ports.values())[0]
-
- db_mgr.set_query_port(query_port)
+ db_mgr.master_fe_ip = master_fe_ip
try:
- db_mgr.load_states(alive_fe_ports)
+ db_mgr.load_states()
except Exception as e:
if required_load_succ:
raise e
diff --git a/docker/runtime/doris-compose/doris-compose.py
b/docker/runtime/doris-compose/doris-compose.py
index 0091b70eae9..a2d3a517553 100644
--- a/docker/runtime/doris-compose/doris-compose.py
+++ b/docker/runtime/doris-compose/doris-compose.py
@@ -45,6 +45,9 @@ def run(args, disable_log, help):
if __name__ == '__main__':
args, help = parse_args()
+ verbose = getattr(args, "verbose", False)
+ if verbose:
+ utils.set_log_verbose()
disable_log = getattr(args, "output_json", False)
if disable_log:
utils.set_enable_log(False)
@@ -53,13 +56,13 @@ if __name__ == '__main__':
try:
data = run(args, disable_log, help)
if disable_log:
- print(utils.pretty_json({"code": 0, "data": data}))
+ print(utils.pretty_json({"code": 0, "data": data}), flush=True)
code = 0
except:
err = traceback.format_exc()
if disable_log:
- print(utils.pretty_json({"code": 1, "err": err}))
+ print(utils.pretty_json({"code": 1, "err": err}), flush=True)
else:
- print(err)
+ print(err, flush=True)
code = 1
sys.exit(code)
diff --git a/docker/runtime/doris-compose/requirements.txt
b/docker/runtime/doris-compose/format-code.sh
similarity index 90%
copy from docker/runtime/doris-compose/requirements.txt
copy to docker/runtime/doris-compose/format-code.sh
index ac177eddf82..0626662e641 100644
--- a/docker/runtime/doris-compose/requirements.txt
+++ b/docker/runtime/doris-compose/format-code.sh
@@ -15,10 +15,5 @@
# specific language governing permissions and limitations
# under the License.
-docker
-docker-compose
-filelock
-jsonpickle
-prettytable
-pymysql
-python-dateutil
+yapf -i *.py
+shfmt -w resource/*.sh
diff --git a/docker/runtime/doris-compose/requirements.txt
b/docker/runtime/doris-compose/requirements.txt
index ac177eddf82..2f962ed68d8 100644
--- a/docker/runtime/doris-compose/requirements.txt
+++ b/docker/runtime/doris-compose/requirements.txt
@@ -22,3 +22,5 @@ jsonpickle
prettytable
pymysql
python-dateutil
+#pyyaml==5.4.1
+requests<=2.31.0
diff --git a/docker/runtime/doris-compose/resource/common.sh
b/docker/runtime/doris-compose/resource/common.sh
index de6ba29865a..40833d01dc6 100644
--- a/docker/runtime/doris-compose/resource/common.sh
+++ b/docker/runtime/doris-compose/resource/common.sh
@@ -23,7 +23,7 @@ export LOG_FILE=$DORIS_HOME/log/health.out
export LOCK_FILE=$DORIS_HOME/status/token
health_log() {
- echo "$(date +'%Y-%m-%d %H:%M:%S') $@" >>$LOG_FILE
+ echo "$(date +'%Y-%m-%d %H:%M:%S') $@" | tee -a $LOG_FILE
}
# concurrent write meta service server will failed due to fdb txn conflict.
@@ -120,10 +120,11 @@ wait_pid() {
health_log ""
health_log "ps -elf\n$(ps -elf)\n"
if [ -z $pid ]; then
- health_log "pid not exist"
+ health_log "pid $pid not exist"
exit 1
fi
+ health_log "pid $pid exist"
health_log "wait process $pid"
while true; do
ps -p $pid >/dev/null
@@ -132,5 +133,13 @@ wait_pid() {
fi
sleep 1s
done
+
+ health_log "show dmesg -T: "
+ dmesg -T | tail -n 50 | tee -a $LOG_FILE
+
+ health_log "show ps -elf"
+ health_log "ps -elf\n$(ps -elf)\n"
+ health_log "pid $pid not exist"
+
health_log "wait end"
}
diff --git a/docker/runtime/doris-compose/resource/entrypoint.sh
b/docker/runtime/doris-compose/resource/entrypoint.sh
new file mode 100644
index 00000000000..a3cdaaae8f1
--- /dev/null
+++ b/docker/runtime/doris-compose/resource/entrypoint.sh
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+DIR=$(
+ cd $(dirname $0)
+ pwd
+)
+
+source $DIR/common.sh
+
+RUN_USER=root
+
+create_host_user() {
+ if [ -z ${HOST_USER} ]; then
+ health_log "no specific run user, run as root"
+ return
+ fi
+ id ${HOST_USER}
+ if [ $? -eq 0 ]; then
+ health_log "contain user ${HOST_USER}, no create new user"
+ RUN_USER=${HOST_USER}
+ return
+ fi
+ id ${HOST_UID}
+ if [ $? -eq 0 ]; then
+ health_log "contain uid ${HOST_UID}, no create new user"
+ return
+ fi
+ addgroup --gid ${HOST_GID} ${HOST_USER}
+ if [ $? -eq 0 ]; then
+ health_log "create group ${HOST_USER} with gid ${HOST_GID} succ"
+ else
+ health_log "create group ${HOST_USER} with gid ${HOST_GID} failed"
+ return
+ fi
+ adduser --disabled-password --shell /bin/bash --gecos "" --uid ${HOST_UID}
--gid ${HOST_GID} ${HOST_USER}
+ if [ $? -eq 0 ]; then
+ health_log "create user ${HOST_USER} with uid ${HOST_UID} succ"
+ RUN_USER=${HOST_USER}
+ else
+ health_log "create user ${HOST_USER} with uid ${HOST_UID} failed"
+ fi
+}
+
+create_host_user
+
+if command -v gosu 2>&1 >/dev/null; then
+ if [ -f ${LOG_FILE} ]; then
+ chown ${RUN_USER}:${RUN_USER} ${LOG_FILE}
+ fi
+ gosu ${RUN_USER} bash ${DIR}/${1} ${@:2}
+else
+ bash ${DIR}/${1} ${@:2}
+fi
diff --git a/docker/runtime/doris-compose/resource/init_be.sh
b/docker/runtime/doris-compose/resource/init_be.sh
index d9b7953b534..08cc914f6af 100755
--- a/docker/runtime/doris-compose/resource/init_be.sh
+++ b/docker/runtime/doris-compose/resource/init_be.sh
@@ -48,6 +48,12 @@ add_cloud_be() {
return
fi
+ # Check if SQL_MODE_NODE_MGR is set to 1
+ if [ "$SQL_MODE_NODE_MGR" -eq 1 ]; then
+ health_log "SQL_MODE_NODE_MGR is set to 1, skipping cluster creation"
+ return
+ fi
+
cluster_file_name="${DORIS_HOME}/conf/CLUSTER_NAME"
cluster_name=$(cat $cluster_file_name)
if [ -z $cluster_name ]; then
@@ -167,7 +173,7 @@ main() {
add_be_to_cluster
health_log "run start_be.sh"
- bash $DORIS_HOME/bin/start_be.sh --daemon
+ bash $DORIS_HOME/bin/start_be.sh --daemon | tee -a $DORIS_HOME/log/be.out
wait_process
}
diff --git a/docker/runtime/doris-compose/resource/init_cloud.sh
b/docker/runtime/doris-compose/resource/init_cloud.sh
index 78152b5330b..18dfc4430e2 100644
--- a/docker/runtime/doris-compose/resource/init_cloud.sh
+++ b/docker/runtime/doris-compose/resource/init_cloud.sh
@@ -50,6 +50,13 @@ check_init_cloud() {
lock_cluster
+ # Check if SQL_MODE_NODE_MGR is set
+ if [[ "$SQL_MODE_NODE_MGR" -eq 1 ]]; then
+ health_log "SQL_MODE_NODE_MGR is set, skipping create_instance"
+ touch $HAS_CREATE_INSTANCE_FILE
+ return
+ fi
+
output=$(curl -s
"${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999"
\
-d '{"instance_id":"default_instance_id",
"name": "default_instance",
@@ -109,9 +116,8 @@ main() {
check_init_cloud &
- health_log "input args: $ARGS"
-
- bash bin/start.sh $ARGS --daemon
+ health_log "run starts.sh with args: $ARGS"
+ bash bin/start.sh $ARGS --daemon | tee -a $DORIS_HOME/log/doris_cloud.out
wait_process
}
diff --git a/docker/runtime/doris-compose/resource/init_fe.sh
b/docker/runtime/doris-compose/resource/init_fe.sh
index e532d0d56e1..b69ac3a209e 100755
--- a/docker/runtime/doris-compose/resource/init_fe.sh
+++ b/docker/runtime/doris-compose/resource/init_fe.sh
@@ -45,8 +45,8 @@ fe_daemon() {
sleep 1
output=$(mysql -P $FE_QUERY_PORT -h $MY_IP -u root --execute "SHOW
FRONTENDS;")
code=$?
- health_log "$output"
if [ $code -ne 0 ]; then
+ health_log "exec show frontends bad: $output"
continue
fi
header=$(grep IsMaster <<<$output)
@@ -81,8 +81,30 @@ fe_daemon() {
done
}
-add_cloud_fe() {
+run_fe() {
+ health_log "run start_fe.sh"
+ bash $DORIS_HOME/bin/start_fe.sh --daemon $@ | tee -a
$DORIS_HOME/log/fe.out
+}
+
+start_cloud_fe() {
if [ -f "$REGISTER_FILE" ]; then
+ fe_daemon &
+ run_fe
+ return
+ fi
+
+ # Check if SQL_MODE_NODE_MGR is set to 1
+ if [ "${SQL_MODE_NODE_MGR}" = "1" ]; then
+ health_log "SQL_MODE_NODE_MGR is set to 1. Skipping add FE."
+
+ touch $REGISTER_FILE
+
+ fe_daemon &
+ run_fe
+
+ if [ "$MY_ID" == "1" ]; then
+ echo $MY_IP >$MASTER_FE_IP_FILE
+ fi
return
fi
@@ -96,6 +118,10 @@ add_cloud_fe() {
node_type=FE_OBSERVER
fi
+ if [ "a$IS_FE_FOLLOWER" == "a1" ]; then
+ node_type=FE_FOLLOWER
+ fi
+
nodes='{
"cloud_unique_id": "'"${CLOUD_UNIQUE_ID}"'",
"ip": "'"${MY_IP}"'",
@@ -139,6 +165,10 @@ add_cloud_fe() {
fi
touch $REGISTER_FILE
+
+ fe_daemon &
+ run_fe
+
if [ "$MY_ID" == "1" ]; then
echo $MY_IP >$MASTER_FE_IP_FILE
fi
@@ -174,19 +204,14 @@ start_local_fe() {
if [ -f $REGISTER_FILE ]; then
fe_daemon &
- bash $DORIS_HOME/bin/start_fe.sh --daemon
+ run_fe
else
add_local_fe
fe_daemon &
- bash $DORIS_HOME/bin/start_fe.sh --helper
$MASTER_FE_IP:$FE_EDITLOG_PORT --daemon
+ run_fe --helper $MASTER_FE_IP:$FE_EDITLOG_PORT
fi
}
-start_cloud_fe() {
- add_cloud_fe
- bash $DORIS_HOME/bin/start_fe.sh --daemon
-}
-
main() {
trap stop_frontend SIGTERM
diff --git a/docker/runtime/doris-compose/utils.py
b/docker/runtime/doris-compose/utils.py
index 54255b597bc..4332ae6cf48 100644
--- a/docker/runtime/doris-compose/utils.py
+++ b/docker/runtime/doris-compose/utils.py
@@ -15,11 +15,13 @@
# specific language governing permissions and limitations
# under the License.
+import contextlib
import docker
-import json
+import jsonpickle
import logging
import os
import pwd
+import socket
import subprocess
import time
import yaml
@@ -56,6 +58,10 @@ def is_enable_log():
return ENABLE_LOG
+def set_log_verbose():
+ get_logger().setLevel(logging.DEBUG)
+
+
def get_logger(name=None):
global LOG
if LOG != None:
@@ -274,6 +280,12 @@ def copy_image_directory(image, image_dir, local_dir):
entrypoint="cp -r {} /opt/mount/".format(image_dir))
+def is_socket_avail(ip, port):
+ with contextlib.closing(socket.socket(socket.AF_INET,
+ socket.SOCK_STREAM)) as sock:
+ return sock.connect_ex((ip, port)) == 0
+
+
def enable_dir_with_rw_perm(dir):
if not os.path.exists(dir):
return
@@ -291,6 +303,13 @@ def get_path_owner(path):
return ""
+def get_path_uid(path):
+ try:
+ return os.stat(path).st_uid
+ except:
+ return ""
+
+
def read_compose_file(file):
with open(file, "r") as f:
return yaml.safe_load(f.read())
@@ -302,7 +321,7 @@ def write_compose_file(file, compose):
def pretty_json(json_data):
- return json.dumps(json_data, indent=4, sort_keys=True)
+ return jsonpickle.dumps(json_data, indent=4)
def is_true(val):
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
index 0a440bade64..3675bbf4e31 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
@@ -308,8 +308,8 @@ class Config {
Properties props = cmd.getOptionProperties("conf")
config.otherConfigs.putAll(props)
- config.tryCreateDbIfNotExist()
- config.buildUrlWithDefaultDb()
+ // mainly auth_xxx cases use defaultDb, these suites better not use
defaultDb
+ config.createDefaultDb()
return config
}
@@ -630,6 +630,24 @@ class Config {
return null
}
+ void createDefaultDb() {
+ String dbName = null
+ try {
+ tryCreateDbIfNotExist(defaultDb)
+ dbName = defaultDb
+ } catch (Exception e) {
+ // defaultDb is not need for most cases.
+ // when run docker suites without external fe/be, createDefaultDb
will fail, but can ignore this exception.
+ // Infact, only mainly auth_xxx cases use defaultDb, and they just
use jdbcUrl in connect function.
+ // And they can avoid using defaultDb too. But modify all these
cases take a lot work.
+ // We better delete all the usage of defaultDb in suites later,
and all suites should use their own db, not the defaultDb.
+ log.warn("create default db failed ${defaultDb}".toString())
+ }
+
+ jdbcUrl = buildUrlWithDb(jdbcUrl, dbName)
+ log.info("Reset jdbcUrl to ${jdbcUrl}".toString())
+ }
+
void tryCreateDbIfNotExist(String dbName = defaultDb) {
// connect without specify default db
try {
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 6e392ef2ac4..54f2cdfc722 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -255,13 +255,13 @@ class Suite implements GroovyInterceptable {
return
}
- boolean pipelineIsCloud = isCloudCluster()
+ boolean pipelineIsCloud = isCloudMode()
boolean dockerIsCloud = false
if (options.cloudMode == null) {
dockerIsCloud = pipelineIsCloud
} else {
dockerIsCloud = options.cloudMode
- if (dockerIsCloud != pipelineIsCloud &&
options.skipRunWhenPipelineDiff) {
+ if (dockerIsCloud != pipelineIsCloud) {
return
}
}
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index 49bfbc18792..856b0e76956 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -17,8 +17,9 @@
package org.apache.doris.regression.suite
import org.apache.doris.regression.Config
-import org.apache.doris.regression.util.Http
import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.Http
+import org.apache.doris.regression.util.JdbcUtils
import org.apache.doris.regression.util.NodeType
import com.google.common.collect.Maps
@@ -29,17 +30,29 @@ import groovy.json.JsonSlurper
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import java.util.stream.Collectors
+import java.sql.Connection
class ClusterOptions {
int feNum = 1
int beNum = 3
+ Boolean sqlModeNodeMgr = false
+ Boolean beMetaServiceEndpoint = true
+ Boolean beClusterId = false
+
+ int waitTimeout = 180
+
+ // don't add whitespace in feConfigs items,
+ // for example, ' xx = yy ' is bad, should use 'xx=yy'
List<String> feConfigs = [
'heartbeat_interval_second=5',
]
+ // don't add whitespace in beConfigs items,
+ // for example, ' xx = yy ' is bad, should use 'xx=yy'
List<String> beConfigs = [
+ 'max_sys_mem_available_low_water_mark_bytes=0', //no check mem
available memory
'report_disk_state_interval_seconds=2',
'report_random_wait=false',
]
@@ -51,9 +64,11 @@ class ClusterOptions {
// 3. cloudMode = null, create both cloud and none-cloud cluster, depend
on the running pipeline mode.
Boolean cloudMode = false
- // when cloudMode = true/false, but the running pipeline is diff with
cloudMode,
- // skip run this docker test or not.
- boolean skipRunWhenPipelineDiff = true
+ // in cloud mode, deployment methods are divided into
+ // 1. master - multi observers
+ // 2. mutli followers - multi observers
+ // default use 1
+ Boolean useFollowersMode = false
// each be disks, a disks format is: disk_type=disk_num[,disk_capacity]
// here disk_type=HDD or SSD, disk capacity is in gb unit.
@@ -95,12 +110,14 @@ class ServerNode {
String host
int httpPort
boolean alive
+ String path
static void fromCompose(ServerNode node, ListHeader header, int index,
List<Object> fields) {
node.index = index
node.host = (String) fields.get(header.indexOf('IP'))
node.httpPort = (Integer) fields.get(header.indexOf('http_port'))
node.alive = fields.get(header.indexOf('alive')) == 'true'
+ node.path = (String) fields.get(header.indexOf('path'))
}
static long toLongOrDefault(Object val, long defValue) {
@@ -130,10 +147,19 @@ class ServerNode {
assert false : 'Unknown node type'
}
+ String getLogFilePath() {
+ assert false : 'Unknown node type'
+ }
+
+ String getConfFilePath() {
+ assert false : 'Unknown node type'
+ }
+
}
class Frontend extends ServerNode {
+ int editLogPort
int queryPort
boolean isMaster
@@ -141,6 +167,7 @@ class Frontend extends ServerNode {
Frontend fe = new Frontend()
ServerNode.fromCompose(fe, header, index, fields)
fe.queryPort = (Integer) fields.get(header.indexOf('query_port'))
+ fe.editLogPort = (Integer) fields.get(header.indexOf('edit_log_port'))
fe.isMaster = fields.get(header.indexOf('is_master')) == 'true'
return fe
}
@@ -149,18 +176,29 @@ class Frontend extends ServerNode {
return NodeType.FE
}
+ String getLogFilePath() {
+ return path + '/log/fe.log'
+ }
+
+ String getConfFilePath() {
+ return path + '/conf/fe.conf'
+ }
+
}
class Backend extends ServerNode {
+ int heartbeatPort
long backendId
int tabletNum
static Backend fromCompose(ListHeader header, int index, List<Object>
fields) {
Backend be = new Backend()
ServerNode.fromCompose(be, header, index, fields)
+ be.heartbeatPort = (Integer)
fields.get(header.indexOf('heartbeat_port'))
be.backendId =
toLongOrDefault(fields.get(header.indexOf('backend_id')), -1L)
be.tabletNum = (int)
toLongOrDefault(fields.get(header.indexOf('tablet_num')), 0L)
+
return be
}
@@ -168,6 +206,58 @@ class Backend extends ServerNode {
return NodeType.BE
}
+ String getLogFilePath() {
+ return path + '/log/be.INFO'
+ }
+
+ String getConfFilePath() {
+ return path + '/conf/be.conf'
+ }
+
+}
+
+class MetaService extends ServerNode {
+
+ static MetaService fromCompose(ListHeader header, int index, List<Object>
fields) {
+ MetaService ms = new MetaService()
+ ServerNode.fromCompose(ms, header, index, fields)
+ return ms
+ }
+
+ NodeType getNodeType() {
+ return NodeType.MS
+ }
+
+ String getLogFilePath() {
+ return path + '/log/meta_service.INFO'
+ }
+
+ String getConfFilePath() {
+ return path + '/conf/doris_cloud.conf'
+ }
+
+}
+
+class Recycler extends ServerNode {
+
+ static Recycler fromCompose(ListHeader header, int index, List<Object>
fields) {
+ Recycler rs = new Recycler()
+ ServerNode.fromCompose(rs, header, index, fields)
+ return rs
+ }
+
+ NodeType getNodeType() {
+ return NodeType.RECYCLER
+ }
+
+ String getLogFilePath() {
+ return path + '/log/recycler.INFO'
+ }
+
+ String getConfFilePath() {
+ return path + '/conf/doris_cloud.conf'
+ }
+
}
@Slf4j
@@ -179,6 +269,8 @@ class SuiteCluster {
final String name
final Config config
private boolean running
+ private boolean sqlModeNodeMgr = false
+ private boolean isCloudMode = false
SuiteCluster(String name, Config config) {
this.name = name
@@ -191,6 +283,8 @@ class SuiteCluster {
assert options.feNum > 0 || options.beNum > 0
assert config.image != null && config.image != ''
+ this.isCloudMode = isCloud
+
def cmd = [
'up', name, config.image
]
@@ -220,7 +314,24 @@ class SuiteCluster {
if (isCloud) {
cmd += ['--cloud']
}
- cmd += ['--wait-timeout', String.valueOf(180)]
+
+ if (isCloud && options.useFollowersMode) {
+ cmd += ['--fe-follower']
+ }
+
+ if (options.sqlModeNodeMgr) {
+ cmd += ['--sql-mode-node-mgr']
+ }
+ if (!options.beMetaServiceEndpoint) {
+ cmd += ['--no-be-metaservice-endpoint']
+ }
+ if (!options.beClusterId) {
+ cmd += ['--no-be-cluster-id']
+ }
+
+ cmd += ['--wait-timeout', String.valueOf(options.waitTimeout)]
+
+ sqlModeNodeMgr = options.sqlModeNodeMgr
runCmd(cmd.join(' '), -1)
@@ -287,25 +398,44 @@ class SuiteCluster {
return getBackends().stream().filter(be -> be.alive ||
!needAlive).collect(Collectors.toList());
}
+ List<MetaService> getAllMetaservices(boolean needAlive = false) {
+ return getMetaservices().stream().filter(ms -> ms.alive ||
!needAlive).collect(Collectors.toList());
+ }
+
+ List<MetaService> getAllRecyclers(boolean needAlive = false) {
+ return getRecyclers().stream().filter(rc -> rc.alive ||
!needAlive).collect(Collectors.toList());
+ }
+
private List<Frontend> getFrontends() {
- List<Frontend> frontends = []
- List<Backend> backends = []
- getAllNodes(frontends, backends)
- return frontends
+ def ret = getAllNodes()
+ return ret.getV1()
}
private List<Backend> getBackends() {
- List<Frontend> frontends = []
- List<Backend> backends = []
- getAllNodes(frontends, backends)
- return backends
+ def ret = getAllNodes()
+ return ret.getV2()
+ }
+
+ private List<MetaService> getMetaservices() {
+ def ret = getAllNodes()
+ return ret.getV3()
}
- private void getAllNodes(List<Frontend> frontends, List<Backend> backends)
{
+ private List<Recycler> getRecyclers() {
+ def ret = getAllNodes()
+ return ret.getV4()
+ }
+
+ private Tuple4<List<Frontend>, List<Backend>, List<MetaService>,
List<Recycler>> getAllNodes() {
+ List<Frontend> frontends = []
+ List<Backend> backends = []
+ List<MetaService> metaservices = []
+ List<Recycler> recyclers = []
def cmd = 'ls ' + name + ' --detail'
def data = runCmd(cmd)
assert data instanceof List
def rows = (List<List<Object>>) data
+ logger.info('get all nodes {}', rows)
def header = new ListHeader(rows.get(0))
for (int i = 1; i < rows.size(); i++) {
def row = (List<Object>) rows.get(i)
@@ -316,34 +446,49 @@ class SuiteCluster {
} else if (name.startsWith('fe-')) {
int index = name.substring('fe-'.length()) as int
frontends.add(Frontend.fromCompose(header, index, row))
- } else if (name.startsWith('ms-') || name.startsWith('recycle-')
|| name.startsWith('fdb-')) {
- // TODO: handle these nodes
+ } else if (name.startsWith('ms-')) {
+ int index = name.substring('ms-'.length()) as int
+ metaservices.add(MetaService.fromCompose(header, index, row))
+ } else if (name.startsWith('recycle-')) {
+ int index = name.substring('recycle-'.length()) as int
+ recyclers.add(Recycler.fromCompose(header, index, row))
+ } else if (name.startsWith('fdb-')) {
+ // current not used
} else {
assert false : 'Unknown node type with name: ' + name
}
}
+ return new Tuple4(frontends, backends, metaservices, recyclers)
}
- List<Integer> addFrontend(int num) throws Exception {
- def result = add(num, 0)
+ List<Integer> addFrontend(int num, boolean followerMode=false) throws
Exception {
+ def result = add(num, 0, null, followerMode)
return result.first
}
- List<Integer> addBackend(int num) throws Exception {
- def result = add(0, num)
+ List<Integer> addBackend(int num, String ClusterName='') throws Exception {
+ def result = add(0, num, ClusterName)
return result.second
}
- Tuple2<List<Integer>, List<Integer>> add(int feNum, int beNum) throws
Exception {
+ // ATTN: clusterName just used for cloud mode, 1 cluster has n bes
+ // ATTN: followerMode just used for cloud mode
+ Tuple2<List<Integer>, List<Integer>> add(int feNum, int beNum, String
clusterName, boolean followerMode=false) throws Exception {
assert feNum > 0 || beNum > 0
def sb = new StringBuilder()
sb.append('up ' + name + ' ')
if (feNum > 0) {
sb.append('--add-fe-num ' + feNum + ' ')
+ if (followerMode) {
+ sb.append('--fe-follower' + ' ')
+ }
}
if (beNum > 0) {
sb.append('--add-be-num ' + beNum + ' ')
+ if (clusterName != null && !clusterName.isEmpty()) {
+ sb.append(' --be-cluster ' + clusterName + ' ')
+ }
}
sb.append('--wait-timeout 60')
@@ -373,40 +518,42 @@ class SuiteCluster {
return running
}
+ boolean isCloudMode() {
+ return this.isCloudMode
+ }
+
+ int START_WAIT_TIMEOUT = 120
+
// if not specific fe indices, then start all frontends
void startFrontends(int... indices) {
- runFrontendsCmd('start', indices)
- waitHbChanged()
+ runFrontendsCmd(START_WAIT_TIMEOUT + 5, "start --wait-timeout
${START_WAIT_TIMEOUT}".toString(), indices)
}
// if not specific be indices, then start all backends
void startBackends(int... indices) {
- runBackendsCmd('start', indices)
- waitHbChanged()
+ runBackendsCmd(START_WAIT_TIMEOUT + 5, "start --wait-timeout
${START_WAIT_TIMEOUT}".toString(), indices)
}
// if not specific fe indices, then stop all frontends
void stopFrontends(int... indices) {
- runFrontendsCmd('stop', indices)
+ runFrontendsCmd(60, 'stop', indices)
waitHbChanged()
}
// if not specific be indices, then stop all backends
void stopBackends(int... indices) {
- runBackendsCmd('stop', indices)
+ runBackendsCmd(60, 'stop', indices)
waitHbChanged()
}
// if not specific fe indices, then restart all frontends
void restartFrontends(int... indices) {
- runFrontendsCmd('restart', indices)
- waitHbChanged()
+ runFrontendsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout
${START_WAIT_TIMEOUT}".toString(), indices)
}
// if not specific be indices, then restart all backends
void restartBackends(int... indices) {
- runBackendsCmd('restart', indices)
- waitHbChanged()
+ runBackendsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout
${START_WAIT_TIMEOUT}".toString(), indices)
}
// if not specific fe indices, then drop all frontends
@@ -415,7 +562,7 @@ class SuiteCluster {
if (clean) {
cmd += ' --clean'
}
- runFrontendsCmd(cmd, indices)
+ runFrontendsCmd(60, cmd, indices)
}
// if not specific be indices, then decommission all backends
@@ -433,7 +580,7 @@ class SuiteCluster {
if (clean) {
cmd += ' --clean'
}
- runBackendsCmd(cmd, indices)
+ runBackendsCmd(60, cmd, indices)
}
void checkFeIsAlive(int index, boolean isAlive) {
@@ -468,27 +615,28 @@ class SuiteCluster {
}
}
+ void addRWPermToAllFiles() {
+ def cmd = 'add-rw-perm ' + name
+ runCmd(cmd)
+ }
+
private void waitHbChanged() {
// heart beat interval is 5s
Thread.sleep(7000)
}
- private void runFrontendsCmd(String op, int... indices) {
+ private void runFrontendsCmd(int timeoutSecond, String op, int... indices)
{
def cmd = op + ' ' + name + ' --fe-id ' + indices.join(' ')
- runCmd(cmd)
+ runCmd(cmd, timeoutSecond)
}
- private void runBackendsCmd(Integer timeoutSecond = null, String op,
int... indices) {
+ private void runBackendsCmd(int timeoutSecond, String op, int... indices) {
def cmd = op + ' ' + name + ' --be-id ' + indices.join(' ')
- if (timeoutSecond == null) {
- runCmd(cmd)
- } else {
- runCmd(cmd, timeoutSecond)
- }
+ runCmd(cmd, timeoutSecond)
}
private Object runCmd(String cmd, int timeoutSecond = 60) throws Exception
{
- def fullCmd = String.format('python %s %s --output-json',
config.dorisComposePath, cmd)
+ def fullCmd = String.format('python -W ignore %s %s --output-json',
config.dorisComposePath, cmd)
logger.info('Run doris compose cmd: {}', fullCmd)
def proc = fullCmd.execute()
def outBuf = new StringBuilder()
diff --git a/regression-test/suites/demo_p0/docker_action.groovy
b/regression-test/suites/demo_p0/docker_action.groovy
index 6d62d6ea7be..d59c0f43774 100644
--- a/regression-test/suites/demo_p0/docker_action.groovy
+++ b/regression-test/suites/demo_p0/docker_action.groovy
@@ -55,8 +55,6 @@ suite('docker_action') {
options2.beNum = 1
// create cloud cluster
options2.cloudMode = true
- //// cloud docker only run in cloud pipeline, but enable it run in
none-cloud pipeline
- // options2.skipRunWhenPipelineDiff = false
// run another docker, create a cloud cluster
docker(options2) {
// cloud cluster will ignore replication_num, always set to 1. so
create table succ even has 1 be.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]