This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new cae93219814 branch-3.0: [fix](docker case) Fix
`test_sql_mode_node_mgr` and add cloud multi f… #44124 (#45239)
cae93219814 is described below
commit cae9321981412d79aa3af8dc47193422a0661e25
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 11 22:54:12 2024 +0800
branch-3.0: [fix](docker case) Fix `test_sql_mode_node_mgr` and add cloud
multi f… #44124 (#45239)
Cherry-picked from #44124
Co-authored-by: deardeng <[email protected]>
---
docker/runtime/doris-compose/cluster.py | 12 +-
docker/runtime/doris-compose/command.py | 32 +++---
docker/runtime/doris-compose/database.py | 58 ++++++----
docker/runtime/doris-compose/doris-compose.py | 25 ++--
docker/runtime/doris-compose/requirements.txt | 3 +-
docker/runtime/doris-compose/resource/init_fe.sh | 7 --
docker/runtime/doris-compose/utils.py | 55 +++++----
.../doris/cloud/catalog/CloudClusterChecker.java | 4 +
.../org/apache/doris/cloud/catalog/CloudEnv.java | 3 +-
.../doris/common/proc/FrontendsProcNode.java | 25 +++-
.../doris/regression/suite/SuiteCluster.groovy | 21 ++--
.../node_mgr/test_sql_mode_node_mgr.groovy | 127 +++++++++------------
12 files changed, 206 insertions(+), 166 deletions(-)
diff --git a/docker/runtime/doris-compose/cluster.py
b/docker/runtime/doris-compose/cluster.py
index ba834167bd1..f4522181d4b 100644
--- a/docker/runtime/doris-compose/cluster.py
+++ b/docker/runtime/doris-compose/cluster.py
@@ -23,6 +23,7 @@ import jsonpickle
import os
import os.path
import utils
+import time
DOCKER_DORIS_PATH = "/opt/apache-doris"
LOCAL_DORIS_PATH = os.getenv("LOCAL_DORIS_PATH", "/tmp/doris")
@@ -139,11 +140,15 @@ def gen_subnet_prefix16():
def get_master_fe_endpoint(cluster_name):
master_fe_ip_file = get_cluster_path(cluster_name) + "/status/master_fe_ip"
- if os.path.exists(master_fe_ip_file):
- with open(master_fe_ip_file, "r") as f:
- return "{}:{}".format(f.read().strip(), FE_QUERY_PORT)
+ max_retries = 10
+ for attempt in range(max_retries):
+ if os.path.exists(master_fe_ip_file):
+ with open(master_fe_ip_file, "r") as f:
+ return "{}:{}".format(f.read().strip(), FE_QUERY_PORT)
+ time.sleep(1)
try:
cluster = Cluster.load(cluster_name)
+ LOG.info("master file not exist, master ip get from node 1")
return "{}:{}".format(
cluster.get_node(Node.TYPE_FE, 1).get_ip(), FE_QUERY_PORT)
except:
@@ -468,6 +473,7 @@ class FE(Node):
for key in ("JAVA_OPTS", "JAVA_OPTS_FOR_JDK_17"):
value = parser["dummy_section"].get(key)
if value:
+ value = value.strip().strip('"')
cfg.append(
f"{key} = \"{value}
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:{FE_JAVA_DBG_PORT}\""
)
diff --git a/docker/runtime/doris-compose/command.py
b/docker/runtime/doris-compose/command.py
index 7a2f3f3c195..638c1c465d7 100644
--- a/docker/runtime/doris-compose/command.py
+++ b/docker/runtime/doris-compose/command.py
@@ -183,7 +183,7 @@ class Command(object):
return sys.version_info.major == 3 and sys.version_info.minor >= 9
def _print_table(self, header, datas):
- if utils.is_enable_log():
+ if utils.is_log_stdout():
table = prettytable.PrettyTable(
[utils.render_green(field) for field in header])
for row in datas:
@@ -598,13 +598,6 @@ class UpCommand(Command):
related_nodes,
output_real_time=output_real_time)
- ls_cmd = "python docker/runtime/doris-compose/doris-compose.py ls " +
cluster.name
- LOG.info("Inspect command: " + utils.render_green(ls_cmd) + "\n")
- LOG.info(
- "Master fe query address: " +
- utils.render_green(CLUSTER.get_master_fe_endpoint(cluster.name)) +
- "\n")
-
if not args.start:
LOG.info(
utils.render_green(
@@ -618,14 +611,18 @@ class UpCommand(Command):
LOG.info("Waiting for FE master to be elected...")
expire_ts = time.time() + 30
while expire_ts > time.time():
+ ready = False
db_mgr = database.get_db_mgr(args.NAME, False)
for id in add_fe_ids:
fe_state = db_mgr.get_fe(id)
if fe_state is not None and fe_state.alive:
+ ready = True
break
- LOG.info("there is no fe ready")
- time.sleep(5)
-
+ if ready:
+ break
+ LOG.info("there is no fe ready")
+ time.sleep(1)
+ LOG.info("after Waiting for FE master to be elected...")
if cluster.is_cloud and args.sql_mode_node_mgr:
db_mgr = database.get_db_mgr(args.NAME, False)
master_fe_endpoint = CLUSTER.get_master_fe_endpoint(
@@ -635,7 +632,9 @@ class UpCommand(Command):
fe_endpoint = f"{fe.get_ip()}:{CLUSTER.FE_EDITLOG_PORT}"
if fe_endpoint != master_fe_endpoint:
try:
- db_mgr.add_fe(fe_endpoint)
+ db_mgr.add_fe(
+ fe_endpoint, "FOLLOWER"
+ if cluster.fe_follower else "OBSERVER")
LOG.info(f"Added FE {fe_endpoint} successfully.")
except Exception as e:
LOG.error(
@@ -661,6 +660,12 @@ class UpCommand(Command):
"Up cluster {} succ, related node num {}".format(
args.NAME, related_node_num)))
+ ls_cmd = "python docker/runtime/doris-compose/doris-compose.py ls " +
cluster.name
+ LOG.info("Inspect command: " + utils.render_green(ls_cmd) + "\n")
+ LOG.info(
+ "Master fe query address: " +
+ utils.render_green(CLUSTER.get_master_fe_endpoint(cluster.name)) +
+ "\n")
return {
"fe": {
"add_list": add_fe_ids,
@@ -1066,8 +1071,7 @@ class ListCommand(Command):
if services is None:
return COMPOSE_BAD, {}
return COMPOSE_GOOD, {
- service:
- ComposeService(
+ service: ComposeService(
service,
list(service_conf["networks"].values())[0]
["ipv4_address"], service_conf["image"])
diff --git a/docker/runtime/doris-compose/database.py
b/docker/runtime/doris-compose/database.py
index 46cdd961c9f..370f1d5ee2a 100644
--- a/docker/runtime/doris-compose/database.py
+++ b/docker/runtime/doris-compose/database.py
@@ -27,12 +27,13 @@ LOG = utils.get_logger()
class FEState(object):
- def __init__(self, id, is_master, alive, last_heartbeat, err_msg):
+ def __init__(self, id, is_master, alive, last_heartbeat, err_msg, role):
self.id = id
self.is_master = is_master
self.alive = alive
self.last_heartbeat = last_heartbeat
self.err_msg = err_msg
+ self.role = role
class BEState(object):
@@ -66,11 +67,11 @@ class DBManager(object):
self._load_fe_states()
self._load_be_states()
- def add_fe(self, fe_endpoint):
+ def add_fe(self, fe_endpoint, role):
try:
- sql = f"ALTER SYSTEM ADD FOLLOWER '{fe_endpoint}'"
+ sql = f"ALTER SYSTEM ADD {role} '{fe_endpoint}'"
self._exec_query(sql)
- LOG.info(f"Added FE {fe_endpoint} via SQL successfully.")
+ LOG.info(f"Added {role} FE {fe_endpoint} via SQL successfully.")
except Exception as e:
LOG.error(f"Failed to add FE {fe_endpoint} via SQL: {str(e)}")
raise
@@ -78,8 +79,9 @@ class DBManager(object):
def drop_fe(self, fe_endpoint):
id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")])
try:
- self._exec_query(
- "ALTER SYSTEM DROP FOLLOWER '{}'".format(fe_endpoint))
+ role = self.get_fe(id).role if self.get_fe(id) else "FOLLOWER"
+ self._exec_query("ALTER SYSTEM DROP {} '{}'".format(
+ role, fe_endpoint))
LOG.info("Drop fe {} with id {} from db succ.".format(
fe_endpoint, id))
except Exception as e:
@@ -152,7 +154,7 @@ class DBManager(object):
.format(be_endpoint, be.alive, be.decommissioned,
be.tablet_num, old_tablet_num,
int(time.time() - start_ts)))
- time.sleep(5)
+ time.sleep(1)
def create_default_storage_vault(self, cloud_store_config):
try:
@@ -194,7 +196,7 @@ class DBManager(object):
id = CLUSTER.Node.get_id_from_ip(ip)
last_heartbeat = utils.escape_null(record["LastHeartbeat"])
err_msg = record["ErrMsg"]
- fe = FEState(id, is_master, alive, last_heartbeat, err_msg)
+ fe = FEState(id, is_master, alive, last_heartbeat, err_msg, role)
fe_states[id] = fe
if is_master and alive:
alive_master_fe_ip = ip
@@ -223,13 +225,23 @@ class DBManager(object):
self.be_states = be_states
# return rows, and each row is a record map
- def _exec_query(self, sql):
+ def _exec_query(self, sql, retries=3):
self._prepare_conn()
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- fields = [field_md[0] for field_md in cursor.description
- ] if cursor.description else []
- return [dict(zip(fields, row)) for row in cursor.fetchall()]
+ for attempt in range(retries):
+ try:
+ with self.conn.cursor() as cursor:
+ cursor.execute(sql)
+ fields = [field_md[0] for field_md in cursor.description
+ ] if cursor.description else []
+ return [dict(zip(fields, row)) for row in
cursor.fetchall()]
+ except Exception as e:
+ LOG.warn(f"Error occurred: {e}")
+ if "timed out" in str(e).lower() and attempt < retries - 1:
+ LOG.warn(f"Query timed out. Retrying {attempt +
1}/{retries}...")
+ self._reset_conn()
+ else:
+ raise e
+ raise Exception("Max retries exceeded")
def _prepare_conn(self):
if self.conn:
@@ -257,19 +269,23 @@ def get_db_mgr(cluster_name, required_load_succ=True):
if not master_fe_ip:
return db_mgr
- has_alive_fe = False
+ alive_fe = None
+ cluster = CLUSTER.Cluster.load(cluster_name)
containers = utils.get_doris_containers(cluster_name).get(cluster_name, [])
for container in containers:
if utils.is_container_running(container):
- _, node_type, _ = utils.parse_service_name(container.name)
+ _, node_type, id = utils.parse_service_name(container.name)
if node_type == CLUSTER.Node.TYPE_FE:
- has_alive_fe = True
- break
-
- if not has_alive_fe:
+ node = cluster.get_node(node_type, id)
+ if not alive_fe:
+ alive_fe = node
+ if node.get_ip() == master_fe_ip:
+ alive_fe = node
+ break
+ if not alive_fe:
return db_mgr
- db_mgr.master_fe_ip = master_fe_ip
+ db_mgr.master_fe_ip = alive_fe.get_ip()
try:
db_mgr.load_states()
except Exception as e:
diff --git a/docker/runtime/doris-compose/doris-compose.py
b/docker/runtime/doris-compose/doris-compose.py
index a2d3a517553..cf3692d5321 100644
--- a/docker/runtime/doris-compose/doris-compose.py
+++ b/docker/runtime/doris-compose/doris-compose.py
@@ -16,7 +16,9 @@
# under the License.
import argparse
+import cluster as CLUSTER
import command
+import os.path
import sys
import traceback
import utils
@@ -31,12 +33,12 @@ def parse_args():
return ap.parse_args(), ap.format_help()
-def run(args, disable_log, help):
+def run(args, disable_log_stdout, help):
for cmd in command.ALL_COMMANDS:
if args.command == cmd.name:
timer = utils.Timer()
result = cmd.run(args)
- if cmd.print_use_time() and not disable_log:
+ if cmd.print_use_time() and not disable_log_stdout:
timer.show()
return result
print(help)
@@ -48,19 +50,26 @@ if __name__ == '__main__':
verbose = getattr(args, "verbose", False)
if verbose:
utils.set_log_verbose()
- disable_log = getattr(args, "output_json", False)
- if disable_log:
- utils.set_enable_log(False)
+ disable_log_stdout = getattr(args, "output_json", False)
+ if disable_log_stdout:
+ log_file_name = ""
+ cluster_name = getattr(args, "NAME", "")
+ if cluster_name:
+ if type(cluster_name) == type([]):
+ cluster_name = cluster_name[0]
+ log_file_name = os.path.join(
+ CLUSTER.get_cluster_path(cluster_name), "doris-compose.log")
+ utils.set_log_to(log_file_name, False)
code = None
try:
- data = run(args, disable_log, help)
- if disable_log:
+ data = run(args, disable_log_stdout, help)
+ if disable_log_stdout:
print(utils.pretty_json({"code": 0, "data": data}), flush=True)
code = 0
except:
err = traceback.format_exc()
- if disable_log:
+ if disable_log_stdout:
print(utils.pretty_json({"code": 1, "err": err}), flush=True)
else:
print(err, flush=True)
diff --git a/docker/runtime/doris-compose/requirements.txt
b/docker/runtime/doris-compose/requirements.txt
index 2f962ed68d8..1f32223a02e 100644
--- a/docker/runtime/doris-compose/requirements.txt
+++ b/docker/runtime/doris-compose/requirements.txt
@@ -22,5 +22,6 @@ jsonpickle
prettytable
pymysql
python-dateutil
-#pyyaml==5.4.1
+# if mac install pyyaml failed, change pyyaml version
+#pyyaml==5.3.1
requests<=2.31.0
diff --git a/docker/runtime/doris-compose/resource/init_fe.sh
b/docker/runtime/doris-compose/resource/init_fe.sh
index b69ac3a209e..a58723db1d7 100755
--- a/docker/runtime/doris-compose/resource/init_fe.sh
+++ b/docker/runtime/doris-compose/resource/init_fe.sh
@@ -102,9 +102,6 @@ start_cloud_fe() {
fe_daemon &
run_fe
- if [ "$MY_ID" == "1" ]; then
- echo $MY_IP >$MASTER_FE_IP_FILE
- fi
return
fi
@@ -168,10 +165,6 @@ start_cloud_fe() {
fe_daemon &
run_fe
-
- if [ "$MY_ID" == "1" ]; then
- echo $MY_IP >$MASTER_FE_IP_FILE
- fi
}
stop_frontend() {
diff --git a/docker/runtime/doris-compose/utils.py
b/docker/runtime/doris-compose/utils.py
index 4332ae6cf48..dcb821ddffd 100644
--- a/docker/runtime/doris-compose/utils.py
+++ b/docker/runtime/doris-compose/utils.py
@@ -23,6 +23,7 @@ import os
import pwd
import socket
import subprocess
+import sys
import time
import yaml
@@ -30,7 +31,7 @@ DORIS_PREFIX = "doris-"
LOG = None
-ENABLE_LOG = True
+ENALBE_LOG_STDOUT = True
class Timer(object):
@@ -48,39 +49,41 @@ class Timer(object):
self.canceled = True
-def set_enable_log(enabled):
- global ENABLE_LOG
- ENABLE_LOG = enabled
- get_logger().disabled = not enabled
-
-
-def is_enable_log():
- return ENABLE_LOG
+def is_log_stdout():
+ return ENALBE_LOG_STDOUT
def set_log_verbose():
get_logger().setLevel(logging.DEBUG)
-def get_logger(name=None):
- global LOG
- if LOG != None:
- return LOG
-
- logger = logging.getLogger(name)
- if not logger.hasHandlers():
+def set_log_to(log_file_name, is_to_stdout):
+ logger = get_logger()
+ for ch in logger.handlers:
+ logger.removeHandler(ch)
+ if log_file_name:
+ os.makedirs(os.path.dirname(log_file_name), exist_ok=True)
+ logger.addHandler(logging.FileHandler(log_file_name))
+ global ENALBE_LOG_STDOUT
+ ENALBE_LOG_STDOUT = is_to_stdout
+ if is_to_stdout:
+ logger.addHandler(logging.StreamHandler(sys.stdout))
+ for ch in logger.handlers:
formatter = logging.Formatter(
'%(asctime)s - %(filename)s - %(lineno)dL - %(levelname)s -
%(message)s'
)
- ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
- logger.addHandler(ch)
- logger.setLevel(logging.INFO)
- LOG = logger
- return logger
+def get_logger(name="doris-compose"):
+ global LOG
+ if LOG is None:
+ LOG = logging.getLogger(name)
+ LOG.setLevel(logging.INFO)
+ set_log_to(None, True)
+
+ return LOG
get_logger()
@@ -196,15 +199,17 @@ def exec_shell_command(command, ignore_errors=False,
output_real_time=False):
if output_real_time:
while p.poll() is None:
s = p.stdout.readline().decode('utf-8')
- if ENABLE_LOG and s.rstrip():
- print(s.rstrip())
+ if s.rstrip():
+ for line in s.strip().splitlines():
+ LOG.info("(docker) " + line)
out += s
exitcode = p.wait()
else:
out = p.communicate()[0].decode('utf-8')
exitcode = p.returncode
- if ENABLE_LOG and out:
- print(out)
+ if out:
+ for line in out.splitlines():
+ LOG.info("(docker) " + line)
if not ignore_errors:
assert exitcode == 0, out
return exitcode, out
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index e27339c2aac..9468c8acecd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -170,6 +170,10 @@ public class CloudClusterChecker extends MasterDaemon {
String endpoint = addr + ":" + node.getHeartbeatPort();
Cloud.NodeStatusPB status = node.getStatus();
Backend be = currentMap.get(endpoint);
+ if (be == null) {
+ LOG.warn("cant get valid be {} from fe mem, ignore it checker
will add this be at next", endpoint);
+ continue;
+ }
if (status == Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONING) {
if (!be.isDecommissioned()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index f4c6005a0d8..89338c228fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -415,7 +415,8 @@ public class CloudEnv extends Env {
Frontend frontend = checkFeExist(host, port);
if (frontend == null) {
- throw new DdlException("Frontend does not exist.");
+ throw new DdlException("frontend does not exist[" + NetUtils
+ .getHostPortInAccessibleFormat(host, port) + "]");
}
if (frontend.getRole() != role) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
index eb75bc9312a..ede8cb56258 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
@@ -126,7 +126,9 @@ public class FrontendsProcNode implements ProcNodeInterface
{
selfNode = ConnectContext.get().getCurrentConnectedFEIp();
}
- for (Frontend fe : env.getFrontends(null /* all */)) {
+ List<Frontend> envFes = env.getFrontends(null /* all */);
+ LOG.info("bdbje fes {}, env fes {}", allFe, envFes);
+ for (Frontend fe : envFes) {
List<String> info = new ArrayList<String>();
info.add(fe.getNodeName());
info.add(fe.getHost());
@@ -211,11 +213,6 @@ public class FrontendsProcNode implements
ProcNodeInterface {
if (fe.getEditLogPort() != addr.getPort()) {
continue;
}
- if (!Strings.isNullOrEmpty(addr.getHostName())) {
- if (addr.getHostName().equals(fe.getHost())) {
- return true;
- }
- }
// if hostname of InetSocketAddress is ip, addr.getHostName() may
be not equal to fe.getIp()
// so we need to compare fe.getIp() with address.getHostAddress()
InetAddress address = addr.getAddress();
@@ -227,6 +224,22 @@ public class FrontendsProcNode implements
ProcNodeInterface {
return true;
}
}
+
+ // Avoid calling getHostName multiple times, don't remove it
+ for (InetSocketAddress addr : allFeHosts) {
+ // Avoid calling getHostName multiple times, don't remove it
+ if (fe.getEditLogPort() != addr.getPort()) {
+ continue;
+ }
+ //
https://bugs.openjdk.org/browse/JDK-8143378#:~:text=getHostName()%3B%20takes%20about%205,millisecond%20on%20JDK%20update%2051
+ // getHostName sometime has bug, take 5s
+ String host = addr.getHostName();
+ if (!Strings.isNullOrEmpty(host)) {
+ if (host.equals(fe.getHost())) {
+ return true;
+ }
+ }
+ }
return false;
}
}
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index 856b0e76956..e77658793fe 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -23,12 +23,14 @@ import org.apache.doris.regression.util.JdbcUtils
import org.apache.doris.regression.util.NodeType
import com.google.common.collect.Maps
+import org.awaitility.Awaitility
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import groovy.json.JsonSlurper
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
+import static java.util.concurrent.TimeUnit.SECONDS
import java.util.stream.Collectors
import java.sql.Connection
@@ -333,7 +335,7 @@ class SuiteCluster {
sqlModeNodeMgr = options.sqlModeNodeMgr
- runCmd(cmd.join(' '), -1)
+ runCmd(cmd.join(' '), 180)
// wait be report disk
Thread.sleep(5000)
@@ -483,6 +485,9 @@ class SuiteCluster {
if (followerMode) {
sb.append('--fe-follower' + ' ')
}
+ if (sqlModeNodeMgr) {
+ sb.append('--sql-mode-node-mgr' + ' ')
+ }
}
if (beNum > 0) {
sb.append('--add-be-num ' + beNum + ' ')
@@ -492,7 +497,7 @@ class SuiteCluster {
}
sb.append('--wait-timeout 60')
- def data = (Map<String, Map<String, Object>>) runCmd(sb.toString(), -1)
+ def data = (Map<String, Map<String, Object>>) runCmd(sb.toString(),
180)
def newFrontends = (List<Integer>) data.get('fe').get('add_list')
def newBackends = (List<Integer>) data.get('be').get('add_list')
@@ -636,17 +641,15 @@ class SuiteCluster {
}
private Object runCmd(String cmd, int timeoutSecond = 60) throws Exception
{
- def fullCmd = String.format('python -W ignore %s %s --output-json',
config.dorisComposePath, cmd)
+ def fullCmd = String.format('python -W ignore %s %s -v --output-json',
config.dorisComposePath, cmd)
logger.info('Run doris compose cmd: {}', fullCmd)
def proc = fullCmd.execute()
def outBuf = new StringBuilder()
def errBuf = new StringBuilder()
- proc.consumeProcessOutput(outBuf, errBuf)
- if (timeoutSecond > 0) {
- proc.waitForOrKill(timeoutSecond * 1000)
- } else {
- proc.waitFor()
- }
+ Awaitility.await().atMost(timeoutSecond, SECONDS).until({
+ proc.waitForProcessOutput(outBuf, errBuf)
+ return true
+ })
if (proc.exitValue() != 0) {
throw new Exception(String.format('Exit value: %s != 0, stdout:
%s, stderr: %s',
proc.exitValue(),
outBuf.toString(), errBuf.toString()))
diff --git
a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy
b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy
index 7405cb864d8..70372f68ab8 100644
--- a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy
+++ b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy
@@ -38,6 +38,7 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') {
options.sqlModeNodeMgr = true
options.waitTimeout = 0
options.feNum = 3
+ options.useFollowersMode = true
options.feConfigs += ["resource_not_ready_sleep_seconds=1",
"heartbeat_interval_second=1",]
}
@@ -121,6 +122,9 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') {
// Check FE number
def frontendResult = sql_return_maparray """SHOW FRONTENDS;"""
+ // Check that all frontends are alive
+ def aliveCount = frontendResult.count { it['Alive'] == 'true' }
+ assert aliveCount == expectedFeNum, "Expected all
$expectedFeNum frontends to be alive, but only ${aliveCount} are alive"
assert frontendResult.size() == expectedFeNum, "Expected
${expectedFeNum} frontends, but got ${frontendResult.size()}"
logger.info("FE number check passed: ${frontendResult.size()}
FEs found")
@@ -272,28 +276,23 @@ suite('test_sql_mode_node_mgr',
'multi_cluster,docker,p1') {
def feEditLogPort = feToDropMap['EditLogPort']
def feRole = feToDropMap['Role']
- logger.info("Dropping non-master frontend: {}:{}", feHost,
feEditLogPort)
+ def dropFeInx = cluster.getFrontends().find { it.host == feHost
}.index
+ logger.info("Dropping non-master frontend: {}:{}, index: {}",
feHost, feEditLogPort, dropFeInx)
// Drop the selected non-master frontend
sql """ ALTER SYSTEM DROP ${feRole} "${feHost}:${feEditLogPort}";
"""
-
+ // After drop feHost container will exit
+ cluster.dropFrontends(true, dropFeInx)
+ sleep(3 * 1000)
+ logger.info("Dropping frontend index: {}, remove it from docker
compose", dropFeInx)
// Wait for the frontend to be fully dropped
- maxWaitSeconds = 300
- waited = 0
- while (waited < maxWaitSeconds) {
+
+ dockerAwaitUntil(300) {
reconnectFe()
def currentFrontends = sql_return_maparray("SHOW FRONTENDS")
- if (currentFrontends.size() == frontends.size() - 1) {
- logger.info("Non-master frontend successfully dropped")
- break
- }
- sleep(10000)
- waited += 10
+ currentFrontends.size() == frontends.size() - 1
}
- if (waited >= maxWaitSeconds) {
- throw new Exception("Timeout waiting for non-master frontend
to be dropped")
- }
checkClusterStatus(2, 3, 4)
@@ -309,86 +308,72 @@ suite('test_sql_mode_node_mgr',
'multi_cluster,docker,p1') {
}
assert droppedFE != null, "Could not find the dropped frontend"
-
- feHost = droppedFE['Host']
- feEditLogPort = droppedFE['EditLogPort']
-
- logger.info("Adding back frontend: {}:{}", feHost, feEditLogPort)
-
- // Add the frontend back
- sql """ ALTER SYSTEM ADD FOLLOWER "${feHost}:${feEditLogPort}"; """
+
+ // Up a new follower fe and add to docker compose
+ // ATTN: in addFrontend, sql node mode, will execute `ALTER SYSTEM
ADD FOLLOWER "${feHost}:${feEditLogPort}";`
+ boolean fuzzyUpFollower = (getRandomBoolean() == "true") ? true :
false
+ logger.info("Want up a new role [{}] frontend", fuzzyUpFollower ?
"FOLLOWER" : "OBSERVER")
+ def addList = cluster.addFrontend(1, fuzzyUpFollower)
+ logger.info("Up a new frontend, addList: {}", addList)
+
+ def addFE = cluster.getFeByIndex(addList[0])
+ feHost = addFE['Host']
+ feEditLogPort = addFE['EditLogPort']
+ def showFes = sql """SHOW FRONTENDS"""
+ logger.info("Adding back frontend: {}", showFes)
// Wait for the frontend to be fully added back
- maxWaitSeconds = 300
- waited = 0
- while (waited < maxWaitSeconds) {
+ dockerAwaitUntil(300, 5) {
def updatedFrontends = sql_return_maparray("SHOW FRONTENDS")
- if (updatedFrontends.size() == frontends.size()) {
- logger.info("Frontend successfully added back")
- break
- }
- sleep(10000)
- waited += 10
+ updatedFrontends.size() == frontends.size()
}
-
- if (waited >= maxWaitSeconds) {
- throw new Exception("Timeout waiting for frontend to be added
back")
- }
-
- // Verify cluster status after adding the frontend back
+
checkClusterStatus(3, 3, 5)
logger.info("Frontend successfully added back and cluster status
verified")
// CASE 6. Drop frontend and add back again
logger.info("Dropping frontend and adding back again")
-
// Get the frontend to be dropped
- def frontendToDrop = frontends.find { it['Host'] == feHost &&
it['EditLogPort'] == feEditLogPort }
+ currentFrontends = sql_return_maparray("SHOW FRONTENDS")
+
+ int obServerCount = currentFrontends.count { it['Role'] ==
'OBSERVER' }
+ String fuzzyDropRole
+ if (obServerCount != 0) {
+ fuzzyDropRole = (getRandomBoolean() == "true") ? "FOLLOWER" :
"OBSERVER"
+ } else {
+ fuzzyDropRole = "FOLLOWER"
+ }
+
+ def frontendToDrop = currentFrontends.find {it['IsMaster'] ==
"false" && it['Role'] == fuzzyDropRole}
+ logger.info("Find drop again frontend: {}, drop role [{}]",
frontendToDrop, fuzzyDropRole)
assert frontendToDrop != null, "Could not find the frontend to
drop"
+ def role = frontendToDrop.Role
// Drop the frontend
- sql """ ALTER SYSTEM DROP FOLLOWER "${feHost}:${feEditLogPort}";
"""
- sleep(30000)
+ sql """ ALTER SYSTEM DROP $role
"${frontendToDrop.Host}:${frontendToDrop.EditLogPort}"; """
+ dropFeInx = cluster.getFrontends().find { it.host ==
frontendToDrop.Host }.index
+ // After drop frontendToDrop.Host container will exit
+ cluster.dropFrontends(true, dropFeInx)
+ logger.info("Dropping again frontend index: {}, remove it from
docker compose", dropFeInx)
+ sleep(3 * 1000)
reconnectFe()
// Wait for the frontend to be fully dropped
- maxWaitSeconds = 300
- waited = 0
- while (waited < maxWaitSeconds) {
+ dockerAwaitUntil(300, 5) {
def updatedFrontends = sql_return_maparray("SHOW FRONTENDS")
- if (!updatedFrontends.any { it['Host'] == feHost &&
it['EditLogPort'] == feEditLogPort }) {
- logger.info("Frontend successfully dropped")
- break
- }
- sleep(10000)
- waited += 10
- }
-
- if (waited >= maxWaitSeconds) {
- throw new Exception("Timeout waiting for frontend to be
dropped")
+ !updatedFrontends.any { it['Host'] == frontendToDrop.Host &&
it['EditLogPort'] == frontendToDrop.EditLogPort }
}
- // Add the frontend back
- sql """ ALTER SYSTEM ADD FOLLOWER "${feHost}:${feEditLogPort}"; """
+ // Up a new follower fe and add to docker compose
+ // ATTN: in addFrontend, sql node mode, will execute `ALTER SYSTEM
ADD FOLLOWER "${feHost}:${feEditLogPort}";`
+ addList = cluster.addFrontend(1, true)
+ logger.info("Up a new frontend, addList: {}", addList)
- // Wait for the frontend to be fully added back
- maxWaitSeconds = 300
- waited = 0
- while (waited < maxWaitSeconds) {
+ dockerAwaitUntil(300, 5) {
def updatedFrontends = sql_return_maparray("SHOW FRONTENDS")
- if (updatedFrontends.any { it['Host'] == feHost &&
it['EditLogPort'] == feEditLogPort }) {
- logger.info("Frontend successfully added back")
- break
- }
- sleep(10000)
- waited += 10
+ updatedFrontends.size() == 3
}
-
- if (waited >= maxWaitSeconds) {
- throw new Exception("Timeout waiting for frontend to be added
back")
- }
-
// Verify cluster status after adding the frontend back
checkClusterStatus(3, 3, 6)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]