This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 4a07efe119c [improvement](statistics)Async drop table stats while
doing truncate and schema change(#45923) (#46010)
4a07efe119c is described below
commit 4a07efe119c521817964570fce10344157551228
Author: James <[email protected]>
AuthorDate: Thu Dec 26 21:57:18 2024 +0800
[improvement](statistics)Async drop table stats while doing truncate and
schema change(#45923) (#46010)
backport: https://github.com/apache/doris/pull/45923
---
.../apache/doris/statistics/AnalysisManager.java | 57 ++++++++++++++++++----
.../doris/statistics/AnalysisTaskExecutor.java | 6 +--
.../doris/statistics/StatisticsAutoCollector.java | 2 +-
.../suites/statistics/analyze_stats.groovy | 20 ++++++++
.../suites/statistics/test_analyze_mv.groovy | 18 +++++++
.../statistics/test_drop_stats_and_truncate.groovy | 18 +++++++
6 files changed, 108 insertions(+), 13 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 0e4a1c7b42d..c43136a08f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -115,6 +115,7 @@ public class AnalysisManager implements Writable {
private StatisticsCache statisticsCache;
private AnalysisTaskExecutor taskExecutor;
+ private ThreadPoolExecutor dropStatsExecutors;
// Store task information in metadata.
protected final NavigableMap<Long, AnalysisInfo> analysisTaskInfoMap =
@@ -136,8 +137,13 @@ public class AnalysisManager implements Writable {
public AnalysisManager() {
if (!Env.isCheckpointThread()) {
this.taskExecutor = new
AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num,
- Integer.MAX_VALUE);
+ Integer.MAX_VALUE, "Manual Analysis Job Executor");
this.statisticsCache = new StatisticsCache();
+ this.dropStatsExecutors = ThreadPoolManager.newDaemonThreadPool(
+ 1, 3, 10,
+ TimeUnit.DAYS, new LinkedBlockingQueue<>(20),
+ new ThreadPoolExecutor.DiscardPolicy(),
+ "Drop stats executor", true);
}
}
@@ -656,19 +662,52 @@ public class AnalysisManager implements Writable {
long catalogId = table.getDatabase().getCatalog().getId();
long dbId = table.getDatabase().getId();
long tableId = table.getId();
- removeTableStats(tableId);
- Env.getCurrentEnv().getEditLog().logDeleteTableStats(new
TableStatsDeletionLog(tableId));
- Set<String> cols =
table.getSchemaAllIndexes(false).stream().map(Column::getName)
- .collect(Collectors.toSet());
- invalidateLocalStats(catalogId, dbId, tableId, null, tableStats);
- // Drop stats ddl is master only operation.
- invalidateRemoteStats(catalogId, dbId, tableId, cols, true);
- StatisticsRepository.dropStatisticsByColNames(catalogId, dbId,
table.getId(), cols);
+ asyncDropStatsTask(table, catalogId, dbId, tableId, tableStats);
} catch (Throwable e) {
LOG.warn("Failed to drop stats for table {}", table.getName(), e);
}
}
+ class DropStatsTask implements Runnable {
+ private final long catalogId;
+ private final long dbId;
+ private final long tableId;
+ private final TableStatsMeta tableStats;
+ private final TableIf table;
+
+ public DropStatsTask(TableIf table, long catalogId, long dbId, long
tableId, TableStatsMeta tableStats) {
+ this.catalogId = catalogId;
+ this.dbId = dbId;
+ this.tableId = tableId;
+ this.tableStats = tableStats;
+ this.table = table;
+ }
+
+ @Override
+ public void run() {
+ try {
+ removeTableStats(tableId);
+ Env.getCurrentEnv().getEditLog().logDeleteTableStats(new
TableStatsDeletionLog(tableId));
+ Set<String> cols =
table.getSchemaAllIndexes(false).stream().map(Column::getName)
+ .collect(Collectors.toSet());
+ StatisticsRepository.dropStatisticsByColNames(catalogId, dbId,
table.getId(), cols);
+ invalidateLocalStats(catalogId, dbId, tableId, null,
tableStats);
+ // Drop stats ddl is master only operation.
+ invalidateRemoteStats(catalogId, dbId, tableId, cols, true);
+ } catch (Throwable e) {
+ LOG.warn("Failed to drop stats for table {}", table.getName(),
e);
+ }
+ }
+ }
+
+ public void asyncDropStatsTask(TableIf table, long catalogId, long dbId,
long tableId, TableStatsMeta tableStats) {
+ try {
+ dropStatsExecutors.submit(new DropStatsTask(table, catalogId,
dbId, tableId, tableStats));
+ } catch (Throwable t) {
+ LOG.info("Failed to submit async drop stats job. reason: {}",
t.getMessage());
+ }
+ }
+
public void dropCachedStats(long catalogId, long dbId, long tableId) {
TableIf table = StatisticsUtil.findTable(catalogId, dbId, tableId);
StatisticsCache statsCache = Env.getCurrentEnv().getStatisticsCache();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 3db9a862d10..23cfc0b6d11 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -44,17 +44,17 @@ public class AnalysisTaskExecutor {
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) {
- this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE);
+ this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE, "Analysis Job
Executor");
}
- public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int
taskQueueSize) {
+ public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int
taskQueueSize, String poolName) {
if (!Env.isCheckpointThread()) {
executors = ThreadPoolManager.newDaemonThreadPool(
simultaneouslyRunningTaskNum,
simultaneouslyRunningTaskNum, 0,
TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize),
new BlockedPolicy("Analysis Job Executor",
Integer.MAX_VALUE),
- "Analysis Job Executor", true);
+ poolName, true);
cancelExpiredTask();
} else {
executors = null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 574b25da422..cc721720230 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -58,7 +58,7 @@ public class StatisticsAutoCollector extends MasterDaemon {
public StatisticsAutoCollector() {
super("Automatic Analyzer", TimeUnit.SECONDS.toMillis(10));
this.analysisTaskExecutor = new
AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num,
- StatisticConstants.TASK_QUEUE_CAP);
+ StatisticConstants.TASK_QUEUE_CAP, "Auto Analysis Job
Executor");
}
@Override
diff --git a/regression-test/suites/statistics/analyze_stats.groovy
b/regression-test/suites/statistics/analyze_stats.groovy
index 7f4b9abee47..6d845a11da4 100644
--- a/regression-test/suites/statistics/analyze_stats.groovy
+++ b/regression-test/suites/statistics/analyze_stats.groovy
@@ -19,6 +19,22 @@ import java.util.stream.Collectors
suite("test_analyze") {
+ def stats_dropped = { table ->
+ def result1 = sql """show column cached stats $table"""
+ def result2 = sql """show column stats $table"""
+ boolean dropped = false
+ for (int i = 0; i < 120; i++) {
+ if (0 == result1.size() && 0 == result2.size()) {
+ dropped = true;
+ break;
+ }
+ Thread.sleep(1000)
+ result1 = sql """show column cached stats $table"""
+ result2 = sql """show column stats $table"""
+ }
+ assertTrue(dropped)
+ }
+
String db = "test_analyze"
String tbl = "analyzetestlimited_duplicate_all"
@@ -1159,6 +1175,8 @@ PARTITION `p599` VALUES IN (599)
ALTER TABLE analyze_test_with_schema_update ADD COLUMN tbl_name
VARCHAR(256) DEFAULT NULL;
"""
+ stats_dropped("analyze_test_with_schema_update")
+
sql """
ANALYZE TABLE analyze_test_with_schema_update WITH SYNC
"""
@@ -1356,6 +1374,7 @@ PARTITION `p599` VALUES IN (599)
def result_before_truncate = sql """show column stats ${tbl}"""
assertEquals(14, result_before_truncate.size())
sql """TRUNCATE TABLE ${tbl}"""
+ stats_dropped(tbl)
def result_after_truncate = sql """show column stats ${tbl}"""
assertEquals(0, result_after_truncate.size())
result_after_truncate = sql """show column cached stats ${tbl}"""
@@ -1382,6 +1401,7 @@ PARTITION `p599` VALUES IN (599)
assert
"111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111
[...]
sql """TRUNCATE TABLE ${tbl}"""
+ stats_dropped(tbl)
result_after_truncate = sql """show column stats ${tbl}"""
assertEquals(0, result_after_truncate.size())
sql """ANALYZE TABLE ${tbl} WITH SYNC"""
diff --git a/regression-test/suites/statistics/test_analyze_mv.groovy
b/regression-test/suites/statistics/test_analyze_mv.groovy
index 06a5de85bef..7e978a0c2b9 100644
--- a/regression-test/suites/statistics/test_analyze_mv.groovy
+++ b/regression-test/suites/statistics/test_analyze_mv.groovy
@@ -108,6 +108,22 @@ suite("test_analyze_mv") {
assertTrue(found)
}
+ def stats_dropped = { table ->
+ def result1 = sql """show column cached stats $table"""
+ def result2 = sql """show column stats $table"""
+ boolean dropped = false
+ for (int i = 0; i < 120; i++) {
+ if (0 == result1.size() && 0 == result2.size()) {
+ dropped = true;
+ break;
+ }
+ Thread.sleep(1000)
+ result1 = sql """show column cached stats $table"""
+ result2 = sql """show column stats $table"""
+ }
+ assertTrue(dropped)
+ }
+
sql """drop database if exists test_analyze_mv"""
sql """create database test_analyze_mv"""
sql """use test_analyze_mv"""
@@ -679,6 +695,7 @@ suite("test_analyze_mv") {
// Test row count report and report for nereids
sql """truncate table mvTestDup"""
result_row = sql """show index stats mvTestDup mv3"""
+ stats_dropped("mvTestDup")
assertEquals(1, result_row.size())
assertEquals("mvTestDup", result_row[0][0])
assertEquals("mv3", result_row[0][1])
@@ -714,6 +731,7 @@ suite("test_analyze_mv") {
// ** Embedded test for skip auto analyze when table is empty again
sql """analyze table mvTestDup properties ("use.auto.analyzer" =
"true")"""
+ stats_dropped("mvTestDup")
empty_test = sql """show auto analyze mvTestDup"""
assertEquals(0, empty_test.size())
empty_test = sql """show column stats mvTestDup"""
diff --git
a/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy
b/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy
index 969e03cb295..a6a9f4471a4 100644
--- a/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy
+++ b/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy
@@ -17,6 +17,22 @@
suite("test_drop_stats_and_truncate") {
+ def stats_dropped = { table ->
+ def result1 = sql """show column cached stats $table"""
+ def result2 = sql """show column stats $table"""
+ boolean dropped = false
+ for (int i = 0; i < 120; i++) {
+ if (0 == result1.size() && 0 == result2.size()) {
+ dropped = true;
+ break;
+ }
+ Thread.sleep(1000)
+ result1 = sql """show column cached stats $table"""
+ result2 = sql """show column stats $table"""
+ }
+ assertTrue(dropped)
+ }
+
sql """drop database if exists test_drop_stats_and_truncate"""
sql """create database test_drop_stats_and_truncate"""
sql """use test_drop_stats_and_truncate"""
@@ -100,6 +116,7 @@ suite("test_drop_stats_and_truncate") {
assertEquals(3, columns.size())
sql """truncate table non_part"""
+ stats_dropped("non_part")
result = sql """show column stats non_part"""
assertEquals(0, result.size())
result = sql """show table stats non_part"""
@@ -147,6 +164,7 @@ suite("test_drop_stats_and_truncate") {
assertEquals(9, columns.size())
sql """truncate table part"""
+ stats_dropped("part")
result = sql """show column stats part"""
assertEquals(0, result.size())
result = sql """show table stats part"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]