This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2e3d15b552 [Feature](doris compose) A tool for setup and manage doris
docker cluster scaling easily (#21649)
2e3d15b552 is described below
commit 2e3d15b552aceb968bdcf5ce0352d9505cbb12ad
Author: yujun <[email protected]>
AuthorDate: Wed Jul 12 22:13:38 2023 +0800
[Feature](doris compose) A tool for setup and manage doris docker cluster
scaling easily (#21649)
---
docker/runtime/doris-compose/Dockerfile | 30 ++
docker/runtime/doris-compose/Readme.md | 102 +++++
docker/runtime/doris-compose/cluster.py | 438 ++++++++++++++++++
docker/runtime/doris-compose/command.py | 543 +++++++++++++++++++++++
docker/runtime/doris-compose/database.py | 239 ++++++++++
docker/runtime/doris-compose/doris-compose.py | 48 ++
docker/runtime/doris-compose/requirements.txt | 22 +
docker/runtime/doris-compose/resource/common.sh | 36 ++
docker/runtime/doris-compose/resource/init_be.sh | 50 +++
docker/runtime/doris-compose/resource/init_fe.sh | 70 +++
docker/runtime/doris-compose/utils.py | 253 +++++++++++
11 files changed, 1831 insertions(+)
diff --git a/docker/runtime/doris-compose/Dockerfile
b/docker/runtime/doris-compose/Dockerfile
new file mode 100644
index 0000000000..8f5d7e3376
--- /dev/null
+++ b/docker/runtime/doris-compose/Dockerfile
@@ -0,0 +1,30 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# choose a base image
+FROM openjdk:8u342-jdk
+
+# set environment variables
+ENV JAVA_HOME="/usr/local/openjdk-8/"
+
+ADD output /opt/apache-doris/
+
+RUN apt-get update && \
+ apt-get install -y default-mysql-client python && \
+ apt-get clean
+
diff --git a/docker/runtime/doris-compose/Readme.md
b/docker/runtime/doris-compose/Readme.md
new file mode 100644
index 0000000000..13f543d4e7
--- /dev/null
+++ b/docker/runtime/doris-compose/Readme.md
@@ -0,0 +1,102 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Doris compose
+
+Use doris compose to create doris docker compose clusters.
+
+## Requirements
+
+1. The doris image should contains:
+
+```
+/opt/apache-doris/{fe, be}
+```
+
+if build doris use `sh build.sh`, then its output satisfy with this, then run
command in doris root
+
+```
+docker build -f docker/runtime/doris-compose/Dockerfile -t <image> .
+```
+
+will generate a image.
+
+2. Install the dependent python library in
'docker/runtime/doris-compose/requirements.txt'
+
+
+```
+python -m pip install --user -r docker/runtime/doris-compose/requirements.txt
+```
+
+## Usage
+
+### Create a cluster or recreate its containers
+
+```
+python docker/runtime/doris-compose/doris-compose.py up <cluster-name>
<image?>
+ --add-fe-num <add-fe-num> --add-be-num <add-be-num>
+ --fe-id <fd-id> --be-id <be-id>
+
+```
+
+if it's a new cluster, must specific the image.
+
+add fe/be nodes with the specific image, or update existing nodes with
`--fe-id`, `--be-id`
+
+### Remove node from the cluster
+
+```
+python docker/runtime/doris-compose/doris-compose.py down <cluster-name>
--fe-id <fe-id> --be-id<be-id> [--clean] [--drop-force]
+```
+
+Down the containers and remove it from the DB.
+
+For BE, if specific drop force, it will send dropp sql to FE, otherwise it
will send decommission sql to FE.
+
+If specific `--clean`, it will delete its data too.
+
+
+### Start, stop, restart specific nodes
+
+
+```
+python docker/runtime/doris-compose/doris-compose.py start <cluster-name>
--fe-id <multiple fe ids> --be-id <multiple be ids>
+python docker/runtime/doris-compose/doris-compose.py restart <cluster-name>
--fe-id <multiple fe ids> --be-id <multiple be ids>
+```
+
+### List doris cluster
+
+```
+python docker/runtime/doris-compose/doris-compose.py ls <-a> <multiple
cluster names>
+```
+
+if specific cluster names, it will list all the cluster's nodes.
+
+Otherwise it will just list summary of each clusters. If not specific -a, it
will list only active clusters.
+
+If specific `-a`, it will list the unactive clusters too.
+
+There are more options about doris-compose. Just try
+
+```
+python docker/runtime/doris-compose/doris-compose.py <command> -h
+```
+
+
+
diff --git a/docker/runtime/doris-compose/cluster.py
b/docker/runtime/doris-compose/cluster.py
new file mode 100644
index 0000000000..2556809a3c
--- /dev/null
+++ b/docker/runtime/doris-compose/cluster.py
@@ -0,0 +1,438 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import jsonpickle
+import os
+import os.path
+import utils
+
+DOCKER_DORIS_PATH = "/opt/apache-doris"
+LOCAL_DORIS_PATH = os.getenv("LOCAL_DORIS_PATH")
+if not LOCAL_DORIS_PATH:
+ LOCAL_DORIS_PATH = "/tmp/doris"
+
+LOCAL_RESOURCE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
+ "resource")
+DOCKER_RESOURCE_PATH = os.path.join(DOCKER_DORIS_PATH, "resource")
+
+FE_HTTP_PORT = 8030
+FE_RPC_PORT = 9020
+FE_QUERY_PORT = 9030
+FE_EDITLOG_PORT = 9010
+
+BE_PORT = 9060
+BE_WEBSVR_PORT = 8040
+BE_HEARTBEAT_PORT = 9050
+BE_BRPC_PORT = 8060
+
+ID_LIMIT = 10000
+
+IP_PART4_SIZE = 200
+
+LOG = utils.get_logger()
+
+
+def get_cluster_path(cluster_name):
+ return os.path.join(LOCAL_DORIS_PATH, cluster_name)
+
+
+def get_compose_file(cluster_name):
+ return os.path.join(get_cluster_path(cluster_name), "docker-compose.yml")
+
+
+def get_status_path(cluster_name):
+ return os.path.join(get_cluster_path(cluster_name), "status")
+
+
+def gen_subnet_prefix16():
+ used_subnet = utils.get_docker_subnets_prefix16()
+ if os.path.exists(LOCAL_DORIS_PATH):
+ for cluster_name in os.listdir(LOCAL_DORIS_PATH):
+ try:
+ cluster = Cluster.load(cluster_name)
+ used_subnet[cluster.subnet] = True
+ except:
+ pass
+
+ for i in range(128, 192):
+ for j in range(256):
+ subnet = "{}.{}".format(i, j)
+ if not used_subnet.get(subnet, False):
+ return subnet
+
+ raise Exception("Failed to gen subnet")
+
+
+class NodeMeta(object):
+
+ def __init__(self, image):
+ self.image = image
+
+
+class Group(object):
+
+ def __init__(self, node_type):
+ self.node_type = node_type
+ self.nodes = {} # id : NodeMeta
+ self.next_id = 1
+
+ def add(self, id, image):
+ assert image
+ if not id:
+ id = self.next_id
+ self.next_id += 1
+ if self.get_node(id):
+ raise Exception(
+ "Failed to add {} with id {}, id has exists".format(
+ self.node_type, id))
+ if id > ID_LIMIT:
+ raise Exception(
+ "Failed to add {} with id {}, id exceeds {}".format(
+ self.node_type, id, ID_LIMIT))
+ self.nodes[id] = NodeMeta(image)
+
+ return id
+
+ def remove(self, id):
+ self.nodes.pop(id, None)
+
+ def get_node_num(self):
+ return len(self.nodes)
+
+ def get_all_nodes(self):
+ return self.nodes
+
+ def get_node(self, id):
+ return self.nodes.get(id, None)
+
+ def on_loaded(self):
+ nodes = {}
+ for id, node in self.nodes.items():
+ nodes[int(id)] = node
+ self.nodes = nodes
+
+
+class Node(object):
+ TYPE_FE = "fe"
+ TYPE_BE = "be"
+ TYPE_ALL = [TYPE_FE, TYPE_BE]
+
+ def __init__(self, cluster_name, id, subnet, meta):
+ self.cluster_name = cluster_name
+ self.id = id
+ self.subnet = subnet
+ self.meta = meta
+
+ @staticmethod
+ def new(cluster_name, node_type, id, subnet, meta):
+ if node_type == Node.TYPE_FE:
+ return FE(cluster_name, id, subnet, meta)
+ elif node_type == Node.TYPE_BE:
+ return BE(cluster_name, id, subnet, meta)
+ else:
+ raise Exception("Unknown node type {}".format(node_type))
+
+ def init_dir(self):
+ path = self.get_path()
+ os.makedirs(path, exist_ok=True)
+
+ # copy config to local
+ conf_dir = os.path.join(path, "conf")
+ if not os.path.exists(conf_dir) or utils.is_dir_empty(conf_dir):
+ utils.copy_image_directory(
+ self.get_image(), "{}/{}/conf".format(DOCKER_DORIS_PATH,
+ self.node_type()),
+ conf_dir)
+ assert not utils.is_dir_empty(conf_dir), "conf directory {} is
empty, " \
+ "check doris path in image is correct".format(conf_dir)
+ for sub_dir in self.expose_sub_dirs():
+ os.makedirs(os.path.join(path, sub_dir), exist_ok=True)
+
+ def is_fe(self):
+ return self.node_type() == Node.TYPE_FE
+
+ def is_be(self):
+ return self.node_type() == Node.TYPE_BE
+
+ def node_type(self):
+ raise Exception("No implemented")
+
+ def expose_sub_dirs(self):
+ return ["conf", "log"]
+
+ def get_name(self):
+ return "{}-{}".format(self.node_type(), self.id)
+
+ def get_path(self):
+ return os.path.join(get_cluster_path(self.cluster_name),
+ self.get_name())
+
+ def get_image(self):
+ return self.meta.image
+
+ def set_image(self, image):
+ self.meta.image = image
+
+ def get_ip(self):
+ seq = self.id
+ seq += IP_PART4_SIZE
+ if self.node_type() == Node.TYPE_FE:
+ seq += 0 * ID_LIMIT
+ elif self.node_type() == Node.TYPE_BE:
+ seq += 1 * ID_LIMIT
+ else:
+ seq += 2 * ID_LIMIT
+ return "{}.{}.{}".format(self.subnet, int(seq / IP_PART4_SIZE),
+ seq % IP_PART4_SIZE)
+
+ @staticmethod
+ def get_id_from_ip(ip):
+ pos2 = ip.rfind(".")
+ pos1 = ip.rfind(".", 0, pos2 - 1)
+ num3 = int(ip[pos1 + 1:pos2])
+ num4 = int(ip[pos2 + 1:])
+ seq = num3 * IP_PART4_SIZE + num4
+ while seq > ID_LIMIT:
+ seq -= ID_LIMIT
+ seq -= IP_PART4_SIZE
+ return seq
+
+ def service_name(self):
+ return utils.with_doris_prefix("{}-{}".format(self.cluster_name,
+ self.get_name()))
+
+ def docker_env(self):
+ return {
+ "MY_IP": self.get_ip(),
+ "MY_ID": self.id,
+ "FE_QUERY_PORT": FE_QUERY_PORT,
+ "FE_EDITLOG_PORT": FE_EDITLOG_PORT,
+ "BE_HEARTBEAT_PORT": BE_HEARTBEAT_PORT,
+ "DORIS_HOME": os.path.join(DOCKER_DORIS_PATH, self.node_type()),
+ }
+
+ def docker_ports(self):
+ raise Exception("No implemented")
+
+ def compose(self):
+ return {
+ "cap_add": ["SYS_PTRACE"],
+ "hostname":
+ self.get_name(),
+ "container_name":
+ self.service_name(),
+ "command":
+ self.docker_command(),
+ "environment":
+ self.docker_env(),
+ "image":
+ self.get_image(),
+ "networks": {
+ utils.with_doris_prefix(self.cluster_name): {
+ "ipv4_address": self.get_ip(),
+ }
+ },
+ "ports":
+ self.docker_ports(),
+ "ulimits": {
+ "core": -1
+ },
+ "security_opt": ["seccomp:unconfined"],
+ "volumes": [
+ "{}:{}/{}/{}".format(os.path.join(self.get_path(),
+ sub_dir), DOCKER_DORIS_PATH,
+ self.node_type(), sub_dir)
+ for sub_dir in self.expose_sub_dirs()
+ ] + [
+ "{}:{}:ro".format(LOCAL_RESOURCE_PATH, DOCKER_RESOURCE_PATH),
+ "{}:{}/{}/status".format(get_status_path(self.cluster_name),
+ DOCKER_DORIS_PATH, self.node_type()),
+ ] + [
+ "{0}:{0}:ro".format(path)
+ for path in ("/etc/localtime", "/etc/timezone",
+ "/usr/share/zoneinfo") if os.path.exists(path)
+ ],
+ }
+
+
+class FE(Node):
+
+ def docker_command(self):
+ return [
+ "bash",
+ os.path.join(DOCKER_RESOURCE_PATH, "init_fe.sh"),
+ #"{}/fe/bin/init_fe.sh".format(DOCKER_DORIS_PATH),
+ ]
+
+ def docker_ports(self):
+ return [FE_HTTP_PORT, FE_EDITLOG_PORT, FE_RPC_PORT, FE_QUERY_PORT]
+
+ def node_type(self):
+ return Node.TYPE_FE
+
+ def expose_sub_dirs(self):
+ return super().expose_sub_dirs() + ["doris-meta"]
+
+
+class BE(Node):
+
+ def docker_command(self):
+ return [
+ "bash",
+ os.path.join(DOCKER_RESOURCE_PATH, "init_be.sh"),
+ #"{}/be/bin/init_be.sh".format(DOCKER_DORIS_PATH),
+ ]
+
+ def docker_ports(self):
+ return [BE_WEBSVR_PORT, BE_BRPC_PORT, BE_HEARTBEAT_PORT, BE_PORT]
+
+ def node_type(self):
+ return Node.TYPE_BE
+
+ def expose_sub_dirs(self):
+ return super().expose_sub_dirs() + ["storage"]
+
+
+class Cluster(object):
+
+ def __init__(self, name, subnet, image):
+ self.name = name
+ self.subnet = subnet
+ self.image = image
+ self.groups = {
+ node_type: Group(node_type)
+ for node_type in Node.TYPE_ALL
+ }
+
+ @staticmethod
+ def new(name, image):
+ subnet = gen_subnet_prefix16()
+ cluster = Cluster(name, subnet, image)
+ os.makedirs(cluster.get_path(), exist_ok=True)
+ os.makedirs(get_status_path(name), exist_ok=True)
+ return cluster
+
+ @staticmethod
+ def load(name):
+ if not name:
+ raise Exception("Failed to load cluster, name is empty")
+ path = get_cluster_path(name)
+ if not os.path.exists(path):
+ raise Exception(
+ "Failed to load cluster, its directory {} not exists.".format(
+ path))
+ meta_path = Cluster._get_meta_file(name)
+ if not os.path.exists(meta_path):
+ raise Exception(
+ "Failed to load cluster, its meta file {} not exists.".format(
+ meta_path))
+ with open(meta_path, "r") as f:
+ cluster = jsonpickle.loads(f.read())
+ for group in cluster.groups.values():
+ group.on_loaded()
+ return cluster
+
+ @staticmethod
+ def _get_meta_file(name):
+ return os.path.join(get_cluster_path(name), "meta")
+
+ def get_image(self):
+ return self.image
+
+ # cluster's nodes will update image too if cluster update.
+ def set_image(self, image):
+ self.image = image
+ for _, group in self.groups.items():
+ for _, node_meta in group.nodes.items():
+ node_meta.image = image
+
+ def get_path(self):
+ return get_cluster_path(self.name)
+
+ def get_group(self, node_type):
+ group = self.groups.get(node_type, None)
+ if not group:
+ raise Exception("Unknown node_type: {}".format(node_type))
+ return group
+
+ def get_node(self, node_type, id):
+ group = self.get_group(node_type)
+ meta = group.get_node(id)
+ if not meta:
+ raise Exception("No found {} with id {}".format(node_type, id))
+ return Node.new(self.name, node_type, id, self.subnet, meta)
+
+ def get_all_nodes(self, node_type):
+ group = self.groups.get(node_type, None)
+ if not group:
+ raise Exception("Unknown node_type: {}".format(node_type))
+ return [
+ Node.new(self.name, node_type, id, self.subnet, meta)
+ for id, meta in group.get_all_nodes().items()
+ ]
+
+ def get_all_nodes_num(self):
+ num = 0
+ for group in self.groups.values():
+ num += group.get_node_num()
+ return num
+
+ def add(self, node_type, id=None):
+ id = self.get_group(node_type).add(id, self.image)
+ node = self.get_node(node_type, id)
+ node.init_dir()
+ return node
+
+ def remove(self, node_type, id):
+ group = self.get_group(node_type)
+ group.remove(id)
+
+ def save(self):
+ self._save_meta()
+ self._save_compose()
+
+ def _save_meta(self):
+ with open(Cluster._get_meta_file(self.name), "w") as f:
+ f.write(jsonpickle.dumps(self, indent=2))
+
+ def _save_compose(self):
+ services = {}
+ for node_type in self.groups.keys():
+ for node in self.get_all_nodes(node_type):
+ services[node.service_name()] = node.compose()
+
+ compose = {
+ "version": "3",
+ "networks": {
+ utils.with_doris_prefix(self.name): {
+ "driver": "bridge",
+ "ipam": {
+ "config": [{
+ "subnet": "{}.0.0/16".format(self.subnet),
+ }]
+ }
+ }
+ },
+ "services": services,
+ }
+
+ utils.write_compose_file(self.get_compose_file(), compose)
+
+ def get_compose_file(self):
+ global get_compose_file
+ return get_compose_file(self.name)
diff --git a/docker/runtime/doris-compose/command.py
b/docker/runtime/doris-compose/command.py
new file mode 100644
index 0000000000..2fa556d513
--- /dev/null
+++ b/docker/runtime/doris-compose/command.py
@@ -0,0 +1,543 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import cluster as CLUSTER
+import database
+import utils
+import os
+import os.path
+import prettytable
+import shutil
+import sys
+
+LOG = utils.get_logger()
+
+
+# return for_all, related_nodes, related_node_num
+def get_ids_related_nodes(cluster, fe_ids, be_ids, ignore_not_exists=False):
+ if fe_ids is None and be_ids is None:
+ return True, None, cluster.get_all_nodes_num()
+
+ def get_ids_related_nodes_with_type(node_type, ids):
+ if ids is None:
+ return []
+ if not ids:
+ return cluster.get_all_nodes(node_type)
+ else:
+ nodes = []
+ for id in ids:
+ try:
+ nodes.append(cluster.get_node(node_type, id))
+ except Exception as e:
+ if ignore_not_exists:
+ LOG.warning(
+ utils.render_yellow(
+ "Not found {} with id {}".format(
+ node_type, id)))
+ else:
+ raise e
+ return nodes
+
+ nodes = get_ids_related_nodes_with_type(
+ CLUSTER.Node.TYPE_FE, fe_ids) + get_ids_related_nodes_with_type(
+ CLUSTER.Node.TYPE_BE, be_ids)
+
+ related_node_num = len(nodes)
+
+ return len(nodes) == cluster.get_all_nodes_num(), nodes, len(nodes)
+
+
+class Command(object):
+
+ def __init__(self, name):
+ self.name = name
+
+ def add_parser(self, args_parsers):
+ raise Exception("No implemented")
+
+ def run(self, args):
+ raise Exception("No implemented")
+
+ def _add_parser_ids_args(self, parser):
+ group = parser.add_argument_group("for existing nodes",
+ "apply to the existing nodes.")
+ group.add_argument("--fe-id", nargs="*", type=int, help="Specify up fe
ids, support multiple ids, " \
+ "if specific --fe-id but not specific ids, apply to all fe.")
+ group.add_argument("--be-id", nargs="*", type=int, help="Specify up be
ids, support multiple ids, " \
+ "if specific --be but not specific ids, apply to all be.")
+
+ def _get_parser_bool_action(self, is_store_true):
+ if sys.version_info.major == 3 and sys.version_info.minor >= 9:
+ return argparse.BooleanOptionalAction
+ else:
+ return "store_true" if is_store_true else "store_false"
+
+
+class SimpleCommand(Command):
+
+ def __init__(self, command, help):
+ super().__init__(command)
+ self.command = command
+ self.help = help
+
+ def add_parser(self, args_parsers):
+ help = self.help + " If none of --fe-id, --be-id is specific, then
apply to all containers."
+ parser = args_parsers.add_parser(self.command, help=help)
+ parser.add_argument("NAME", help="Specify cluster name.")
+ self._add_parser_ids_args(parser)
+
+ def run(self, args):
+ cluster = CLUSTER.Cluster.load(args.NAME)
+ _, related_nodes, related_node_num = get_ids_related_nodes(
+ cluster, args.fe_id, args.be_id)
+ utils.exec_docker_compose_command(cluster.get_compose_file(),
+ self.command,
+ nodes=related_nodes)
+ show_cmd = self.command[0].upper() + self.command[1:]
+ LOG.info(
+ utils.render_green("{} succ, total related node num {}".format(
+ show_cmd, related_node_num)))
+
+
+class UpCommand(Command):
+
+ def add_parser(self, args_parsers):
+ parser = args_parsers.add_parser("up", help="Create and upgrade doris
containers, "\
+ "or add new containers. " \
+ "If none of --add-fe-num, --add-be-num, --fe-id, --be-id is
specific, " \
+ "then apply to all containers.")
+ parser.add_argument("NAME", default="", help="Specific cluster name.")
+ parser.add_argument("IMAGE",
+ default="",
+ nargs="?",
+ help="Specify docker image.")
+
+ group1 = parser.add_argument_group("add new nodes",
+ "add cluster nodes.")
+ group1.add_argument(
+ "--add-fe-num",
+ type=int,
+ help="Specify add fe num, default 3 for a new cluster.")
+ group1.add_argument(
+ "--add-be-num",
+ type=int,
+ help="Specify add be num, default 3 for a new cluster.")
+
+ self._add_parser_ids_args(parser)
+
+ group2 = parser.add_mutually_exclusive_group()
+ group2.add_argument(
+ "--no-start",
+ default=False,
+ action=self._get_parser_bool_action(True),
+ help="Not start containers, create or update config image only.")
+ group2.add_argument("--force-recreate",
+ default=False,
+ action=self._get_parser_bool_action(True),
+ help="Recreate containers even if their
configuration" \
+ "and image haven't changed. ")
+
+ def run(self, args):
+ if not args.NAME:
+ raise Exception("Need specific not empty cluster name")
+ for_all = True
+ try:
+ cluster = CLUSTER.Cluster.load(args.NAME)
+ if args.fe_id != None or args.be_id != None or args.add_fe_num or
args.add_be_num:
+ for_all = False
+ except:
+ # a new cluster
+ if not args.IMAGE:
+ raise Exception("New cluster must specific image")
+ if args.fe_id != None:
+ args.fe_id = None
+ LOG.warning(
+ utils.render_yellow("Ignore --fe-id for new cluster"))
+ if args.be_id != None:
+ args.be_id = None
+ LOG.warning(
+ utils.render_yellow("Ignore --be-id for new cluster"))
+ cluster = CLUSTER.Cluster.new(args.NAME, args.IMAGE)
+ LOG.info("Create new cluster {} succ, cluster path is {}".format(
+ args.NAME, cluster.get_path()))
+ if not args.add_fe_num:
+ args.add_fe_num = 3
+ if not args.add_be_num:
+ args.add_be_num = 3
+
+ _, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id,
+ args.be_id)
+ if not related_nodes:
+ related_nodes = []
+ if args.add_fe_num:
+ for i in range(args.add_fe_num):
+ related_nodes.append(cluster.add(CLUSTER.Node.TYPE_FE))
+ if args.add_be_num:
+ for i in range(args.add_be_num):
+ related_nodes.append(cluster.add(CLUSTER.Node.TYPE_BE))
+ if args.IMAGE:
+ for node in related_nodes:
+ node.set_image(args.IMAGE)
+ if for_all and args.IMAGE:
+ cluster.set_image(args.IMAGE)
+ cluster.save()
+
+ options = []
+ if args.no_start:
+ options.append("--no-start")
+ else:
+ options = ["-d", "--remove-orphans"]
+ if args.force_recreate:
+ options.append("--force-recreate")
+
+ related_node_num = len(related_nodes)
+ if for_all:
+ related_node_num = cluster.get_all_nodes_num()
+ related_nodes = None
+
+ utils.exec_docker_compose_command(cluster.get_compose_file(), "up",
+ options, related_nodes)
+ if args.no_start:
+ LOG.info(
+ utils.render_green(
+ "Not up cluster cause specific --no-start, related node
num {}"
+ .format(related_node_num)))
+ else:
+ LOG.info(
+ utils.render_green(
+ "Up cluster {} succ, related node num {}".format(
+ args.NAME, related_node_num)))
+
+
+class DownCommand(Command):
+
+ def add_parser(self, args_parsers):
+ parser = args_parsers.add_parser("down",
+ help="Down doris containers, networks. "\
+ "It will also remove node from DB.
" \
+ "If none of --fe-id, --be-id is
specific, "\
+ "then apply to all containers.")
+ parser.add_argument("NAME", help="Specify cluster name")
+ self._add_parser_ids_args(parser)
+ parser.add_argument(
+ "--clean",
+ default=False,
+ action=self._get_parser_bool_action(True),
+ help=
+ "Clean container related files, include expose data, config and
logs"
+ )
+ parser.add_argument(
+ "--drop-force",
+ default=None,
+ action=self._get_parser_bool_action(True),
+ help="Drop doris node force. For be, if specific --drop-force, "\
+ "it will send dropp to fe, otherwise send decommission to
fe.")
+
+ def run(self, args):
+ cluster = CLUSTER.Cluster.load(args.NAME)
+ for_all, related_nodes, related_node_num = get_ids_related_nodes(
+ cluster, args.fe_id, args.be_id, ignore_not_exists=True)
+
+ if for_all:
+ utils.exec_docker_compose_command(cluster.get_compose_file(),
+ "down",
+ ["-v", "--remove-orphans"])
+ if args.clean:
+ utils.enable_dir_with_rw_perm(cluster.get_path())
+ shutil.rmtree(cluster.get_path())
+ LOG.info(
+ utils.render_yellow(
+ "Clean cluster data cause has specific --clean"))
+ else:
+ db_mgr = database.get_db_mgr(cluster.name)
+
+ for node in related_nodes:
+ if node.is_fe():
+ fe_endpoint = "{}:{}".format(node.get_ip(),
+ CLUSTER.FE_EDITLOG_PORT)
+ db_mgr.drop_fe(fe_endpoint)
+ elif node.is_be():
+ be_endpoint = "{}:{}".format(node.get_ip(),
+ CLUSTER.BE_HEARTBEAT_PORT)
+ if args.drop_force:
+ db_mgr.drop_be(be_endpoint)
+ else:
+ db_mgr.decommission_be(be_endpoint)
+ else:
+ raise Exception("Unknown node type: {}".format(
+ node.node_type()))
+
+ #utils.exec_docker_compose_command(cluster.get_compose_file(),
+ # "stop",
+ # nodes=[node])
+ utils.exec_docker_compose_command(cluster.get_compose_file(),
+ "rm", ["-s", "-v", "-f"],
+ nodes=[node])
+ if args.clean:
+ utils.enable_dir_with_rw_perm(node.get_path())
+ shutil.rmtree(node.get_path())
+ LOG.info(
+ utils.render_yellow(
+ "Clean {} with id {} data cause has specific
--clean"
+ .format(node.node_type(), node.id)))
+
+ cluster.remove(node.node_type(), node.id)
+ cluster.save()
+
+ LOG.info(
+ utils.render_green(
+ "Down cluster {} succ, related node num {}".format(
+ args.NAME, related_node_num)))
+
+
+class ListNode(object):
+
+ def __init__(self):
+ self.node_type = ""
+ self.id = 0
+ self.cluster_name = ""
+ self.ip = ""
+ self.status = ""
+ self.container_id = ""
+ self.image = ""
+ self.created = ""
+ self.alive = ""
+ self.is_master = ""
+ self.query_port = ""
+ self.tablet_num = ""
+ self.last_heartbeat = ""
+ self.err_msg = ""
+
+ def info(self):
+ return (self.cluster_name, "{}-{}".format(self.node_type, self.id),
+ self.ip, self.status, self.container_id, self.image,
+ self.created, self.alive, self.is_master, self.query_port,
+ self.tablet_num, self.last_heartbeat, self.err_msg)
+
+ def update_db_info(self, db_mgr):
+ if self.node_type == CLUSTER.Node.TYPE_FE:
+ fe = db_mgr.get_fe(self.id)
+ if fe:
+ self.alive = str(fe.alive).lower()
+ self.is_master = str(fe.is_master).lower()
+ self.query_port = fe.query_port
+ self.last_heartbeat = fe.last_heartbeat
+ self.err_msg = fe.err_msg
+ elif self.node_type == CLUSTER.Node.TYPE_BE:
+ be = db_mgr.get_be(self.id)
+ if be:
+ self.alive = str(be.alive).lower()
+ self.tablet_num = be.tablet_num
+ self.last_heartbeat = be.last_heartbeat
+ self.err_msg = be.err_msg
+
+
+class ListCommand(Command):
+
+ def add_parser(self, args_parsers):
+ parser = args_parsers.add_parser(
+ "ls", help="List running doris compose clusters.")
+ parser.add_argument(
+ "NAME",
+ nargs="*",
+ help=
+ "Specify multiple clusters, if specific, show all their
containers."
+ )
+ parser.add_argument(
+ "-a",
+ "--all",
+ default=False,
+ action=self._get_parser_bool_action(True),
+ help="Show all stopped and bad doris compose projects")
+
+ def run(self, args):
+ COMPOSE_MISSING = "(missing)"
+ COMPOSE_BAD = "(bad)"
+ COMPOSE_GOOD = ""
+
+ SERVICE_DEAD = "dead"
+
+ class ComposeService(object):
+
+ def __init__(self, name, ip, image):
+ self.name = name
+ self.ip = ip
+ self.image = image
+
+ def parse_cluster_compose_file(cluster_name):
+ compose_file = CLUSTER.get_compose_file(cluster_name)
+ if not os.path.exists(compose_file):
+ return COMPOSE_MISSING, {}
+ try:
+ compose = utils.read_compose_file(compose_file)
+ if not compose:
+ return COMPOSE_BAD, {}
+ services = compose.get("services", {})
+ if services is None:
+ return COMPOSE_BAD, {}
+ return COMPOSE_GOOD, {
+ service:
+ ComposeService(
+ service,
+ list(service_conf["networks"].values())[0]
+ ["ipv4_address"], service_conf["image"])
+ for service, service_conf in services.items()
+ }
+ except:
+ return COMPOSE_BAD, {}
+
+ clusters = {}
+ search_names = []
+ if args.NAME:
+ search_names = args.NAME
+ elif os.path.exists(CLUSTER.LOCAL_DORIS_PATH):
+ search_names = os.listdir(CLUSTER.LOCAL_DORIS_PATH)
+
+ for cluster_name in search_names:
+ status, services = parse_cluster_compose_file(cluster_name)
+ clusters[cluster_name] = {"status": status, "services": services}
+
+ docker_clusters = utils.get_doris_containers(args.NAME)
+ for cluster_name, containers in docker_clusters.items():
+ cluster_info = clusters.get(cluster_name, None)
+ if not cluster_info:
+ cluster_info = {"status": COMPOSE_MISSING, "services": {}}
+ clusters[cluster_name] = cluster_info
+ for container in containers:
+ #if container.status == "running" and cluster_info[
+ # "status"] == COMPOSE_GOOD and (
+ # container.name not in cluster_info["services"]):
+ # container.status = "orphans"
+ cluster_info["services"][container.name] = container
+
+ TYPE_COMPOSESERVICE = type(ComposeService("", "", ""))
+ if not args.NAME:
+ headers = (utils.render_green(field)
+ for field in ("CLUSTER", "STATUS", "CONFIG FILES"))
+ table = prettytable.PrettyTable(headers)
+ for name in sorted(clusters.keys()):
+ cluster_info = clusters[name]
+ service_statuses = {}
+ for _, container in cluster_info["services"].items():
+ status = SERVICE_DEAD if type(
+ container) == TYPE_COMPOSESERVICE else container.status
+ service_statuses[status] = service_statuses.get(status,
+ 0) + 1
+ show_status = ",".join([
+ "{}({})".format(status, count)
+ for status, count in service_statuses.items()
+ ])
+ if not args.all and service_statuses.get("running", 0) == 0:
+ continue
+ compose_file = CLUSTER.get_compose_file(name)
+ table.add_row(
+ (name, show_status, "{}{}".format(compose_file,
+ cluster_info["status"])))
+ print(table)
+ return
+
+ headers = (utils.render_green(field)
+ for field in ("CLUSTER", "NAME", "IP", "STATUS",
+ "CONTAINER ID", "IMAGE", "CREATED", "alive",
+ "is_master", "query_port", "tablet_num",
+ "last_heartbeat", "err_msg"))
+ table = prettytable.PrettyTable(headers)
+
+ for cluster_name in sorted(clusters.keys()):
+ fe_ids = {}
+ be_ids = {}
+ services = clusters[cluster_name]["services"]
+ db_mgr = database.get_db_mgr(cluster_name, False)
+
+ nodes = []
+ for service_name, container in services.items():
+ _, node_type, id = utils.parse_service_name(container.name)
+ node = ListNode()
+ node.cluster_name = cluster_name
+ node.node_type = node_type
+ node.id = id
+ node.update_db_info(db_mgr)
+ nodes.append(node)
+
+ if node_type == CLUSTER.Node.TYPE_FE:
+ fe_ids[id] = True
+ elif node_type == CLUSTER.Node.TYPE_BE:
+ be_ids[id] = True
+
+ if type(container) == TYPE_COMPOSESERVICE:
+ node.ip = container.ip
+ node.image = container.image
+ node.status = SERVICE_DEAD
+ else:
+ node.created = container.attrs.get("Created",
+ "")[:19].replace(
+ "T", " ")
+ node.ip = list(
+ container.attrs["NetworkSettings"]
+ ["Networks"].values())[0]["IPAMConfig"]["IPv4Address"]
+ node.image = ",".join(container.image.tags)
+ node.container_id = container.short_id
+ node.status = container.status
+
+ for id, fe in db_mgr.fe_states.items():
+ if fe_ids.get(id, False):
+ continue
+ node = ListNode()
+ node.cluster_name = cluster_name
+ node.node_type = CLUSTER.Node.TYPE_FE
+ node.id = id
+ node.status = SERVICE_DEAD
+ node.update_db_info(db_mgr)
+ nodes.append(node)
+ for id, be in db_mgr.be_states.items():
+ if be_ids.get(id, False):
+ continue
+ node = ListNode()
+ node.cluster_name = cluster_name
+ node.node_type = CLUSTER.Node.TYPE_BE
+ node.id = id
+ node.status = SERVICE_DEAD
+ node.update_db_info(db_mgr)
+ nodes.append(node)
+
+ def get_key(node):
+ key = node.id
+ if node.node_type == CLUSTER.Node.TYPE_FE:
+ key += 0 * CLUSTER.ID_LIMIT
+ elif node.node_type == CLUSTER.Node.TYPE_BE:
+ key += 1 * CLUSTER.ID_LIMIT
+ else:
+ key += 2 * CLUSTER.ID_LIMIT
+ return key
+
+ for node in sorted(nodes, key=get_key):
+ table.add_row(node.info())
+
+ print(table)
+
+
+ALL_COMMANDS = [
+ UpCommand("up"),
+ DownCommand("down"),
+ SimpleCommand("start", "Start the doris containers. "),
+ SimpleCommand("stop", "Stop the doris containers. "),
+ SimpleCommand("restart", "Restart the doris containers. "),
+ SimpleCommand("pause", "Pause the doris containers. "),
+ SimpleCommand("unpause", "Unpause the doris containers. "),
+ ListCommand("ls"),
+]
diff --git a/docker/runtime/doris-compose/database.py
b/docker/runtime/doris-compose/database.py
new file mode 100644
index 0000000000..21aa400a47
--- /dev/null
+++ b/docker/runtime/doris-compose/database.py
@@ -0,0 +1,239 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import cluster as CLUSTER
+import os.path
+import pymysql
+import time
+import utils
+
+LOG = utils.get_logger()
+
+
+class FEState(object):
+
+ def __init__(self, id, query_port, is_master, alive, last_heartbeat,
+ err_msg):
+ self.id = id
+ self.query_port = query_port
+ self.is_master = is_master
+ self.alive = alive
+ self.last_heartbeat = last_heartbeat
+ self.err_msg = err_msg
+
+
+class BEState(object):
+
+ def __init__(self, id, decommissioned, alive, tablet_num, last_heartbeat,
+ err_msg):
+ self.id = id
+ self.decommissioned = decommissioned
+ self.alive = alive
+ self.tablet_num = tablet_num
+ self.last_heartbeat = last_heartbeat
+ self.err_msg = err_msg
+
+
+class DBManager(object):
+
+ def __init__(self):
+ self.fe_states = {}
+ self.be_states = {}
+ self.query_port = -1
+ self.conn = None
+
+ def set_query_port(self, query_port):
+ self.query_port = query_port
+
+ def get_fe(self, id):
+ return self.fe_states.get(id, None)
+
+ def get_be(self, id):
+ return self.be_states.get(id, None)
+
+ def load_states(self, query_ports):
+ self._load_fe_states(query_ports)
+ self._load_be_states()
+
+ def drop_fe(self, fe_endpoint):
+ id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")])
+ try:
+ self._exec_query(
+ "ALTER SYSTEM DROP FOLLOWER '{}'".format(fe_endpoint))
+ LOG.info("Drop fe {} with id {} from db succ.".format(
+ fe_endpoint, id))
+ except Exception as e:
+ if str(e).find("frontend does not exist") >= 0:
+ LOG.info(
+ "Drop fe {} with id {} from db succ cause it does not
exist in db."
+ .format(fe_endpoint, id))
+ return
+ raise e
+
+ def drop_be(self, be_endpoint):
+ id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
+ try:
+ self._exec_query(
+ "ALTER SYSTEM DROPP BACKEND '{}'".format(be_endpoint))
+ LOG.info("Drop be {} with id {} from db succ.".format(
+ be_endpoint, id))
+ except Exception as e:
+ if str(e).find("backend does not exists") >= 0:
+ LOG.info(
+ "Drop be {} with id {} from db succ cause it does not
exist in db."
+ .format(be_endpoint, id))
+ return
+ raise e
+
+ def decommission_be(self, be_endpoint):
+ old_tablet_num = 0
+ id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
+ if id not in self.be_states:
+ self._load_be_states()
+ if id in self.be_states:
+ be = self.be_states[id]
+ old_tablet_num = be.tablet_num
+ if not be.alive:
+ raise Exception("Decommission be {} with id {} fail " \
+ "cause it's not alive, maybe you should specific
--drop-force " \
+ " to dropp it from db".format(be_endpoint, id))
+ try:
+ self._exec_query(
+ "ALTER SYSTEM DECOMMISSION BACKEND '{}'".format(be_endpoint))
+ LOG.info("Mark be {} with id {} as decommissioned, start migrate
its tablets, " \
+ "wait migrating job finish.".format(be_endpoint, id))
+ except Exception as e:
+ if str(e).find("Backend does not exist") >= 0:
+ LOG.info("Decommission be {} with id {} from db succ " \
+ "cause it does not exist in db.",format(be_endpoint,
id))
+ return
+ raise e
+
+ while True:
+ self._load_be_states()
+ be = self.be_states.get(id, None)
+ if not be:
+ LOG.info("Decommission be {} succ, total migrate {} tablets, "
\
+ "has drop it from db.".format(be_endpoint,
old_tablet_num))
+ return
+ LOG.info(
+ "Decommission be {} status: alive {}, decommissioned {}. "
\
+ "It is migrating its tablets, left {}/{} tablets."
+ .format(be_endpoint, be.alive, be.decommissioned,
be.tablet_num, old_tablet_num))
+
+ time.sleep(5)
+
+ def _load_fe_states(self, query_ports):
+ fe_states = {}
+ alive_master_fe_port = None
+ for record in self._exec_query("show frontends;"):
+ ip = record[1]
+ is_master = record[7] == "true"
+ alive = record[10] == "true"
+ last_heartbeat = record[12]
+ err_msg = record[14]
+ id = CLUSTER.Node.get_id_from_ip(ip)
+ query_port = query_ports.get(id, None)
+ fe = FEState(id, query_port, is_master, alive, last_heartbeat,
+ err_msg)
+ fe_states[id] = fe
+ if is_master and alive and query_port:
+ alive_master_fe_port = query_port
+ self.fe_states = fe_states
+ if alive_master_fe_port and alive_master_fe_port != self.query_port:
+ self.query_port = alive_master_fe_port
+ self._reset_conn()
+
+ def _load_be_states(self):
+ be_states = {}
+ for record in self._exec_query("show backends;"):
+ ip = record[1]
+ last_heartbeat = record[7]
+ alive = record[8] == "true"
+ decommissioned = record[9] == "true"
+ tablet_num = int(record[10])
+ err_msg = record[18]
+ id = CLUSTER.Node.get_id_from_ip(ip)
+ be = BEState(id, decommissioned, alive, tablet_num, last_heartbeat,
+ err_msg)
+ be_states[id] = be
+ self.be_states = be_states
+
+ def _exec_query(self, sql):
+ self._prepare_conn()
+ with self.conn.cursor() as cursor:
+ cursor.execute(sql)
+ return cursor.fetchall()
+
+ def _prepare_conn(self):
+ if self.conn:
+ return
+ if self.query_port <= 0:
+ raise Exception("Not set query_port")
+ self._reset_conn()
+
+ def _reset_conn(self):
+ self.conn = pymysql.connect(user="root",
+ host="127.0.0.1",
+ read_timeout = 10,
+ port=self.query_port)
+
+
+def get_db_mgr(cluster_name, required_load_succ=True):
+ assert cluster_name
+ db_mgr = DBManager()
+ containers = utils.get_doris_containers(cluster_name).get(
+ cluster_name, None)
+ if not containers:
+ return db_mgr
+ alive_fe_ports = {}
+ for container in containers:
+ if utils.is_container_running(container):
+ _, node_type, id = utils.parse_service_name(container.name)
+ if node_type == CLUSTER.Node.TYPE_FE:
+ query_port = utils.get_map_ports(container).get(
+ CLUSTER.FE_QUERY_PORT, None)
+ if query_port:
+ alive_fe_ports[id] = query_port
+ if not alive_fe_ports:
+ return db_mgr
+
+ master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name),
+ "master_fe_ip")
+ query_port = None
+ if os.path.exists(master_fe_ip_file):
+ with open(master_fe_ip_file, "r") as f:
+ master_fe_ip = f.read()
+ if master_fe_ip:
+ master_id = CLUSTER.Node.get_id_from_ip(master_fe_ip)
+ query_port = alive_fe_ports.get(master_id, None)
+ if not query_port:
+ # A new cluster's master is fe-1
+ if 1 in alive_fe_ports:
+ query_port = alive_fe_ports[1]
+ else:
+ query_port = list(alive_fe_ports.values())[0]
+
+ db_mgr.set_query_port(query_port)
+ try:
+ db_mgr.load_states(alive_fe_ports)
+ except Exception as e:
+ if required_load_succ:
+ raise e
+ LOG.exception(e)
+
+ return db_mgr
diff --git a/docker/runtime/doris-compose/doris-compose.py
b/docker/runtime/doris-compose/doris-compose.py
new file mode 100644
index 0000000000..77ceb736e3
--- /dev/null
+++ b/docker/runtime/doris-compose/doris-compose.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import command
+import utils
+
+
+def parse_args():
+ ap = argparse.ArgumentParser(description="")
+ args_parsers = ap.add_subparsers(dest="command")
+ for cmd in command.ALL_COMMANDS:
+ cmd.add_parser(args_parsers)
+
+ return ap.parse_args(), ap.format_help()
+
+
+def run(args, help):
+ timer = utils.Timer()
+ for cmd in command.ALL_COMMANDS:
+ if args.command == cmd.name:
+ return cmd.run(args)
+ timer.cancel()
+ print(help)
+ return -1
+
+
+def main():
+ args, help = parse_args()
+ run(args, help)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/docker/runtime/doris-compose/requirements.txt
b/docker/runtime/doris-compose/requirements.txt
new file mode 100644
index 0000000000..039260e6c7
--- /dev/null
+++ b/docker/runtime/doris-compose/requirements.txt
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+docker
+docker-compose
+jsonpickle
+prettytable
+pymysql
diff --git a/docker/runtime/doris-compose/resource/common.sh
b/docker/runtime/doris-compose/resource/common.sh
new file mode 100644
index 0000000000..044e26487d
--- /dev/null
+++ b/docker/runtime/doris-compose/resource/common.sh
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+export MASTER_FE_IP=""
+export MASTER_FE_IP_FILE=$DORIS_HOME/status/master_fe_ip
+
+health_log() {
+ date >> "$DORIS_HOME/log/health.out"
+ echo "$@" >> "$DORIS_HOME/log/health.out"
+}
+
+read_master_fe_ip() {
+ MASTER_FE_IP=`cat $MASTER_FE_IP_FILE`
+ if [ $? -eq 0 ]; then
+ health_log "master fe ${MASTER_FE_IP} has ready."
+ return 0
+ else
+ health_log "master fe has not ready."
+ return 1
+ fi
+}
+
diff --git a/docker/runtime/doris-compose/resource/init_be.sh
b/docker/runtime/doris-compose/resource/init_be.sh
new file mode 100644
index 0000000000..9a59876382
--- /dev/null
+++ b/docker/runtime/doris-compose/resource/init_be.sh
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+DIR=$(cd $(dirname $0);pwd)
+
+source $DIR/common.sh
+
+REGISTER_FILE=$DORIS_HOME/status/$MY_IP-register
+
+add_backend() {
+ while true; do
+ read_master_fe_ip
+ if [ $? -ne 0 ]; then
+ sleep 1
+ continue
+ fi
+
+ output=`mysql -P $FE_QUERY_PORT -h $MASTER_FE_IP -u root --execute
"ALTER SYSTEM ADD BACKEND '$MY_IP:$BE_HEARTBEAT_PORT';" 2>&1`
+ res=$?
+ health_log "$output"
+ [ $res -eq 0 ] && break
+ (echo $output | grep "Same backend already exists") && break
+ sleep 1
+ done
+
+ touch $REGISTER_FILE
+}
+
+main() {
+ if [ ! -f $REGISTER_FILE ]; then
+ add_backend
+ fi
+ bash $DORIS_HOME/bin/start_be.sh
+}
+
+main
diff --git a/docker/runtime/doris-compose/resource/init_fe.sh
b/docker/runtime/doris-compose/resource/init_fe.sh
new file mode 100644
index 0000000000..fbbd335f37
--- /dev/null
+++ b/docker/runtime/doris-compose/resource/init_fe.sh
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+DIR=$(cd $(dirname $0);pwd)
+
+source $DIR/common.sh
+
+add_frontend() {
+ while true; do
+ read_master_fe_ip
+ if [ $? -ne 0 ]; then
+ sleep 1
+ continue
+ fi
+
+ output=`mysql -P $FE_QUERY_PORT -h $MASTER_FE_IP -u root --execute
"ALTER SYSTEM ADD FOLLOWER '$MY_IP:$FE_EDITLOG_PORT';" 2>&1`
+ res=$?
+ health_log "$output"
+ [ $res -eq 0 ] && break
+ (echo $output | grep "frontend already exists") && break
+ sleep 1
+ done
+}
+
+fe_daemon() {
+ set +e
+ while true; do
+ output=`mysql -P $FE_QUERY_PORT -h $MY_IP -u root --execute "SHOW
FRONTENDS;" | grep -w $MY_IP | awk '{print $8}' 2>&1`
+ if [ $? -ne 0 ]; then
+ health_log "$output"
+ else
+ echo $output | grep true
+ if [ $? -eq 0 ]; then
+ echo $MY_IP > $MASTER_FE_IP_FILE
+ if [ "$MASTER_FE_IP" != "$MY_IP" ]; then
+ health_log "change to master, last master is $MASTER_FE_IP"
+ MASTER_FE_IP=$MY_IP
+ fi
+ fi
+ fi
+ sleep 3
+ done
+}
+
+main() {
+ if [ "$MY_ID" = "1" -o -d "${DORIS_HOME}/doris-meta/image" ]; then
+ fe_daemon &
+ bash $DORIS_HOME/bin/start_fe.sh
+ else
+ add_frontend
+ fe_daemon &
+ $DORIS_HOME/bin/start_fe.sh --helper $MASTER_FE_IP:$FE_EDITLOG_PORT
+ fi
+}
+
+main
diff --git a/docker/runtime/doris-compose/utils.py
b/docker/runtime/doris-compose/utils.py
new file mode 100644
index 0000000000..7317715fe9
--- /dev/null
+++ b/docker/runtime/doris-compose/utils.py
@@ -0,0 +1,253 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import docker
+import logging
+import os
+import subprocess
+import time
+import yaml
+
+DORIS_PREFIX = "doris-"
+
+
+class Timer(object):
+
+ def __init__(self):
+ self.start = time.time()
+ self.canceled = False
+
+ def __del__(self):
+ if not self.canceled:
+ LOG.info("=== Total run time: {} s".format(
+ int(time.time() - self.start)))
+
+ def cancel(self):
+ self.canceled = True
+
+
+def get_logger(name=None):
+ logger = logging.getLogger(name)
+ if not logger.hasHandlers():
+ formatter = logging.Formatter(
+ '%(asctime)s - %(filename)s - %(lineno)dL - %(levelname)s -
%(message)s'
+ )
+ ch = logging.StreamHandler()
+ ch.setLevel(logging.DEBUG)
+ ch.setFormatter(formatter)
+ logger.addHandler(ch)
+ logger.setLevel(logging.INFO)
+
+ return logger
+
+
+LOG = get_logger()
+
+
+def render_red(s):
+ return "\x1B[31m" + str(s) + "\x1B[0m"
+
+
+def render_green(s):
+ return "\x1B[32m" + str(s) + "\x1B[0m"
+
+
+def render_yellow(s):
+ return "\x1B[33m" + str(s) + "\x1B[0m"
+
+
+def render_blue(s):
+ return "\x1B[34m" + str(s) + "\x1B[0m"
+
+
+def with_doris_prefix(name):
+ return DORIS_PREFIX + name
+
+
+def parse_service_name(service_name):
+ import cluster
+ if not service_name or not service_name.startswith(DORIS_PREFIX):
+ return None, None, None
+ pos2 = service_name.rfind("-")
+ if pos2 < 0:
+ return None, None, None
+ id = None
+ try:
+ id = int(service_name[pos2 + 1:])
+ except:
+ return None, None, None
+ pos1 = service_name.rfind("-", len(DORIS_PREFIX), pos2 - 1)
+ if pos1 < 0:
+ return None, None, None
+ node_type = service_name[pos1 + 1:pos2]
+ if node_type not in cluster.Node.TYPE_ALL:
+ return None, None, None
+ return service_name[len(DORIS_PREFIX):pos1], node_type, id
+
+
+def get_map_ports(container):
+ return {
+ int(innner.replace("/tcp", "")): int(outer[0]["HostPort"])
+ for innner, outer in container.attrs.get("NetworkSettings", {}).get(
+ "Ports", {}).items()
+ }
+
+
+def is_container_running(container):
+ return container.status == "running"
+
+
+# return all doris containers when cluster_names is empty
+def get_doris_containers(cluster_names):
+ if cluster_names:
+ if type(cluster_names) == type(""):
+ filter_names = "{}{}-*".format(DORIS_PREFIX, cluster_names)
+ else:
+ filter_names = "|".join([
+ "{}{}-*".format(DORIS_PREFIX, name) for name in cluster_names
+ ])
+ else:
+ filter_names = "{}*".format(DORIS_PREFIX)
+
+ clusters = {}
+ client = docker.client.from_env()
+ containers = client.containers.list(filters={"name": filter_names})
+ for container in containers:
+ cluster_name, _, _ = parse_service_name(container.name)
+ if not cluster_name:
+ continue
+ if cluster_names and cluster_name not in cluster_names:
+ continue
+ if cluster_name not in clusters:
+ clusters[cluster_name] = []
+ clusters[cluster_name].append(container)
+ return clusters
+
+
+def get_doris_running_containers(cluster_name):
+ return {
+ container.name: container
+ for container in get_doris_containers(cluster_name).get(
+ cluster_name, []) if is_container_running(container)
+ }
+
+
+def is_dir_empty(dir):
+ return False if os.listdir(dir) else True
+
+
+def exec_shell_command(command, ignore_errors=False):
+ LOG.info("Exec command: {}".format(command))
+ p = subprocess.Popen(command,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ out = p.communicate()[0].decode('utf-8')
+ if not ignore_errors:
+ assert p.returncode == 0, out
+ if out:
+ print(out)
+ return p.returncode, out
+
+
+def exec_docker_compose_command(compose_file,
+ command,
+ options=None,
+ nodes=None,
+ user_command=None):
+ if nodes != None and not nodes:
+ return 0, "Skip"
+
+ compose_cmd = "docker-compose -f {} {} {} {} {}".format(
+ compose_file, command, " ".join(options) if options else "",
+ " ".join([node.service_name() for node in nodes]) if nodes else "",
+ user_command if user_command else "")
+
+ return exec_shell_command(compose_cmd)
+
+
+def get_docker_subnets_prefix16():
+ subnet_prefixes = {}
+ client = docker.from_env()
+ for network in client.networks.list():
+ if not network.attrs:
+ continue
+ ipam = network.attrs.get("IPAM", None)
+ if not ipam:
+ continue
+ configs = ipam.get("Config", None)
+ if not configs:
+ continue
+ for config in configs:
+ subnet = config.get("Subnet", None)
+ if not subnet:
+ continue
+ pos1 = subnet.find(".")
+ if pos1 <= 0:
+ continue
+ pos2 = subnet.find(".", pos1 + 1)
+ if pos2 <= 0:
+ continue
+ num1 = subnet[0:pos1]
+ num2 = subnet[pos1 + 1:pos2]
+ network_part_len = 16
+ pos = subnet.find("/")
+ if pos != -1:
+ network_part_len = int(subnet[pos + 1:])
+ if network_part_len < 16:
+ for i in range(256):
+ subnet_prefixes["{}.{}".format(num1, i)] = True
+ else:
+ subnet_prefixes["{}.{}".format(num1, num2)] = True
+
+ LOG.debug("Get docker subnet prefixes: {}".format(subnet_prefixes))
+
+ return subnet_prefixes
+
+
+def copy_image_directory(image, image_dir, local_dir):
+ client = docker.from_env()
+ volumes = ["{}:/opt/mount".format(local_dir)]
+ if image_dir.endswith("/"):
+ image_dir += "."
+ elif not image_dir.endswith("."):
+ image_dir += "/."
+ client.containers.run(
+ image,
+ remove=True,
+ volumes=volumes,
+ entrypoint="cp -r {} /opt/mount/".format(image_dir))
+
+
+def enable_dir_with_rw_perm(dir):
+ if not os.path.exists(dir):
+ return
+ client = docker.client.from_env()
+ client.containers.run("ubuntu",
+ remove=True,
+ volumes=["{}:/opt/mount".format(dir)],
+ entrypoint="chmod a+rw -R {}".format("/opt/mount"))
+
+
+def read_compose_file(file):
+ with open(file, "r") as f:
+ return yaml.safe_load(f.read())
+
+
+def write_compose_file(file, compose):
+ with open(file, "w") as f:
+ f.write(yaml.dump(compose))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]