This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 7ed116be9cd [improvement](decommission be) decommission check replica
num #32748 (#34038)
7ed116be9cd is described below
commit 7ed116be9cdb43bdf0dd1e54dcee2da53d72da45
Author: yujun <[email protected]>
AuthorDate: Fri May 10 21:27:48 2024 +0800
[improvement](decommission be) decommission check replica num #32748
(#34038)
---
.../java/org/apache/doris/alter/SystemHandler.java | 81 +++++++++++++++++++++-
.../org/apache/doris/regression/suite/Suite.groovy | 3 +
regression-test/pipeline/p0/conf/fe.conf | 2 +
regression-test/pipeline/p1/conf/be.conf | 1 +
regression-test/pipeline/p1/conf/fe.conf | 2 +
.../test_decommission_with_replica_num_fail.groovy | 55 +++++++++++++++
regression-test/suites/node_p0/test_backend.groovy | 40 +++++++++++
7 files changed, 183 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
index d4aae2d7dc0..8be8d4a2153 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
@@ -33,24 +33,34 @@ import org.apache.doris.analysis.ModifyBrokerClause;
import org.apache.doris.analysis.ModifyFrontendHostNameClause;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MysqlCompatibleDatabase;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
/*
* SystemHandler is for
@@ -220,12 +230,81 @@ public class SystemHandler extends AlterHandler {
decommissionBackends.add(backend);
}
- // TODO(cmy): check if replication num can be met
+ checkDecommissionWithReplicaAllocation(decommissionBackends);
+
// TODO(cmy): check remaining space
return decommissionBackends;
}
+ private static void checkDecommissionWithReplicaAllocation(List<Backend>
decommissionBackends)
+ throws DdlException {
+ if (decommissionBackends.isEmpty()
+ ||
DebugPointUtil.isEnable("SystemHandler.decommission_no_check_replica_num")) {
+ return;
+ }
+
+ Set<Tag> decommissionTags = decommissionBackends.stream().map(be ->
be.getLocationTag())
+ .collect(Collectors.toSet());
+ Map<Tag, Integer> tagAvailBackendNums = Maps.newHashMap();
+ for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
+ long beId = backend.getId();
+ if (!backend.isScheduleAvailable()
+ || decommissionBackends.stream().anyMatch(be -> be.getId()
== beId)) {
+ continue;
+ }
+
+ Tag tag = backend.getLocationTag();
+ if (tag != null) {
+ tagAvailBackendNums.put(tag,
tagAvailBackendNums.getOrDefault(tag, 0) + 1);
+ }
+ }
+
+ Env env = Env.getCurrentEnv();
+ List<Long> dbIds = env.getInternalCatalog().getDbIds();
+ for (Long dbId : dbIds) {
+ Database db = env.getInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ continue;
+ }
+
+ if (db instanceof MysqlCompatibleDatabase) {
+ continue;
+ }
+
+ for (Table table : db.getTables()) {
+ table.readLock();
+ try {
+ if (!table.isManagedTable()) {
+ continue;
+ }
+
+ OlapTable tbl = (OlapTable) table;
+ for (Partition partition : tbl.getAllPartitions()) {
+ ReplicaAllocation replicaAlloc =
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
+ for (Map.Entry<Tag, Short> entry :
replicaAlloc.getAllocMap().entrySet()) {
+ Tag tag = entry.getKey();
+ if (!decommissionTags.contains(tag)) {
+ continue;
+ }
+ int replicaNum = (int) entry.getValue();
+ int backendNum =
tagAvailBackendNums.getOrDefault(tag, 0);
+ if (replicaNum > backendNum) {
+ throw new DdlException("After decommission,
partition " + partition.getName()
+ + " of table " + db.getFullName() +
"." + tbl.getName()
+ + " 's replication allocation { " +
replicaAlloc
+ + " } > available backend num " +
backendNum + " on tag " + tag
+ + ", otherwise need to decrease the
partition's replication num.");
+ }
+ }
+ }
+ } finally {
+ table.readUnlock();
+ }
+ }
+ }
+ }
+
@Override
public synchronized void cancel(CancelStmt stmt) throws DdlException {
CancelAlterSystemStmt cancelAlterSystemStmt = (CancelAlterSystemStmt)
stmt;
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 7575c185d54..f0ca33486da 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -570,6 +570,9 @@ class Suite implements GroovyInterceptable {
assert p.exitValue() == 0
}
+ List<String> getFrontendIpHttpPort() {
+ return sql_return_maparray("show frontends").collect { it.Host + ":" +
it.HttpPort };
+ }
void getBackendIpHttpPort(Map<String, String> backendId_to_backendIP,
Map<String, String> backendId_to_backendHttpPort) {
List<List<Object>> backends = sql("show backends");
diff --git a/regression-test/pipeline/p0/conf/fe.conf
b/regression-test/pipeline/p0/conf/fe.conf
index b301d04a88e..1ec05be9fd9 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -78,6 +78,8 @@ enable_map_type=true
enable_struct_type=true
enable_feature_binlog=true
+enable_debug_points=true
+
# enable mtmv
enable_mtmv = true
diff --git a/regression-test/pipeline/p1/conf/be.conf
b/regression-test/pipeline/p1/conf/be.conf
index a97e528a6b8..ae95e4f65a8 100644
--- a/regression-test/pipeline/p1/conf/be.conf
+++ b/regression-test/pipeline/p1/conf/be.conf
@@ -71,3 +71,4 @@ enable_set_in_bitmap_value=true
enable_feature_binlog=true
max_sys_mem_available_low_water_mark_bytes=69206016
enable_merge_on_write_correctness_check=false
+enable_debug_points=true
diff --git a/regression-test/pipeline/p1/conf/fe.conf
b/regression-test/pipeline/p1/conf/fe.conf
index adc042357ca..10b13131e49 100644
--- a/regression-test/pipeline/p1/conf/fe.conf
+++ b/regression-test/pipeline/p1/conf/fe.conf
@@ -75,6 +75,8 @@ remote_fragment_exec_timeout_ms=60000
fuzzy_test_type=p1
use_fuzzy_session_variable=true
+enable_debug_points=true
+
# enable mtmv
enable_mtmv = true
diff --git
a/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy
b/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy
new file mode 100644
index 00000000000..d7941591eec
--- /dev/null
+++
b/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite('test_decommission_with_replica_num_fail') {
+ def tbl = 'test_decommission_with_replica_num_fail'
+ def backends = sql_return_maparray('show backends')
+ def replicaNum = 0
+ def targetBackend = null
+ for (def be : backends) {
+ def alive = be.Alive.toBoolean()
+ def decommissioned = be.SystemDecommissioned.toBoolean()
+ if (alive && !decommissioned) {
+ replicaNum++
+ targetBackend = be
+ }
+ }
+ assertTrue(replicaNum > 0)
+
+ sql "DROP TABLE IF EXISTS ${tbl} FORCE"
+ sql """
+ CREATE TABLE ${tbl}
+ (
+ k1 int,
+ k2 int
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 6
+ PROPERTIES
+ (
+ "replication_num" = "${replicaNum}"
+ );
+ """
+ try {
+ test {
+ sql "ALTER SYSTEM DECOMMISSION BACKEND
'${targetBackend.Host}:${targetBackend.HeartbeatPort}'"
+ exception "otherwise need to decrease the partition's replication
num"
+ }
+ } finally {
+ sql "CANCEL DECOMMISSION BACKEND
'${targetBackend.Host}:${targetBackend.HeartbeatPort}'"
+ }
+ sql "DROP TABLE IF EXISTS ${tbl} FORCE"
+}
diff --git a/regression-test/suites/node_p0/test_backend.groovy
b/regression-test/suites/node_p0/test_backend.groovy
index 5de31b1f964..4c85ff1b54a 100644
--- a/regression-test/suites/node_p0/test_backend.groovy
+++ b/regression-test/suites/node_p0/test_backend.groovy
@@ -39,4 +39,44 @@ suite("test_backend") {
result = sql """SHOW BACKENDS;"""
logger.info("result:${result}")
}
+
+ if (context.config.jdbcUser.equals("root")) {
+ def decommissionBe = null
+ try {
+
GetDebugPoint().enableDebugPointForAllFEs("SystemHandler.decommission_no_check_replica_num");
+ try_sql """admin set frontend
config("drop_backend_after_decommission" = "false")"""
+ def result = sql_return_maparray """SHOW BACKENDS;"""
+ logger.info("show backends result:${result}")
+ for (def res : result) {
+ decommissionBe = res
+ break
+ }
+ sql """CANCEL DECOMMISSION BACKEND
"${decommissionBe.Host}:${decommissionBe.HeartbeatPort}" """
+ result = sql """ALTER SYSTEM DECOMMISSION BACKEND
"${decommissionBe.Host}:${decommissionBe.HeartbeatPort}" """
+ logger.info("ALTER SYSTEM DECOMMISSION BACKEND ${result}")
+ result = sql_return_maparray """SHOW BACKENDS;"""
+ for (def res : result) {
+ if (res.BackendId == "${decommissionBe.BackendId}") {
+ assertTrue(res.SystemDecommissioned.toBoolean())
+ }
+ }
+ } finally {
+ try {
+ if (decommissionBe != null) {
+ def result = sql """CANCEL DECOMMISSION BACKEND
"${decommissionBe.Host}:${decommissionBe.HeartbeatPort}" """
+ logger.info("CANCEL DECOMMISSION BACKEND ${result}")
+
+ result = sql_return_maparray """SHOW BACKENDS;"""
+ for (def res : result) {
+ if (res.BackendId == "${decommissionBe.BackendId}") {
+ assertFalse(res.SystemDecommissioned.toBoolean())
+ }
+ }
+ }
+ } finally {
+
GetDebugPoint().disableDebugPointForAllFEs('SystemHandler.decommission_no_check_replica_num');
+ try_sql """admin set frontend
config("drop_backend_after_decommission" = "true")"""
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]