This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new 700d78ae2c [api-change](http) change kill query http api by using
query id (#12125)
700d78ae2c is described below
commit 700d78ae2c385b0960180a25f975f510256c81fb
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Aug 29 09:03:44 2022 +0800
[api-change](http) change kill query http api by using query id (#12125)
Now user can cancel query id by http by following steps:
1. Get query id by trace id
2. cancel query by query id
The modified api has not been released yet.
---
.../fe/manager/query-profile-action.md | 16 +++--
.../fe/manager/query-profile-action.md | 11 ++--
.../httpv2/rest/manager/QueryProfileAction.java | 57 +++++++++++------
.../java/org/apache/doris/qe/ConnectContext.java | 13 ++++
.../java/org/apache/doris/qe/ConnectScheduler.java | 24 +++++++
.../main/java/org/apache/doris/qe/VariableMgr.java | 10 +--
.../org/apache/doris/qe/VariableVarCallbackI.java | 24 +++++++
.../org/apache/doris/qe/VariableVarCallbacks.java | 73 ++++++++++++++++++++++
.../java/org/apache/doris/qe/VariableMgrTest.java | 8 +++
9 files changed, 200 insertions(+), 36 deletions(-)
diff --git
a/docs/en/administrator-guide/http-actions/fe/manager/query-profile-action.md
b/docs/en/administrator-guide/http-actions/fe/manager/query-profile-action.md
index 50d59191a8..0b86949483 100644
---
a/docs/en/administrator-guide/http-actions/fe/manager/query-profile-action.md
+++
b/docs/en/administrator-guide/http-actions/fe/manager/query-profile-action.md
@@ -34,9 +34,15 @@ under the License.
`GET /rest/v2/manager/query/profile/text/{query_id}`
+`GET /rest/v2/manager/query/profile/graph/{query_id}`
+
+`GET /rest/v2/manager/query/profile/json/{query_id}`
+
`GET /rest/v2/manager/query/profile/fragments/{query_id}`
-`GET /rest/v2/manager/query/profile/graph/{query_id}`
+`GET /rest/v2/manager/query/current_queries`
+
+`GET /rest/v2/manager/query/kill/{query_id}`
## Get the query information
@@ -342,7 +348,7 @@ Same as `show proc "/current_query_stmts"`, return current
running queries.
## Cancel query
-`POST /rest/v2/manager/query/kill/{connection_id}`
+`POST /rest/v2/manager/query/kill/{query_id}`
### Description
@@ -350,9 +356,9 @@ Cancel query of specified connection.
### Path parameters
-* `{connection_id}`
+* `{query_id}`
- connection id
+ query id. You can get query id by `trance_id` api.
### Query parameters
@@ -362,7 +368,7 @@ Cancel query of specified connection.
{
"msg": "success",
"code": 0,
- "data": "",
+ "data": null,
"count": 0
}
```
diff --git
a/docs/zh-CN/administrator-guide/http-actions/fe/manager/query-profile-action.md
b/docs/zh-CN/administrator-guide/http-actions/fe/manager/query-profile-action.md
index 1917237807..5283107279 100644
---
a/docs/zh-CN/administrator-guide/http-actions/fe/manager/query-profile-action.md
+++
b/docs/zh-CN/administrator-guide/http-actions/fe/manager/query-profile-action.md
@@ -42,8 +42,7 @@ under the License.
`GET /rest/v2/manager/query/current_queries`
-`GET /rest/v2/manager/query/kill/{connection_id}`
-
+`GET /rest/v2/manager/query/kill/{query_id}`
## 获取查询信息
@@ -349,7 +348,7 @@ GET /rest/v2/manager/query/query_info
## 取消query
-`POST /rest/v2/manager/query/kill/{connection_id}`
+`POST /rest/v2/manager/query/kill/{query_id}`
### Description
@@ -357,9 +356,9 @@ GET /rest/v2/manager/query/query_info
### Path parameters
-* `{connection_id}`
+* `{query_id}`
- connection id
+ query id. 你可以通过 trace_id 接口,获取 query id。
### Query parameters
@@ -369,7 +368,7 @@ GET /rest/v2/manager/query/query_info
{
"msg": "success",
"code": 0,
- "data": "",
+ "data": null,
"count": 0
}
```
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
index e3f83b9e3c..0b68c65184 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
@@ -45,6 +45,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.json.simple.JSONObject;
+import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -70,7 +71,7 @@ import javax.servlet.http.HttpServletResponse;
* 4. /trace_id/{trace_id}
* 5. /profile/fragments/{query_id}
* 6. /current_queries
- * 7. /kill/{connection_id}
+ * 7. /kill/{query_id}
*/
@RestController
@RequestMapping("/rest/v2/manager/query")
@@ -100,7 +101,8 @@ public class QueryProfileAction extends RestBaseController {
.add(NODE).add(USER).add(DEFAULT_DB).add(SQL_STATEMENT).add(QUERY_TYPE).add(START_TIME).add(END_TIME)
.add(TOTAL).add(QUERY_STATE).build();
- private List<String> requestAllFe(String httpPath, Map<String, String>
arguments, String authorization) {
+ private List<String> requestAllFe(String httpPath, Map<String, String>
arguments, String authorization,
+ HttpMethod method) {
List<Pair<String, Integer>> frontends = HttpUtils.getFeList();
ImmutableMap<String, String> header = ImmutableMap.<String,
String>builder()
.put(NodeAction.AUTHORIZATION, authorization).build();
@@ -108,7 +110,12 @@ public class QueryProfileAction extends RestBaseController
{
for (Pair<String, Integer> ipPort : frontends) {
String url = HttpUtils.concatUrl(ipPort, httpPath, arguments);
try {
- String data = HttpUtils.parseResponse(HttpUtils.doGet(url,
header));
+ String data = null;
+ if (method == HttpMethod.GET) {
+ data = HttpUtils.parseResponse(HttpUtils.doGet(url,
header));
+ } else if (method == HttpMethod.POST) {
+ data = HttpUtils.parseResponse(HttpUtils.doPost(url,
header, null));
+ }
if (!Strings.isNullOrEmpty(data) && !data.equals("{}")) {
dataList.add(data);
}
@@ -145,12 +152,12 @@ public class QueryProfileAction extends
RestBaseController {
arguments.put(SEARCH_PARA, search);
arguments.put(IS_ALL_NODE_PARA, "false");
- List<String> dataList = requestAllFe(httpPath, arguments,
request.getHeader(NodeAction.AUTHORIZATION));
+ List<String> dataList = requestAllFe(httpPath, arguments,
request.getHeader(NodeAction.AUTHORIZATION),
+ HttpMethod.GET);
for (String data : dataList) {
try {
- NodeAction.NodeInfo nodeInfo =
GsonUtils.GSON.fromJson(data,
- new TypeToken<NodeAction.NodeInfo>() {
- }.getType());
+ NodeAction.NodeInfo nodeInfo =
GsonUtils.GSON.fromJson(data, new TypeToken<NodeAction.NodeInfo>() {
+ }.getType());
queries.addAll(nodeInfo.getRows());
} catch (Exception e) {
LOG.warn("parse query info error: {}", data, e);
@@ -200,7 +207,8 @@ public class QueryProfileAction extends RestBaseController {
String httpPath = "/rest/v2/manager/query/sql/" + queryId;
ImmutableMap<String, String> arguments = ImmutableMap.<String,
String>builder()
.put(IS_ALL_NODE_PARA, "false").build();
- List<String> dataList = requestAllFe(httpPath, arguments,
request.getHeader(NodeAction.AUTHORIZATION));
+ List<String> dataList = requestAllFe(httpPath, arguments,
request.getHeader(NodeAction.AUTHORIZATION),
+ HttpMethod.GET);
if (!dataList.isEmpty()) {
try {
String sql =
JsonParser.parseString(dataList.get(0)).getAsJsonObject().get("sql").getAsString();
@@ -291,7 +299,8 @@ public class QueryProfileAction extends RestBaseController {
}
}
} else {
- String queryId =
ProfileManager.getInstance().getQueryIdByTraceId(traceId);
+ ExecuteEnv env = ExecuteEnv.getInstance();
+ String queryId = env.getScheduler().getQueryIdByTraceId(traceId);
if (Strings.isNullOrEmpty(queryId)) {
return ResponseEntityBuilder.badRequest("Not found");
}
@@ -424,7 +433,8 @@ public class QueryProfileAction extends RestBaseController {
if (!Strings.isNullOrEmpty(instanceId)) {
builder.put(INSTANCE_ID, instanceId);
}
- List<String> dataList = requestAllFe(httpPath, builder.build(),
request.getHeader(NodeAction.AUTHORIZATION));
+ List<String> dataList = requestAllFe(httpPath, builder.build(),
request.getHeader(NodeAction.AUTHORIZATION),
+ HttpMethod.GET);
Map<String, String> result = Maps.newHashMap();
if (!dataList.isEmpty()) {
try {
@@ -457,7 +467,8 @@ public class QueryProfileAction extends RestBaseController {
Map<String, String> arguments = Maps.newHashMap();
arguments.put(IS_ALL_NODE_PARA, "false");
List<List<String>> queries = Lists.newArrayList();
- List<String> dataList = requestAllFe(httpPath, arguments,
request.getHeader(NodeAction.AUTHORIZATION));
+ List<String> dataList = requestAllFe(httpPath, arguments,
request.getHeader(NodeAction.AUTHORIZATION),
+ HttpMethod.GET);
for (String data : dataList) {
try {
NodeAction.NodeInfo nodeInfo =
GsonUtils.GSON.fromJson(data, new TypeToken<NodeAction.NodeInfo>() {
@@ -490,25 +501,31 @@ public class QueryProfileAction extends
RestBaseController {
}
/**
- * kill queries with specified connection id
+ * kill queries with specific query id
*
* @param request
* @param response
- * @param connectionId
+ * @param queryId
* @return
*/
- @RequestMapping(path = "/kill/{connection_id}", method =
RequestMethod.POST)
+ @RequestMapping(path = "/kill/{query_id}", method = RequestMethod.POST)
public Object killQuery(HttpServletRequest request, HttpServletResponse
response,
- @PathVariable("connection_id") int connectionId) {
+ @PathVariable("query_id") String queryId,
+ @RequestParam(value = IS_ALL_NODE_PARA, required = false,
defaultValue = "true") boolean isAllNode) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(),
PrivPredicate.ADMIN);
- ExecuteEnv env = ExecuteEnv.getInstance();
- ConnectContext ctx = env.getScheduler().getContext(connectionId);
- if (ctx == null) {
- return ResponseEntityBuilder.notFound("connection not found");
+ if (isAllNode) {
+ // Get current queries from all FE
+ String httpPath = "/rest/v2/manager/query/kill/" + queryId;
+ Map<String, String> arguments = Maps.newHashMap();
+ arguments.put(IS_ALL_NODE_PARA, "false");
+ requestAllFe(httpPath, arguments,
request.getHeader(NodeAction.AUTHORIZATION), HttpMethod.POST);
+ return ResponseEntityBuilder.ok();
}
- ctx.cancelQuery();
+
+ ExecuteEnv env = ExecuteEnv.getInstance();
+ env.getScheduler().cancelQuery(queryId);
return ResponseEntityBuilder.ok();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index ce9a13b705..e1803844f9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -35,6 +35,7 @@ import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionStatus;
+import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -58,6 +59,7 @@ public class ConnectContext {
protected volatile long forwardedStmtId;
protected volatile TUniqueId queryId;
+ protected volatile String traceId;
// id for this connection
protected volatile int connectionId;
// mysql net
@@ -438,6 +440,17 @@ public class ConnectContext {
public void setQueryId(TUniqueId queryId) {
this.queryId = queryId;
+ if (connectScheduler != null && !Strings.isNullOrEmpty(traceId)) {
+ connectScheduler.putTraceId2QueryId(traceId, queryId);
+ }
+ }
+
+ public void setTraceId(String traceId) {
+ this.traceId = traceId;
+ }
+
+ public String traceId() {
+ return traceId;
}
public TUniqueId queryId() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index ec48687330..be4f13c9f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -22,9 +22,11 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.ldap.LdapAuthenticate;
+import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.mysql.MysqlProto;
import org.apache.doris.mysql.nio.NConnectContext;
import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -52,6 +54,9 @@ public class ConnectScheduler {
private Map<String, AtomicInteger> connByUser = Maps.newConcurrentMap();
private ExecutorService executor =
ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num,
"connect-scheduler-pool", true);
+ // valid trace id -> query id
+ private final Map<String, TUniqueId> traceId2QueryId =
Maps.newConcurrentMap();
+
// Use a thread to check whether connection is timeout. Because
// 1. If use a scheduler, the task maybe a huge number when query is messy.
// Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks
in scheduler.
@@ -132,6 +137,16 @@ public class ConnectScheduler {
return connectionMap.get(connectionId);
}
+ public void cancelQuery(String queryId) {
+ for (ConnectContext ctx : connectionMap.values()) {
+ TUniqueId qid = ctx.queryId();
+ if (qid != null && DebugUtil.printId(qid).equals(queryId)) {
+ ctx.cancelQuery();
+ break;
+ }
+ }
+ }
+
public int getConnectionNum() {
return numberConnection.get();
}
@@ -151,6 +166,15 @@ public class ConnectScheduler {
return infos;
}
+ public void putTraceId2QueryId(String traceId, TUniqueId queryId) {
+ traceId2QueryId.put(traceId, queryId);
+ }
+
+ public String getQueryIdByTraceId(String traceId) {
+ TUniqueId queryId = traceId2QueryId.get(traceId);
+ return queryId == null ? "" : DebugUtil.printId(queryId);
+ }
+
private class LoopHandler implements Runnable {
ConnectContext context;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
index 1016bc0a87..9f30ee2d4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
@@ -187,6 +187,10 @@ public class VariableMgr {
ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR,
attr.name(), value);
}
+ if (VariableVarCallbacks.hasCallback(attr.name())) {
+ VariableVarCallbacks.call(attr.name(), value);
+ }
+
return true;
}
@@ -516,9 +520,6 @@ public class VariableMgr {
// Set to true if the variables need to be forwarded along with
forward statement.
boolean needForward() default false;
-
- // Set to true if the variables need to be set in TQueryOptions
- boolean isQueryOption() default false;
}
private static class VarContext {
@@ -586,8 +587,7 @@ public class VariableMgr {
}
field.setAccessible(true);
- builder.put(attr.name(),
- new VarContext(field, null, GLOBAL | attr.flag(),
getValue(null, field)));
+ builder.put(attr.name(), new VarContext(field, null, GLOBAL |
attr.flag(), getValue(null, field)));
}
return builder;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbackI.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbackI.java
new file mode 100644
index 0000000000..4aefa4fd9d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbackI.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.DdlException;
+
+public interface VariableVarCallbackI {
+ public void call(String value) throws DdlException;
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbacks.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbacks.java
new file mode 100644
index 0000000000..f1dc5e69c3
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbacks.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+/**
+ * Callback after setting the session variable
+ */
+public class VariableVarCallbacks {
+
+ public static final Map<String, VariableVarCallbackI> callbacks =
Maps.newHashMap();
+
+ static {
+ SessionContextCallback sessionContextCallback = new
SessionContextCallback();
+ callbacks.put(SessionVariable.SESSION_CONTEXT, sessionContextCallback);
+ }
+
+ public static Boolean hasCallback(String varName) {
+ return callbacks.containsKey(varName);
+ }
+
+ public static void call(String varName, String value) throws DdlException {
+ if (hasCallback(varName)) {
+ callbacks.get(varName).call(value);
+ }
+ }
+
+ // Converter to convert runtime filter type variable
+ public static class SessionContextCallback implements VariableVarCallbackI
{
+ public void call(String value) throws DdlException {
+ if (Strings.isNullOrEmpty(value)) {
+ return;
+ }
+ /**
+ * The sessionContext is as follows:
+ * "k1:v1;k2:v2;..."
+ * Here we want to get value with key named "trace_id".
+ */
+ String[] parts = value.split(";");
+ for (String part : parts) {
+ String[] innerParts = part.split(":");
+ if (innerParts.length != 2) {
+ continue;
+ }
+ if (innerParts[0].equals("trace_id")) {
+ ConnectContext.get().setTraceId(innerParts[1]);
+ break;
+ }
+ }
+ }
+ }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
index 560f2a70d2..162dbe6f6f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
@@ -235,5 +235,13 @@ public class VariableMgrTest {
VariableMgr.setVar(null, setVar);
Assert.fail("No exception throws.");
}
+
+ @Test
+ public void testVariableCallback() throws Exception {
+ SetStmt stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set
session_context='trace_id:123'", ctx);
+ SetExecutor executor = new SetExecutor(ctx, stmt);
+ executor.execute();
+ Assert.assertEquals("123", ctx.traceId());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]