This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 5cecbfc6eaf [cherry-pick]Add workload metric query_be_memory (#35911)
5cecbfc6eaf is described below
commit 5cecbfc6eaf1224bf4c8f466c0399a51574c2071
Author: wangbo <[email protected]>
AuthorDate: Thu Jun 6 14:33:30 2024 +0800
[cherry-pick]Add workload metric query_be_memory (#35911)
---
be/src/runtime/runtime_query_statistics_mgr.cpp | 2 +
.../workload_management/workload_action.cpp | 10 +++--
.../workload_management/workload_condition.cpp | 13 ++++++
.../workload_management/workload_condition.h | 17 +++++++-
.../workload_management/workload_query_info.h | 2 +
.../workload_management/workload_sched_policy.cpp | 2 +
.../workloadschedpolicy/WorkloadActionMeta.java | 12 +++---
.../workloadschedpolicy/WorkloadCondition.java | 2 +
.../WorkloadConditionBeScanBytes.java | 11 ++++--
.../WorkloadConditionBeScanRows.java | 11 ++++--
.../workloadschedpolicy/WorkloadConditionMeta.java | 14 +++----
...ws.java => WorkloadConditionQueryBeMemory.java} | 32 ++++++++-------
.../WorkloadConditionQueryTime.java | 11 ++++--
.../workloadschedpolicy/WorkloadMetricType.java | 2 +-
.../workloadschedpolicy/WorkloadSchedPolicy.java | 27 ++-----------
.../WorkloadSchedPolicyMgr.java | 46 +++++++++++++++++++++-
gensrc/thrift/BackendService.thrift | 1 +
.../test_workload_sched_policy.groovy | 30 ++++++++++++++
18 files changed, 176 insertions(+), 69 deletions(-)
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 9764b0f0507..955d1b9a7e8 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -190,6 +190,8 @@ void RuntimeQueryStatiticsMgr::get_metric_map(
metric_map.emplace(WorkloadMetricType::QUERY_TIME,
std::to_string(query_time_ms));
metric_map.emplace(WorkloadMetricType::SCAN_ROWS,
std::to_string(ret_qs.get_scan_rows()));
metric_map.emplace(WorkloadMetricType::SCAN_BYTES,
std::to_string(ret_qs.get_scan_bytes()));
+ metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES,
+ std::to_string(ret_qs.get_current_used_memory_bytes()));
}
void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id,
int64_t wg_id) {
diff --git a/be/src/runtime/workload_management/workload_action.cpp
b/be/src/runtime/workload_management/workload_action.cpp
index 39916bc7cc1..b36895594dc 100644
--- a/be/src/runtime/workload_management/workload_action.cpp
+++ b/be/src/runtime/workload_management/workload_action.cpp
@@ -22,10 +22,14 @@
namespace doris {
void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) {
- LOG(INFO) << "[workload_schedule]workload scheduler cancel query " <<
query_info->query_id;
+ std::stringstream msg;
+ msg << "query " << query_info->query_id
+ << " cancelled by workload policy: " << query_info->policy_name
+ << ", id:" << query_info->policy_id;
+ std::string msg_str = msg.str();
+ LOG(INFO) << "[workload_schedule]" << msg_str;
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
- query_info->tquery_id, PPlanFragmentCancelReason::INTERNAL_ERROR,
- std::string("query canceled by workload scheduler"));
+ query_info->tquery_id, PPlanFragmentCancelReason::INTERNAL_ERROR,
msg_str);
}
void WorkloadActionMoveQuery::exec(WorkloadQueryInfo* query_info) {
diff --git a/be/src/runtime/workload_management/workload_condition.cpp
b/be/src/runtime/workload_management/workload_condition.cpp
index dff6f2adc24..62c6072a60c 100644
--- a/be/src/runtime/workload_management/workload_condition.cpp
+++ b/be/src/runtime/workload_management/workload_condition.cpp
@@ -56,4 +56,17 @@ bool WorkloadConditionScanBytes::eval(std::string str_val) {
return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_args,
_scan_bytes);
}
+// query memory
+WorkloadConditionQueryMemory::WorkloadConditionQueryMemory(WorkloadCompareOperator
op,
+ std::string
str_val) {
+ _op = op;
+ _query_memory_bytes = std::stol(str_val);
+}
+
+bool WorkloadConditionQueryMemory::eval(std::string str_val) {
+ int64_t query_memory_bytes = std::stol(str_val);
+ return WorkloadCompareUtils::compare_signed_integer(_op,
query_memory_bytes,
+ _query_memory_bytes);
+}
+
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_condition.h
b/be/src/runtime/workload_management/workload_condition.h
index 96387a2af41..a85268a8dc3 100644
--- a/be/src/runtime/workload_management/workload_condition.h
+++ b/be/src/runtime/workload_management/workload_condition.h
@@ -23,7 +23,7 @@
namespace doris {
-enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES };
+enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES,
QUERY_MEMORY_BYTES };
class WorkloadCondition {
public:
@@ -74,6 +74,19 @@ private:
WorkloadCompareOperator _op;
};
+class WorkloadConditionQueryMemory : public WorkloadCondition {
+public:
+ WorkloadConditionQueryMemory(WorkloadCompareOperator op, std::string
str_val);
+ bool eval(std::string str_val) override;
+ WorkloadMetricType get_workload_metric_type() override {
+ return WorkloadMetricType::QUERY_MEMORY_BYTES;
+ }
+
+private:
+ int64_t _query_memory_bytes;
+ WorkloadCompareOperator _op;
+};
+
class WorkloadConditionFactory {
public:
static std::unique_ptr<WorkloadCondition> create_workload_condition(
@@ -88,6 +101,8 @@ public:
return std::make_unique<WorkloadConditionScanRows>(op, str_val);
} else if (TWorkloadMetricType::type::BE_SCAN_BYTES == metric_name) {
return std::make_unique<WorkloadConditionScanBytes>(op, str_val);
+ } else if (TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES ==
metric_name) {
+ return std::make_unique<WorkloadConditionQueryMemory>(op, str_val);
}
LOG(ERROR) << "not find a metric name " << metric_name;
return nullptr;
diff --git a/be/src/runtime/workload_management/workload_query_info.h
b/be/src/runtime/workload_management/workload_query_info.h
index f2da31b6196..e544668e103 100644
--- a/be/src/runtime/workload_management/workload_query_info.h
+++ b/be/src/runtime/workload_management/workload_query_info.h
@@ -29,6 +29,8 @@ public:
TUniqueId tquery_id;
std::string query_id;
int64_t wg_id;
+ int64_t policy_id;
+ std::string policy_name;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp
b/be/src/runtime/workload_management/workload_sched_policy.cpp
index b97eb85c068..efa8965dd77 100644
--- a/be/src/runtime/workload_management/workload_sched_policy.cpp
+++ b/be/src/runtime/workload_management/workload_sched_policy.cpp
@@ -75,6 +75,8 @@ bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo*
query_info_ptr) {
void WorkloadSchedPolicy::exec_action(WorkloadQueryInfo* query_info) {
for (int i = 0; i < _action_list.size(); i++) {
+ query_info->policy_id = this->_id;
+ query_info->policy_name = this->_name;
_action_list[i]->exec(query_info);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
index 57f6ba37993..2ce05412844 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
@@ -37,14 +37,12 @@ public class WorkloadActionMeta {
}
static WorkloadActionType getWorkloadActionType(String strType) throws
UserException {
- if
(WorkloadActionType.CANCEL_QUERY.toString().equalsIgnoreCase(strType)) {
- return WorkloadActionType.CANCEL_QUERY;
- } else if
(WorkloadActionType.MOVE_QUERY_TO_GROUP.toString().equalsIgnoreCase(strType)) {
- return WorkloadActionType.MOVE_QUERY_TO_GROUP;
- } else if
(WorkloadActionType.SET_SESSION_VARIABLE.toString().equalsIgnoreCase(strType)) {
- return WorkloadActionType.SET_SESSION_VARIABLE;
+ WorkloadActionType workloadActionType =
WorkloadSchedPolicyMgr.STRING_ACTION_MAP.get(strType.toUpperCase());
+ if (workloadActionType == null) {
+ throw new UserException("invalid action type " + strType);
+ } else {
+ return workloadActionType;
}
- throw new UserException("invalid action type " + strType);
}
public String toString() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
index 5d89d2afae9..c790a401308 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
@@ -37,6 +37,8 @@ public interface WorkloadCondition {
return WorkloadConditionBeScanRows.createWorkloadCondition(cm.op,
cm.value);
} else if (WorkloadMetricType.BE_SCAN_BYTES.equals(cm.metricName)) {
return WorkloadConditionBeScanBytes.createWorkloadCondition(cm.op,
cm.value);
+ } else if
(WorkloadMetricType.QUERY_BE_MEMORY_BYTES.equals(cm.metricName)) {
+ return
WorkloadConditionQueryBeMemory.createWorkloadCondition(cm.op, cm.value);
}
throw new UserException("invalid metric name:" + cm.metricName);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java
index 7431f2e0c4f..bd914baf54e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java
@@ -38,9 +38,14 @@ public class WorkloadConditionBeScanBytes implements
WorkloadCondition {
public static WorkloadConditionBeScanBytes
createWorkloadCondition(WorkloadConditionOperator op, String value)
throws UserException {
- long longValue = Long.parseLong(value);
- if (longValue < 0) {
- throw new UserException("invalid scan bytes value, " + longValue +
", it requires >= 0");
+ long longValue = -1;
+ try {
+ longValue = Long.parseLong(value);
+ if (longValue < 0) {
+ throw new NumberFormatException();
+ }
+ } catch (NumberFormatException e) {
+ throw new UserException("invalid scan bytes value: " + value + ",
it requires >= 0");
}
return new WorkloadConditionBeScanBytes(op, longValue);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
index c2fb638e082..8b99e40d04d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
@@ -38,9 +38,14 @@ public class WorkloadConditionBeScanRows implements
WorkloadCondition {
public static WorkloadConditionBeScanRows
createWorkloadCondition(WorkloadConditionOperator op, String value)
throws UserException {
- long longValue = Long.parseLong(value);
- if (longValue < 0) {
- throw new UserException("invalid scan rows value, " + longValue +
", it requires >= 0");
+ long longValue = -1;
+ try {
+ longValue = Long.parseLong(value);
+ if (longValue < 0) {
+ throw new NumberFormatException();
+ }
+ } catch (NumberFormatException e) {
+ throw new UserException("invalid scan rows value: " + value + ",
it requires >= 0");
}
return new WorkloadConditionBeScanRows(op, longValue);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
index 52f50f924fc..81e0f6c2188 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
@@ -40,16 +40,12 @@ public class WorkloadConditionMeta {
}
private static WorkloadMetricType getMetricType(String metricStr) throws
UserException {
- if
(WorkloadMetricType.USERNAME.toString().equalsIgnoreCase(metricStr)) {
- return WorkloadMetricType.USERNAME;
- } else if
(WorkloadMetricType.QUERY_TIME.toString().equalsIgnoreCase(metricStr)) {
- return WorkloadMetricType.QUERY_TIME;
- } else if
(WorkloadMetricType.BE_SCAN_ROWS.toString().equalsIgnoreCase(metricStr)) {
- return WorkloadMetricType.BE_SCAN_ROWS;
- } else if
(WorkloadMetricType.BE_SCAN_BYTES.toString().equalsIgnoreCase(metricStr)) {
- return WorkloadMetricType.BE_SCAN_BYTES;
+ WorkloadMetricType metricType =
WorkloadSchedPolicyMgr.STRING_METRIC_MAP.get(metricStr.toUpperCase());
+ if (metricType == null) {
+ throw new UserException("invalid metric name:" + metricStr);
+ } else {
+ return metricType;
}
- throw new UserException("invalid metric name:" + metricStr);
}
public String toString() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java
similarity index 60%
copy from
fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
copy to
fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java
index c2fb638e082..2274b35ca51 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java
@@ -19,34 +19,38 @@ package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.common.UserException;
-public class WorkloadConditionBeScanRows implements WorkloadCondition {
+public class WorkloadConditionQueryBeMemory implements WorkloadCondition {
private long value;
private WorkloadConditionOperator op;
- public WorkloadConditionBeScanRows(WorkloadConditionOperator op, long
value) {
- this.op = op;
+ public WorkloadConditionQueryBeMemory(WorkloadConditionOperator op, long
value) {
this.value = value;
+ this.op = op;
}
@Override
public boolean eval(String strValue) {
- // currently not support run in fe, so this condition never match
return false;
}
- public static WorkloadConditionBeScanRows
createWorkloadCondition(WorkloadConditionOperator op, String value)
- throws UserException {
- long longValue = Long.parseLong(value);
- if (longValue < 0) {
- throw new UserException("invalid scan rows value, " + longValue +
", it requires >= 0");
- }
- return new WorkloadConditionBeScanRows(op, longValue);
- }
-
@Override
public WorkloadMetricType getMetricType() {
- return WorkloadMetricType.BE_SCAN_ROWS;
+ return WorkloadMetricType.QUERY_BE_MEMORY_BYTES;
+ }
+
+ public static WorkloadConditionQueryBeMemory
createWorkloadCondition(WorkloadConditionOperator op,
+ String value) throws UserException {
+ long longValue = -1;
+ try {
+ longValue = Long.parseLong(value);
+ if (longValue < 0) {
+ throw new NumberFormatException();
+ }
+ } catch (NumberFormatException e) {
+ throw new UserException("invalid query be memory value: " + value
+ ", it requires >= 0");
+ }
+ return new WorkloadConditionQueryBeMemory(op, longValue);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java
index e61484508df..6c3a5c653aa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java
@@ -37,9 +37,14 @@ public class WorkloadConditionQueryTime implements
WorkloadCondition {
public static WorkloadConditionQueryTime
createWorkloadCondition(WorkloadConditionOperator op, String value)
throws UserException {
- long longValue = Long.parseLong(value);
- if (longValue < 0) {
- throw new UserException("invalid query time value, " + longValue +
", it requires >= 0");
+ long longValue = -1;
+ try {
+ longValue = Long.parseLong(value);
+ if (longValue < 0) {
+ throw new NumberFormatException();
+ }
+ } catch (NumberFormatException e) {
+ throw new UserException("invalid query time value: " + value + ",
it requires >= 0");
}
return new WorkloadConditionQueryTime(op, longValue);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java
index ed17414ec45..93e612a85c2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java
@@ -18,5 +18,5 @@
package org.apache.doris.resource.workloadschedpolicy;
public enum WorkloadMetricType {
- USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES
+ USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES, QUERY_BE_MEMORY_BYTES
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
index 55759e90972..ff27a08706b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
@@ -22,7 +22,6 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
-import org.apache.doris.thrift.TCompareOperator;
import org.apache.doris.thrift.TWorkloadAction;
import org.apache.doris.thrift.TWorkloadActionType;
import org.apache.doris.thrift.TWorkloadCondition;
@@ -31,7 +30,6 @@ import org.apache.doris.thrift.TWorkloadSchedPolicy;
import org.apache.doris.thrift.TopicInfo;
import com.esotericsoftware.minlog.Log;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.gson.annotations.SerializedName;
@@ -51,25 +49,6 @@ public class WorkloadSchedPolicy implements Writable,
GsonPostProcessable {
public static final ImmutableSet<String> POLICY_PROPERTIES = new
ImmutableSet.Builder<String>()
.add(ENABLED).add(PRIORITY).add(WORKLOAD_GROUP).build();
- // used for convert fe type to thrift type
- private static ImmutableMap<WorkloadMetricType, TWorkloadMetricType>
METRIC_MAP
- = new ImmutableMap.Builder<WorkloadMetricType,
TWorkloadMetricType>()
- .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME)
- .put(WorkloadMetricType.BE_SCAN_ROWS,
TWorkloadMetricType.BE_SCAN_ROWS)
- .put(WorkloadMetricType.BE_SCAN_BYTES,
TWorkloadMetricType.BE_SCAN_BYTES).build();
- private static ImmutableMap<WorkloadActionType, TWorkloadActionType>
ACTION_MAP
- = new ImmutableMap.Builder<WorkloadActionType,
TWorkloadActionType>()
- .put(WorkloadActionType.MOVE_QUERY_TO_GROUP,
TWorkloadActionType.MOVE_QUERY_TO_GROUP)
- .put(WorkloadActionType.CANCEL_QUERY,
TWorkloadActionType.CANCEL_QUERY).build();
-
- private static ImmutableMap<WorkloadConditionOperator, TCompareOperator>
OP_MAP
- = new ImmutableMap.Builder<WorkloadConditionOperator,
TCompareOperator>()
- .put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL)
- .put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER)
- .put(WorkloadConditionOperator.GREATER_EQUAL,
TCompareOperator.GREATER_EQUAL)
- .put(WorkloadConditionOperator.LESS, TCompareOperator.LESS)
- .put(WorkloadConditionOperator.LESS_EQUAl,
TCompareOperator.LESS_EQUAL).build();
-
@SerializedName(value = "id")
long id;
@SerializedName(value = "name")
@@ -255,12 +234,12 @@ public class WorkloadSchedPolicy implements Writable,
GsonPostProcessable {
List<TWorkloadCondition> condList = new ArrayList();
for (WorkloadConditionMeta cond : conditionMetaList) {
TWorkloadCondition tCond = new TWorkloadCondition();
- TWorkloadMetricType metricType = METRIC_MAP.get(cond.metricName);
+ TWorkloadMetricType metricType =
WorkloadSchedPolicyMgr.METRIC_MAP.get(cond.metricName);
if (metricType == null) {
return null;
}
tCond.setMetricName(metricType);
- tCond.setOp(OP_MAP.get(cond.op));
+ tCond.setOp(WorkloadSchedPolicyMgr.OP_MAP.get(cond.op));
tCond.setValue(cond.value);
condList.add(tCond);
}
@@ -268,7 +247,7 @@ public class WorkloadSchedPolicy implements Writable,
GsonPostProcessable {
List<TWorkloadAction> actionList = new ArrayList();
for (WorkloadActionMeta action : actionMetaList) {
TWorkloadAction tAction = new TWorkloadAction();
- TWorkloadActionType tActionType = ACTION_MAP.get(action.action);
+ TWorkloadActionType tActionType =
WorkloadSchedPolicyMgr.ACTION_MAP.get(action.action);
if (tActionType == null) {
return null;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
index 4aa7563f8d7..3879dd83b9a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
@@ -35,11 +35,15 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
+import org.apache.doris.thrift.TCompareOperator;
import org.apache.doris.thrift.TUserIdentity;
+import org.apache.doris.thrift.TWorkloadActionType;
+import org.apache.doris.thrift.TWorkloadMetricType;
import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
@@ -80,6 +84,14 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon
implements Writable, Gs
.add("WorkloadGroup")
.build();
+ public static final ImmutableMap<WorkloadConditionOperator,
TCompareOperator> OP_MAP
+ = new ImmutableMap.Builder<WorkloadConditionOperator,
TCompareOperator>()
+ .put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL)
+ .put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER)
+ .put(WorkloadConditionOperator.GREATER_EQUAL,
TCompareOperator.GREATER_EQUAL)
+ .put(WorkloadConditionOperator.LESS, TCompareOperator.LESS)
+ .put(WorkloadConditionOperator.LESS_EQUAl,
TCompareOperator.LESS_EQUAL).build();
+
public static final ImmutableSet<WorkloadActionType> FE_ACTION_SET
= new
ImmutableSet.Builder<WorkloadActionType>().add(WorkloadActionType.SET_SESSION_VARIABLE).build();
@@ -93,7 +105,39 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon
implements Writable, Gs
public static final ImmutableSet<WorkloadMetricType> BE_METRIC_SET
= new
ImmutableSet.Builder<WorkloadMetricType>().add(WorkloadMetricType.BE_SCAN_ROWS)
-
.add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME).build();
+
.add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME)
+ .add(WorkloadMetricType.QUERY_BE_MEMORY_BYTES).build();
+
+ // used for convert fe type to thrift type
+ public static final ImmutableMap<WorkloadMetricType, TWorkloadMetricType>
METRIC_MAP
+ = new ImmutableMap.Builder<WorkloadMetricType,
TWorkloadMetricType>()
+ .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME)
+ .put(WorkloadMetricType.BE_SCAN_ROWS,
TWorkloadMetricType.BE_SCAN_ROWS)
+ .put(WorkloadMetricType.BE_SCAN_BYTES,
TWorkloadMetricType.BE_SCAN_BYTES)
+ .put(WorkloadMetricType.QUERY_BE_MEMORY_BYTES,
TWorkloadMetricType.QUERY_BE_MEMORY_BYTES).build();
+ public static final ImmutableMap<WorkloadActionType, TWorkloadActionType>
ACTION_MAP
+ = new ImmutableMap.Builder<WorkloadActionType,
TWorkloadActionType>()
+ .put(WorkloadActionType.MOVE_QUERY_TO_GROUP,
TWorkloadActionType.MOVE_QUERY_TO_GROUP)
+ .put(WorkloadActionType.CANCEL_QUERY,
TWorkloadActionType.CANCEL_QUERY).build();
+
+ public static final Map<String, WorkloadMetricType> STRING_METRIC_MAP =
new HashMap<>();
+ public static final Map<String, WorkloadActionType> STRING_ACTION_MAP =
new HashMap<>();
+
+ static {
+ for (WorkloadMetricType metricType : FE_METRIC_SET) {
+ STRING_METRIC_MAP.put(metricType.toString(), metricType);
+ }
+ for (WorkloadMetricType metricType : BE_METRIC_SET) {
+ STRING_METRIC_MAP.put(metricType.toString(), metricType);
+ }
+
+ for (WorkloadActionType actionType : FE_ACTION_SET) {
+ STRING_ACTION_MAP.put(actionType.toString(), actionType);
+ }
+ for (WorkloadActionType actionType : BE_ACTION_SET) {
+ STRING_ACTION_MAP.put(actionType.toString(), actionType);
+ }
+ }
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index d93520206c5..0a2edb8ccbf 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -187,6 +187,7 @@ enum TWorkloadMetricType {
QUERY_TIME
BE_SCAN_ROWS
BE_SCAN_BYTES
+ QUERY_BE_MEMORY_BYTES
}
enum TCompareOperator {
diff --git
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
index d3f9b35426a..2536b06ce7a 100644
---
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
+++
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
@@ -23,6 +23,9 @@ suite("test_workload_sched_policy") {
sql "drop workload policy if exists set_action_policy;"
sql "drop workload policy if exists fe_policy;"
sql "drop workload policy if exists be_policy;"
+ sql "drop workload policy if exists be_scan_row_policy;"
+ sql "drop workload policy if exists be_scan_bytes_policy;"
+ sql "drop workload policy if exists query_be_memory_used;"
// 1 create cancel policy
sql "create workload policy test_cancel_policy " +
@@ -106,11 +109,38 @@ suite("test_workload_sched_policy") {
exception "duplicate set_session_variable action args one policy"
}
+ test {
+ sql "create workload policy invalid_metric_value_policy
conditions(query_be_memory_bytes > '-1') actions(cancel_query);"
+ exception "invalid"
+ }
+
+ test {
+ sql "create workload policy invalid_metric_value_policy
conditions(query_time > '-1') actions(cancel_query);"
+ exception "invalid"
+ }
+
+ test {
+ sql "create workload policy invalid_metric_value_policy
conditions(be_scan_rows > '-1') actions(cancel_query);"
+ exception "invalid"
+ }
+
+ test {
+ sql "create workload policy invalid_metric_value_policy
conditions(be_scan_bytes > '-1') actions(cancel_query);"
+ exception "invalid"
+ }
+
+ sql "create workload policy be_scan_row_policy conditions(be_scan_rows >
1) actions(cancel_query) properties('enabled'='false');"
+ sql "create workload policy be_scan_bytes_policy conditions(be_scan_bytes
> 1) actions(cancel_query) properties('enabled'='false');"
+ sql "create workload policy query_be_memory_used
conditions(query_be_memory_bytes > 1) actions(cancel_query)
properties('enabled'='false');"
+
// drop
sql "drop workload policy test_cancel_policy;"
sql "drop workload policy set_action_policy;"
sql "drop workload policy fe_policy;"
sql "drop workload policy be_policy;"
+ sql "drop workload policy be_scan_row_policy;"
+ sql "drop workload policy be_scan_bytes_policy;"
+ sql "drop workload policy query_be_memory_used;"
qt_select_policy_tvf_after_drop "select
name,condition,action,priority,enabled,version from
information_schema.workload_policy where name
in('be_policy','fe_policy','set_action_policy','test_cancel_policy') order by
name;"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]