This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 72b709d6a9 [opt](stats) split period collector from auto collector
(#23622)
72b709d6a9 is described below
commit 72b709d6a9ee2a6637c78a1a9a7f0c3b49b69a2b
Author: AKIRA <[email protected]>
AuthorDate: Mon Sep 4 17:04:16 2023 +0800
[opt](stats) split period collector from auto collector (#23622)
1. Split period analyze from auto collector
2. Analyze table incrementally by default
3. Rename StatisticsAutoAnalyzer to StatisticsAutoCollector
---
.../main/java/org/apache/doris/common/Config.java | 3 +
.../main/java/org/apache/doris/catalog/Env.java | 21 +++---
.../java/org/apache/doris/catalog/OlapTable.java | 3 +-
.../main/java/org/apache/doris/catalog/Table.java | 2 +-
.../java/org/apache/doris/catalog/TableIf.java | 2 +-
.../doris/catalog/external/ExternalTable.java | 2 +-
.../org/apache/doris/statistics/AnalysisInfo.java | 1 +
.../apache/doris/statistics/AnalysisManager.java | 46 +++++++-----
.../doris/statistics/AnalysisTaskExecutor.java | 20 ++++--
.../apache/doris/statistics/OlapAnalysisTask.java | 6 +-
...oAnalyzer.java => StatisticsAutoCollector.java} | 69 +++---------------
.../doris/statistics/StatisticsCollector.java | 81 ++++++++++++++++++++++
.../statistics/StatisticsPeriodCollector.java | 50 +++++++++++++
.../org/apache/doris/statistics/TableStats.java | 2 +-
...rTest.java => StatisticsAutoCollectorTest.java} | 20 +++---
.../suites/statistics/analyze_stats.groovy | 8 +++
16 files changed, 229 insertions(+), 107 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index d13fc72e7e..2c6087cd28 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2103,6 +2103,9 @@ public class Config extends ConfigBase {
@ConfField
public static int full_auto_analyze_simultaneously_running_task_num = 1;
+ @ConfField
+ public static final int period_analyze_simultaneously_running_task_num = 1;
+
@ConfField
public static int cpu_resource_limit_per_analyze_task = 1;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 71d1ef1d8d..8f9ca541a8 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -224,9 +224,10 @@ import
org.apache.doris.scheduler.registry.TimerJobRegister;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
-import org.apache.doris.statistics.StatisticsAutoAnalyzer;
+import org.apache.doris.statistics.StatisticsAutoCollector;
import org.apache.doris.statistics.StatisticsCache;
import org.apache.doris.statistics.StatisticsCleaner;
+import org.apache.doris.statistics.StatisticsPeriodCollector;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
@@ -476,7 +477,9 @@ public class Env {
*/
private final LoadManagerAdapter loadManagerAdapter;
- private StatisticsAutoAnalyzer statisticsAutoAnalyzer;
+ private StatisticsAutoCollector statisticsAutoCollector;
+
+ private StatisticsPeriodCollector statisticsPeriodCollector;
private HiveTransactionMgr hiveTransactionMgr;
@@ -701,7 +704,8 @@ public class Env {
this.extMetaCacheMgr = new ExternalMetaCacheMgr();
this.analysisManager = new AnalysisManager();
this.statisticsCleaner = new StatisticsCleaner();
- this.statisticsAutoAnalyzer = new StatisticsAutoAnalyzer();
+ this.statisticsAutoCollector = new StatisticsAutoCollector();
+ this.statisticsPeriodCollector = new StatisticsPeriodCollector();
this.globalFunctionMgr = new GlobalFunctionMgr();
this.workloadGroupMgr = new WorkloadGroupMgr();
this.queryStats = new QueryStats();
@@ -948,8 +952,11 @@ public class Env {
if (statisticsCleaner != null) {
statisticsCleaner.start();
}
- if (statisticsAutoAnalyzer != null) {
- statisticsAutoAnalyzer.start();
+ if (statisticsAutoCollector != null) {
+ statisticsAutoCollector.start();
+ }
+ if (statisticsPeriodCollector != null) {
+ statisticsPeriodCollector.start();
}
}
@@ -5578,10 +5585,6 @@ public class Env {
return loadManagerAdapter;
}
- public StatisticsAutoAnalyzer getStatisticsAutoAnalyzer() {
- return statisticsAutoAnalyzer;
- }
-
public QueryStats getQueryStats() {
return queryStats;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 39b5450e23..6fc6bb2ed5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1137,7 +1137,8 @@ public class OlapTable extends Table {
}
@Override
- public Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) {
+ public Set<String> findReAnalyzeNeededPartitions() {
+ TableStats tableStats =
Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(getId());
if (tableStats == null) {
return getPartitionNames().stream().map(this::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
index 12689894b4..ef71e394e5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
@@ -580,7 +580,7 @@ public abstract class Table extends MetaObject implements
Writable, TableIf {
}
@Override
- public Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) {
+ public Set<String> findReAnalyzeNeededPartitions() {
return Collections.emptySet();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index 21e2ddd154..ae67d0c9e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -139,7 +139,7 @@ public interface TableIf {
boolean needReAnalyzeTable(TableStats tblStats);
- Set<String> findReAnalyzeNeededPartitions(TableStats tableStats);
+ Set<String> findReAnalyzeNeededPartitions();
void write(DataOutput out) throws IOException;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
index 6f31ac18d7..ca5b80bc43 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
@@ -388,7 +388,7 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
}
@Override
- public Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) {
+ public Set<String> findReAnalyzeNeededPartitions() {
HashSet<String> partitions = Sets.newHashSet();
// TODO: Find a way to collect external table partitions that need to
be analyzed.
partitions.add("Dummy Partition");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
index 8f33480640..c20bad6396 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
@@ -96,6 +96,7 @@ public class AnalysisInfo implements Writable {
@SerializedName("tblName")
public final String tblName;
+ // TODO: Map here is wired, List is enough
@SerializedName("colToPartitions")
public final Map<String, Set<String>> colToPartitions;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index bdd325e6d1..853e9b3393 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -29,6 +29,7 @@ import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
@@ -363,6 +364,9 @@ public class AnalysisManager extends Daemon implements
Writable {
ShowResultSetMetaData commonResultSetMetaData = new
ShowResultSetMetaData(columns);
List<List<String>> resultRows = new ArrayList<>();
for (AnalysisInfo analysisInfo : analysisInfos) {
+ if (analysisInfo == null) {
+ continue;
+ }
List<String> row = new ArrayList<>();
row.add(analysisInfo.catalogName);
row.add(analysisInfo.dbName);
@@ -442,23 +446,9 @@ public class AnalysisManager extends Daemon implements
Writable {
StatisticsRepository.dropStatistics(invalidPartIds);
}
- if (analysisMode == AnalysisMode.INCREMENTAL && analysisType ==
AnalysisType.FUNDAMENTALS) {
- existColAndPartsForStats.values().forEach(partIds ->
partIds.removeAll(invalidPartIds));
- // In incremental collection mode, just collect the uncollected
partition statistics
- existColAndPartsForStats.forEach((columnName, partitionIds) -> {
- Set<String> existPartitions = partitionIds.stream()
- .map(idToPartition::get)
- .collect(Collectors.toSet());
- columnToPartitions.computeIfPresent(columnName, (colName,
partNames) -> {
- partNames.removeAll(existPartitions);
- return partNames;
- });
- });
- if (invalidPartIds.isEmpty()) {
- // There is no invalid statistics, so there is no need to
update table statistics,
- // remove columns that do not require re-collection of
statistics
- columnToPartitions.entrySet().removeIf(entry ->
entry.getValue().isEmpty());
- }
+ if (analysisType == AnalysisType.FUNDAMENTALS) {
+ Set<String> reAnalyzeNeededPartitions =
findReAnalyzeNeededPartitions(table);
+ columnToPartitions.replaceAll((k, v) -> reAnalyzeNeededPartitions);
}
return columnToPartitions;
@@ -692,6 +682,12 @@ public class AnalysisManager extends Daemon implements
Writable {
}
Set<String> cols = dropStatsStmt.getColumnNames();
long tblId = dropStatsStmt.getTblId();
+ TableStats tableStats = findTableStatsStatus(dropStatsStmt.getTblId());
+ if (tableStats == null) {
+ return;
+ }
+ tableStats.updatedTime = 0;
+ replayUpdateTableStatsStatus(tableStats);
StatisticsRepository.dropStatistics(tblId, cols);
for (String col : cols) {
Env.getCurrentEnv().getStatisticsCache().invalidate(tblId, -1L,
col);
@@ -951,4 +947,20 @@ public class AnalysisManager extends Daemon implements
Writable {
systemJobInfoMap.put(jobInfo.jobId, jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos);
}
+
+ @VisibleForTesting
+ protected Set<String> findReAnalyzeNeededPartitions(TableIf table) {
+ TableStats tableStats = findTableStatsStatus(table.getId());
+ if (tableStats == null) {
+ return table.getPartitionNames().stream().map(table::getPartition)
+
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
+ }
+ return table.getPartitionNames().stream()
+ .map(table::getPartition)
+ .filter(Partition::hasData)
+ .filter(partition ->
+ partition.getVisibleVersionTime() >=
tableStats.updatedTime).map(Partition::getName)
+ .collect(Collectors.toSet());
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index fb23050fff..a7b0073bb4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -17,6 +17,7 @@
package org.apache.doris.statistics;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
@@ -42,16 +43,23 @@ public class AnalysisTaskExecutor extends Thread {
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) {
- executors = ThreadPoolManager.newDaemonThreadPool(
- simultaneouslyRunningTaskNum,
- simultaneouslyRunningTaskNum, 0,
- TimeUnit.DAYS, new LinkedBlockingQueue<>(),
- new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
- "Analysis Job Executor", true);
+ if (!Env.isCheckpointThread()) {
+ executors = ThreadPoolManager.newDaemonThreadPool(
+ simultaneouslyRunningTaskNum,
+ simultaneouslyRunningTaskNum, 0,
+ TimeUnit.DAYS, new LinkedBlockingQueue<>(),
+ new BlockedPolicy("Analysis Job Executor",
Integer.MAX_VALUE),
+ "Analysis Job Executor", true);
+ } else {
+ executors = null;
+ }
}
@Override
public void run() {
+ if (Env.isCheckpointThread()) {
+ return;
+ }
cancelExpiredTask();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
index ef26d7349e..9d34b6aabd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
@@ -74,6 +74,10 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
}
public void doExecute() throws Exception {
+ Set<String> partitionNames = info.colToPartitions.get(info.colName);
+ if (partitionNames.isEmpty()) {
+ return;
+ }
Map<String, String> params = new HashMap<>();
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
@@ -90,7 +94,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
List<String> partitionAnalysisSQLs = new ArrayList<>();
try {
tbl.readLock();
- Set<String> partitionNames =
info.colToPartitions.get(info.colName);
+
for (String partitionName : partitionNames) {
Partition part = tbl.getPartition(partitionName);
if (part == null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
similarity index 74%
rename from
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
rename to
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 5d704e4f3b..c7310fc0e8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -24,8 +24,6 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.common.Config;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.statistics.AnalysisInfo.JobType;
@@ -41,43 +39,27 @@ import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-public class StatisticsAutoAnalyzer extends MasterDaemon {
+public class StatisticsAutoCollector extends StatisticsCollector {
- private static final Logger LOG =
LogManager.getLogger(StatisticsAutoAnalyzer.class);
+ private static final Logger LOG =
LogManager.getLogger(StatisticsAutoCollector.class);
- private final AnalysisTaskExecutor analysisTaskExecutor;
-
- public StatisticsAutoAnalyzer() {
+ public StatisticsAutoCollector() {
super("Automatic Analyzer",
-
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2);
- analysisTaskExecutor = new
AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num);
- analysisTaskExecutor.start();
+
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2,
+ new
AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num));
}
@Override
- protected void runAfterCatalogReady() {
- if (!Env.getCurrentEnv().isMaster()) {
- return;
- }
- if (!StatisticsUtil.statsTblAvailable()) {
- return;
- }
- analyzePeriodically();
+ protected void collect() {
if
(!checkAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) {
return;
}
-
- if (!analysisTaskExecutor.idle()) {
- return;
- }
-
if (Config.enable_full_auto_analyze) {
analyzeAll();
}
@@ -141,29 +123,18 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
return analysisInfos;
}
- private void analyzePeriodically() {
- try {
- AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
- List<AnalysisInfo> jobInfos = analysisManager.findPeriodicJobs();
- for (AnalysisInfo jobInfo : jobInfos) {
- createSystemAnalysisJob(jobInfo);
- }
- } catch (Exception e) {
- LOG.warn("Failed to periodically analyze the statistics." + e);
- }
- }
-
@VisibleForTesting
protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) {
TableIf table = StatisticsUtil
.findTable(jobInfo.catalogName, jobInfo.dbName,
jobInfo.tblName);
- TableStats tblStats =
Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
+ AnalysisManager analysisManager =
Env.getServingEnv().getAnalysisManager();
+ TableStats tblStats =
analysisManager.findTableStatsStatus(table.getId());
if (!(tblStats == null || table.needReAnalyzeTable(tblStats))) {
return null;
}
- Set<String> needRunPartitions =
table.findReAnalyzeNeededPartitions(tblStats);
+ Set<String> needRunPartitions = table.findReAnalyzeNeededPartitions();
if (needRunPartitions.isEmpty()) {
return null;
@@ -173,7 +144,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
}
@VisibleForTesting
- public AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table,
+ protected AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf
table,
Set<String> needRunPartitions) {
Map<String, Set<String>> newColToPartitions = Maps.newHashMap();
Map<String, Set<String>> colToPartitions = jobInfo.colToPartitions;
@@ -209,24 +180,4 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
return true;
}
}
-
-
- // Analysis job created by the system
- @VisibleForTesting
- protected void createSystemAnalysisJob(AnalysisInfo jobInfo)
- throws DdlException {
- if (jobInfo.colToPartitions.isEmpty()) {
- // No statistics need to be collected or updated
- return;
- }
-
- Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
- AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
- analysisManager.createTaskForEachColumns(jobInfo, analysisTaskInfos,
false);
- if (StatisticsUtil.isExternalTable(jobInfo.catalogName,
jobInfo.dbName, jobInfo.tblName)) {
- analysisManager.createTableLevelTaskForExternalTable(jobInfo,
analysisTaskInfos, false);
- }
- Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo,
analysisTaskInfos);
- analysisTaskInfos.values().forEach(analysisTaskExecutor::submitTask);
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
new file mode 100644
index 0000000000..f65829e748
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class StatisticsCollector extends MasterDaemon {
+
+
+ protected final AnalysisTaskExecutor analysisTaskExecutor;
+
+
+ public StatisticsCollector(String name, long intervalMs,
AnalysisTaskExecutor analysisTaskExecutor) {
+ super(name, intervalMs);
+ this.analysisTaskExecutor = analysisTaskExecutor;
+ analysisTaskExecutor.start();
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ if (!Env.getCurrentEnv().isMaster()) {
+ return;
+ }
+ if (!StatisticsUtil.statsTblAvailable()) {
+ return;
+ }
+ if (Env.isCheckpointThread()) {
+ return;
+ }
+
+ if (!analysisTaskExecutor.idle()) {
+ return;
+ }
+ collect();
+ }
+
+ protected abstract void collect();
+
+ // Analysis job created by the system
+ @VisibleForTesting
+ protected void createSystemAnalysisJob(AnalysisInfo jobInfo)
+ throws DdlException {
+ if (jobInfo.colToPartitions.isEmpty()) {
+ // No statistics need to be collected or updated
+ return;
+ }
+
+ Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
+ AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
+ analysisManager.createTaskForEachColumns(jobInfo, analysisTaskInfos,
false);
+ if (StatisticsUtil.isExternalTable(jobInfo.catalogName,
jobInfo.dbName, jobInfo.tblName)) {
+ analysisManager.createTableLevelTaskForExternalTable(jobInfo,
analysisTaskInfos, false);
+ }
+ Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo,
analysisTaskInfos);
+ analysisTaskInfos.values().forEach(analysisTaskExecutor::submitTask);
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java
new file mode 100644
index 0000000000..f34ad0f122
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class StatisticsPeriodCollector extends StatisticsCollector {
+ private static final Logger LOG =
LogManager.getLogger(StatisticsPeriodCollector.class);
+
+ public StatisticsPeriodCollector() {
+ super("Automatic Analyzer",
+
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2,
+ new
AnalysisTaskExecutor(Config.period_analyze_simultaneously_running_task_num));
+ }
+
+ @Override
+ protected void collect() {
+ try {
+ AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
+ List<AnalysisInfo> jobInfos = analysisManager.findPeriodicJobs();
+ for (AnalysisInfo jobInfo : jobInfos) {
+ createSystemAnalysisJob(jobInfo);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to periodically analyze the statistics." + e);
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java
index 0fffbd9dd7..48a8bd81c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java
@@ -55,7 +55,7 @@ public class TableStats implements Writable {
public final AnalysisType analysisType;
@SerializedName("updateTime")
- public final long updatedTime;
+ public long updatedTime;
@SerializedName("columns")
public String columns;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoAnalyzerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
similarity index 90%
rename from
fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoAnalyzerTest.java
rename to
fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
index fff649a447..d152e8175f 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoAnalyzerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
@@ -49,7 +49,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-public class StatisticsAutoAnalyzerTest {
+public class StatisticsAutoCollectorTest {
@Test
public void testAnalyzeAll(@Injectable AnalysisInfo analysisInfo) {
@@ -65,7 +65,7 @@ public class StatisticsAutoAnalyzerTest {
return databaseIfs;
}
};
- new MockUp<StatisticsAutoAnalyzer>() {
+ new MockUp<StatisticsAutoCollector>() {
@Mock
public List<AnalysisInfo>
constructAnalysisInfo(DatabaseIf<TableIf> db) {
return Arrays.asList(analysisInfo, analysisInfo);
@@ -85,7 +85,7 @@ public class StatisticsAutoAnalyzerTest {
}
};
- StatisticsAutoAnalyzer saa = new StatisticsAutoAnalyzer();
+ StatisticsAutoCollector saa = new StatisticsAutoCollector();
saa.runAfterCatalogReady();
new Expectations() {
{
@@ -131,7 +131,7 @@ public class StatisticsAutoAnalyzerTest {
return columns;
}
};
- StatisticsAutoAnalyzer saa = new StatisticsAutoAnalyzer();
+ StatisticsAutoCollector saa = new StatisticsAutoCollector();
List<AnalysisInfo> analysisInfos =
saa.constructAnalysisInfo(new Database(1, "anydb"));
Assertions.assertEquals(1, analysisInfos.size());
@@ -145,7 +145,7 @@ public class StatisticsAutoAnalyzerTest {
new MockUp<OlapTable>() {
@Mock
- protected Set<String> findReAnalyzeNeededPartitions(TableStats
tableStats) {
+ protected Set<String> findReAnalyzeNeededPartitions() {
Set<String> partitionNames = new HashSet<>();
partitionNames.add("p1");
partitionNames.add("p2");
@@ -185,20 +185,20 @@ public class StatisticsAutoAnalyzerTest {
}
};
- new MockUp<StatisticsAutoAnalyzer>() {
+ new MockUp<StatisticsAutoCollector>() {
@Mock
public AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo,
TableIf table,
Set<String> needRunPartitions) {
return new AnalysisInfoBuilder().build();
}
};
- StatisticsAutoAnalyzer statisticsAutoAnalyzer = new
StatisticsAutoAnalyzer();
+ StatisticsAutoCollector statisticsAutoCollector = new
StatisticsAutoCollector();
AnalysisInfo analysisInfo2 = new AnalysisInfoBuilder()
.setCatalogName("cname")
.setDbName("db")
.setTblName("tbl").build();
-
Assertions.assertNotNull(statisticsAutoAnalyzer.getReAnalyzeRequiredPart(analysisInfo2));
-
Assertions.assertNull(statisticsAutoAnalyzer.getReAnalyzeRequiredPart(analysisInfo2));
-
Assertions.assertNotNull(statisticsAutoAnalyzer.getReAnalyzeRequiredPart(analysisInfo2));
+
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
+
Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
+
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
}
}
diff --git a/regression-test/suites/statistics/analyze_stats.groovy
b/regression-test/suites/statistics/analyze_stats.groovy
index 50420613b5..3220a34ee5 100644
--- a/regression-test/suites/statistics/analyze_stats.groovy
+++ b/regression-test/suites/statistics/analyze_stats.groovy
@@ -19,6 +19,14 @@ suite("test_analyze") {
String db = "regression_test_statistics"
String tbl = "analyzetestlimited_duplicate_all"
+ sql """
+ DROP DATABASE IF EXISTS `${db}`
+ """
+
+ sql """
+ CREATE DATABASE `${db}`
+ """
+
sql """
DROP TABLE IF EXISTS `${tbl}`
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]