This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6f68ec9de0 support query queue (#20048)
6f68ec9de0 is described below
commit 6f68ec9de0d67846ccf60f0686a3f155d92ea63e
Author: wangbo <[email protected]>
AuthorDate: Tue May 30 19:52:27 2023 +0800
support query queue (#20048)
support query queue (#20048)
---
.../main/java/org/apache/doris/common/Config.java | 3 +
.../java/org/apache/doris/qe/StmtExecutor.java | 28 ++++-
.../doris/resource/resourcegroup/QueryQueue.java | 117 +++++++++++++++++++++
.../resource/resourcegroup/QueueOfferToken.java | 46 ++++++++
.../resource/resourcegroup/ResourceGroup.java | 107 +++++++++++++++++--
.../resource/resourcegroup/ResourceGroupMgr.java | 13 +++
.../resource/resourcegroup/ResourceGroupTest.java | 4 +-
7 files changed, 309 insertions(+), 9 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 04d0d22e6b..23ecd187ab 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1492,6 +1492,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true, expType =
ExperimentalType.EXPERIMENTAL)
public static boolean enable_resource_group = false;
+ @ConfField(mutable = true)
+ public static boolean enable_query_queue = true;
+
@ConfField(mutable = false, masterOnly = true)
public static int backend_rpc_timeout_ms = 60000; // 1 min
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 79a69ac3d3..458102dc6d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -135,6 +135,8 @@ import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
+import org.apache.doris.resource.resourcegroup.QueryQueue;
+import org.apache.doris.resource.resourcegroup.QueueOfferToken;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.rpc.RpcException;
@@ -204,6 +206,9 @@ public class StmtExecutor {
private OriginStatement originStmt;
private StatementBase parsedStmt;
private Analyzer analyzer;
+ private QueryQueue queryQueue = null;
+ // by default, false means no query queued, then no need to poll when
query finish
+ private QueueOfferToken offerRet = new QueueOfferToken(false);
private ProfileType profileType = ProfileType.QUERY;
private volatile Coordinator coord = null;
private MasterOpExecutor masterOpExecutor = null;
@@ -552,6 +557,24 @@ public class StmtExecutor {
}
private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
+ // queue query here
+ if (!parsedStmt.isExplain() && Config.enable_resource_group &&
Config.enable_query_queue) {
+ this.queryQueue = analyzer.getEnv().getResourceGroupMgr()
+
.getResourceGroupQueryQueue(context.sessionVariable.resourceGroup);
+ try {
+ this.offerRet = queryQueue.offer();
+ } catch (InterruptedException e) {
+ // this Exception means try lock/await failed, so no need to
handle offer result
+ LOG.error("error happens when offer queue, query id=" +
DebugUtil.printId(queryId) + " ", e);
+ throw new RuntimeException("interrupted Exception happens when
queue query");
+ }
+ if (!offerRet.isOfferSuccess()) {
+ String retMsg = "queue failed, reason=" +
offerRet.getOfferResultDetail();
+ LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " +
retMsg);
+ throw new UserException(retMsg);
+ }
+ }
+
int retryTime = Config.max_query_retry_time;
for (int i = 0; i < retryTime; i++) {
try {
@@ -621,6 +644,9 @@ public class StmtExecutor {
throw e;
} finally {
queryAnalysisSpan.end();
+ if (offerRet.isOfferSuccess()) {
+ queryQueue.poll();
+ }
}
if (isForwardToMaster()) {
if (isProxy) {
@@ -801,7 +827,7 @@ public class StmtExecutor {
}
// Analyze one statement to structure in memory.
- public void analyze(TQueryOptions tQueryOptions) throws UserException {
+ public void analyze(TQueryOptions tQueryOptions) throws UserException,
InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("begin to analyze stmt: {}, forwarded stmt id: {}",
context.getStmtId(),
context.getForwardedStmtId());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueryQueue.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueryQueue.java
new file mode 100644
index 0000000000..4b464bd94d
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueryQueue.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.resourcegroup;
+
+import com.google.common.base.Preconditions;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+// note(wb) refer java BlockingQueue, but support altering capacity
+// todo(wb) add wait time to profile
+public class QueryQueue {
+
+ private static final Logger LOG = LogManager.getLogger(QueryQueue.class);
+ // note(wb) used unfair by default, need more test later
+ private final ReentrantLock queueLock = new ReentrantLock();
+ private final Condition queueLockCond = queueLock.newCondition();
+ // resource group property
+ private int maxConcurrency;
+ private int maxQueueSize;
+ private int queueTimeout; // ms
+ // running property
+ private int currentRunningQueryNum;
+ private int currentWaitingQueryNum;
+
+ public QueryQueue(int maxConcurrency, int maxQueueSize, int queueTimeout) {
+ this.maxConcurrency = maxConcurrency;
+ this.maxQueueSize = maxQueueSize;
+ this.queueTimeout = queueTimeout;
+ }
+
+ public String debugString() {
+ return "maxConcurrency=" + maxConcurrency + ", maxQueueSize=" +
maxQueueSize + ", queueTimeout=" + queueTimeout
+ + ", currentRunningQueryNum=" + currentRunningQueryNum + ",
currentWaitingQueryNum="
+ + currentWaitingQueryNum;
+ }
+
+ public QueueOfferToken offer() throws InterruptedException {
+ // to prevent hang
+ // the lock shouldn't be hold for too long
+ // we should catch the case when it happens
+ queueLock.tryLock(5, TimeUnit.SECONDS);
+ try {
+ // currentRunningQueryNum may bigger than maxRunningQueryNum
+ // because maxRunningQueryNum can be altered
+ if (currentRunningQueryNum >= maxConcurrency) {
+ if (currentWaitingQueryNum >= maxQueueSize) {
+ LOG.debug(this.debugString());
+ return new QueueOfferToken(false, "query waiting queue is
full, queue length=" + maxQueueSize);
+ }
+
+ currentWaitingQueryNum++;
+ boolean ret;
+ try {
+ ret = queueLockCond.await(queueTimeout,
TimeUnit.MILLISECONDS);
+ } finally {
+ currentWaitingQueryNum--;
+ }
+ if (!ret) {
+ LOG.debug(this.debugString());
+ return new QueueOfferToken(false, "query wait timeout " +
queueTimeout + " ms");
+ }
+ }
+ currentRunningQueryNum++;
+ return new QueueOfferToken(true, "offer success");
+ } finally {
+ queueLock.unlock();
+ }
+ }
+
+ public void poll() throws InterruptedException {
+ queueLock.tryLock(5, TimeUnit.SECONDS);
+ try {
+ currentRunningQueryNum--;
+ Preconditions.checkArgument(currentRunningQueryNum >= 0);
+ // maybe only when currentWaitingQueryNum != 0 need to signal
+ queueLockCond.signal();
+ } finally {
+ queueLock.unlock();
+ }
+ }
+
+ public void resetQueueProperty(int maxConcurrency, int maxQueueSize, int
queryWaitTimeout) {
+ try {
+ queueLock.tryLock(5, TimeUnit.SECONDS);
+ try {
+ this.maxConcurrency = maxConcurrency;
+ this.maxQueueSize = maxQueueSize;
+ this.queueTimeout = queryWaitTimeout;
+ } finally {
+ queueLock.unlock();
+ }
+ } catch (InterruptedException e) {
+ LOG.error("reset queue property failed, ", e);
+ throw new RuntimeException("reset queue property failed, reason="
+ e.getMessage());
+ }
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueueOfferToken.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueueOfferToken.java
new file mode 100644
index 0000000000..4096a1095d
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueueOfferToken.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.resourcegroup;
+
+// used to mark QueryQueue offer result
+// if offer failed, then need to cancel query
+// and return failed reason to user client
+public class QueueOfferToken {
+
+ private Boolean offerResult;
+
+ private String offerResultDetail;
+
+ public QueueOfferToken(Boolean offerResult) {
+ this.offerResult = offerResult;
+ }
+
+ public QueueOfferToken(Boolean offerResult, String offerResultDetail) {
+ this.offerResult = offerResult;
+ this.offerResultDetail = offerResultDetail;
+ }
+
+ public Boolean isOfferSuccess() {
+ return offerResult;
+ }
+
+ public String getOfferResultDetail() {
+ return offerResultDetail;
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
index 6d34cb7595..039209df44 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
+import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TPipelineResourceGroup;
@@ -39,7 +40,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-public class ResourceGroup implements Writable {
+public class ResourceGroup implements Writable, GsonPostProcessable {
private static final Logger LOG =
LogManager.getLogger(ResourceGroup.class);
public static final String CPU_SHARE = "cpu_share";
@@ -48,11 +49,18 @@ public class ResourceGroup implements Writable {
public static final String ENABLE_MEMORY_OVERCOMMIT =
"enable_memory_overcommit";
+ public static final String MAX_CONCURRENCY = "max_concurrency";
+
+ public static final String MAX_QUEUE_SIZE = "max_queue_size";
+
+ public static final String QUEUE_TIMEOUT = "queue_timeout";
+
private static final ImmutableSet<String> REQUIRED_PROPERTIES_NAME = new
ImmutableSet.Builder<String>().add(
CPU_SHARE).add(MEMORY_LIMIT).build();
- private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new
ImmutableSet.Builder<String>().add(
- CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).build();
+ private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new
ImmutableSet.Builder<String>()
+
.add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
+ .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).build();
@SerializedName(value = "id")
private long id;
@@ -69,6 +77,11 @@ public class ResourceGroup implements Writable {
private double memoryLimitPercent;
+ private QueryQueue queryQueue;
+ private int maxConcurrency = Integer.MAX_VALUE;
+ private int maxQueueSize = 0;
+ private int queueTimeout = 0;
+
private ResourceGroup(long id, String name, Map<String, String>
properties) {
this(id, name, properties, 0);
}
@@ -85,11 +98,54 @@ public class ResourceGroup implements Writable {
}
}
+ // called when first create a resource group, load from image or user new
create a group
+ public void initQueryQueue() {
+ resetQueueProperty(properties);
+ // if query queue property is not set, when use default value
+ this.queryQueue = new QueryQueue(maxConcurrency, maxQueueSize,
queueTimeout);
+ }
+
+ void resetQueryQueue(QueryQueue queryQueue) {
+ resetQueueProperty(properties);
+ this.queryQueue = queryQueue;
+ this.queryQueue.resetQueueProperty(this.maxConcurrency,
this.maxQueueSize, this.queueTimeout);
+
+ }
+
+ private void resetQueueProperty(Map<String, String> properties) {
+ if (properties.containsKey(MAX_CONCURRENCY)) {
+ this.maxConcurrency =
Integer.parseInt(properties.get(MAX_CONCURRENCY));
+ } else {
+ this.maxConcurrency = Integer.MAX_VALUE;
+ properties.put(MAX_CONCURRENCY,
String.valueOf(this.maxConcurrency));
+ }
+ if (properties.containsKey(MAX_QUEUE_SIZE)) {
+ this.maxQueueSize =
Integer.parseInt(properties.get(MAX_QUEUE_SIZE));
+ } else {
+ this.maxQueueSize = 0;
+ properties.put(MAX_QUEUE_SIZE, String.valueOf(maxQueueSize));
+ }
+ if (properties.containsKey(QUEUE_TIMEOUT)) {
+ this.queueTimeout =
Integer.parseInt(properties.get(QUEUE_TIMEOUT));
+ } else {
+ this.queueTimeout = 0;
+ properties.put(QUEUE_TIMEOUT, String.valueOf(queueTimeout));
+ }
+ }
+
+ public QueryQueue getQueryQueue() {
+ return this.queryQueue;
+ }
+
+ // new resource group
public static ResourceGroup create(String name, Map<String, String>
properties) throws DdlException {
checkProperties(properties);
- return new ResourceGroup(Env.getCurrentEnv().getNextId(), name,
properties);
+ ResourceGroup newResourceGroup = new
ResourceGroup(Env.getCurrentEnv().getNextId(), name, properties);
+ newResourceGroup.initQueryQueue();
+ return newResourceGroup;
}
+ // alter resource group
public static ResourceGroup copyAndUpdate(ResourceGroup resourceGroup,
Map<String, String> updateProperties)
throws DdlException {
Map<String, String> newProperties = new
HashMap<>(resourceGroup.getProperties());
@@ -100,8 +156,13 @@ public class ResourceGroup implements Writable {
}
checkProperties(newProperties);
- return new ResourceGroup(
- resourceGroup.getId(), resourceGroup.getName(), newProperties,
resourceGroup.getVersion() + 1);
+ ResourceGroup newResourceGroup = new ResourceGroup(
+ resourceGroup.getId(), resourceGroup.getName(), newProperties,
resourceGroup.getVersion() + 1);
+
+ // note(wb) query queue should be unique and can not be copy
+ newResourceGroup.resetQueryQueue(resourceGroup.getQueryQueue());
+
+ return newResourceGroup;
}
private static void checkProperties(Map<String, String> properties) throws
DdlException {
@@ -141,6 +202,35 @@ public class ResourceGroup implements Writable {
throw new DdlException("The value of '" +
ENABLE_MEMORY_OVERCOMMIT + "' must be true or false.");
}
}
+
+ // check queue property
+ if (properties.containsKey(MAX_CONCURRENCY)) {
+ try {
+ if (Integer.parseInt(properties.get(MAX_CONCURRENCY)) < 0) {
+ throw new DdlException(MAX_CONCURRENCY + " requires a
positive integer");
+ }
+ } catch (NumberFormatException e) {
+ throw new DdlException(MAX_CONCURRENCY + " requires a positive
integer");
+ }
+ }
+ if (properties.containsKey(MAX_QUEUE_SIZE)) {
+ try {
+ if (Integer.parseInt(properties.get(MAX_QUEUE_SIZE)) < 0) {
+ throw new DdlException(MAX_QUEUE_SIZE + " requires a
positive integer");
+ }
+ } catch (NumberFormatException e) {
+ throw new DdlException(MAX_QUEUE_SIZE + " requires a positive
integer");
+ }
+ }
+ if (properties.containsKey(QUEUE_TIMEOUT)) {
+ try {
+ if (Integer.parseInt(properties.get(QUEUE_TIMEOUT)) < 0) {
+ throw new DdlException(QUEUE_TIMEOUT + " requires a
positive integer");
+ }
+ } catch (NumberFormatException e) {
+ throw new DdlException(QUEUE_TIMEOUT + " requires a positive
integer");
+ }
+ }
}
public long getId() {
@@ -188,4 +278,9 @@ public class ResourceGroup implements Writable {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ResourceGroup.class);
}
+
+ @Override
+ public void gsonPostProcess() throws IOException {
+ this.initQueryQueue();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
index f83a9a1678..a11907f34c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
@@ -114,6 +114,19 @@ public class ResourceGroupMgr implements Writable,
GsonPostProcessable {
return resourceGroups;
}
+ public QueryQueue getResourceGroupQueryQueue(String groupName) throws
UserException {
+ readLock();
+ try {
+ ResourceGroup resourceGroup = nameToResourceGroup.get(groupName);
+ if (resourceGroup == null) {
+ throw new UserException("Resource group " + groupName + " does
not exist");
+ }
+ return resourceGroup.getQueryQueue();
+ } finally {
+ readUnlock();
+ }
+ }
+
private void checkAndCreateDefaultGroup() {
ResourceGroup defaultResourceGroup = null;
writeLock();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
index 9f174e201c..ff7199aa67 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
@@ -37,7 +37,7 @@ public class ResourceGroupTest {
String name1 = "g1";
ResourceGroup group1 = ResourceGroup.create(name1, properties1);
Assert.assertEquals(name1, group1.getName());
- Assert.assertEquals(2, group1.getProperties().size());
+ Assert.assertEquals(5, group1.getProperties().size());
Assert.assertTrue(group1.getProperties().containsKey(ResourceGroup.CPU_SHARE));
Assert.assertTrue(Math.abs(group1.getMemoryLimitPercent() - 30) <
1e-6);
}
@@ -92,6 +92,6 @@ public class ResourceGroupTest {
BaseProcResult result = new BaseProcResult();
group1.getProcNodeData(result);
List<List<String>> rows = result.getRows();
- Assert.assertEquals(2, rows.size());
+ Assert.assertEquals(5, rows.size());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]