This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new da6a90cceb [IOTDB-3682] Add DriverScheduler configuration into
iotdb-datanode.properties (#6482)
da6a90cceb is described below
commit da6a90cceb7a9acf832b037abc5ae995a32d25bd
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jun 28 19:06:52 2022 +0800
[IOTDB-3682] Add DriverScheduler configuration into
iotdb-datanode.properties (#6482)
---
server/src/assembly/resources/conf/iotdb-datanode.properties | 4 ++++
.../src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++++++++++
.../main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java | 10 ++++++++++
.../iotdb/db/mpp/execution/schedule/DriverScheduler.java | 12 ++++++++----
4 files changed, 33 insertions(+), 4 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties
b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 3d378902de..427bff0438 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -598,6 +598,10 @@ timestamp_precision=ms
# Datatype: int
# query_timeout_threshold=60000
+# The maximum allowed concurrently executing queries
+# Datatype: int
+# max_allowed_concurrent_queries=1000
+
# The number of sub compaction threads to be set up to perform compaction.
# Currently only works for nonAligned data in cross space compaction and unseq
inner space compaction.
# Set to 1 when less than or equal to 0.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index fe176dd568..7d898bf2c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -296,6 +296,9 @@ public class IoTDBConfig {
/** How many threads can concurrently execute query statement. When <= 0,
use CPU core number. */
private int concurrentQueryThread = 16;
+ /** How many queries can be concurrently executed. When <= 0, use 1000. */
+ private int maxAllowedConcurrentQueries = 1000;
+
/**
* How many threads can concurrently read data for raw data query. When <=
0, use CPU core number.
*/
@@ -1324,6 +1327,14 @@ public class IoTDBConfig {
this.concurrentQueryThread = concurrentQueryThread;
}
+ public int getMaxAllowedConcurrentQueries() {
+ return maxAllowedConcurrentQueries;
+ }
+
+ public void setMaxAllowedConcurrentQueries(int maxAllowedConcurrentQueries) {
+ this.maxAllowedConcurrentQueries = maxAllowedConcurrentQueries;
+ }
+
public int getConcurrentSubRawQueryThread() {
return concurrentSubRawQueryThread;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 66ed7c4e9d..b50aead65c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -476,6 +476,16 @@ public class IoTDBDescriptor {
conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors());
}
+ conf.setMaxAllowedConcurrentQueries(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_allowed_concurrent_queries",
+ Integer.toString(conf.getConcurrentQueryThread()))));
+
+ if (conf.getMaxAllowedConcurrentQueries() <= 0) {
+ conf.setMaxAllowedConcurrentQueries(1000);
+ }
+
conf.setConcurrentSubRawQueryThread(
Integer.parseInt(
properties.getProperty(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index a4bea2c91f..ce079f4f3c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
@@ -63,11 +64,14 @@ public class DriverScheduler implements IDriverScheduler,
IService {
private final Set<DriverTask> blockedTasks;
private final Map<QueryId, Set<DriverTask>> queryMap;
private final ITaskScheduler scheduler;
- private IMPPDataExchangeManager blockManager; // TODO: init with real
IMPPDataExchangeManager
+ private IMPPDataExchangeManager blockManager;
- private static final int MAX_CAPACITY = 1000; // TODO: load from config files
- private static final int WORKER_THREAD_NUM = 4; // TODO: load from config
files
- private static final int QUERY_TIMEOUT_MS = 60_000; // TODO: load from
config files or requests
+ private static final int MAX_CAPACITY =
+
IoTDBDescriptor.getInstance().getConfig().getMaxAllowedConcurrentQueries();
+ private static final int WORKER_THREAD_NUM =
+ IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread();
+ private static final int QUERY_TIMEOUT_MS =
+ IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
private final ThreadGroup workerGroups;
private final List<AbstractDriverThread> threads;