This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 365afc754c6 Collect partition stats (#34552)
365afc754c6 is described below

commit 365afc754c62b63c7b8a09aa174ba14f7adc9807
Author: Jibing-Li <[email protected]>
AuthorDate: Thu May 9 15:20:47 2024 +0800

    Collect partition stats (#34552)
---
 .../main/java/org/apache/doris/catalog/Table.java  |  6 ---
 .../java/org/apache/doris/catalog/TableIf.java     |  4 ++
 .../doris/datasource/hive/HMSExternalTable.java    |  6 +++
 .../java/org/apache/doris/qe/SessionVariable.java  |  7 +++
 .../apache/doris/statistics/BaseAnalysisTask.java  | 59 ++++++++++++++++++++++
 .../apache/doris/statistics/OlapAnalysisTask.java  | 52 +++++++++++++++++--
 .../doris/statistics/StatisticConstants.java       |  3 ++
 .../doris/statistics/util/StatisticsUtil.java      | 10 ++++
 8 files changed, 136 insertions(+), 11 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
index 9de006cebb5..5c25fd4aded 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
@@ -530,12 +530,6 @@ public abstract class Table extends MetaObject implements 
Writable, TableIf {
         this.createTime = in.readLong();
     }
 
-    // return if this table is partitioned.
-    // For OlapTable, return true only if its partition type is RANGE or HASH
-    public boolean isPartitionedTable() {
-        return false;
-    }
-
     // return if this table is partitioned, for planner.
     // For OlapTable ture when is partitioned, or distributed by hash when no 
partition
     public boolean isPartitionDistributed() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index c5039660e6e..22c8f7106ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -560,4 +560,8 @@ public interface TableIf {
     default Set<String> getDistributionColumnNames() {
         return Sets.newHashSet();
     }
+
+    default boolean isPartitionedTable() {
+        return false;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index c2099a1acc8..68cd9d374a5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -1069,4 +1069,10 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         String bindBrokerName = catalog.bindBrokerName();
         return cache.getFilesByPartitionsWithoutCache(hivePartitions, 
bindBrokerName);
     }
+
+    @Override
+    public boolean isPartitionedTable() {
+        makeSureInitialized();
+        return remoteTable.getPartitionKeysSize() > 0;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 2509ddf5553..73d5d37cc68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -488,6 +488,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG = 
"enable_auto_analyze_internal_catalog";
 
+    public static final String ENABLE_PARTITION_ANALYZE = 
"enable_partition_analyze";
+
     public static final String AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD = 
"auto_analyze_table_width_threshold";
 
     public static final String FASTER_FLOAT_CONVERT = "faster_float_convert";
@@ -1579,6 +1581,11 @@ public class SessionVariable implements Serializable, 
Writable {
             flag = VariableMgr.GLOBAL)
     public boolean enableAutoAnalyzeInternalCatalog = true;
 
+    @VariableMgr.VarAttr(name = ENABLE_PARTITION_ANALYZE,
+            description = {"临时参数,收否收集分区级别统计信息", "Temp variable, enable to 
collect partition level statistics."},
+            flag = VariableMgr.GLOBAL)
+    public boolean enablePartitionAnalyze = false;
+
     @VariableMgr.VarAttr(name = AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD,
             description = {"参与自动收集的最大表宽度,列数多于这个参数的表不参与自动收集",
                 "Maximum table width to enable auto analyze, "
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index 368112c26c9..c78415c674c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -24,7 +24,9 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.qe.AuditLogHelper;
 import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
@@ -46,6 +48,7 @@ public abstract class BaseAnalysisTask {
 
     public static final long LIMIT_SIZE = 1024 * 1024 * 1024; // 1GB
     public static final double LIMIT_FACTOR = 1.2;
+    public static final int PARTITION_BATCH_SIZE = 100;
 
     protected static final String FULL_ANALYZE_TEMPLATE =
             "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
@@ -144,6 +147,44 @@ public abstract class BaseAnalysisTask {
             + "${data_size} AS `data_size`, "
             + "NOW() ";
 
+    protected static final String PARTITION_ANALYZE_TEMPLATE = " SELECT "
+            + "${catalogId} AS `catalog_id`, "
+            + "${dbId} AS `db_id`, "
+            + "${tblId} AS `tbl_id`, "
+            + "${idxId} AS `idx_id`, "
+            + "${partId} AS `part_id`, "
+            + "'${colId}' AS `col_id`, "
+            + "COUNT(1) AS `row_count`, "
+            + "HLL_UNION(HLL_HASH(`${colName}`)) as ndv, "
+            + "COUNT(1) - COUNT(`${colName}`) AS `null_count`, "
+            + "SUBSTRING(CAST(MIN(`${colName}`) AS STRING), 1, 1024) AS `min`, 
"
+            + "SUBSTRING(CAST(MAX(`${colName}`) AS STRING), 1, 1024) AS `max`, 
"
+            + "${dataSizeFunction} AS `data_size`, "
+            + "NOW() AS `update_time` "
+            + " FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index} 
${partitionInfo}";
+
+    protected static final String MERGE_PARTITION_TEMPLATE =
+            "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
+            + "${catalogId} AS `catalog_id`, "
+            + "${dbId} AS `db_id`, "
+            + "${tblId} AS `tbl_id`, "
+            + "${idxId} AS `idx_id`, "
+            + "'${colId}' AS `col_id`, "
+            + "NULL AS `part_id`, "
+            + "SUM(count) AS `row_count`, "
+            + "HLL_CARDINALITY(HLL_UNION(ndv)) AS `ndv`, "
+            + "SUM(null_count) AS `null_count`, "
+            + "MIN(min) AS `min`, "
+            + "MAX(max) AS `max`, "
+            + "SUM(data_size_in_bytes) AS `data_size`, "
+            + "NOW() AS `update_time` FROM "
+            + StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME
+            + " WHERE `catalog_id` = ${catalogId} "
+            + " AND `db_id` = ${dbId} "
+            + " AND `tbl_id` = ${tblId} "
+            + " AND `idx_id` = ${idxId} "
+            + " AND `col_id` = '${colId}'";
+
     protected AnalysisInfo info;
 
     protected CatalogIf<? extends DatabaseIf<? extends TableIf>> catalog;
@@ -320,4 +361,22 @@ public abstract class BaseAnalysisTask {
         }
     }
 
+    protected void runInsert(String sql) throws Exception {
+        try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) 
{
+            stmtExecutor = new StmtExecutor(r.connectContext, sql);
+            try {
+                stmtExecutor.execute();
+                QueryState queryState = stmtExecutor.getContext().getState();
+                if 
(queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
+                    throw new RuntimeException(
+                        "Failed to insert : " + 
stmtExecutor.getOriginStmt().originStmt + "Error msg: "
+                            + queryState.getErrorMessage());
+                }
+            } finally {
+                AuditLogHelper.logAuditLog(stmtExecutor.getContext(), 
stmtExecutor.getOriginStmt().toString(),
+                        stmtExecutor.getParsedStmt(), 
stmtExecutor.getQueryStatisticsForAuditLog(), true);
+            }
+        }
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
index 302a861ad1e..f85542511b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
@@ -33,6 +33,8 @@ import org.apache.doris.statistics.AnalysisInfo.JobType;
 import org.apache.doris.statistics.util.StatisticsUtil;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 import org.apache.commons.text.StringSubstitutor;
 
 import java.security.SecureRandom;
@@ -206,15 +208,56 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
         return resultRow;
     }
 
+    protected void doFull() throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Will do full collection for column {}", col.getName());
+        }
+        if (StatisticsUtil.enablePartitionAnalyze() && 
tbl.isPartitionedTable()) {
+            doPartitionTable();
+        } else {
+            doNonPartitionTable();
+        }
+    }
+
     /**
      * 1. Get stats of each partition
      * 2. insert partition in batch
      * 3. calculate column stats based on partition stats
      */
-    protected void doFull() {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Will do full collection for column {}", col.getName());
+    protected void doPartitionTable() throws Exception {
+        Map<String, String> params = buildSqlParams();
+        Set<String> partitionNames = tbl.getPartitionNames();
+        List<String> sqls = Lists.newArrayList();
+        int count = 0;
+        for (String part : partitionNames) {
+            params.put("partId", "'" + StatisticsUtil.escapeColumnName(part) + 
"'");
+            params.put("partitionInfo", "partition " + part);
+            StringSubstitutor stringSubstitutor = new 
StringSubstitutor(params);
+            sqls.add(stringSubstitutor.replace(PARTITION_ANALYZE_TEMPLATE));
+            count++;
+            if (count == PARTITION_BATCH_SIZE) {
+                String sql = "INSERT INTO " + 
StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME
+                        + Joiner.on(" UNION ALL ").join(sqls);
+                runInsert(sql);
+                sqls.clear();
+                count = 0;
+            }
+        }
+        if (count > 0) {
+            String sql = "INSERT INTO " + 
StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME
+                    + Joiner.on(" UNION ALL ").join(sqls);
+            runInsert(sql);
         }
+        StringSubstitutor stringSubstitutor = new 
StringSubstitutor(buildSqlParams());
+        runQuery(stringSubstitutor.replace(MERGE_PARTITION_TEMPLATE));
+    }
+
+    protected void doNonPartitionTable() {
+        StringSubstitutor stringSubstitutor = new 
StringSubstitutor(buildSqlParams());
+        runQuery(stringSubstitutor.replace(FULL_ANALYZE_TEMPLATE));
+    }
+
+    protected Map<String, String> buildSqlParams() {
         Map<String, String> params = new HashMap<>();
         params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
         params.put("columnStatTbl", 
StatisticConstants.TABLE_STATISTIC_TBL_NAME);
@@ -229,8 +272,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
         params.put("colName", 
StatisticsUtil.escapeColumnName(String.valueOf(info.colName)));
         params.put("tblName", String.valueOf(tbl.getName()));
         params.put("index", getIndex());
-        StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
-        runQuery(stringSubstitutor.replace(FULL_ANALYZE_TEMPLATE));
+        return params;
     }
 
     protected String getIndex() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
index 8dea5d515a6..67de69c57da 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
@@ -70,6 +70,9 @@ public class StatisticConstants {
     public static final String FULL_QUALIFIED_STATS_TBL_NAME = 
InternalCatalog.INTERNAL_CATALOG_NAME
             + "." + FeConstants.INTERNAL_DB_NAME + "." + 
TABLE_STATISTIC_TBL_NAME;
 
+    public static final String FULL_QUALIFIED_PARTITION_STATS_TBL_NAME = 
InternalCatalog.INTERNAL_CATALOG_NAME
+            + "." + FeConstants.INTERNAL_DB_NAME + "." + 
PARTITION_STATISTIC_TBL_NAME;
+
     public static final int STATISTIC_INTERNAL_TABLE_REPLICA_NUM = 3;
 
     public static final int RETRY_LOAD_QUEUE_SIZE = 1000;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 955ddea04b1..a90932941f7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -813,6 +813,16 @@ public class StatisticsUtil {
         return true;
     }
 
+    public static boolean enablePartitionAnalyze() {
+        try {
+            return findConfigFromGlobalSessionVar(
+                
SessionVariable.ENABLE_PARTITION_ANALYZE).enablePartitionAnalyze;
+        } catch (Exception e) {
+            LOG.warn("Fail to get value of enable partition analyze, return 
false by default", e);
+        }
+        return false;
+    }
+
     public static int getInsertMergeCount() {
         try {
             return 
findConfigFromGlobalSessionVar(SessionVariable.STATS_INSERT_MERGE_ITEM_COUNT)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to