This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new abdeaec91dd [feature](alter colocate group) Support alter colocate
group replica allocation #23320 (#25387)
abdeaec91dd is described below
commit abdeaec91ddc1250014198b7691a64bff4965ecb
Author: yujun <[email protected]>
AuthorDate: Fri Oct 13 16:22:43 2023 +0800
[feature](alter colocate group) Support alter colocate group replica
allocation #23320 (#25387)
---
.../Alter/ALTER-COLOCATE-GROUP.md | 84 +++++++++
docs/sidebars.json | 1 +
.../Alter/ALTER-COLOCATE-GROUP.md | 88 ++++++++++
.../Alter/ALTER-WORKLOAD-GROUP.md | 4 +-
fe/fe-core/src/main/cup/sql_parser.cup | 17 ++
.../doris/analysis/AlterColocateGroupStmt.java | 81 +++++++++
.../apache/doris/analysis/ColocateGroupName.java | 70 ++++++++
.../apache/doris/catalog/ColocateGroupSchema.java | 4 +
.../apache/doris/catalog/ColocateTableIndex.java | 191 ++++++++++++++++++++-
.../org/apache/doris/catalog/PartitionInfo.java | 4 +
.../clone/ColocateTableCheckerAndBalancer.java | 133 +++++++++++++-
.../org/apache/doris/clone/TabletScheduler.java | 18 +-
.../doris/common/proc/TabletHealthProcDir.java | 5 +
.../doris/httpv2/meta/ColocateMetaService.java | 2 +-
.../org/apache/doris/journal/JournalEntity.java | 1 +
.../org/apache/doris/master/ReportHandler.java | 5 +
.../apache/doris/persist/ColocatePersistInfo.java | 27 ++-
.../java/org/apache/doris/persist/EditLog.java | 9 +
.../org/apache/doris/persist/OperationType.java | 4 +
.../main/java/org/apache/doris/qe/DdlExecutor.java | 3 +
fe/fe-core/src/main/jflex/sql_scanner.flex | 1 +
.../java/org/apache/doris/alter/AlterTest.java | 181 +++++++++++++++++--
.../org/apache/doris/utframe/UtFrameUtils.java | 7 +-
.../alter_p2/test_alter_colocate_group.groovy | 170 ++++++++++++++++++
24 files changed, 1076 insertions(+), 34 deletions(-)
diff --git
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
new file mode 100644
index 00000000000..54c87c05e67
--- /dev/null
+++
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
@@ -0,0 +1,84 @@
+---
+{
+"title": "ALTER-COLOCATE-GROUP",
+"language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## ALTER-COLOCATE-GROUP
+
+### Name
+
+ALTER COLOCATE GROUP
+
+<version since="dev"></version>
+
+### Description
+
+This statement is used to modify the colocation group.
+
+Syntax:
+
+```sql
+ALTER COLOCATE GROUP [database.]group
+SET (
+ property_list
+);
+```
+
+NOTE:
+
+1. If the colocate group is global, that is, its name starts with
`__global__`, then it does not belong to any database;
+
+2. property_list is a colocation group attribute, currently only supports
modifying `replication_num` and `replication_allocation`. After modifying these
two attributes of the colocation group, at the same time, change the attribute
`default.replication_allocation`, the attribute
`dynamic.replication_allocation` of the table of the group, and the
`replication_allocation` of the existing partition to be the same as it.
+
+### Example
+
+1. Modify the number of copies of a global group
+
+ ```sql
+ # Set "colocate_with" = "__global__foo" when creating the table
+
+ ALTER COLOCATE GROUP __global__foo
+ SET (
+ "replication_num"="1"
+ );
+ ```
+
+2. Modify the number of copies of a non-global group
+
+ ```sql
+ # Set "colocate_with" = "bar" when creating the table, and the Database
is "example_db"
+
+ ALTER COLOCATE GROUP example_db.bar
+ SET (
+ "replication_num"="1"
+ );
+ ```
+
+### Keywords
+
+```sql
+ALTER, COLOCATE, GROUP
+```
+
+### Best Practice
diff --git a/docs/sidebars.json b/docs/sidebars.json
index 744260a9a31..3b58f867dc7 100644
--- a/docs/sidebars.json
+++ b/docs/sidebars.json
@@ -795,6 +795,7 @@
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-PARTITION",
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-COLUMN",
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-RESOURCE",
+
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP",
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP",
"sql-manual/sql-reference/Data-Definition-Statements/Alter/CANCEL-ALTER-TABLE",
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-COMMENT",
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
new file mode 100644
index 00000000000..2b5ca2cc727
--- /dev/null
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
@@ -0,0 +1,88 @@
+---
+{
+"title": "ALTER-COLOCATE-GROUP",
+"language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## ALTER-COLOCATE-GROUP
+
+### Name
+
+ALTER COLOCATE GROUP
+
+<version since="dev"></version>
+
+### Description
+
+该语句用于修改 Colocation Group 的属性。
+
+语法:
+
+```sql
+ALTER COLOCATE GROUP [database.]group
+SET (
+ property_list
+);
+```
+
+注意:
+
+1. 如果colocate group是全局的,即它的名称是以 `__global__` 开头的,那它不属于任何一个Database;
+
+2. property_list 是colocation group属性,目前只支持修改`replication_num` 和
`replication_allocation`。
+ 修改colocation
group的这两个属性修改之后,同时把该group的表的属性`default.replication_allocation` 、
+ 属性`dynamic.replication_allocation `、以及已有分区的`replication_allocation`改成跟它一样。
+
+
+
+### Example
+
+1. 修改一个全局group的副本数
+
+ ```sql
+ # 建表时设置 "colocate_with" = "__global__foo"
+
+ ALTER COLOCATE GROUP __global__foo
+ SET (
+ "replication_num"="1"
+ );
+ ```
+
+2. 修改一个非全局group的副本数
+
+ ```sql
+ # 建表时设置 "colocate_with" = "bar",且表属于Database example_db
+
+ ALTER COLOCATE GROUP example_db.bar
+ SET (
+ "replication_num"="1"
+ );
+ ```
+
+### Keywords
+
+```sql
+ALTER, COLOCATE , GROUP
+```
+
+### Best Practice
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md
index 1bc19780f6c..e3c7c17b660 100644
---
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md
@@ -1,6 +1,6 @@
---
{
-"title": "ALTER-WORKLOAD -GROUP",
+"title": "ALTER-WORKLOAD-GROUP",
"language": "zh-CN"
}
---
@@ -24,7 +24,7 @@ specific language governing permissions and limitations
under the License.
-->
-## ALTER-WORKLOAD -GROUP
+## ALTER-WORKLOAD-GROUP
### Name
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index 5950548c427..74896554b7e 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -296,6 +296,7 @@ terminal String
KW_CLUSTERS,
KW_COLLATE,
KW_COLLATION,
+ KW_COLOCATE,
KW_COLUMN,
KW_COLUMNS,
KW_COMMENT,
@@ -798,6 +799,7 @@ nonterminal Qualifier opt_set_qualifier;
nonterminal Operation set_op;
nonterminal ArrayList<String> opt_common_hints;
nonterminal String optional_on_ident;
+nonterminal ColocateGroupName colocate_group_name;
nonterminal LoadTask.MergeType opt_merge_type, opt_with_merge_type;
@@ -1327,6 +1329,10 @@ alter_stmt ::=
{:
RESULT = new AlterResourceStmt(resourceName, properties);
:}
+ | KW_ALTER KW_COLOCATE KW_GROUP colocate_group_name:colocateGroupName
KW_SET LPAREN key_value_map:properties RPAREN
+ {:
+ RESULT = new AlterColocateGroupStmt(colocateGroupName, properties);
+ :}
| KW_ALTER KW_WORKLOAD KW_GROUP ident_or_text:workloadGroupName
opt_properties:properties
{:
RESULT = new AlterWorkloadGroupStmt(workloadGroupName, properties);
@@ -5521,6 +5527,17 @@ table_name ::=
{: RESULT = new TableName(ctl, db, tbl); :}
;
+colocate_group_name ::=
+ ident:group
+ {:
+ RESULT = new ColocateGroupName(null, group);
+ :}
+ | ident:db DOT ident:group
+ {:
+ RESULT = new ColocateGroupName(db, group);
+ :}
+ ;
+
encryptkey_name ::=
ident:name
{:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColocateGroupStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColocateGroupStmt.java
new file mode 100644
index 00000000000..e268322dcc8
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColocateGroupStmt.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.base.Strings;
+
+import java.util.Map;
+
+public class AlterColocateGroupStmt extends DdlStmt {
+ private final ColocateGroupName colocateGroupName;
+ private final Map<String, String> properties;
+
+ public AlterColocateGroupStmt(ColocateGroupName colocateGroupName,
Map<String, String> properties) {
+ this.colocateGroupName = colocateGroupName;
+ this.properties = properties;
+ }
+
+ public ColocateGroupName getColocateGroupName() {
+ return colocateGroupName;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws UserException {
+ super.analyze(analyzer);
+ colocateGroupName.analyze(analyzer);
+
+ String dbName = colocateGroupName.getDb();
+ if (Strings.isNullOrEmpty(dbName)) {
+ if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(
+ ConnectContext.get(), PrivPredicate.ADMIN)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
+ }
+ } else {
+ if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(
+ ConnectContext.get(), dbName, PrivPredicate.ADMIN)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR,
+ ConnectContext.get().getQualifiedUser(), dbName);
+ }
+ }
+
+ if (properties == null || properties.isEmpty()) {
+ throw new AnalysisException("Colocate group properties can't be
null");
+ }
+ }
+
+ @Override
+ public String toSql() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("ALTER COLOCATE GROUP
").append(colocateGroupName.toSql()).append(" ");
+ sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ",
true, false)).append(")");
+ return sb.toString();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColocateGroupName.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColocateGroupName.java
new file mode 100644
index 00000000000..b7f0c0afd34
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColocateGroupName.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.ColocateTableIndex.GroupId;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+
+import com.google.common.base.Strings;
+
+public class ColocateGroupName {
+ private String db;
+ private String group;
+
+ public ColocateGroupName(String db, String group) {
+ this.db = db;
+ this.group = group;
+ }
+
+ public String getDb() {
+ return db;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ if (GroupId.isGlobalGroupName(group)) {
+ if (!Strings.isNullOrEmpty(db)) {
+ throw new AnalysisException("group that name starts with `" +
GroupId.GLOBAL_COLOCATE_PREFIX + "`"
+ + " is a global group, it doesn't belong to any
specific database");
+ }
+ } else {
+ if (Strings.isNullOrEmpty(db)) {
+ if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+ }
+ db = analyzer.getDefaultDb();
+ }
+ db = ClusterNamespace.getFullName(analyzer.getClusterName(), db);
+ }
+ }
+
+ public String toSql() {
+ StringBuilder sb = new StringBuilder();
+ if (!Strings.isNullOrEmpty(db)) {
+ sb.append("`").append(db).append("`.");
+ }
+ sb.append("`").append(group).append("`");
+ return sb.toString();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java
index b5004973c37..57d512b9789 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java
@@ -66,6 +66,10 @@ public class ColocateGroupSchema implements Writable {
return replicaAlloc;
}
+ public void setReplicaAlloc(ReplicaAllocation replicaAlloc) {
+ this.replicaAlloc = replicaAlloc;
+ }
+
public List<Type> getDistributionColTypes() {
return distributionColTypes;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
index 23703278fd8..fcefcff132a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
@@ -17,10 +17,16 @@
package org.apache.doris.catalog;
+import org.apache.doris.analysis.AlterColocateGroupStmt;
+import org.apache.doris.clone.ColocateTableCheckerAndBalancer;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.DynamicPartitionUtil;
+import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
@@ -249,10 +255,34 @@ public class ColocateTableIndex implements Writable {
}
}
- public void addBackendsPerBucketSeqByTag(GroupId groupId, Tag tag,
List<List<Long>> backendsPerBucketSeq) {
+ public void setBackendsPerBucketSeq(GroupId groupId, Map<Tag,
List<List<Long>>> backendsPerBucketSeq) {
writeLock();
try {
+ Map<Tag, List<List<Long>>> backendsPerBucketSeqMap =
group2BackendsPerBucketSeq.row(groupId);
+ if (backendsPerBucketSeqMap != null) {
+ backendsPerBucketSeqMap.clear();
+ }
+ for (Map.Entry<Tag, List<List<Long>>> entry :
backendsPerBucketSeq.entrySet()) {
+ group2BackendsPerBucketSeq.put(groupId, entry.getKey(),
entry.getValue());
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public boolean addBackendsPerBucketSeqByTag(GroupId groupId, Tag tag,
List<List<Long>> backendsPerBucketSeq,
+ ReplicaAllocation originReplicaAlloc) {
+ writeLock();
+ try {
+ ColocateGroupSchema groupSchema = group2Schema.get(groupId);
+ // replica allocation has outdate
+ if (groupSchema != null &&
!originReplicaAlloc.equals(groupSchema.getReplicaAlloc())) {
+ LOG.info("replica allocation has outdate for group {}, old
replica alloc {}, new replica alloc {}",
+ groupId, originReplicaAlloc.getAllocMap(),
groupSchema.getReplicaAlloc());
+ return false;
+ }
group2BackendsPerBucketSeq.put(groupId, tag, backendsPerBucketSeq);
+ return true;
} finally {
writeUnlock();
}
@@ -277,12 +307,20 @@ public class ColocateTableIndex implements Writable {
}
}
- public void markGroupStable(GroupId groupId, boolean needEditLog) {
+ public void markGroupStable(GroupId groupId, boolean needEditLog,
ReplicaAllocation originReplicaAlloc) {
writeLock();
try {
if (!group2Tables.containsKey(groupId)) {
return;
}
+ // replica allocation is outdate
+ ColocateGroupSchema groupSchema = group2Schema.get(groupId);
+ if (groupSchema != null && originReplicaAlloc != null
+ &&
!originReplicaAlloc.equals(groupSchema.getReplicaAlloc())) {
+ LOG.warn("mark group {} failed, replica alloc has outdate, old
replica alloc {}, new replica alloc {}",
+ groupId, originReplicaAlloc.getAllocMap(),
groupSchema.getReplicaAlloc());
+ return;
+ }
if (unstableGroups.remove(groupId)) {
group2ErrMsgs.put(groupId, "");
if (needEditLog) {
@@ -604,13 +642,23 @@ public class ColocateTableIndex implements Writable {
}
public void replayMarkGroupStable(ColocatePersistInfo info) {
- markGroupStable(info.getGroupId(), false);
+ markGroupStable(info.getGroupId(), false, null);
}
public void replayRemoveTable(ColocatePersistInfo info) {
removeTable(info.getTableId());
}
+ public void replayModifyReplicaAlloc(ColocatePersistInfo info) throws
UserException {
+ writeLock();
+ try {
+ modifyColocateGroupReplicaAllocation(info.getGroupId(),
info.getReplicaAlloc(),
+ info.getBackendsPerBucketSeq(), false);
+ } finally {
+ writeUnlock();
+ }
+ }
+
// only for test
public void clear() {
writeLock();
@@ -633,7 +681,22 @@ public class ColocateTableIndex implements Writable {
List<String> info = Lists.newArrayList();
GroupId groupId = entry.getValue();
info.add(groupId.toString());
- info.add(entry.getKey());
+ String dbName = "";
+ if (groupId.dbId != 0) {
+ Database db =
Env.getCurrentInternalCatalog().getDbNullable(groupId.dbId);
+ if (db != null) {
+ dbName = db.getFullName();
+ int index = dbName.indexOf(":");
+ if (index > 0) {
+ dbName = dbName.substring(index + 1); //use short
db name
+ }
+ }
+ }
+ String groupName = entry.getKey();
+ if (!GroupId.isGlobalGroupName(groupName)) {
+ groupName = dbName + "." +
groupName.substring(groupName.indexOf("_") + 1);
+ }
+ info.add(groupName);
info.add(Joiner.on(", ").join(group2Tables.get(groupId)));
ColocateGroupSchema groupSchema = group2Schema.get(groupId);
info.add(String.valueOf(groupSchema.getBucketsNum()));
@@ -756,4 +819,124 @@ public class ColocateTableIndex implements Writable {
public Map<Long, GroupId> getTable2Group() {
return table2Group;
}
+
+ public void alterColocateGroup(AlterColocateGroupStmt stmt) throws
UserException {
+ writeLock();
+ try {
+ Map<String, String> properties = stmt.getProperties();
+ String dbName = stmt.getColocateGroupName().getDb();
+ String groupName = stmt.getColocateGroupName().getGroup();
+ long dbId = 0;
+ if (!GroupId.isGlobalGroupName(groupName)) {
+ Database db = (Database)
Env.getCurrentInternalCatalog().getDbOrMetaException(dbName);
+ dbId = db.getId();
+ }
+ String fullGroupName = GroupId.getFullGroupName(dbId, groupName);
+ ColocateGroupSchema groupSchema = getGroupSchema(fullGroupName);
+ if (groupSchema == null) {
+ throw new DdlException("Not found colocate group " +
stmt.getColocateGroupName().toSql());
+ }
+
+ GroupId groupId = groupSchema.getGroupId();
+
+ if (properties.size() > 1) {
+ throw new DdlException("Can only set one colocate group
property at a time");
+ }
+
+ if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)
+ ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
+ ReplicaAllocation replicaAlloc =
PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
+ Preconditions.checkState(!replicaAlloc.isNotSet());
+
Env.getCurrentSystemInfo().checkReplicaAllocation(replicaAlloc);
+ Map<Tag, List<List<Long>>> backendsPerBucketSeq =
getBackendsPerBucketSeq(groupId);
+ Map<Tag, List<List<Long>>> newBackendsPerBucketSeq =
Maps.newHashMap();
+ for (Map.Entry<Tag, List<List<Long>>> entry :
backendsPerBucketSeq.entrySet()) {
+ List<List<Long>> newList = Lists.newArrayList();
+ for (List<Long> backends : entry.getValue()) {
+ newList.add(Lists.newArrayList(backends));
+ }
+ newBackendsPerBucketSeq.put(entry.getKey(), newList);
+ }
+ try {
+
ColocateTableCheckerAndBalancer.modifyGroupReplicaAllocation(replicaAlloc,
+ newBackendsPerBucketSeq,
groupSchema.getBucketsNum());
+ } catch (Exception e) {
+ LOG.warn("modify group [{}, {}] to replication allocation
{} failed, bucket seq {}",
+ fullGroupName, groupId, replicaAlloc,
backendsPerBucketSeq, e);
+ throw new DdlException(e.getMessage());
+ }
+ backendsPerBucketSeq = newBackendsPerBucketSeq;
+ Preconditions.checkState(backendsPerBucketSeq.size() ==
replicaAlloc.getAllocMap().size());
+ modifyColocateGroupReplicaAllocation(groupSchema.getGroupId(),
replicaAlloc,
+ backendsPerBucketSeq, true);
+ } else {
+ throw new DdlException("Unknown colocate group property: " +
properties.keySet());
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ private void modifyColocateGroupReplicaAllocation(GroupId groupId,
ReplicaAllocation replicaAlloc,
+ Map<Tag, List<List<Long>>> backendsPerBucketSeq, boolean
needEditLog) throws UserException {
+ ColocateGroupSchema groupSchema = getGroupSchema(groupId);
+ if (groupSchema == null) {
+ LOG.warn("not found group {}", groupId);
+ return;
+ }
+
+ List<Long> tableIds = getAllTableIds(groupId);
+ for (Long tableId : tableIds) {
+ long dbId = groupId.dbId;
+ if (dbId == 0) {
+ dbId = groupId.getDbIdByTblId(tableId);
+ }
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ continue;
+ }
+ OlapTable table = (OlapTable) db.getTableNullable(tableId);
+ if (table == null || !isColocateTable(table.getId())) {
+ continue;
+ }
+ table.writeLock();
+ try {
+ Map<String, String> tblProperties = Maps.newHashMap();
+ tblProperties.put("default." +
PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION,
+ replicaAlloc.toCreateStmt());
+ table.setReplicaAllocation(tblProperties);
+ if (table.dynamicPartitionExists()) {
+ TableProperty tableProperty = table.getTableProperty();
+ // Merge the new properties with origin properties, and
then analyze them
+ Map<String, String> origDynamicProperties =
tableProperty.getOriginDynamicPartitionProperty();
+
origDynamicProperties.put(DynamicPartitionProperty.REPLICATION_ALLOCATION,
+ replicaAlloc.toCreateStmt());
+ Map<String, String> analyzedDynamicPartition =
DynamicPartitionUtil.analyzeDynamicPartition(
+ origDynamicProperties, table, db);
+
tableProperty.modifyTableProperties(analyzedDynamicPartition);
+ tableProperty.buildDynamicProperty();
+ }
+ for (ReplicaAllocation alloc :
table.getPartitionInfo().getPartitionReplicaAllocations().values()) {
+ Map<Tag, Short> allocMap = alloc.getAllocMap();
+ allocMap.clear();
+ allocMap.putAll(replicaAlloc.getAllocMap());
+ }
+ } finally {
+ table.writeUnlock();
+ }
+ }
+
+ if
(!backendsPerBucketSeq.equals(group2BackendsPerBucketSeq.row(groupId))) {
+ markGroupUnstable(groupId, "change replica allocation", false);
+ }
+ groupSchema.setReplicaAlloc(replicaAlloc);
+ setBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
+
+ if (needEditLog) {
+ ColocatePersistInfo info =
ColocatePersistInfo.createForModifyReplicaAlloc(groupId,
+ replicaAlloc, backendsPerBucketSeq);
+
Env.getCurrentEnv().getEditLog().logColocateModifyRepliaAlloc(info);
+ }
+ LOG.info("modify group {} replication allocation to {}, is replay {}",
groupId, replicaAlloc, !needEditLog);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
index 6bd4604471a..235a8cc8db5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
@@ -240,6 +240,10 @@ public class PartitionInfo implements Writable {
idToStoragePolicy.put(partitionId, storagePolicy);
}
+ public Map<Long, ReplicaAllocation> getPartitionReplicaAllocations() {
+ return idToReplicaAllocation;
+ }
+
public ReplicaAllocation getReplicaAllocation(long partitionId) {
if (!idToReplicaAllocation.containsKey(partitionId)) {
LOG.debug("failed to get replica allocation for partition: {}",
partitionId);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index 5c18c2bd468..4ec8993be0d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -35,6 +35,7 @@ import org.apache.doris.clone.TabletScheduler.AddResult;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.resource.Tag;
@@ -183,7 +184,12 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
List<List<Long>> balancedBackendsPerBucketSeq =
Lists.newArrayList();
if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup,
availableBeIds, colocateIndex,
infoService, statistic, balancedBackendsPerBucketSeq))
{
- colocateIndex.addBackendsPerBucketSeqByTag(groupId, tag,
balancedBackendsPerBucketSeq);
+ if (!colocateIndex.addBackendsPerBucketSeqByTag(groupId,
tag, balancedBackendsPerBucketSeq,
+ replicaAlloc)) {
+ LOG.warn("relocate group {} succ, but replica
allocation has change, old replica alloc {}",
+ groupId, replicaAlloc);
+ continue;
+ }
Map<Tag, List<List<Long>>> balancedBackendsPerBucketSeqMap
= Maps.newHashMap();
balancedBackendsPerBucketSeqMap.put(tag,
balancedBackendsPerBucketSeq);
ColocatePersistInfo info = ColocatePersistInfo
@@ -219,6 +225,8 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
continue;
}
+ ColocateGroupSchema groupSchema =
colocateIndex.getGroupSchema(groupId);
+ ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
String unstableReason = null;
OUT:
for (Long tableId : tableIds) {
@@ -237,8 +245,6 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
olapTable.readLock();
try {
for (Partition partition : olapTable.getPartitions()) {
- ReplicaAllocation replicaAlloc
- =
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId());
short replicationNum =
replicaAlloc.getTotalReplicaNum();
long visibleVersion = partition.getVisibleVersion();
// Here we only get VISIBLE indexes. All other indexes
are not queryable.
@@ -269,8 +275,7 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
TabletSchedCtx tabletCtx = new
TabletSchedCtx(
TabletSchedCtx.Type.REPAIR,
db.getId(), tableId,
partition.getId(), index.getId(), tablet.getId(),
-
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()),
- System.currentTimeMillis());
+ replicaAlloc,
System.currentTimeMillis());
// the tablet status will be set again
when being scheduled
tabletCtx.setTabletStatus(st);
tabletCtx.setPriority(Priority.NORMAL);
@@ -299,7 +304,7 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
// mark group as stable or unstable
if (Strings.isNullOrEmpty(unstableReason)) {
- colocateIndex.markGroupStable(groupId, true);
+ colocateIndex.markGroupStable(groupId, true, replicaAlloc);
} else {
colocateIndex.markGroupUnstable(groupId, unstableReason, true);
}
@@ -521,6 +526,122 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
return hostsPerBucketSeq;
}
+ public static void modifyGroupReplicaAllocation(ReplicaAllocation
replicaAlloc,
+ Map<Tag, List<List<Long>>> backendBucketsSeq, int bucketNum)
throws Exception {
+ Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
+ List<Tag> deleteTags = Lists.newArrayList();
+ for (Tag tag : backendBucketsSeq.keySet()) {
+ if (!allocMap.containsKey(tag)) {
+ deleteTags.add(tag);
+ }
+ Preconditions.checkState(bucketNum ==
backendBucketsSeq.get(tag).size(),
+ bucketNum + " vs " + backendBucketsSeq.get(tag).size());
+ }
+ deleteTags.forEach(tag -> backendBucketsSeq.remove(tag));
+
+ for (Tag tag : replicaAlloc.getAllocMap().keySet()) {
+ if (!backendBucketsSeq.containsKey(tag)) {
+ List<List<Long>> tagBackendBucketsSeq = Lists.newArrayList();
+ for (int i = 0; i < bucketNum; i++) {
+ tagBackendBucketsSeq.add(Lists.newArrayList());
+ }
+ backendBucketsSeq.put(tag, tagBackendBucketsSeq);
+ }
+ }
+
+ Map<Long, Integer> backendToBucketNum = Maps.newHashMap();
+ backendBucketsSeq.values().forEach(tagBackendIds ->
+ tagBackendIds.forEach(backendIds ->
+ backendIds.forEach(backendId -> backendToBucketNum.put(
+ backendId,
backendToBucketNum.getOrDefault(backendId, 0) + 1))));
+
+ for (Tag tag : backendBucketsSeq.keySet()) {
+ List<List<Long>> tagBackendBucketsSeq = backendBucketsSeq.get(tag);
+ int oldReplicaNum = tagBackendBucketsSeq.get(0).size();
+ for (List<Long> backendIdsOneBucket : tagBackendBucketsSeq) {
+ Preconditions.checkState(backendIdsOneBucket.size() ==
oldReplicaNum,
+ backendIdsOneBucket.size() + " vs " + oldReplicaNum);
+ }
+
+ int newReplicaNum = allocMap.get(tag);
+ if (newReplicaNum == oldReplicaNum) {
+ continue;
+ }
+
+ List<Backend> backends =
Env.getCurrentSystemInfo().getBackendsByTag(tag);
+ Set<Long> availableBeIds = backends.stream().filter(be ->
be.isScheduleAvailable())
+ .map(be -> be.getId()).collect(Collectors.toSet());
+
+ for (Long backendId : availableBeIds) {
+ if (!backendToBucketNum.containsKey(backendId)) {
+ backendToBucketNum.put(backendId, 0);
+ }
+ }
+
+ for (int i = 0; i < tagBackendBucketsSeq.size(); i++) {
+ modifyGroupBucketReplicas(tag, newReplicaNum,
tagBackendBucketsSeq.get(i),
+ availableBeIds, backendToBucketNum);
+ }
+ }
+ }
+
+ private static void modifyGroupBucketReplicas(Tag tag, int newReplicaNum,
List<Long> backendIds,
+ Set<Long> availableBeIds, Map<Long, Integer> backendToBucketNum)
throws Exception {
+ final boolean smallIdFirst = Math.random() < 0.5;
+ if (backendIds.size() > newReplicaNum) {
+ backendIds.sort((id1, id2) -> {
+ boolean alive1 = availableBeIds.contains(id1);
+ boolean alive2 = availableBeIds.contains(id2);
+ if (alive1 != alive2) {
+ return alive1 ? -1 : 1;
+ }
+ int bucketNum1 = backendToBucketNum.getOrDefault(id1, 0);
+ int bucketNum2 = backendToBucketNum.getOrDefault(id2, 0);
+ if (bucketNum1 != bucketNum2) {
+ return Integer.compare(bucketNum1, bucketNum2);
+ }
+
+ return smallIdFirst ? Long.compare(id1, id2) :
Long.compare(id2, id1);
+ });
+
+ for (int i = backendIds.size() - 1; i >= newReplicaNum; i--) {
+ long backendId = backendIds.get(i);
+ backendIds.remove(i);
+ backendToBucketNum.put(backendId,
backendToBucketNum.getOrDefault(backendId, 0) - 1);
+ }
+ }
+
+ if (backendIds.size() < newReplicaNum) {
+ Set<Long> candBackendSet = Sets.newHashSet();
+ candBackendSet.addAll(availableBeIds);
+ candBackendSet.removeAll(backendIds);
+ if (backendIds.size() + candBackendSet.size() < newReplicaNum) {
+ throw new UserException("Can not add backend for tag: " + tag);
+ }
+
+ List<Long> candBackendList = Lists.newArrayList(candBackendSet);
+ candBackendList.sort((id1, id2) -> {
+ int bucketNum1 = backendToBucketNum.getOrDefault(id1, 0);
+ int bucketNum2 = backendToBucketNum.getOrDefault(id2, 0);
+ if (bucketNum1 != bucketNum2) {
+ return Integer.compare(bucketNum1, bucketNum2);
+ }
+
+ return smallIdFirst ? Long.compare(id1, id2) :
Long.compare(id2, id1);
+ });
+
+ int addNum = newReplicaNum - backendIds.size();
+ for (int i = 0; i < addNum; i++) {
+ long backendId = candBackendList.get(i);
+ backendIds.add(backendId);
+ backendToBucketNum.put(backendId,
backendToBucketNum.getOrDefault(backendId, 0) + 1);
+ }
+ }
+
+ Preconditions.checkState(newReplicaNum == backendIds.size(),
+ newReplicaNum + " vs " + backendIds.size());
+ }
+
private List<Map.Entry<Long, Long>>
getSortedBackendReplicaNumPairs(List<Long> allAvailBackendIds,
Set<Long> unavailBackendIds, LoadStatisticForTag statistic,
List<Long> flatBackendsPerBucketSeq) {
// backend id -> replica num, and sorted by replica num, descending.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index d6a8e1efa0e..ee9da3ac100 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -19,6 +19,7 @@ package org.apache.doris.clone;
import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt;
import org.apache.doris.analysis.AdminRebalanceDiskStmt;
+import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Database;
@@ -490,15 +491,20 @@ public class TabletScheduler extends MasterDaemon {
throw new SchedException(Status.UNRECOVERABLE, "index does not
exist");
}
+ ReplicaAllocation replicaAlloc = null;
Tablet tablet = idx.getTablet(tabletId);
Preconditions.checkNotNull(tablet);
- ReplicaAllocation replicaAlloc =
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
-
if (isColocateTable) {
GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
if (groupId == null) {
throw new SchedException(Status.UNRECOVERABLE, "colocate
group does not exist");
}
+ ColocateGroupSchema groupSchema =
colocateTableIndex.getGroupSchema(groupId);
+ if (groupSchema == null) {
+ throw new SchedException(Status.UNRECOVERABLE,
+ "colocate group schema " + groupId + " does not
exist");
+ }
+ replicaAlloc = groupSchema.getReplicaAlloc();
int tabletOrderIdx = tabletCtx.getTabletOrderIdx();
if (tabletOrderIdx == -1) {
@@ -512,6 +518,7 @@ public class TabletScheduler extends MasterDaemon {
statusPair = Pair.of(st, Priority.HIGH);
tabletCtx.setColocateGroupBackendIds(backendsSet);
} else {
+ replicaAlloc =
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
statusPair = tablet.getHealthStatusWithPriority(
infoService, partition.getVisibleVersion(),
replicaAlloc, aliveBeIds);
@@ -1484,14 +1491,18 @@ public class TabletScheduler extends MasterDaemon {
return;
}
- replicaAlloc =
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
boolean isColocateTable =
colocateTableIndex.isColocateTable(tbl.getId());
if (isColocateTable) {
GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
if (groupId == null) {
return;
}
+ ColocateGroupSchema groupSchema =
colocateTableIndex.getGroupSchema(groupId);
+ if (groupSchema == null) {
+ return;
+ }
+ replicaAlloc = groupSchema.getReplicaAlloc();
int tabletOrderIdx = tabletCtx.getTabletOrderIdx();
if (tabletOrderIdx == -1) {
tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId());
@@ -1504,6 +1515,7 @@ public class TabletScheduler extends MasterDaemon {
statusPair = Pair.of(st, Priority.HIGH);
tabletCtx.setColocateGroupBackendIds(backendsSet);
} else {
+ replicaAlloc =
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
statusPair = tablet.getHealthStatusWithPriority(
infoService, partition.getVisibleVersion(),
replicaAlloc, aliveBeIds);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
index 93f54483cbf..3ce3ff74c7a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
@@ -17,6 +17,7 @@
package org.apache.doris.common.proc;
+import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
@@ -185,6 +186,10 @@ public class TabletHealthProcDir implements
ProcDirInterface {
++tabletNum;
Tablet.TabletStatus res = null;
if (groupId != null) {
+ ColocateGroupSchema groupSchema =
colocateTableIndex.getGroupSchema(groupId);
+ if (groupSchema != null) {
+ replicaAlloc =
groupSchema.getReplicaAlloc();
+ }
Set<Long> backendsSet =
colocateTableIndex.getTabletBackendsByGroup(groupId, i);
res =
tablet.getColocateHealthStatus(partition.getVisibleVersion(), replicaAlloc,
backendsSet);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java
index 9e51d38de5c..b7c2a615aac 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java
@@ -114,7 +114,7 @@ public class ColocateMetaService extends RestBaseController
{
if ("POST".equalsIgnoreCase(method)) {
colocateIndex.markGroupUnstable(groupId, "mark unstable via http
api", true);
} else if ("DELETE".equalsIgnoreCase(method)) {
- colocateIndex.markGroupStable(groupId, true);
+ colocateIndex.markGroupStable(groupId, true, null);
}
return ResponseEntityBuilder.ok();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 0e0b8ac7df6..d3d0fe18d90 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -456,6 +456,7 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
+ case OperationType.OP_COLOCATE_MOD_REPLICA_ALLOC:
case OperationType.OP_COLOCATE_ADD_TABLE:
case OperationType.OP_COLOCATE_REMOVE_TABLE:
case OperationType.OP_COLOCATE_BACKENDS_PER_BUCKETSEQ:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index b78cbddb381..bfe1bb0a9e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -19,6 +19,7 @@ package org.apache.doris.master;
import org.apache.doris.catalog.BinlogConfig;
+import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@@ -1172,6 +1173,10 @@ public class ReportHandler extends Daemon {
int tabletOrderIdx =
materializedIndex.getTabletOrderIdx(tabletId);
Preconditions.checkState(tabletOrderIdx != -1, "get tablet
materializedIndex for %s fail", tabletId);
Set<Long> backendsSet =
colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
+ ColocateGroupSchema groupSchema =
colocateTableIndex.getGroupSchema(groupId);
+ if (groupSchema != null) {
+ replicaAlloc = groupSchema.getReplicaAlloc();
+ }
TabletStatus status =
tablet.getColocateHealthStatus(visibleVersion,
replicaAlloc, backendsSet);
if (status == TabletStatus.HEALTHY) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java
index 459be646052..429d4e0e1a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java
@@ -18,6 +18,7 @@
package org.apache.doris.persist;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
+import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
@@ -45,29 +46,38 @@ public class ColocatePersistInfo implements Writable {
private long tableId;
@SerializedName(value = "backendsPerBucketSeq")
private Map<Tag, List<List<Long>>> backendsPerBucketSeq =
Maps.newHashMap();
+ @SerializedName(value = "replicaAlloc")
+ private ReplicaAllocation replicaAlloc = new ReplicaAllocation();
- private ColocatePersistInfo(GroupId groupId, long tableId, Map<Tag,
List<List<Long>>> backendsPerBucketSeq) {
+ private ColocatePersistInfo(GroupId groupId, long tableId, Map<Tag,
List<List<Long>>> backendsPerBucketSeq,
+ ReplicaAllocation replicaAlloc) {
this.groupId = groupId;
this.tableId = tableId;
this.backendsPerBucketSeq = backendsPerBucketSeq;
+ this.replicaAlloc = replicaAlloc;
}
public static ColocatePersistInfo createForAddTable(GroupId groupId, long
tableId,
Map<Tag, List<List<Long>>> backendsPerBucketSeq) {
- return new ColocatePersistInfo(groupId, tableId, backendsPerBucketSeq);
+ return new ColocatePersistInfo(groupId, tableId, backendsPerBucketSeq,
new ReplicaAllocation());
}
public static ColocatePersistInfo createForBackendsPerBucketSeq(GroupId
groupId,
Map<Tag, List<List<Long>>> backendsPerBucketSeq) {
- return new ColocatePersistInfo(groupId, -1L, backendsPerBucketSeq);
+ return new ColocatePersistInfo(groupId, -1L, backendsPerBucketSeq, new
ReplicaAllocation());
}
public static ColocatePersistInfo createForMarkUnstable(GroupId groupId) {
- return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap());
+ return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap(), new
ReplicaAllocation());
}
public static ColocatePersistInfo createForMarkStable(GroupId groupId) {
- return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap());
+ return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap(), new
ReplicaAllocation());
+ }
+
+ public static ColocatePersistInfo createForModifyReplicaAlloc(GroupId
groupId, ReplicaAllocation replicaAlloc,
+ Map<Tag, List<List<Long>>> backendsPerBucketSeq) {
+ return new ColocatePersistInfo(groupId, -1L, backendsPerBucketSeq,
replicaAlloc);
}
public static ColocatePersistInfo read(DataInput in) throws IOException {
@@ -87,6 +97,10 @@ public class ColocatePersistInfo implements Writable {
return backendsPerBucketSeq;
}
+ public ReplicaAllocation getReplicaAlloc() {
+ return replicaAlloc;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
@@ -129,7 +143,7 @@ public class ColocatePersistInfo implements Writable {
ColocatePersistInfo info = (ColocatePersistInfo) obj;
return tableId == info.tableId && groupId.equals(info.groupId) &&
backendsPerBucketSeq.equals(
- info.backendsPerBucketSeq);
+ info.backendsPerBucketSeq) &&
replicaAlloc.equals(info.replicaAlloc);
}
@Override
@@ -138,6 +152,7 @@ public class ColocatePersistInfo implements Writable {
sb.append("table id: ").append(tableId);
sb.append(" group id: ").append(groupId);
sb.append(" backendsPerBucketSeq: ").append(backendsPerBucketSeq);
+ sb.append(" replicaAlloc: ").append(replicaAlloc);
return sb.toString();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 97dd6719e9b..b0540abf9ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -594,6 +594,11 @@ public class EditLog {
env.getColocateTableIndex().replayMarkGroupStable(info);
break;
}
+ case OperationType.OP_COLOCATE_MOD_REPLICA_ALLOC: {
+ final ColocatePersistInfo info = (ColocatePersistInfo)
journal.getData();
+ env.getColocateTableIndex().replayModifyReplicaAlloc(info);
+ break;
+ }
case OperationType.OP_MODIFY_TABLE_COLOCATE: {
final TablePropertyInfo info = (TablePropertyInfo)
journal.getData();
env.replayModifyTableColocate(info);
@@ -1497,6 +1502,10 @@ public class EditLog {
Env.getCurrentEnv().getBinlogManager().addTruncateTable(info, logId);
}
+ public void logColocateModifyRepliaAlloc(ColocatePersistInfo info) {
+ logEdit(OperationType.OP_COLOCATE_MOD_REPLICA_ALLOC, info);
+ }
+
public void logColocateAddTable(ColocatePersistInfo info) {
logEdit(OperationType.OP_COLOCATE_ADD_TABLE, info);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 3cf30df2428..b2f3ffb7e33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -186,6 +186,10 @@ public class OperationType {
public static final short OP_ADD_GLOBAL_FUNCTION = 132;
public static final short OP_DROP_GLOBAL_FUNCTION = 133;
+ // modify database/table/tablet/replica meta
+ public static final short OP_SET_REPLICA_VERSION = 141;
+ public static final short OP_COLOCATE_MOD_REPLICA_ALLOC = 142;
+
// routine load 200
public static final short OP_CREATE_ROUTINE_LOAD_JOB = 200;
public static final short OP_CHANGE_ROUTINE_LOAD_JOB = 201;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 29e29a9a7dd..f50d8d1ae02 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -29,6 +29,7 @@ import org.apache.doris.analysis.AdminSetReplicaStatusStmt;
import org.apache.doris.analysis.AlterCatalogCommentStmt;
import org.apache.doris.analysis.AlterCatalogNameStmt;
import org.apache.doris.analysis.AlterCatalogPropertyStmt;
+import org.apache.doris.analysis.AlterColocateGroupStmt;
import org.apache.doris.analysis.AlterColumnStatsStmt;
import org.apache.doris.analysis.AlterDatabasePropertyStmt;
import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
@@ -298,6 +299,8 @@ public class DdlExecutor {
env.getRefreshManager().handleRefreshDb((RefreshDbStmt) ddlStmt);
} else if (ddlStmt instanceof AlterResourceStmt) {
env.getResourceMgr().alterResource((AlterResourceStmt) ddlStmt);
+ } else if (ddlStmt instanceof AlterColocateGroupStmt) {
+
env.getColocateTableIndex().alterColocateGroup((AlterColocateGroupStmt)
ddlStmt);
} else if (ddlStmt instanceof AlterWorkloadGroupStmt) {
env.getWorkloadGroupMgr().alterWorkloadGroup((AlterWorkloadGroupStmt) ddlStmt);
} else if (ddlStmt instanceof CreatePolicyStmt) {
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex
b/fe/fe-core/src/main/jflex/sql_scanner.flex
index 4f4f3392dc2..c4791093b48 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -146,6 +146,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("clusters", new Integer(SqlParserSymbols.KW_CLUSTERS));
keywordMap.put("collate", new Integer(SqlParserSymbols.KW_COLLATE));
keywordMap.put("collation", new
Integer(SqlParserSymbols.KW_COLLATION));
+ keywordMap.put("colocate", new Integer(SqlParserSymbols.KW_COLOCATE));
keywordMap.put("column", new Integer(SqlParserSymbols.KW_COLUMN));
keywordMap.put("columns", new Integer(SqlParserSymbols.KW_COLUMNS));
keywordMap.put("comment", new Integer(SqlParserSymbols.KW_COMMENT));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
index 924fcd53b7d..4a78ef4de28 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
@@ -17,6 +17,7 @@
package org.apache.doris.alter;
+import org.apache.doris.analysis.AlterColocateGroupStmt;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
@@ -26,6 +27,8 @@ import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.analysis.DropResourceStmt;
import org.apache.doris.analysis.ShowCreateMaterializedViewStmt;
+import org.apache.doris.catalog.ColocateGroupSchema;
+import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.Database;
@@ -36,10 +39,13 @@ import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.Type;
+import org.apache.doris.clone.RebalancerTestUtil;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -47,6 +53,7 @@ import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.DdlExecutor;
import org.apache.doris.qe.ShowExecutor;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
@@ -70,18 +77,36 @@ public class AlterTest {
private static String runningDir = "fe/mocked/AlterTest/" +
UUID.randomUUID().toString() + "/";
private static ConnectContext connectContext;
- private static Backend be;
+
+ private static Map<Long, Tag> backendTags;
@BeforeClass
public static void beforeClass() throws Exception {
FeConstants.runningUnitTest = true;
FeConstants.default_scheduler_interval_millisecond = 100;
+ FeConstants.tablet_checker_interval_ms = 100;
+ FeConstants.tablet_checker_interval_ms = 100;
Config.dynamic_partition_check_interval_seconds = 1;
Config.disable_storage_medium_check = true;
Config.enable_storage_policy = true;
- UtFrameUtils.createDorisCluster(runningDir);
+ Config.disable_balance = true;
+ Config.schedule_batch_size = 400;
+ Config.schedule_slot_num_per_hdd_path = 100;
+ UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 5);
+
+ List<Backend> backends =
Env.getCurrentSystemInfo().getIdToBackend().values().asList();
+
+ Map<String, String> tagMap = Maps.newHashMap();
+ tagMap.put(Tag.TYPE_LOCATION, "group_a");
+ backends.get(2).setTagMap(tagMap);
+ backends.get(3).setTagMap(tagMap);
+
+ tagMap = Maps.newHashMap();
+ tagMap.put(Tag.TYPE_LOCATION, "group_b");
+ backends.get(4).setTagMap(tagMap);
- be =
Env.getCurrentSystemInfo().getIdToBackend().values().asList().get(0);
+ backendTags = Maps.newHashMap();
+ backends.forEach(be -> backendTags.put(be.getId(),
be.getLocationTag()));
// create connect context
connectContext = UtFrameUtils.createDefaultCtx();
@@ -443,21 +468,16 @@ public class AlterTest {
// set un-partitioned table's real replication num
// first we need to change be's tag
- Map<String, String> originTagMap = be.getTagMap();
- Map<String, String> tagMap = Maps.newHashMap();
- tagMap.put(Tag.TYPE_LOCATION, "group1");
- be.setTagMap(tagMap);
OlapTable tbl2 = (OlapTable) db.getTableOrMetaException("tbl2");
Partition partition = tbl2.getPartition(tbl2.getName());
Assert.assertEquals(Short.valueOf("1"),
Short.valueOf(tbl2.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()));
- stmt = "alter table test.tbl2 set ('replication_allocation' =
'tag.location.group1:1');";
+ stmt = "alter table test.tbl2 set ('replication_allocation' =
'tag.location.group_a:1');";
alterTable(stmt, false);
Assert.assertEquals((short) 1, (short)
tbl2.getPartitionInfo().getReplicaAllocation(partition.getId())
- .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION,
"group1")));
+ .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION,
"group_a")));
Assert.assertEquals((short) 1, (short)
tbl2.getTableProperty().getReplicaAllocation()
- .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION,
"group1")));
- be.setTagMap(originTagMap);
+ .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION,
"group_a")));
Thread.sleep(5000); // sleep to wait dynamic partition scheduler run
// add partition without set replication num, and default num is 3.
@@ -1239,6 +1259,145 @@ public class AlterTest {
Env.getCurrentEnv().getResourceMgr().dropResource(stmt);
}
+ @Test
+ public void testModifyColocateGroupReplicaAlloc() throws Exception {
+ Config.enable_round_robin_create_tablet = true;
+
+ createTable("CREATE TABLE test.col_tbl0\n" + "(\n" + " k1 date,\n"
+ " k2 int,\n" + " v1 int \n"
+ + ") ENGINE=OLAP\n" + "UNIQUE KEY (k1,k2)\n"
+ + "DISTRIBUTED BY HASH(k2) BUCKETS 4\n"
+ + "PROPERTIES('replication_num' = '2', 'colocate_with' =
'mod_group_0');");
+
+ createTable("CREATE TABLE test.col_tbl1\n" + "(\n" + " k1 date,\n"
+ " k2 int,\n" + " v1 int \n"
+ + ") ENGINE=OLAP\n" + "UNIQUE KEY (k1,k2)\n" + "PARTITION BY
RANGE(k1)\n" + "(\n"
+ + " PARTITION p1 values less than('2020-02-01'),\n"
+ + " PARTITION p2 values less than('2020-03-01')\n" + ")\n"
+ "DISTRIBUTED BY HASH(k2) BUCKETS 4\n"
+ + "PROPERTIES('replication_num' = '2', 'colocate_with' =
'mod_group_1');");
+
+ createTable("CREATE TABLE test.col_tbl2 (\n"
+ + "`uuid` varchar(255) NULL,\n"
+ + "`action_datetime` date NULL\n"
+ + ")\n"
+ + "DUPLICATE KEY(uuid)\n"
+ + "PARTITION BY RANGE(action_datetime)()\n"
+ + "DISTRIBUTED BY HASH(uuid) BUCKETS 4\n"
+ + "PROPERTIES\n"
+ + "(\n"
+ + "\"replication_num\" = \"2\",\n"
+ + "\"colocate_with\" = \"mod_group_2\",\n"
+ + "\"dynamic_partition.enable\" = \"true\",\n"
+ + "\"dynamic_partition.time_unit\" = \"DAY\",\n"
+ + "\"dynamic_partition.end\" = \"2\",\n"
+ + "\"dynamic_partition.prefix\" = \"p\",\n"
+ + "\"dynamic_partition.buckets\" = \"4\",\n"
+ + "\"dynamic_partition.replication_num\" = \"2\"\n"
+ + ");\n");
+
+ Env env = Env.getCurrentEnv();
+ Database db =
env.getInternalCatalog().getDbOrMetaException("default_cluster:test");
+ OlapTable tbl2 = (OlapTable) db.getTableOrMetaException("col_tbl2");
+ for (int j = 0; true; j++) {
+ Thread.sleep(2000);
+ if (tbl2.getAllPartitions().size() > 0) {
+ break;
+ }
+ if (j >= 5) {
+ Assert.assertTrue("dynamic table not create partition", false);
+ }
+ }
+
+ RebalancerTestUtil.updateReplicaPathHash();
+
+ ReplicaAllocation newReplicaAlloc = new ReplicaAllocation();
+ newReplicaAlloc.put(Tag.DEFAULT_BACKEND_TAG, (short) 1);
+ newReplicaAlloc.put(Tag.create(Tag.TYPE_LOCATION, "group_a"), (short)
1);
+ newReplicaAlloc.put(Tag.create(Tag.TYPE_LOCATION, "group_b"), (short)
1);
+
+ for (int i = 0; i < 3; i++) {
+ String groupName = "mod_group_" + i;
+ String sql = "alter colocate group test." + groupName
+ + " set ( 'replication_allocation' = '" +
newReplicaAlloc.toCreateStmt() + "')";
+ String fullGroupName = GroupId.getFullGroupName(db.getId(),
groupName);
+ AlterColocateGroupStmt stmt = (AlterColocateGroupStmt)
UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+ DdlExecutor.execute(env, stmt);
+
+ ColocateGroupSchema groupSchema =
env.getColocateTableIndex().getGroupSchema(fullGroupName);
+ Assert.assertNotNull(groupSchema);
+ Assert.assertEquals(newReplicaAlloc,
groupSchema.getReplicaAlloc());
+
+ OlapTable tbl = (OlapTable) db.getTableOrMetaException("col_tbl" +
i);
+ Assert.assertEquals(newReplicaAlloc,
tbl.getDefaultReplicaAllocation());
+ for (Partition partition : tbl.getAllPartitions()) {
+ Assert.assertEquals(newReplicaAlloc,
+
tbl.getPartitionInfo().getReplicaAllocation(partition.getId()));
+ }
+
+ if (i == 2) {
+ Assert.assertEquals(newReplicaAlloc,
+
tbl.getTableProperty().getDynamicPartitionProperty().getReplicaAllocation());
+ }
+ }
+
+ Config.enable_round_robin_create_tablet = false;
+
+ for (int k = 0; true; k++) {
+ Thread.sleep(1000); // sleep to wait dynamic partition scheduler
run
+ boolean allStable = true;
+ for (int i = 0; i < 3; i++) {
+ String fullGroupName = GroupId.getFullGroupName(db.getId(),
"mod_group_" + i);
+ ColocateGroupSchema groupSchema =
env.getColocateTableIndex().getGroupSchema(fullGroupName);
+ Assert.assertNotNull(groupSchema);
+
+ if
(env.getColocateTableIndex().isGroupUnstable(groupSchema.getGroupId())) {
+ allStable = false;
+ if (k >= 120) {
+ Assert.assertTrue(fullGroupName + " is unstable",
false);
+ }
+ continue;
+ }
+
+ Map<Long, Integer> backendReplicaNum = Maps.newHashMap();
+ OlapTable tbl = (OlapTable)
db.getTableOrMetaException("col_tbl" + i);
+ int tabletNum = 0;
+ for (Partition partition : tbl.getAllPartitions()) {
+ for (MaterializedIndex idx :
partition.getMaterializedIndices(
+ MaterializedIndex.IndexExtState.VISIBLE)) {
+ for (Tablet tablet : idx.getTablets()) {
+ Map<Tag, Short> allocMap = Maps.newHashMap();
+ tabletNum++;
+ for (Replica replica : tablet.getReplicas()) {
+ long backendId = replica.getBackendId();
+ Tag tag = backendTags.get(backendId);
+ Assert.assertNotNull(tag);
+ short oldNum = allocMap.getOrDefault(tag,
(short) 0);
+ allocMap.put(tag, (short) (oldNum + 1));
+ backendReplicaNum.put(backendId,
backendReplicaNum.getOrDefault(backendId, 0) + 1);
+ }
+ Assert.assertEquals(newReplicaAlloc.getAllocMap(),
allocMap);
+ }
+ }
+ }
+
+ Assert.assertTrue(tabletNum > 0);
+
+ for (Map.Entry<Long, Integer> entry :
backendReplicaNum.entrySet()) {
+ long backendId = entry.getKey();
+ int replicaNum = entry.getValue();
+ Tag tag = backendTags.get(backendId);
+ int sameTagReplicaNum = tabletNum *
newReplicaAlloc.getAllocMap().getOrDefault(tag, (short) 0);
+ int sameTagBeNum = (int)
(backendTags.values().stream().filter(t -> t.equals(tag)).count());
+ Assert.assertEquals("backend " + backendId + " failed: " +
" all backend replica num: "
+ + backendReplicaNum + ", all backend tag: " +
backendTags,
+ sameTagReplicaNum / sameTagBeNum, replicaNum);
+ }
+ }
+
+ if (allStable) {
+ break;
+ }
+ }
+ }
+
@Test
public void testShowMV() throws Exception {
createMV("CREATE MATERIALIZED VIEW test_mv as select k1 from
test.show_test group by k1;", false);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
index 2e2d53edb7a..701cd114cdb 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
@@ -253,12 +253,16 @@ public class UtFrameUtils {
FeConstants.runningUnitTest = true;
FeConstants.enableInternalSchemaDb = false;
int feRpcPort = startFEServer(runningDir);
+ List<Backend> bes = Lists.newArrayList();
for (int i = 0; i < backendNum; i++) {
String host = "127.0.0." + (i + 1);
createBackend(host, feRpcPort);
}
+ System.out.println("after create backend");
+ checkBEHeartbeat(bes);
// sleep to wait first heartbeat
- Thread.sleep(6000);
+ // Thread.sleep(6000);
+ System.out.println("after create backend2");
}
public static Backend createBackend(String beHost, int feRpcPort) throws
IOException, InterruptedException {
@@ -293,6 +297,7 @@ public class UtFrameUtils {
diskInfo1.setTotalCapacityB(1000000);
diskInfo1.setAvailableCapacityB(500000);
diskInfo1.setDataUsedCapacityB(480000);
+ diskInfo1.setPathHash(be.getId());
disks.put(diskInfo1.getRootPath(), diskInfo1);
be.setDisks(ImmutableMap.copyOf(disks));
be.setAlive(true);
diff --git a/regression-test/suites/alter_p2/test_alter_colocate_group.groovy
b/regression-test/suites/alter_p2/test_alter_colocate_group.groovy
new file mode 100644
index 00000000000..1f5b8496630
--- /dev/null
+++ b/regression-test/suites/alter_p2/test_alter_colocate_group.groovy
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite ("test_alter_colocate_group") {
+ sql "DROP DATABASE IF EXISTS test_alter_colocate_group_db FORCE"
+ test {
+ sql """
+ ALTER COLOCATE GROUP test_alter_colocate_group_db.bad_group_1
+ SET ( "replication_num" = "1" );
+ """
+
+ exception "unknown databases"
+ }
+ test {
+ sql """
+ ALTER COLOCATE GROUP bad_group_2
+ SET ( "replication_num" = "1" );
+ """
+
+ exception "Not found colocate group
`default_cluster:regression_test_alter_p2`.`bad_group_2`"
+ }
+ test {
+ sql """
+ ALTER COLOCATE GROUP bad_db.__global__bad_group_3
+ SET ( "replication_num" = "1" );
+ """
+
+ exception "group that name starts with `__global__` is a global group,
it doesn't belong to any specific database"
+ }
+ test {
+ sql """
+ ALTER COLOCATE GROUP __global__bad_group_4
+ SET ( "replication_num" = "1" );
+ """
+
+ exception "Not found colocate group `__global__bad_group_4`"
+ }
+
+ sql " DROP TABLE IF EXISTS tbl1 FORCE; "
+ sql " DROP TABLE IF EXISTS tbl2 FORCE; "
+ sql " DROP TABLE IF EXISTS tbl3 FORCE; "
+
+ sql """
+ CREATE TABLE tbl1
+ (
+ k1 int,
+ k2 int
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 6
+ PROPERTIES
+ (
+ "colocate_with" = "group_1",
+ "replication_num" = "1"
+ );
+ """
+
+ sql """
+ CREATE TABLE tbl2
+ (
+ k1 date,
+ k2 int
+ )
+ PARTITION BY RANGE(k1)
+ (
+ PARTITION p1 values less than('2020-02-01'),
+ PARTITION p2 values less than('2020-03-01')
+ )
+ DISTRIBUTED BY HASH(k2) BUCKETS 5
+ PROPERTIES
+ (
+ "colocate_with" = "group_2",
+ "replication_num" = "1"
+ );
+ """
+
+ sql """
+ CREATE TABLE tbl3
+ (
+ `uuid` varchar(255) NULL,
+ `action_datetime` date NULL
+ )
+ DUPLICATE KEY(uuid)
+ PARTITION BY RANGE(action_datetime)()
+ DISTRIBUTED BY HASH(uuid) BUCKETS 4
+ PROPERTIES
+ (
+ "colocate_with" = "group_3",
+ "replication_num" = "1",
+ "dynamic_partition.enable" = "true",
+ "dynamic_partition.time_unit" = "DAY",
+ "dynamic_partition.end" = "2",
+ "dynamic_partition.prefix" = "p",
+ "dynamic_partition.buckets" = "4",
+ "dynamic_partition.replication_num" = "1"
+ );
+ """
+
+ def checkGroupsReplicaAlloc = { groupName, replicaNum ->
+ // groupName -> replicaAlloc
+ def allocMap = [:]
+ def groups = sql """ show proc "/colocation_group" """
+ for (def group : groups) {
+ allocMap[group[1]] = group[4]
+ }
+
+ assertEquals("tag.location.default: ${replicaNum}".toString(),
allocMap[groupName])
+ }
+
+ def checkTableReplicaAlloc = { tableName, hasDynamicPart, replicaNum ->
+ def result = sql """ show create table ${tableName} """
+ def createTbl = result[0][1].toString()
+ assertTrue(createTbl.indexOf("\"replication_allocation\" =
\"tag.location.default: ${replicaNum}\"") > 0)
+ if (hasDynamicPart) {
+ assertTrue(createTbl.indexOf(
+ "\"dynamic_partition.replication_allocation\" =
\"tag.location.default: ${replicaNum}\"") > 0)
+ }
+
+ result = sql """ show partitions from ${tableName} """
+ assertTrue(result.size() > 0)
+ for (int i = 0; i < result.size(); i++) {
+ assertEquals("${replicaNum}".toString(), result[i][9].toString())
+ }
+ }
+
+ for (int i = 1; i <= 3; i++) {
+ def groupName = "regression_test_alter_p2.group_${i}"
+ checkGroupsReplicaAlloc(groupName, 1)
+
+ def tableName = "tbl${i}"
+ def hasDynamicPart = i == 3
+ checkTableReplicaAlloc(tableName, hasDynamicPart, 1)
+
+ test {
+ sql """
+ ALTER COLOCATE GROUP ${groupName}
+ SET ( "replication_num" = "100" );
+ """
+
+ exception "Failed to find enough host"
+ }
+
+ test {
+ sql """
+ ALTER COLOCATE GROUP ${groupName}
+ SET ( "replication_num" = "3" );
+ """
+ }
+
+ checkGroupsReplicaAlloc(groupName, 3)
+ checkTableReplicaAlloc(tableName, hasDynamicPart, 3)
+ }
+
+ sql " DROP TABLE IF EXISTS tbl1 FORCE; "
+ sql " DROP TABLE IF EXISTS tbl2 FORCE; "
+ sql " DROP TABLE IF EXISTS tbl3 FORCE; "
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]