This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 365afc754c6 Collect partition stats (#34552)
365afc754c6 is described below
commit 365afc754c62b63c7b8a09aa174ba14f7adc9807
Author: Jibing-Li <[email protected]>
AuthorDate: Thu May 9 15:20:47 2024 +0800
Collect partition stats (#34552)
---
.../main/java/org/apache/doris/catalog/Table.java | 6 ---
.../java/org/apache/doris/catalog/TableIf.java | 4 ++
.../doris/datasource/hive/HMSExternalTable.java | 6 +++
.../java/org/apache/doris/qe/SessionVariable.java | 7 +++
.../apache/doris/statistics/BaseAnalysisTask.java | 59 ++++++++++++++++++++++
.../apache/doris/statistics/OlapAnalysisTask.java | 52 +++++++++++++++++--
.../doris/statistics/StatisticConstants.java | 3 ++
.../doris/statistics/util/StatisticsUtil.java | 10 ++++
8 files changed, 136 insertions(+), 11 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
index 9de006cebb5..5c25fd4aded 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
@@ -530,12 +530,6 @@ public abstract class Table extends MetaObject implements
Writable, TableIf {
this.createTime = in.readLong();
}
- // return if this table is partitioned.
- // For OlapTable, return true only if its partition type is RANGE or HASH
- public boolean isPartitionedTable() {
- return false;
- }
-
// return if this table is partitioned, for planner.
// For OlapTable ture when is partitioned, or distributed by hash when no
partition
public boolean isPartitionDistributed() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index c5039660e6e..22c8f7106ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -560,4 +560,8 @@ public interface TableIf {
default Set<String> getDistributionColumnNames() {
return Sets.newHashSet();
}
+
+ default boolean isPartitionedTable() {
+ return false;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index c2099a1acc8..68cd9d374a5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -1069,4 +1069,10 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
String bindBrokerName = catalog.bindBrokerName();
return cache.getFilesByPartitionsWithoutCache(hivePartitions,
bindBrokerName);
}
+
+ @Override
+ public boolean isPartitionedTable() {
+ makeSureInitialized();
+ return remoteTable.getPartitionKeysSize() > 0;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 2509ddf5553..73d5d37cc68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -488,6 +488,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG =
"enable_auto_analyze_internal_catalog";
+ public static final String ENABLE_PARTITION_ANALYZE =
"enable_partition_analyze";
+
public static final String AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD =
"auto_analyze_table_width_threshold";
public static final String FASTER_FLOAT_CONVERT = "faster_float_convert";
@@ -1579,6 +1581,11 @@ public class SessionVariable implements Serializable,
Writable {
flag = VariableMgr.GLOBAL)
public boolean enableAutoAnalyzeInternalCatalog = true;
+ @VariableMgr.VarAttr(name = ENABLE_PARTITION_ANALYZE,
+ description = {"临时参数,收否收集分区级别统计信息", "Temp variable, enable to
collect partition level statistics."},
+ flag = VariableMgr.GLOBAL)
+ public boolean enablePartitionAnalyze = false;
+
@VariableMgr.VarAttr(name = AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD,
description = {"参与自动收集的最大表宽度,列数多于这个参数的表不参与自动收集",
"Maximum table width to enable auto analyze, "
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index 368112c26c9..c78415c674c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -24,7 +24,9 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
@@ -46,6 +48,7 @@ public abstract class BaseAnalysisTask {
public static final long LIMIT_SIZE = 1024 * 1024 * 1024; // 1GB
public static final double LIMIT_FACTOR = 1.2;
+ public static final int PARTITION_BATCH_SIZE = 100;
protected static final String FULL_ANALYZE_TEMPLATE =
"SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
@@ -144,6 +147,44 @@ public abstract class BaseAnalysisTask {
+ "${data_size} AS `data_size`, "
+ "NOW() ";
+ protected static final String PARTITION_ANALYZE_TEMPLATE = " SELECT "
+ + "${catalogId} AS `catalog_id`, "
+ + "${dbId} AS `db_id`, "
+ + "${tblId} AS `tbl_id`, "
+ + "${idxId} AS `idx_id`, "
+ + "${partId} AS `part_id`, "
+ + "'${colId}' AS `col_id`, "
+ + "COUNT(1) AS `row_count`, "
+ + "HLL_UNION(HLL_HASH(`${colName}`)) as ndv, "
+ + "COUNT(1) - COUNT(`${colName}`) AS `null_count`, "
+ + "SUBSTRING(CAST(MIN(`${colName}`) AS STRING), 1, 1024) AS `min`,
"
+ + "SUBSTRING(CAST(MAX(`${colName}`) AS STRING), 1, 1024) AS `max`,
"
+ + "${dataSizeFunction} AS `data_size`, "
+ + "NOW() AS `update_time` "
+ + " FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index}
${partitionInfo}";
+
+ protected static final String MERGE_PARTITION_TEMPLATE =
+ "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
+ + "${catalogId} AS `catalog_id`, "
+ + "${dbId} AS `db_id`, "
+ + "${tblId} AS `tbl_id`, "
+ + "${idxId} AS `idx_id`, "
+ + "'${colId}' AS `col_id`, "
+ + "NULL AS `part_id`, "
+ + "SUM(count) AS `row_count`, "
+ + "HLL_CARDINALITY(HLL_UNION(ndv)) AS `ndv`, "
+ + "SUM(null_count) AS `null_count`, "
+ + "MIN(min) AS `min`, "
+ + "MAX(max) AS `max`, "
+ + "SUM(data_size_in_bytes) AS `data_size`, "
+ + "NOW() AS `update_time` FROM "
+ + StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME
+ + " WHERE `catalog_id` = ${catalogId} "
+ + " AND `db_id` = ${dbId} "
+ + " AND `tbl_id` = ${tblId} "
+ + " AND `idx_id` = ${idxId} "
+ + " AND `col_id` = '${colId}'";
+
protected AnalysisInfo info;
protected CatalogIf<? extends DatabaseIf<? extends TableIf>> catalog;
@@ -320,4 +361,22 @@ public abstract class BaseAnalysisTask {
}
}
+ protected void runInsert(String sql) throws Exception {
+ try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext())
{
+ stmtExecutor = new StmtExecutor(r.connectContext, sql);
+ try {
+ stmtExecutor.execute();
+ QueryState queryState = stmtExecutor.getContext().getState();
+ if
(queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
+ throw new RuntimeException(
+ "Failed to insert : " +
stmtExecutor.getOriginStmt().originStmt + "Error msg: "
+ + queryState.getErrorMessage());
+ }
+ } finally {
+ AuditLogHelper.logAuditLog(stmtExecutor.getContext(),
stmtExecutor.getOriginStmt().toString(),
+ stmtExecutor.getParsedStmt(),
stmtExecutor.getQueryStatisticsForAuditLog(), true);
+ }
+ }
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
index 302a861ad1e..f85542511b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
@@ -33,6 +33,8 @@ import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
import org.apache.commons.text.StringSubstitutor;
import java.security.SecureRandom;
@@ -206,15 +208,56 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
return resultRow;
}
+ protected void doFull() throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Will do full collection for column {}", col.getName());
+ }
+ if (StatisticsUtil.enablePartitionAnalyze() &&
tbl.isPartitionedTable()) {
+ doPartitionTable();
+ } else {
+ doNonPartitionTable();
+ }
+ }
+
/**
* 1. Get stats of each partition
* 2. insert partition in batch
* 3. calculate column stats based on partition stats
*/
- protected void doFull() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Will do full collection for column {}", col.getName());
+ protected void doPartitionTable() throws Exception {
+ Map<String, String> params = buildSqlParams();
+ Set<String> partitionNames = tbl.getPartitionNames();
+ List<String> sqls = Lists.newArrayList();
+ int count = 0;
+ for (String part : partitionNames) {
+ params.put("partId", "'" + StatisticsUtil.escapeColumnName(part) +
"'");
+ params.put("partitionInfo", "partition " + part);
+ StringSubstitutor stringSubstitutor = new
StringSubstitutor(params);
+ sqls.add(stringSubstitutor.replace(PARTITION_ANALYZE_TEMPLATE));
+ count++;
+ if (count == PARTITION_BATCH_SIZE) {
+ String sql = "INSERT INTO " +
StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME
+ + Joiner.on(" UNION ALL ").join(sqls);
+ runInsert(sql);
+ sqls.clear();
+ count = 0;
+ }
+ }
+ if (count > 0) {
+ String sql = "INSERT INTO " +
StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME
+ + Joiner.on(" UNION ALL ").join(sqls);
+ runInsert(sql);
}
+ StringSubstitutor stringSubstitutor = new
StringSubstitutor(buildSqlParams());
+ runQuery(stringSubstitutor.replace(MERGE_PARTITION_TEMPLATE));
+ }
+
+ protected void doNonPartitionTable() {
+ StringSubstitutor stringSubstitutor = new
StringSubstitutor(buildSqlParams());
+ runQuery(stringSubstitutor.replace(FULL_ANALYZE_TEMPLATE));
+ }
+
+ protected Map<String, String> buildSqlParams() {
Map<String, String> params = new HashMap<>();
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("columnStatTbl",
StatisticConstants.TABLE_STATISTIC_TBL_NAME);
@@ -229,8 +272,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
params.put("colName",
StatisticsUtil.escapeColumnName(String.valueOf(info.colName)));
params.put("tblName", String.valueOf(tbl.getName()));
params.put("index", getIndex());
- StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
- runQuery(stringSubstitutor.replace(FULL_ANALYZE_TEMPLATE));
+ return params;
}
protected String getIndex() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
index 8dea5d515a6..67de69c57da 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
@@ -70,6 +70,9 @@ public class StatisticConstants {
public static final String FULL_QUALIFIED_STATS_TBL_NAME =
InternalCatalog.INTERNAL_CATALOG_NAME
+ "." + FeConstants.INTERNAL_DB_NAME + "." +
TABLE_STATISTIC_TBL_NAME;
+ public static final String FULL_QUALIFIED_PARTITION_STATS_TBL_NAME =
InternalCatalog.INTERNAL_CATALOG_NAME
+ + "." + FeConstants.INTERNAL_DB_NAME + "." +
PARTITION_STATISTIC_TBL_NAME;
+
public static final int STATISTIC_INTERNAL_TABLE_REPLICA_NUM = 3;
public static final int RETRY_LOAD_QUEUE_SIZE = 1000;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 955ddea04b1..a90932941f7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -813,6 +813,16 @@ public class StatisticsUtil {
return true;
}
+ public static boolean enablePartitionAnalyze() {
+ try {
+ return findConfigFromGlobalSessionVar(
+
SessionVariable.ENABLE_PARTITION_ANALYZE).enablePartitionAnalyze;
+ } catch (Exception e) {
+ LOG.warn("Fail to get value of enable partition analyze, return
false by default", e);
+ }
+ return false;
+ }
+
public static int getInsertMergeCount() {
try {
return
findConfigFromGlobalSessionVar(SessionVariable.STATS_INSERT_MERGE_ITEM_COUNT)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]