This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 290a2ac3d53 [improvement](statistics)Analyze all columns when
partition first loaded. (#38601)
290a2ac3d53 is described below
commit 290a2ac3d539698259469188b5d55cceb13de915
Author: Jibing-Li <[email protected]>
AuthorDate: Wed Jul 31 23:47:46 2024 +0800
[improvement](statistics)Analyze all columns when partition first loaded.
(#38601)
backport: https://github.com/apache/doris/pull/38540
---
.../apache/doris/analysis/AnalyzeProperties.java | 2 +
.../java/org/apache/doris/catalog/OlapTable.java | 4 +
.../apache/doris/statistics/AnalysisManager.java | 16 +-
.../doris/statistics/StatisticsAutoCollector.java | 15 --
.../apache/doris/statistics/TableStatsMeta.java | 18 +-
.../doris/statistics/AnalysisManagerTest.java | 8 +-
.../suites/statistics/analyze_stats.groovy | 238 ++++++++++++---------
7 files changed, 171 insertions(+), 130 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java
index 4b5f161d2be..11197cfe62e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java
@@ -44,6 +44,7 @@ public class AnalyzeProperties {
public static final String PROPERTY_PERIOD_SECONDS = "period.seconds";
public static final String PROPERTY_FORCE_FULL = "force.full";
public static final String PROPERTY_PARTITION_COLUMN_FROM_SQL =
"partition.column.from.sql";
+ public static final String PROPERTY_USE_AUTO_ANALYZER =
"use.auto.analyzer";
public static final AnalyzeProperties DEFAULT_PROP = new
AnalyzeProperties(new HashMap<String, String>() {
{
@@ -72,6 +73,7 @@ public class AnalyzeProperties {
.add(PROPERTY_PERIOD_CRON)
.add(PROPERTY_FORCE_FULL)
.add(PROPERTY_PARTITION_COLUMN_FROM_SQL)
+ .add(PROPERTY_USE_AUTO_ANALYZER)
.build();
public AnalyzeProperties(Map<String, String> properties) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 1b77bf25056..3b4d22f0e5d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1205,6 +1205,10 @@ public class OlapTable extends Table {
.collect(Collectors.toSet()))) {
return true;
}
+ // Check new partition first loaded.
+ if (tblStats.newPartitionLoaded != null &&
tblStats.newPartitionLoaded.get()) {
+ return true;
+ }
// 1 Check row count.
long currentRowCount = getRowCount();
long lastAnalyzeRowCount = tblStats.rowCount;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 36cc57ee381..6dd67283774 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -208,7 +208,21 @@ public class AnalysisManager implements Writable {
}
// Each analyze stmt corresponding to an analysis job.
- public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws
DdlException {
+ public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws
DdlException, AnalysisException {
+ // Using auto analyzer if user specifies.
+ if
(stmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) {
+ StatisticsAutoCollector autoCollector =
Env.getCurrentEnv().getStatisticsAutoCollector();
+ if (autoCollector.skip(stmt.getTable())) {
+ return;
+ }
+ List<AnalysisInfo> jobs = new ArrayList<>();
+ autoCollector.createAnalyzeJobForTbl(stmt.getDb(), jobs,
stmt.getTable());
+ AnalysisInfo job =
autoCollector.getReAnalyzeRequiredPart(jobs.get(0));
+ if (job != null) {
+
Env.getCurrentEnv().getStatisticsAutoCollector().createSystemAnalysisJob(job);
+ }
+ return;
+ }
AnalysisInfo jobInfo = buildAndAssignJob(stmt);
if (jobInfo == null) {
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 85913f5fd48..a04f428aa66 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -32,14 +32,12 @@ import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
import org.apache.doris.statistics.util.StatisticsUtil;
-import com.google.common.collect.Sets;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.LocalTime;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -224,19 +222,6 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
String colNames = jobInfo.colName;
if (table.needReAnalyzeTable(tblStats)) {
needRunPartitions = table.findReAnalyzeNeededPartitions();
- } else if (table instanceof OlapTable &&
tblStats.newPartitionLoaded.get()) {
- OlapTable olapTable = (OlapTable) table;
- needRunPartitions = new HashMap<>();
- Set<String> partitionColumnNames =
olapTable.getPartitionInfo().getPartitionColumns().stream()
- .map(Column::getName).collect(Collectors.toSet());
- colNames =
partitionColumnNames.stream().collect(Collectors.joining(","));
- Set<String> partitions = Sets.newHashSet();
- // No need to filter unchanged partitions, because it may bring
unexpected behavior.
- // Use dummy partition to skip it.
- partitions.add("Dummy Partition");
- for (String column : partitionColumnNames) {
- needRunPartitions.put(column, partitions);
- }
}
if (needRunPartitions == null || needRunPartitions.isEmpty()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
index 07580df3607..3eceab0db90 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
@@ -19,12 +19,12 @@ package org.apache.doris.statistics;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.util.StatisticsUtil;
@@ -166,21 +166,16 @@ public class TableStatsMeta implements Writable,
GsonPostProcessable {
indexesRowCount.putAll(analyzedJob.indexesRowCount);
clearStaleIndexRowCount((OlapTable) tableIf);
}
- if (!analyzedJob.emptyJob && analyzedJob.colToPartitions.keySet()
+ if (analyzedJob.emptyJob &&
AnalysisMethod.SAMPLE.equals(analyzedJob.analysisMethod)) {
+ return;
+ }
+ if (analyzedJob.colToPartitions.keySet()
.containsAll(tableIf.getBaseSchema().stream()
.filter(c ->
!StatisticsUtil.isUnsupportedType(c.getType()))
.map(Column::getName).collect(Collectors.toSet()))) {
updatedRows.set(0);
newPartitionLoaded.set(false);
}
- if (tableIf instanceof OlapTable) {
- PartitionInfo partitionInfo = ((OlapTable)
tableIf).getPartitionInfo();
- if (partitionInfo != null &&
analyzedJob.colToPartitions.keySet()
-
.containsAll(partitionInfo.getPartitionColumns().stream()
-
.map(Column::getName).collect(Collectors.toSet()))) {
- newPartitionLoaded.set(false);
- }
- }
}
}
@@ -189,6 +184,9 @@ public class TableStatsMeta implements Writable,
GsonPostProcessable {
if (indexesRowCount == null) {
indexesRowCount = new ConcurrentHashMap<>();
}
+ if (newPartitionLoaded == null) {
+ newPartitionLoaded = new AtomicBoolean(false);
+ }
}
public long getRowCount(long indexId) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
index ebe9cdf93b6..4993884f02a 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
@@ -296,12 +296,18 @@ public class AnalysisManagerTest {
};
OlapTable olapTable = new OlapTable();
+ TableStatsMeta stats0 = new TableStatsMeta(
+ 50, new AnalysisInfoBuilder().setColToPartitions(new
HashMap<>())
+ .setColName("col1").setRowCount(100).build(), olapTable);
+ stats0.newPartitionLoaded.set(true);
+ Assertions.assertTrue(olapTable.needReAnalyzeTable(stats0));
+
TableStatsMeta stats1 = new TableStatsMeta(
50, new AnalysisInfoBuilder().setColToPartitions(new
HashMap<>())
.setColName("col1").setRowCount(100).build(), olapTable);
stats1.updatedRows.addAndGet(70);
-
Assertions.assertTrue(olapTable.needReAnalyzeTable(stats1));
+
TableStatsMeta stats2 = new TableStatsMeta(
190, new AnalysisInfoBuilder()
.setColToPartitions(new
HashMap<>()).setColName("col1").setRowCount(200).build(), olapTable);
diff --git a/regression-test/suites/statistics/analyze_stats.groovy
b/regression-test/suites/statistics/analyze_stats.groovy
index be032a359c9..f0e31e4d7c2 100644
--- a/regression-test/suites/statistics/analyze_stats.groovy
+++ b/regression-test/suites/statistics/analyze_stats.groovy
@@ -121,56 +121,144 @@ suite("test_analyze") {
SET forbid_unknown_col_stats=true;
"""
-// sql """
-// SELECT * FROM ${tbl};
-// """
-
sql """
DROP STATS ${tbl}(analyzetestlimitedk3)
"""
- def exception = null
-
-// try {
-// sql """
-// SELECT * FROM ${tbl};
-// """
-// } catch (Exception e) {
-// exception = e
-// }
-//
-// assert exception != null
-//
-// exception = null
-
sql """
ANALYZE TABLE ${tbl} WITH SYNC
"""
-// sql """
-// SELECT * FROM ${tbl};
-// """
-
sql """
DROP STATS ${tbl}
"""
-// try {
-// sql """
-// SELECT * FROM ${tbl};
-// """
-// } catch (Exception e) {
-// exception = e
-// }
+ // Test partition load data for the first time.
+ sql """
+ CREATE TABLE `partition_test` (
+ `id` INT NOT NULL,
+ `name` VARCHAR(25) NOT NULL,
+ `comment` VARCHAR(152) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ PARTITION BY RANGE(`id`)
+ (PARTITION p1 VALUES [("0"), ("100")),
+ PARTITION p2 VALUES [("100"), ("200")),
+ PARTITION p3 VALUES [("200"), ("300")))
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1");
+ """
+
+ sql """analyze table partition_test with sync"""
+ sql """insert into partition_test values (1, '1', '1')"""
+ def partition_result = sql """show table stats partition_test"""
+ assertEquals(partition_result[0][6], "true")
+ assertEquals(partition_result[0][0], "1")
+ sql """analyze table partition_test with sync"""
+ partition_result = sql """show table stats partition_test"""
+ assertEquals(partition_result[0][6], "false")
+ sql """insert into partition_test values (101, '1', '1')"""
+ partition_result = sql """show table stats partition_test"""
+ assertEquals(partition_result[0][6], "true")
+ sql """analyze table partition_test(id) with sync"""
+ partition_result = sql """show table stats partition_test"""
+ assertEquals(partition_result[0][6], "true")
+ sql """analyze table partition_test with sync"""
+ partition_result = sql """show table stats partition_test"""
+ assertEquals(partition_result[0][6], "false")
+ sql """insert into partition_test values (102, '1', '1')"""
+ partition_result = sql """show table stats partition_test"""
+ assertEquals(partition_result[0][6], "false")
+ sql """insert into partition_test values (2, '2', '2')"""
+ sql """insert into partition_test values (3, '3', '3')"""
+ sql """insert into partition_test values (4, '4', '4')"""
+ sql """insert into partition_test values (5, '5', '5')"""
+ sql """insert into partition_test values (6, '6', '6')"""
+ sql """insert into partition_test values (7, '7', '7')"""
+ sql """insert into partition_test values (8, '8', '8')"""
+ sql """insert into partition_test values (9, '9', '9')"""
+ sql """insert into partition_test values (10, '10', '10')"""
+ sql """insert into partition_test values (103, '1', '1')"""
+ sql """insert into partition_test values (104, '1', '1')"""
+ sql """insert into partition_test values (105, '1', '1')"""
+ partition_result = sql """show table stats partition_test"""
+ assertEquals(partition_result[0][6], "false")
+ sql """analyze table partition_test with sync"""
+ sql """insert into partition_test values (201, '1', '1')"""
+ partition_result = sql """show table stats partition_test"""
+ assertEquals(partition_result[0][6], "true")
+ partition_result = sql """show column stats partition_test(id)"""
+ assertEquals("id", partition_result[0][0])
+ assertEquals("15.0", partition_result[0][2])
+ partition_result = sql """show column stats partition_test(name)"""
+ assertEquals("name", partition_result[0][0])
+ assertEquals("15.0", partition_result[0][2])
+ partition_result = sql """show column stats partition_test(comment)"""
+ assertEquals("comment", partition_result[0][0])
+ assertEquals("15.0", partition_result[0][2])
+
+ // Test sample agg table value column
+ sql """
+ CREATE TABLE `agg_table_test` (
+ `id` BIGINT NOT NULL,
+ `name` VARCHAR(10) REPLACE NULL
+ ) ENGINE=OLAP
+ AGGREGATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 32
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """insert into agg_table_test values (1,'name1'), (2, 'name2')"""
+ sql """analyze table agg_table_test with sample rows 100 with sync"""
+ def agg_result = sql """show column stats agg_table_test (name)"""
+ assertEquals(agg_result[0][7], "N/A")
+ assertEquals(agg_result[0][8], "N/A")
+
+ // Continue test partition load data for the first time.
+ def reported = false;
+ for (int i = 0; i < 10; i++) {
+ def data_result = sql """show data from partition_test"""
+ logger.info("show data from partition_test: " + data_result)
+ if (data_result[0][4] == '16') {
+ reported = true;
+ break;
+ }
+ sleep(1000)
+ }
+ if (!reported) {
+ logger.info("partition_test row count is not reported.")
+ } else {
+ sql """analyze table partition_test
PROPERTIES("use.auto.analyzer"="true")"""
+ for (int i = 0; i < 10; i++) {
+ def auto_analyze_result = sql """show auto analyze
partition_test"""
+ logger.info("show auto analyze partition_test result : " +
auto_analyze_result)
+ if (auto_analyze_result[0][9] == 'FINISHED') {
+ logger.info("Auto analyze finished.")
+ auto_analyze_result = sql """show table stats partition_test"""
+ assertEquals(auto_analyze_result[0][6], "false")
+ auto_analyze_result = sql """show column stats
partition_test(id)"""
+ assertEquals("id", auto_analyze_result[0][0])
+ assertEquals("16.0", auto_analyze_result[0][2])
+ auto_analyze_result = sql """show column stats
partition_test(name)"""
+ assertEquals("name", auto_analyze_result[0][0])
+ assertEquals("16.0", auto_analyze_result[0][2])
+ auto_analyze_result = sql """show column stats
partition_test(comment)"""
+ assertEquals("comment", auto_analyze_result[0][0])
+ assertEquals("16.0", auto_analyze_result[0][2])
+ break
+ }
+ sleep(1000)
+ }
+ }
def a_result_1 = sql """
ANALYZE DATABASE ${db} WITH SYNC WITH SAMPLE PERCENT 10
"""
- def a_result_2 = sql """
- ANALYZE DATABASE ${db} WITH SYNC WITH SAMPLE PERCENT 5
- """
-
def a_result_3 = sql """
ANALYZE DATABASE ${db} WITH SAMPLE PERCENT 5
"""
@@ -2560,62 +2648,6 @@ PARTITION `p599` VALUES IN (599)
sql """drop stats col1100 """
sql """DROP TABLE IF EXISTS col1100"""
- // Test partititon load data for the first time.
- sql """
- CREATE TABLE `partition_test` (
- `id` INT NOT NULL,
- `name` VARCHAR(25) NOT NULL,
- `comment` VARCHAR(152) NULL
- ) ENGINE=OLAP
- DUPLICATE KEY(`id`)
- COMMENT 'OLAP'
- PARTITION BY RANGE(`id`)
- (PARTITION p1 VALUES [("0"), ("100")),
- PARTITION p2 VALUES [("100"), ("200")),
- PARTITION p3 VALUES [("200"), ("300")))
- DISTRIBUTED BY HASH(`id`) BUCKETS 1
- PROPERTIES (
- "replication_num" = "1");
- """
-
- sql """analyze table partition_test with sync"""
- sql """insert into partition_test values (1, '1', '1')"""
- def partition_result = sql """show table stats partition_test"""
- assertEquals(partition_result[0][6], "true")
- assertEquals(partition_result[0][0], "1")
- sql """analyze table partition_test with sync"""
- partition_result = sql """show table stats partition_test"""
- assertEquals(partition_result[0][6], "false")
- sql """insert into partition_test values (101, '1', '1')"""
- partition_result = sql """show table stats partition_test"""
- assertEquals(partition_result[0][6], "true")
- sql """analyze table partition_test(id) with sync"""
- partition_result = sql """show table stats partition_test"""
- assertEquals(partition_result[0][6], "false")
- sql """insert into partition_test values (102, '1', '1')"""
- partition_result = sql """show table stats partition_test"""
- assertEquals(partition_result[0][6], "false")
-
- // Test sample agg table value column
- sql """
- CREATE TABLE `agg_table_test` (
- `id` BIGINT NOT NULL,
- `name` VARCHAR(10) REPLACE NULL
- ) ENGINE=OLAP
- AGGREGATE KEY(`id`)
- COMMENT 'OLAP'
- DISTRIBUTED BY HASH(`id`) BUCKETS 32
- PROPERTIES (
- "replication_num" = "1"
- );
- """
- sql """insert into agg_table_test values (1,'name1'), (2, 'name2')"""
- Thread.sleep(1000 * 60)
- sql """analyze table agg_table_test with sample rows 100 with sync"""
- def agg_result = sql """show column stats agg_table_test (name)"""
- assertEquals(agg_result[0][7], "N/A")
- assertEquals(agg_result[0][8], "N/A")
-
// Test sample string type min max
sql """
CREATE TABLE `string_min_max` (
@@ -2628,19 +2660,19 @@ PARTITION `p599` VALUES IN (599)
PROPERTIES (
"replication_num" = "1"
);
- """
- sql """insert into string_min_max values (1,'name1'), (2, 'name2')"""
- sql """set forbid_unknown_col_stats=false"""
- explain {
- sql("select min(name), max(name) from string_min_max")
- contains "pushAggOp=NONE"
- }
- sql """set enable_pushdown_string_minmax = true"""
- explain {
- sql("select min(name), max(name) from string_min_max")
- contains "pushAggOp=MINMAX"
- }
- sql """set forbid_unknown_col_stats=true"""
+ """
+ sql """insert into string_min_max values (1,'name1'), (2, 'name2')"""
+ sql """set forbid_unknown_col_stats=false"""
+ explain {
+ sql("select min(name), max(name) from string_min_max")
+ contains "pushAggOp=NONE"
+ }
+ sql """set enable_pushdown_string_minmax = true"""
+ explain {
+ sql("select min(name), max(name) from string_min_max")
+ contains "pushAggOp=MINMAX"
+ }
+ sql """set forbid_unknown_col_stats=true"""
// Test alter
sql """
@@ -2840,7 +2872,7 @@ PARTITION `p599` VALUES IN (599)
"replication_allocation" = "tag.location.default: 1"
);
"""
-
+
sql """analyze table part with sync;"""
sql """Insert into part values (1, 1), (10001, 10001);"""
sql """analyze table part with sync;"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]