This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 086236ed245 [Improment]publish workload to BE by tag (#38486)
086236ed245 is described below
commit 086236ed245a403cb8a7b0c2af9ef24d6801c3c6
Author: wangbo <[email protected]>
AuthorDate: Wed Jul 31 18:29:49 2024 +0800
[Improment]publish workload to BE by tag (#38486)
## Proposed changes
A workload group's tag property may be three cases as below:
1 empty string, null or '', it could be published to all BE.
2 a value match some BE' location, then the workload group could only be
published to the BE with same tag.
3 not an empty string, but some invalid string which can not math any
BE's location, then it could not be published any BE.
---
.../doris/common/publish/TopicPublisherThread.java | 28 +++++++++++++++-
.../main/java/org/apache/doris/resource/Tag.java | 2 ++
.../resource/workloadgroup/WorkloadGroup.java | 24 +++++++++++---
.../resource/workloadgroup/WorkloadGroupMgr.java | 3 ++
.../main/java/org/apache/doris/system/Backend.java | 37 ++++++++++++++++++++++
.../workloadgroup/WorkloadGroupMgrTest.java | 13 ++++++--
gensrc/thrift/BackendService.thrift | 1 +
7 files changed, 101 insertions(+), 7 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
index f9fdc808498..74cefeca4d9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
@@ -27,6 +27,7 @@ import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TTopicInfoType;
+import org.apache.doris.thrift.TWorkloadGroupInfo;
import org.apache.doris.thrift.TopicInfo;
import org.apache.logging.log4j.LogManager;
@@ -35,8 +36,10 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
public class TopicPublisherThread extends MasterDaemon {
@@ -126,7 +129,30 @@ public class TopicPublisherThread extends MasterDaemon {
try {
address = new TNetworkAddress(be.getHost(), be.getBePort());
client = ClientPool.backendPool.borrowObject(address);
- client.publishTopicInfo(request);
+ // check whether workload group tag math current be
+ TPublishTopicRequest copiedRequest = request.deepCopy();
+ if (copiedRequest.isSetTopicMap()) {
+ Map<TTopicInfoType, List<TopicInfo>> topicMap =
copiedRequest.getTopicMap();
+ List<TopicInfo> topicInfoList =
topicMap.get(TTopicInfoType.WORKLOAD_GROUP);
+ if (topicInfoList != null) {
+ Set<String> beTagSet = be.getBeWorkloadGroupTagSet();
+ Iterator<TopicInfo> topicIter =
topicInfoList.iterator();
+ while (topicIter.hasNext()) {
+ TopicInfo topicInfo = topicIter.next();
+ if (topicInfo.isSetWorkloadGroupInfo()) {
+ TWorkloadGroupInfo tWgInfo =
topicInfo.getWorkloadGroupInfo();
+ if (tWgInfo.isSetTag() &&
!Backend.isMatchWorkloadGroupTag(
+ tWgInfo.getTag(), beTagSet)) {
+ // currently TopicInfo could not contain
both policy and workload group,
+ // so we can remove TopicInfo directly.
+ topicIter.remove();
+ }
+ }
+ }
+ }
+ }
+
+ client.publishTopicInfo(copiedRequest);
ok = true;
LOG.info("[topic_publish]publish topic info to be {} success,
time cost={} ms, details:{}",
be.getHost(), (System.currentTimeMillis() -
beginTime), logStr);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
index 9353140b9d4..a51755412b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
@@ -73,6 +73,8 @@ public class Tag implements Writable {
public static final String CLOUD_CLUSTER_PRIVATE_ENDPOINT =
"cloud_cluster_private_endpoint";
public static final String CLOUD_CLUSTER_STATUS = "cloud_cluster_status";
+ public static final String WORKLOAD_GROUP = "workload_group";
+
public static final ImmutableSet<String> RESERVED_TAG_TYPE =
ImmutableSet.of(
TYPE_ROLE, TYPE_FUNCTION, TYPE_LOCATION);
public static final ImmutableSet<String> RESERVED_TAG_VALUES =
ImmutableSet.of(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index cae1b25a41f..c6d7aa923b0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -18,8 +18,10 @@
package org.apache.doris.resource.workloadgroup;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
@@ -30,7 +32,6 @@ import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TWorkloadGroupInfo;
import org.apache.doris.thrift.TopicInfo;
-import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
@@ -189,9 +190,7 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
throws DdlException {
Map<String, String> newProperties = new
HashMap<>(currentWorkloadGroup.getProperties());
for (Map.Entry<String, String> kv : updateProperties.entrySet()) {
- if (!Strings.isNullOrEmpty(kv.getValue())) {
- newProperties.put(kv.getKey(), kv.getValue());
- }
+ newProperties.put(kv.getKey(), kv.getValue());
}
checkProperties(newProperties);
@@ -416,6 +415,18 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
}
}
+ String tagStr = properties.get(TAG);
+ if (!StringUtils.isEmpty(tagStr)) {
+ String[] tagArr = tagStr.split(",");
+ for (String tag : tagArr) {
+ try {
+ FeNameFormat.checkCommonName("workload group tag name",
tag);
+ } catch (AnalysisException e) {
+ throw new DdlException("workload group tag name format is
illegal, " + tagStr);
+ }
+ }
+ }
+
}
public long getId() {
@@ -605,6 +616,11 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
tWorkloadGroupInfo.setRemoteReadBytesPerSecond(Long.valueOf(remoteReadBytesPerSecStr));
}
+ String tagStr = properties.get(TAG);
+ if (!StringUtils.isEmpty(tagStr)) {
+ tWorkloadGroupInfo.setTag(tagStr);
+ }
+
TopicInfo topicInfo = new TopicInfo();
topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
return topicInfo;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 323ecffc2f2..5ddc5fb68f8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -394,6 +394,9 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws
DdlException {
String workloadGroupName = stmt.getWorkloadGroupName();
Map<String, String> properties = stmt.getProperties();
+ if (properties.size() == 0) {
+ throw new DdlException("alter workload group should contain at
least one property");
+ }
WorkloadGroup newWorkloadGroup;
writeLock();
try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index a366aca5d6b..876e6ca40b4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -38,7 +38,9 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -46,9 +48,12 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -932,4 +937,36 @@ public class Backend implements Writable {
this.lastPublishTaskAccumulatedNum = accumulatedNum;
}
+ public Set<String> getBeWorkloadGroupTagSet() {
+ Set<String> beTagSet = Sets.newHashSet();
+ String beTagStr = this.tagMap.get(Tag.WORKLOAD_GROUP);
+ if (StringUtils.isEmpty(beTagStr)) {
+ return beTagSet;
+ }
+
+ String[] beTagArr = beTagStr.split(",");
+ for (String beTag : beTagArr) {
+ beTagSet.add(beTag.trim());
+ }
+
+ return beTagSet;
+ }
+
+ public static boolean isMatchWorkloadGroupTag(String wgTagStr, Set<String>
beTagSet) {
+ if (StringUtils.isEmpty(wgTagStr)) {
+ return true;
+ }
+ if (beTagSet.isEmpty()) {
+ return false;
+ }
+
+ String[] wgTagArr = wgTagStr.split(",");
+ Set<String> wgTagSet = new HashSet<>();
+ for (String wgTag : wgTagArr) {
+ wgTagSet.add(wgTag.trim());
+ }
+
+ return !Collections.disjoint(wgTagSet, beTagSet);
+ }
+
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
index 1e73dc79510..5f1e3565966 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
@@ -194,15 +194,24 @@ public class WorkloadGroupMgrTest {
Config.enable_workload_group = true;
ConnectContext context = new ConnectContext();
WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
- Map<String, String> properties = Maps.newHashMap();
+ Map<String, String> p0 = Maps.newHashMap();
String name = "g1";
try {
- AlterWorkloadGroupStmt stmt1 = new AlterWorkloadGroupStmt(name,
properties);
+ AlterWorkloadGroupStmt stmt1 = new AlterWorkloadGroupStmt(name,
p0);
+ workloadGroupMgr.alterWorkloadGroup(stmt1);
+ } catch (DdlException e) {
+ Assert.assertTrue(e.getMessage().contains("alter workload group
should contain at least one property"));
+ }
+
+ p0.put(WorkloadGroup.CPU_SHARE, "10");
+ try {
+ AlterWorkloadGroupStmt stmt1 = new AlterWorkloadGroupStmt(name,
p0);
workloadGroupMgr.alterWorkloadGroup(stmt1);
} catch (DdlException e) {
Assert.assertTrue(e.getMessage().contains("does not exist"));
}
+ Map<String, String> properties = Maps.newHashMap();
properties.put(WorkloadGroup.CPU_SHARE, "10");
properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index b469242ee63..1e52d94f7bb 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -268,6 +268,7 @@ struct TWorkloadGroupInfo {
13: optional i32 spill_threshold_high_watermark
14: optional i64 read_bytes_per_second
15: optional i64 remote_read_bytes_per_second
+ 16: optional string tag
}
enum TWorkloadMetricType {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]