This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 28b714c371 [feature](executor) using fe version to set instance_num
(#22047)
28b714c371 is described below
commit 28b714c3715054137b20e718e8b22cdbd225984f
Author: Mryange <[email protected]>
AuthorDate: Tue Jul 25 14:37:42 2023 +0800
[feature](executor) using fe version to set instance_num (#22047)
---
be/src/exprs/runtime_filter.cpp | 2 +-
.../pipeline-execution-engine.md | 1 +
.../pipeline-execution-engine.md | 1 +
.../main/java/org/apache/doris/catalog/Env.java | 75 +++++++++++++++++++++-
.../java/org/apache/doris/qe/ConnectContext.java | 6 +-
.../main/java/org/apache/doris/qe/VariableMgr.java | 19 ++++++
6 files changed, 101 insertions(+), 3 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 27f5ef843a..e3dbd7e445 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1155,13 +1155,13 @@ Status IRuntimeFilter::publish() {
Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>*
push_exprs,
bool is_late_arrival) {
DCHECK(is_consumer());
- _profile->add_info_string("Info", _format_status());
if (_is_ignored) {
return Status::OK();
}
if (!is_late_arrival) {
_set_push_down();
}
+ _profile->add_info_string("Info", _format_status());
return _wrapper->get_push_exprs(push_exprs, _vprobe_ctx);
}
diff --git a/docs/en/docs/query-acceleration/pipeline-execution-engine.md
b/docs/en/docs/query-acceleration/pipeline-execution-engine.md
index 5dbcecf2c5..d32ca42217 100644
--- a/docs/en/docs/query-acceleration/pipeline-execution-engine.md
+++ b/docs/en/docs/query-acceleration/pipeline-execution-engine.md
@@ -72,6 +72,7 @@ set enable_pipeline_engine = true;
#### parallel_pipeline_task_num
`parallel_pipeline_task_num` represents the concurrency of pipeline tasks of a
query. Default value is `0` (e.g. half number of CPU cores). Users can adjust
this value according to their own workloads.
+If the user upgrades from a lower version, the default value will be the
parallel_fragment_exec_instance_num before the upgrade.
```
set parallel_pipeline_task_num = 0;
diff --git a/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md
b/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md
index 04fd788e3e..0ce0f65448 100644
--- a/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md
+++ b/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md
@@ -72,6 +72,7 @@ set enable_pipeline_engine = true;
#### parallel_pipeline_task_num
`parallel_pipeline_task_num`代表了 SQL 查询进行查询并发的 Pipeline Task
数目。Doris默认的配置为`0`,即CPU核数的一半。用户也可以实际根据自己的实际情况进行调整。
+如果用户从较低的版本升级过来,则默认值为升级前的`parallel_fragment_exec_instance_num`。
```
set parallel_pipeline_task_num = 0;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 8434270fcf..967e129728 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -99,6 +99,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.ConfigException;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.EnvUtils;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
@@ -107,6 +108,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.Version;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.Daemon;
@@ -290,10 +292,13 @@ public class Env {
public static final String CLIENT_NODE_HOST_KEY = "CLIENT_NODE_HOST";
public static final String CLIENT_NODE_PORT_KEY = "CLIENT_NODE_PORT";
+ private static final String VERSION_DIR = "/VERSION";
+ private String latestFeVersion;
+ private String previousFeVersion;
private String metaDir;
private String bdbDir;
private String imageDir;
-
+ private String versionDir;
private MetaContext metaContext;
private long epoch = 0;
@@ -852,6 +857,7 @@ public class Env {
this.metaDir = Config.meta_dir;
this.bdbDir = this.metaDir + BDB_DIR;
this.imageDir = this.metaDir + IMAGE_DIR;
+ this.versionDir = EnvUtils.getDorisHome() + VERSION_DIR;
// 0. get local node and helper node info
getSelfHostPort();
@@ -871,12 +877,21 @@ public class Env {
bdbDir.mkdirs();
}
}
+
File imageDir = new File(this.imageDir);
+
if (!imageDir.exists()) {
imageDir.mkdirs();
}
+ File verDir = new File(this.versionDir);
+
+ if (!verDir.exists()) {
+ verDir.mkdirs();
+ }
+
// init plugin manager
+ initVersionInfo();
pluginMgr.init();
auditEventProcessor.start();
@@ -5330,6 +5345,64 @@ public class Env {
}
}
+ public void writeVersionFile(String version, int seq) {
+ String versionName = versionDir + "/" + version + "-commitid-" + seq +
"-version";
+ File versionFile = new File(versionName);
+ try {
+ versionFile.createNewFile();
+ } catch (Exception e) {
+ LOG.error(e.toString());
+ }
+ }
+
+ public boolean isMajorVersionUpgrade() {
+ if (previousFeVersion == null) {
+ // There are two possible scenarios when there is no
'previousFeVersion':
+ // If 'image' is empty, it indicates a completely new FE.
+ // If 'image' is not empty, it means an upgrade from a lower
version.
+ File imageDir = new File(this.imageDir);
+ File[] files = imageDir.listFiles();
+ if (files == null || files.length == 0) {
+ return false;
+ }
+ return true;
+ }
+ return previousFeVersion.charAt(0) != latestFeVersion.charAt(0);
+ }
+
+ private void initVersionInfo() {
+ latestFeVersion = Version.DORIS_BUILD_VERSION_MAJOR + "_" +
Version.DORIS_BUILD_VERSION_MINOR + "_"
+ + Version.DORIS_BUILD_VERSION_PATCH;
+ File folder = new File(versionDir);
+ File[] files = folder.listFiles();
+ int previousSeq = 0;
+ if (files != null) {
+ // Every part meaning (2_0_0-commitid-1-version)
+ // [version] - [commitid] - [seq]
+ // 'VersionFile' can be transformed like this.
+ // 2_0_0-commitid-1-version -> 2_1_0-commitid-2-version ->
+ // 2_3_0-commitid-3-version -> 2_0_0-commitid-4-version
+ // You can observe the process of FE upgrades through these files.
+ for (File file : files) {
+ String[] splitArr = file.getName().split("-");
+ String version = splitArr[0];
+ int seq = Integer.parseInt(splitArr[2]);
+ if (seq > previousSeq) {
+ previousSeq = seq;
+ previousFeVersion = version;
+ }
+ }
+ }
+ if (previousFeVersion == null) {
+ writeVersionFile(latestFeVersion, 1);
+ } else if (!previousFeVersion.equals(latestFeVersion)) {
+ writeVersionFile(latestFeVersion, previousSeq + 1);
+ }
+ if (isMajorVersionUpgrade()) {
+ ConnectContext.isMajorVersionUpgrade = true;
+ }
+ }
+
public int getFollowerCount() {
int count = 0;
for (Frontend fe : frontends.values()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 3387b702ed..02aec996bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -68,7 +68,7 @@ import java.util.Set;
public class ConnectContext {
private static final Logger LOG =
LogManager.getLogger(ConnectContext.class);
protected static ThreadLocal<ConnectContext> threadLocalInfo = new
ThreadLocal<>();
-
+ public static boolean isMajorVersionUpgrade = false;
private static final String SSL_PROTOCOL = "TLS";
// set this id before analyze
@@ -263,6 +263,10 @@ public class ConnectContext {
mysqlChannel = new DummyMysqlChannel();
}
sessionVariable = VariableMgr.newSessionVariable();
+ if (isMajorVersionUpgrade) {
+
VariableMgr.setGlobalPipelineTask(sessionVariable.parallelExecInstanceNum);
+ sessionVariable = VariableMgr.newSessionVariable();
+ }
command = MysqlCommand.COM_SLEEP;
if (Config.use_fuzzy_session_variable) {
sessionVariable.initFuzzyModeVariables();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
index 24b7468e77..2d41eb821f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
@@ -367,6 +367,25 @@ public class VariableMgr {
}
}
+ public static void setGlobalPipelineTask(int instance) {
+ wlock.lock();
+ try {
+ String name = "parallel_pipeline_task_num";
+ String value = instance + "";
+ VarContext ctx = ctxByVarName.get(name);
+ try {
+ setValue(ctx.getObj(), ctx.getField(), value);
+ } catch (DdlException e) {
+ LOG.error(e.toString());
+ }
+ // write edit log
+ GlobalVarPersistInfo info = new
GlobalVarPersistInfo(defaultSessionVariable, Lists.newArrayList(name));
+ Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
+ } finally {
+ wlock.unlock();
+ }
+ }
+
public static void setLowerCaseTableNames(int mode) throws DdlException {
VarContext ctx =
ctxByVarName.get(GlobalVariable.LOWER_CASE_TABLE_NAMES);
setGlobalVarAndWriteEditLog(ctx,
GlobalVariable.LOWER_CASE_TABLE_NAMES, "" + mode);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]