This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5d4a949840b [Improvement](executor)Routine load support workload group
#31671
5d4a949840b is described below
commit 5d4a949840bb33d3915ad2093f81935d47975389
Author: wangbo <[email protected]>
AuthorDate: Mon Mar 11 22:51:15 2024 +0800
[Improvement](executor)Routine load support workload group #31671
---
.../doris/analysis/AlterRoutineLoadStmt.java | 8 +++++
.../doris/analysis/CreateRoutineLoadStmt.java | 14 ++++++++
.../doris/load/routineload/KafkaTaskInfo.java | 27 +++++++++++++++
.../doris/load/routineload/RoutineLoadJob.java | 14 ++++++++
.../resource/workloadgroup/WorkloadGroupMgr.java | 39 ++++++++++++++++++++--
regression-test/pipeline/p0/conf/fe.conf | 1 +
.../load_p0/routine_load/test_routine_load.groovy | 8 ++++-
.../workload_manager_p0/test_curd_wlg.groovy | 12 +++----
8 files changed, 114 insertions(+), 9 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
index 2df891fbb3c..5a1f1ba56aa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory;
import org.apache.doris.load.routineload.RoutineLoadJob;
+import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
@@ -66,6 +67,7 @@ public class AlterRoutineLoadStmt extends DdlStmt {
.add(CreateRoutineLoadStmt.PARTIAL_COLUMNS)
.add(LoadStmt.STRICT_MODE)
.add(LoadStmt.TIMEZONE)
+ .add(CreateRoutineLoadStmt.WORKLOAD_GROUP)
.build();
private final LabelName labelName;
@@ -242,6 +244,12 @@ public class AlterRoutineLoadStmt extends DdlStmt {
analyzedJobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS,
String.valueOf(isPartialUpdate));
}
+ if (jobProperties.containsKey(CreateRoutineLoadStmt.WORKLOAD_GROUP)) {
+ String workloadGroup =
jobProperties.get(CreateRoutineLoadStmt.WORKLOAD_GROUP);
+ long wgId = Env.getCurrentEnv().getWorkloadGroupMgr()
+
.getWorkloadGroup(ConnectContext.get().getCurrentUserIdentity(), workloadGroup);
+ analyzedJobProperties.put(CreateRoutineLoadStmt.WORKLOAD_GROUP,
String.valueOf(wgId));
+ }
}
private void checkDataSourceProperties() throws UserException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index f859d7d8f05..d58b25195c7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -111,6 +111,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
public static final String PARTIAL_COLUMNS = "partial_columns";
+ public static final String WORKLOAD_GROUP = "workload_group";
+
private static final String NAME_TYPE = "ROUTINE LOAD NAME";
public static final String ENDPOINT_REGEX =
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
public static final String SEND_BATCH_PARALLELISM =
"send_batch_parallelism";
@@ -138,6 +140,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
.add(SEND_BATCH_PARALLELISM)
.add(LOAD_TO_SINGLE_TABLET)
.add(PARTIAL_COLUMNS)
+ .add(WORKLOAD_GROUP)
.build();
private final LabelName labelName;
@@ -179,6 +182,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private String escape;
+ private long workloadGroupId = -1;
+
/**
* support partial columns load(Only Unique Key Columns)
*/
@@ -330,6 +335,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
return comment;
}
+ public long getWorkloadGroupId() {
+ return workloadGroupId;
+ }
+
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
@@ -506,6 +515,11 @@ public class CreateRoutineLoadStmt extends DdlStmt {
if (escape != null && escape.length() != 1) {
throw new AnalysisException("escape must be single-char");
}
+ String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP);
+ if (!StringUtils.isEmpty(inputWorkloadGroupStr)) {
+ this.workloadGroupId = Env.getCurrentEnv().getWorkloadGroupMgr()
+
.getWorkloadGroup(ConnectContext.get().getCurrentUserIdentity(),
inputWorkloadGroupStr);
+ }
if (ConnectContext.get() != null) {
timezone = ConnectContext.get().getSessionVariable().getTimeZone();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index a8d387a2f6d..d8b79d9bdce 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -28,6 +28,7 @@ import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TKafkaLoadInfo;
import org.apache.doris.thrift.TLoadSourceType;
import org.apache.doris.thrift.TPipelineFragmentParams;
+import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TUniqueId;
@@ -130,6 +131,19 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
TExecPlanFragmentParams tExecPlanFragmentParams =
routineLoadJob.plan(loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
+
+ long wgId = routineLoadJob.getWorkloadId();
+ List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
+ if (wgId > 0) {
+ tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
+ .getTWorkloadGroupById(wgId);
+ }
+ if (tWgList.size() == 0) {
+ tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
+
.getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity());
+ }
+ tExecPlanFragmentParams.setWorkloadGroups(tWgList);
+
return tExecPlanFragmentParams;
}
@@ -139,6 +153,19 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
TPipelineFragmentParams tExecPlanFragmentParams =
routineLoadJob.planForPipeline(loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
+
+ long wgId = routineLoadJob.getWorkloadId();
+ List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
+ if (wgId > 0) {
+ tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
+ .getTWorkloadGroupById(wgId);
+ }
+ if (tWgList.size() == 0) {
+ tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
+
.getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity());
+ }
+ tExecPlanFragmentParams.setWorkloadGroups(tWgList);
+
return tExecPlanFragmentParams;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 8760dc4b71c..f9be2014e30 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -69,6 +69,7 @@ import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
+import com.aliyuncs.utils.StringUtils;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -117,6 +118,8 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
protected static final String STAR_STRING = "*";
+ public static final String WORKLOAD_GROUP = "workload_group";
+
@Getter
@Setter
private boolean isMultiTable = false;
@@ -394,6 +397,9 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
if (stmt.getEscape() != null) {
jobProperties.put(LoadStmt.KEY_ESCAPE, stmt.getEscape());
}
+ if (stmt.getWorkloadGroupId() > 0) {
+ jobProperties.put(WORKLOAD_GROUP,
String.valueOf(stmt.getWorkloadGroupId()));
+ }
}
private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {
@@ -479,6 +485,14 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
return database.getTableOrMetaException(tableId).getName();
}
+ public long getWorkloadId() {
+ String workloadIdStr = jobProperties.get(WORKLOAD_GROUP);
+ if (!StringUtils.isEmpty(workloadIdStr)) {
+ return Long.parseLong(workloadIdStr);
+ }
+ return -1;
+ }
+
public JobState getState() {
return state;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index a7c26f7cec5..1bd1a357127 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -183,13 +183,48 @@ public class WorkloadGroupMgr implements Writable,
GsonPostProcessable {
return workloadGroups;
}
- public WorkloadGroup getWorkloadGroupById(long wgId) {
+ public long getWorkloadGroup(UserIdentity currentUser, String groupName)
throws UserException {
+ Long workloadId = getWorkloadGroupIdByName(groupName);
+ if (workloadId == null) {
+ throw new UserException("Workload group " + groupName + " does not
exist");
+ }
+ if (!Env.getCurrentEnv().getAccessManager()
+ .checkWorkloadGroupPriv(currentUser, groupName,
PrivPredicate.USAGE)) {
+ ErrorReport.reportAnalysisException(
+ "Access denied; you need (at least one of) the %s
privilege(s) to use workload group '%s'.",
+ ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "USAGE/ADMIN",
groupName);
+ }
+ return workloadId.longValue();
+ }
+
+ public List<TPipelineWorkloadGroup> getTWorkloadGroupById(long wgId) {
+ List<TPipelineWorkloadGroup> tWorkloadGroups = Lists.newArrayList();
+ readLock();
+ try {
+ WorkloadGroup wg = idToWorkloadGroup.get(wgId);
+ if (wg != null) {
+ tWorkloadGroups.add(wg.toThrift());
+ }
+ } finally {
+ readUnlock();
+ }
+ return tWorkloadGroups;
+ }
+
+ public List<TPipelineWorkloadGroup>
getTWorkloadGroupByUserIdentity(UserIdentity user) throws UserException {
+ String groupName =
Env.getCurrentEnv().getAuth().getWorkloadGroup(user.getQualifiedUser());
+ List<TPipelineWorkloadGroup> ret = new ArrayList<>();
readLock();
try {
- return idToWorkloadGroup.get(wgId);
+ WorkloadGroup wg = nameToWorkloadGroup.get(groupName);
+ if (wg == null) {
+ throw new UserException("can not find workload group " +
groupName);
+ }
+ ret.add(wg.toThrift());
} finally {
readUnlock();
}
+ return ret;
}
public List<TopicInfo> getPublishTopicInfo() {
diff --git a/regression-test/pipeline/p0/conf/fe.conf
b/regression-test/pipeline/p0/conf/fe.conf
index 22f58cb22f5..28f1972c701 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -109,6 +109,7 @@ label_keep_max_second = 300
enable_job_schedule_second_for_test = true
enable_workload_group = true
+publish_topic_info_interval_ms = 1000
master_sync_policy = WRITE_NO_SYNC
replica_sync_policy = WRITE_NO_SYNC
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
index 963d6ebc110..87eae26a48a 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
@@ -22,6 +22,10 @@ import org.apache.kafka.clients.producer.ProducerConfig
suite("test_routine_load","p0") {
+ sql "create workload group if not exists create_routine_load_group
properties ( 'cpu_share'='123');"
+ sql "create workload group if not exists alter_routine_load_group
properties ( 'cpu_share'='123');"
+ Thread.sleep(5000) // wait publish workload group to be
+
def tables = [
"dup_tbl_basic",
"uniq_tbl_basic",
@@ -226,7 +230,8 @@ suite("test_routine_load","p0") {
"send_batch_parallelism" = "2",
"max_batch_interval" = "5",
"max_batch_rows" = "300000",
- "max_batch_size" = "209715200"
+ "max_batch_size" = "209715200",
+ "workload_group" = "create_routine_load_group"
)
FROM KAFKA
(
@@ -1932,6 +1937,7 @@ suite("test_routine_load","p0") {
sql "ALTER ROUTINE LOAD FOR ${jobs[i]} PROPERTIES(\"timezone\"
= \"Asia/Shanghai\");"
sql "ALTER ROUTINE LOAD FOR ${jobs[i]}
PROPERTIES(\"num_as_string\" = \"true\");"
sql "ALTER ROUTINE LOAD FOR ${jobs[i]}
PROPERTIES(\"fuzzy_parse\" = \"true\");"
+ sql "ALTER ROUTINE LOAD FOR ${jobs[i]}
PROPERTIES(\"workload_group\" = \"alter_routine_load_group\");"
res = sql "show routine load for ${jobs[i]}"
log.info("routine load job properties:
${res[0][11].toString()}".toString())
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 4b4fa51486e..864d1ab5b21 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -117,7 +117,7 @@ suite("test_crud_wlg") {
");"
sql "set workload_group=test_group;"
- qt_show_1 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from workload_groups() order by name;"
+ qt_show_1 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from workload_groups() where name in ('normal','test_group') order by name;"
// test memory_limit
test {
@@ -128,7 +128,7 @@ suite("test_crud_wlg") {
sql "alter workload group test_group properties ( 'memory_limit'='11%' );"
qt_mem_limit_1 """ select count(1) from ${table_name} """
- qt_mem_limit_2 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from workload_groups() order by name;"
+ qt_mem_limit_2 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from workload_groups() where name in ('normal','test_group') order by name;"
// test enable_memory_overcommit
test {
@@ -141,7 +141,7 @@ suite("test_crud_wlg") {
qt_mem_overcommit_1 """ select count(1) from ${table_name} """
sql "alter workload group test_group properties (
'enable_memory_overcommit'='false' );"
qt_mem_overcommit_2 """ select count(1) from ${table_name} """
- qt_mem_overcommit_3 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from workload_groups() order by name;"
+ qt_mem_overcommit_3 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from workload_groups() where name in ('normal','test_group') order by name;"
// test cpu_hard_limit
test {
@@ -160,7 +160,7 @@ suite("test_crud_wlg") {
sql "alter workload group test_group properties ( 'cpu_hard_limit'='20%'
);"
qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """
- qt_cpu_hard_limit_2 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from workload_groups() order by name;"
+ qt_cpu_hard_limit_2 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from workload_groups() where name in ('normal','test_group') order by name;"
// test query queue
test {
@@ -183,7 +183,7 @@ suite("test_crud_wlg") {
sql "alter workload group test_group properties ( 'max_concurrency'='100'
);"
qt_queue_1 """ select count(1) from ${table_name} """
- qt_show_queue "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from workload_groups() order by name;"
+ qt_show_queue "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from workload_groups() where name in ('normal','test_group') order by name;"
// test create group failed
// failed for cpu_share
@@ -261,7 +261,7 @@ suite("test_crud_wlg") {
}
// test show workload groups
- qt_select_tvf_1 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from workload_groups() order by name;"
+ qt_select_tvf_1 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from workload_groups() where name in ('normal','test_group') order by name;"
// test auth
sql """drop user if exists test_wlg_user"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]