This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 36695e871ad [feature](statistics)Support auto analyze columns that
haven't been analyzed for a long time. #42399 (#45250)
36695e871ad is described below
commit 36695e871ad64d765530f2ca54bd94923800fb75
Author: James <[email protected]>
AuthorDate: Thu Dec 12 01:57:44 2024 +0800
[feature](statistics)Support auto analyze columns that haven't been
analyzed for a long time. #42399 (#45250)
backport: https://github.com/apache/doris/pull/42399
---
.../main/java/org/apache/doris/common/Config.java | 7 +
.../org/apache/doris/analysis/ShowAnalyzeStmt.java | 1 +
.../apache/doris/analysis/ShowColumnStatsStmt.java | 2 +
.../apache/doris/analysis/ShowTableStatsStmt.java | 11 +-
.../main/java/org/apache/doris/catalog/Env.java | 22 +-
.../org/apache/doris/datasource/ExternalTable.java | 2 +-
.../java/org/apache/doris/qe/ShowExecutor.java | 1 +
.../java/org/apache/doris/qe/StmtExecutor.java | 2 +-
.../org/apache/doris/statistics/AnalysisInfo.java | 13 +-
.../doris/statistics/AnalysisInfoBuilder.java | 17 +-
.../apache/doris/statistics/AnalysisManager.java | 27 +-
.../doris/statistics/AnalysisTaskExecutor.java | 5 +-
.../org/apache/doris/statistics/ColStatsMeta.java | 6 +-
.../org/apache/doris/statistics/JobPriority.java | 25 +
.../doris/statistics/StatisticsAutoCollector.java | 285 ++++++------
.../doris/statistics/StatisticsCollector.java | 79 ----
.../doris/statistics/StatisticsJobAppender.java | 152 +++++++
.../apache/doris/statistics/TableStatsMeta.java | 16 +-
.../doris/statistics/util/StatisticsUtil.java | 35 ++
.../statistics/StatisticsAutoCollectorTest.java | 502 ++-------------------
.../statistics/StatisticsJobAppenderTest.java | 84 ++++
.../doris/statistics/util/StatisticsUtilTest.java | 85 ++++
.../hive/test_hive_statistic_auto.groovy | 12 +-
.../suites/statistics/analyze_stats.groovy | 35 +-
24 files changed, 700 insertions(+), 726 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 359d193a5ba..ec09c850805 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2764,6 +2764,13 @@ public class Config extends ConfigBase {
public static boolean enable_proxy_protocol = false;
public static int profile_async_collect_expire_time_secs = 5;
+ @ConfField(mutable = true, description = {
+ "内表自动收集时间间隔,当某一列上次收集时间距离当前时间大于该值,则会触发一次新的收集,0表示不会触发。",
+ "Columns that have not been collected within the specified
interval will trigger automatic analyze. "
+ + "0 means not trigger."
+ })
+ public static long auto_analyze_interval_seconds = 86400;
+
//==========================================================================
// begin of cloud config
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java
index 9ccfd956ca5..f660d6eeb3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java
@@ -62,6 +62,7 @@ public class ShowAnalyzeStmt extends ShowStmt {
.add("schedule_type")
.add("start_time")
.add("end_time")
+ .add("priority")
.build();
private long jobId;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
index 18bb916b8bd..36986dc9d4e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
@@ -62,6 +62,7 @@ public class ShowColumnStatsStmt extends ShowStmt {
.add("trigger")
.add("query_times")
.add("updated_time")
+ .add("last_analyze_version")
.build();
private final TableName tableName;
@@ -162,6 +163,7 @@ public class ShowColumnStatsStmt extends ShowStmt {
row.add(String.valueOf(colStatsMeta == null ? "N/A" :
colStatsMeta.jobType));
row.add(String.valueOf(colStatsMeta == null ? "N/A" :
colStatsMeta.queriedTimes));
row.add(String.valueOf(p.second.updatedTime));
+ row.add(String.valueOf(colStatsMeta == null ? "N/A" :
colStatsMeta.tableVersion));
result.add(row);
});
return new ShowResultSet(getMetaData(), result);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
index 91b8bf1de2d..915b5f19e03 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
@@ -57,6 +57,7 @@ public class ShowTableStatsStmt extends ShowStmt {
.add("trigger")
.add("new_partition")
.add("user_inject")
+ .add("last_analyze_time")
.build();
private static final ImmutableList<String> INDEX_TITLE_NAMES =
@@ -192,6 +193,7 @@ public class ShowTableStatsStmt extends ShowStmt {
row.add("");
row.add("");
row.add("");
+ row.add("");
result.add(row);
return new ShowResultSet(getMetaData(), result);
}
@@ -201,15 +203,18 @@ public class ShowTableStatsStmt extends ShowStmt {
row.add(String.valueOf(tableStatistic.updatedRows));
row.add(String.valueOf(tableStatistic.queriedTimes.get()));
row.add(String.valueOf(tableStatistic.rowCount));
- LocalDateTime dateTime =
+ LocalDateTime tableUpdateTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.updatedTime),
java.time.ZoneId.systemDefault());
- String formattedDateTime = dateTime.format(formatter);
- row.add(formattedDateTime);
+ LocalDateTime lastAnalyzeTime =
+
LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.lastAnalyzeTime),
+ java.time.ZoneId.systemDefault());
+ row.add(tableUpdateTime.format(formatter));
row.add(tableStatistic.analyzeColumns().toString());
row.add(tableStatistic.jobType.toString());
row.add(String.valueOf(tableStatistic.newPartitionLoaded.get()));
row.add(String.valueOf(tableStatistic.userInjected));
+ row.add(lastAnalyzeTime.format(formatter));
result.add(row);
return new ShowResultSet(getMetaData(), result);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index cf0885e6ec2..321d2f53fc7 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -258,6 +258,7 @@ import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.StatisticsAutoCollector;
import org.apache.doris.statistics.StatisticsCache;
import org.apache.doris.statistics.StatisticsCleaner;
+import org.apache.doris.statistics.StatisticsJobAppender;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
@@ -540,6 +541,7 @@ public class Env {
private final LoadManagerAdapter loadManagerAdapter;
private StatisticsAutoCollector statisticsAutoCollector;
+ private StatisticsJobAppender statisticsJobAppender;
private HiveTransactionMgr hiveTransactionMgr;
@@ -780,6 +782,7 @@ public class Env {
this.analysisManager = new AnalysisManager();
this.statisticsCleaner = new StatisticsCleaner();
this.statisticsAutoCollector = new StatisticsAutoCollector();
+ this.statisticsJobAppender = new
StatisticsJobAppender("StatisticsJobAppender");
this.globalFunctionMgr = new GlobalFunctionMgr();
this.workloadGroupMgr = new WorkloadGroupMgr();
this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
@@ -1078,12 +1081,6 @@ public class Env {
// If not using bdb, we need to notify the FE type transfer
manually.
notifyNewFETypeTransfer(FrontendNodeType.MASTER);
}
- if (statisticsCleaner != null) {
- statisticsCleaner.start();
- }
- if (statisticsAutoCollector != null) {
- statisticsAutoCollector.start();
- }
queryCancelWorker.start();
}
@@ -1623,6 +1620,15 @@ public class Env {
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
}
+ if (statisticsCleaner != null) {
+ statisticsCleaner.start();
+ }
+ if (statisticsAutoCollector != null) {
+ statisticsAutoCollector.start();
+ }
+ if (statisticsJobAppender != null) {
+ statisticsJobAppender.start();
+ }
} catch (Throwable e) {
// When failed to transfer to master, we need to exit the process.
// Otherwise, the process will be in an unknown state.
@@ -6327,6 +6333,10 @@ public class Env {
return statisticsAutoCollector;
}
+ public StatisticsJobAppender getStatisticsJobAppender() {
+ return statisticsJobAppender;
+ }
+
public NereidsSqlCacheManager getSqlCacheManager() {
return sqlCacheManager;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
index 1d7d87d27f8..716f2bdca44 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
@@ -350,7 +350,7 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
return true;
}
return System.currentTimeMillis()
- - tblStats.updatedTime >
StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis();
+ - tblStats.lastAnalyzeTime >
StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis();
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 741f20dd88f..fc92efd7f1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -2799,6 +2799,7 @@ public class ShowExecutor {
java.time.ZoneId.systemDefault());
row.add(startTime.format(formatter));
row.add(endTime.format(formatter));
+ row.add(analysisInfo.priority == null ? "N/A" :
analysisInfo.priority.name());
resultRows.add(row);
} catch (Exception e) {
LOG.warn("Failed to get analyze info for table {}.{}.{},
reason: {}",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 0b8d8c229cd..f314f3aa76c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2513,7 +2513,7 @@ public class StmtExecutor {
context.getState().setOk();
}
- private void handleAnalyzeStmt() throws DdlException, AnalysisException {
+ private void handleAnalyzeStmt() throws DdlException, AnalysisException,
ExecutionException, InterruptedException {
context.env.getAnalysisManager().createAnalyze((AnalyzeStmt)
parsedStmt, isProxy);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
index 125b23bce7b..463b53bf645 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
@@ -195,6 +195,13 @@ public class AnalysisInfo implements Writable {
@SerializedName("rowCount")
public final long rowCount;
+
+ @SerializedName("priority")
+ public final JobPriority priority;
+
+ @SerializedName("tv")
+ public final long tableVersion;
+
/**
*
* Used to store the newest partition version of tbl when creating this
job.
@@ -214,7 +221,7 @@ public class AnalysisInfo implements Writable {
boolean isExternalTableLevelTask, boolean partitionOnly, boolean
samplingPartition,
boolean isAllPartition, long partitionCount, CronExpression
cronExpression, boolean forceFull,
boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean
emptyJob, boolean userInject,
- long rowCount) {
+ long rowCount, JobPriority priority, long tableVersion) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
@@ -253,6 +260,8 @@ public class AnalysisInfo implements Writable {
this.emptyJob = emptyJob;
this.userInject = userInject;
this.rowCount = rowCount;
+ this.priority = priority;
+ this.tableVersion = tableVersion;
}
@Override
@@ -295,6 +304,8 @@ public class AnalysisInfo implements Writable {
sj.add("forceFull: " + forceFull);
sj.add("usingSqlForPartitionColumn: " + usingSqlForPartitionColumn);
sj.add("emptyJob: " + emptyJob);
+ sj.add("priority: " + priority.name());
+ sj.add("tableVersion: " + tableVersion);
return sj.toString();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
index 2e7c4078ca1..2dd79030220 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
@@ -65,6 +65,8 @@ public class AnalysisInfoBuilder {
private boolean emptyJob;
private boolean userInject = false;
private long rowCount;
+ private JobPriority priority;
+ private long tableVersion;
public AnalysisInfoBuilder() {
}
@@ -105,6 +107,8 @@ public class AnalysisInfoBuilder {
emptyJob = info.emptyJob;
userInject = info.userInject;
rowCount = info.rowCount;
+ priority = info.priority;
+ tableVersion = info.tableVersion;
}
public AnalysisInfoBuilder setJobId(long jobId) {
@@ -282,12 +286,23 @@ public class AnalysisInfoBuilder {
return this;
}
+ public AnalysisInfoBuilder setPriority(JobPriority priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ public AnalysisInfoBuilder setTableVersion(long tableVersion) {
+ this.tableVersion = tableVersion;
+ return this;
+ }
+
public AnalysisInfo build() {
return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId,
tblId, jobColumns, partitionNames,
colName, indexId, jobType, analysisMode, analysisMethod,
analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message,
lastExecTimeInMs, timeCostInMs, state, scheduleType,
externalTableLevelTask, partitionOnly, samplingPartition,
isAllPartition, partitionCount,
- cronExpression, forceFull, usingSqlForPartitionColumn,
tblUpdateTime, emptyJob, userInject, rowCount);
+ cronExpression, forceFull, usingSqlForPartitionColumn,
tblUpdateTime, emptyJob, userInject, rowCount,
+ priority, tableVersion);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 5f6206e854e..0e4a1c7b42d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -99,6 +99,7 @@ import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -144,7 +145,8 @@ public class AnalysisManager implements Writable {
return statisticsCache;
}
- public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy) throws
DdlException, AnalysisException {
+ public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy)
+ throws DdlException, AnalysisException, ExecutionException,
InterruptedException {
if (!StatisticsUtil.statsTblAvailable() &&
!FeConstants.runningUnitTest) {
throw new DdlException("Stats table not available, please make
sure your cluster status is normal");
}
@@ -157,10 +159,8 @@ public class AnalysisManager implements Writable {
public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt, boolean proxy)
throws DdlException, AnalysisException {
DatabaseIf<TableIf> db = analyzeDBStmt.getDb();
- // Using auto analyzer if user specifies.
if
(analyzeDBStmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer"))
{
- Env.getCurrentEnv().getStatisticsAutoCollector().analyzeDb(db);
- return;
+ throw new DdlException("Analyze database doesn't support
use.auto.analyzer property.");
}
List<AnalysisInfo> analysisInfos = buildAnalysisInfosForDB(db,
analyzeDBStmt.getAnalyzeProperties());
if (!analyzeDBStmt.isSync()) {
@@ -208,22 +208,12 @@ public class AnalysisManager implements Writable {
}
// Each analyze stmt corresponding to an analysis job.
- public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws
DdlException, AnalysisException {
+ public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy)
+ throws DdlException, AnalysisException, ExecutionException,
InterruptedException {
// Using auto analyzer if user specifies.
if
(stmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) {
StatisticsAutoCollector autoCollector =
Env.getCurrentEnv().getStatisticsAutoCollector();
- if (autoCollector.skip(stmt.getTable())) {
- return;
- }
- List<AnalysisInfo> jobs = new ArrayList<>();
- autoCollector.createAnalyzeJobForTbl(stmt.getDb(), jobs,
stmt.getTable());
- if (jobs.isEmpty()) {
- return;
- }
- AnalysisInfo job =
autoCollector.getNeedAnalyzeColumns(jobs.get(0));
- if (job != null) {
-
Env.getCurrentEnv().getStatisticsAutoCollector().createSystemAnalysisJob(job);
- }
+ autoCollector.processOneJob(stmt.getTable(),
JobPriority.MANUAL_AUTO);
return;
}
AnalysisInfo jobInfo = buildAndAssignJob(stmt);
@@ -347,7 +337,6 @@ public class AnalysisManager implements Writable {
infoBuilder.setAnalysisMode(analysisMode);
infoBuilder.setAnalysisMethod(analysisMethod);
infoBuilder.setScheduleType(scheduleType);
- infoBuilder.setLastExecTimeInMs(0);
infoBuilder.setCronExpression(cronExpression);
infoBuilder.setForceFull(stmt.forceFull());
infoBuilder.setUsingSqlForPartitionColumn(stmt.usingSqlForPartitionColumn());
@@ -377,6 +366,8 @@ public class AnalysisManager implements Writable {
&& analysisMethod.equals(AnalysisMethod.SAMPLE));
long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0
: table.getRowCount();
infoBuilder.setRowCount(rowCount);
+ infoBuilder.setPriority(JobPriority.MANUAL);
+ infoBuilder.setTableVersion(table instanceof OlapTable ? ((OlapTable)
table).getVisibleVersion() : 0);
return infoBuilder.build();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 5277d8025fc..3db9a862d10 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -27,6 +27,7 @@ import org.apache.logging.log4j.Logger;
import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -88,9 +89,9 @@ public class AnalysisTaskExecutor {
}
}
- public void submitTask(BaseAnalysisTask task) {
+ public Future<?> submitTask(BaseAnalysisTask task) {
AnalysisTaskWrapper taskWrapper = new AnalysisTaskWrapper(this, task);
- executors.submit(taskWrapper);
+ return executors.submit(taskWrapper);
}
public void putJob(AnalysisTaskWrapper wrapper) throws Exception {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java
index 445641b2505..f9da52b01cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java
@@ -43,13 +43,17 @@ public class ColStatsMeta {
@SerializedName("trigger")
public JobType jobType;
+ @SerializedName("tv")
+ public long tableVersion;
+
public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod,
- AnalysisType analysisType, JobType jobType, long queriedTimes) {
+ AnalysisType analysisType, JobType jobType, long queriedTimes,
long tableVersion) {
this.updatedTime = updatedTime;
this.analysisMethod = analysisMethod;
this.analysisType = analysisType;
this.jobType = jobType;
this.queriedTimes.addAndGet(queriedTimes);
+ this.tableVersion = tableVersion;
}
public void clear() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java
new file mode 100644
index 00000000000..2d45dad877b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+public enum JobPriority {
+ HIGH,
+ LOW,
+ MANUAL,
+ MANUAL_AUTO;
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index c1a8af93ac0..1d7818a8e8d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -18,172 +18,125 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.datasource.CatalogIf;
-import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
import org.apache.doris.statistics.util.StatisticsUtil;
-import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.time.LocalTime;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.StringJoiner;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-public class StatisticsAutoCollector extends StatisticsCollector {
+public class StatisticsAutoCollector extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(StatisticsAutoCollector.class);
+ public static final int JOB_QUEUE_LIMIT = 100;
+ private final BlockingQueue<TableIf> highPriorityJobs = new
ArrayBlockingQueue<>(JOB_QUEUE_LIMIT);
+ private final BlockingQueue<TableIf> lowPriorityJobs = new
ArrayBlockingQueue<>(JOB_QUEUE_LIMIT);
+
+ protected final AnalysisTaskExecutor analysisTaskExecutor;
+
public StatisticsAutoCollector() {
- super("Automatic Analyzer",
-
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes),
- new
AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num,
- StatisticConstants.TASK_QUEUE_CAP));
+ super("Automatic Analyzer", TimeUnit.SECONDS.toMillis(10));
+ this.analysisTaskExecutor = new
AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num,
+ StatisticConstants.TASK_QUEUE_CAP);
}
@Override
- protected void collect() {
- if (canCollect()) {
- analyzeAll();
+ protected void runAfterCatalogReady() {
+ if (!Env.getCurrentEnv().isMaster()) {
+ return;
+ }
+ if (!StatisticsUtil.statsTblAvailable()) {
+ LOG.info("Stats table not available, skip");
+ return;
+ }
+ if (Env.isCheckpointThread()) {
+ return;
+ }
+ try {
+ collect();
+ } catch (DdlException | ExecutionException | InterruptedException e) {
+ LOG.warn("One auto analyze job failed. ", e);
}
}
- protected boolean canCollect() {
- return StatisticsUtil.enableAutoAnalyze()
- &&
StatisticsUtil.inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()));
- }
-
- protected void analyzeAll() {
- List<CatalogIf> catalogs = getCatalogsInOrder();
- for (CatalogIf ctl : catalogs) {
- if (!canCollect()) {
- analysisTaskExecutor.clear();
- break;
- }
- if (!ctl.enableAutoAnalyze()) {
- continue;
+ protected void collect() throws DdlException, ExecutionException,
InterruptedException {
+ if (!StatisticsUtil.canCollect()) {
+ return;
+ }
+ AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
+ while (true) {
+ Pair<TableIf, JobPriority> jobPair = fetchOneJob();
+ TableIf table = jobPair.first;
+ if (table == null) {
+ return;
}
- List<DatabaseIf> dbs = getDatabasesInOrder(ctl);
- for (DatabaseIf<TableIf> databaseIf : dbs) {
- if (!canCollect()) {
- analysisTaskExecutor.clear();
- break;
- }
- if
(StatisticConstants.SYSTEM_DBS.contains(databaseIf.getFullName())) {
- continue;
- }
- try {
- analyzeDb(databaseIf);
- } catch (Throwable t) {
- LOG.warn("Failed to analyze database {}.{}",
ctl.getName(), databaseIf.getFullName(), t);
- continue;
- }
+ TableStatsMeta tblStats =
analysisManager.findTableStatsStatus(table.getId());
+ if (table.needReAnalyzeTable(tblStats) ||
StatisticsUtil.tableNotAnalyzedForTooLong(table, tblStats)) {
+ processOneJob(table, jobPair.second);
}
}
}
- public List<CatalogIf> getCatalogsInOrder() {
- return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream()
- .sorted((c1, c2) -> (int) (c1.getId() -
c2.getId())).collect(Collectors.toList());
- }
-
- public List<DatabaseIf<? extends TableIf>>
getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) {
- return catalog.getAllDbs().stream()
- .sorted((d1, d2) -> (int) (d1.getId() -
d2.getId())).collect(Collectors.toList());
- }
-
- public List<TableIf> getTablesInOrder(DatabaseIf<? extends TableIf> db) {
- return db.getTables().stream()
- .sorted((t1, t2) -> (int) (t1.getId() -
t2.getId())).collect(Collectors.toList());
- }
-
- public void analyzeDb(DatabaseIf<TableIf> databaseIf) throws DdlException {
- List<AnalysisInfo> analysisInfos = constructAnalysisInfo(databaseIf);
- for (AnalysisInfo analysisInfo : analysisInfos) {
- try {
- if (!canCollect()) {
- analysisTaskExecutor.clear();
- break;
- }
- analysisInfo = getNeedAnalyzeColumns(analysisInfo);
- if (analysisInfo == null) {
- continue;
- }
- createSystemAnalysisJob(analysisInfo);
- } catch (Throwable t) {
- analysisInfo.message = t.getMessage();
- LOG.warn("Failed to auto analyze table {}.{}, reason {}",
- databaseIf.getFullName(), analysisInfo.tblId,
analysisInfo.message, t);
- continue;
- }
+ protected Pair<TableIf, JobPriority> fetchOneJob() {
+ TableIf table = null;
+ JobPriority priority = null;
+ try {
+ table = highPriorityJobs.poll(1, TimeUnit.SECONDS);
+ priority = JobPriority.HIGH;
+ } catch (InterruptedException e) {
+ LOG.debug(e);
}
- }
-
- protected List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<? extends
TableIf> db) {
- List<AnalysisInfo> analysisInfos = new ArrayList<>();
- for (TableIf table : getTablesInOrder(db)) {
+ if (table == null) {
try {
- if (skip(table)) {
- continue;
- }
- createAnalyzeJobForTbl(db, analysisInfos, table);
- } catch (Throwable t) {
- LOG.warn("Failed to analyze table {}.{}.{}",
- db.getCatalog().getName(), db.getFullName(),
table.getName(), t);
+ table = lowPriorityJobs.poll(1, TimeUnit.SECONDS);
+ priority = JobPriority.LOW;
+ } catch (InterruptedException e) {
+ LOG.debug(e);
}
}
- return analysisInfos;
- }
-
- // return true if skip auto analyze this time.
- protected boolean skip(TableIf table) {
- if (!(table instanceof OlapTable || table instanceof
HMSExternalTable)) {
- return true;
- }
- // For now, only support Hive HMS table auto collection.
- if (table instanceof HMSExternalTable
- && !((HMSExternalTable)
table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
- return true;
- }
- if (table.getDataSize(true) <
StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) {
- return false;
- }
- TableStatsMeta tableStats =
Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
- // means it's never got analyzed or new partition loaded data.
- if (tableStats == null || tableStats.newPartitionLoaded.get()) {
- return false;
+ if (table == null) {
+ LOG.debug("Job queues are all empty.");
}
- if (tableStats.userInjected) {
- return true;
- }
- return System.currentTimeMillis()
- - tableStats.updatedTime <
StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis();
+ return Pair.of(table, priority);
}
- protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
- List<AnalysisInfo> analysisInfos, TableIf table) {
- AnalysisMethod analysisMethod = table.getDataSize(true) >=
StatisticsUtil.getHugeTableLowerBoundSizeInBytes()
- ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
- if (table instanceof OlapTable &&
analysisMethod.equals(AnalysisMethod.SAMPLE)) {
- OlapTable ot = (OlapTable) table;
- if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) ==
TableIf.UNKNOWN_ROW_COUNT) {
- LOG.info("Table {} row count is not fully reported, skip auto
analyzing this time.", ot.getName());
- return;
- }
+ protected void processOneJob(TableIf table, JobPriority priority)
+ throws DdlException, ExecutionException, InterruptedException {
+ List<Pair<String, String>> needRunColumns = table.getColumnIndexPairs(
+ table.getSchemaAllIndexes(false)
+ .stream()
+ .filter(c ->
!StatisticsUtil.isUnsupportedType(c.getType()))
+ .map(Column::getName)
+ .collect(Collectors.toSet()));
+ if (needRunColumns == null || needRunColumns.isEmpty()) {
+ return;
+ }
+ AnalysisMethod analysisMethod =
+ table.getDataSize(true) >=
StatisticsUtil.getHugeTableLowerBoundSizeInBytes()
+ ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
+ if (!tableRowCountReported(table, analysisMethod)) {
+ return;
}
// We don't auto analyze empty table to avoid all 0 stats.
// Because all 0 is more dangerous than unknown stats when row count
report is delayed.
@@ -198,12 +151,33 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
}
return;
}
+ StringJoiner stringJoiner = new StringJoiner(",", "[", "]");
+ for (Pair<String, String> pair : needRunColumns) {
+ stringJoiner.add(pair.toString());
+ }
+ AnalysisInfo jobInfo = createAnalysisInfo(table, analysisMethod,
rowCount,
+ stringJoiner.toString(), needRunColumns, priority);
+ executeSystemAnalysisJob(jobInfo);
+ }
+
+ protected boolean tableRowCountReported(TableIf table, AnalysisMethod
analysisMethod) {
+ if (table instanceof OlapTable &&
analysisMethod.equals(AnalysisMethod.SAMPLE)) {
+ OlapTable ot = (OlapTable) table;
+ if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) ==
TableIf.UNKNOWN_ROW_COUNT) {
+ LOG.info("Table {} row count is not fully reported, skip auto
analyzing this time.", ot.getName());
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected AnalysisInfo createAnalysisInfo(TableIf table, AnalysisMethod
analysisMethod, long rowCount,
+ String colNames, List<Pair<String, String>> needRunColumns,
JobPriority priority) {
AnalysisInfo jobInfo = new AnalysisInfoBuilder()
.setJobId(Env.getCurrentEnv().getNextId())
- .setCatalogId(db.getCatalog().getId())
- .setDBId(db.getId())
+ .setCatalogId(table.getDatabase().getCatalog().getId())
+ .setDBId(table.getDatabase().getId())
.setTblId(table.getId())
- .setColName(null)
.setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
.setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
.setAnalysisMethod(analysisMethod)
@@ -216,37 +190,46 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
.setJobType(JobType.SYSTEM)
.setTblUpdateTime(table.getUpdateTime())
.setEmptyJob(table instanceof OlapTable && table.getRowCount()
== 0
- && analysisMethod.equals(AnalysisMethod.SAMPLE))
+ && analysisMethod.equals(AnalysisMethod.SAMPLE))
.setRowCount(rowCount)
+ .setColName(colNames)
+ .setJobColumns(needRunColumns)
+ .setPriority(priority)
+ .setTableVersion(table instanceof OlapTable ? ((OlapTable)
table).getVisibleVersion() : 0)
.build();
- analysisInfos.add(jobInfo);
+ return jobInfo;
}
- @VisibleForTesting
- protected AnalysisInfo getNeedAnalyzeColumns(AnalysisInfo jobInfo) {
- TableIf table = StatisticsUtil.findTable(jobInfo.catalogId,
jobInfo.dbId, jobInfo.tblId);
- // Skip tables that are too wide.
- if (table.getBaseSchema().size() >
StatisticsUtil.getAutoAnalyzeTableWidthThreshold()) {
- return null;
+ // Analysis job created by the system
+ protected void executeSystemAnalysisJob(AnalysisInfo jobInfo)
+ throws DdlException, ExecutionException, InterruptedException {
+ if (jobInfo.jobColumns.isEmpty()) {
+ // No statistics need to be collected or updated
+ return;
}
-
- AnalysisManager analysisManager =
Env.getServingEnv().getAnalysisManager();
- TableStatsMeta tblStats =
analysisManager.findTableStatsStatus(table.getId());
-
- List<Pair<String, String>> needRunColumns = null;
- if (table.needReAnalyzeTable(tblStats)) {
- needRunColumns =
table.getColumnIndexPairs(table.getSchemaAllIndexes(false)
- .stream().map(Column::getName).collect(Collectors.toSet()));
+ Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
+ AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
+ analysisManager.createTaskForEachColumns(jobInfo, analysisTasks,
false);
+ if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId,
jobInfo.tblId)) {
+ analysisManager.createTableLevelTaskForExternalTable(jobInfo,
analysisTasks, false);
}
-
- if (needRunColumns == null || needRunColumns.isEmpty()) {
- return null;
+ Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo,
analysisTasks.values());
+ Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo,
analysisTasks);
+ Future<?>[] futures = new Future[analysisTasks.values().size()];
+ int i = 0;
+ for (BaseAnalysisTask task : analysisTasks.values()) {
+ futures[i++] = analysisTaskExecutor.submitTask(task);
}
- StringJoiner stringJoiner = new StringJoiner(",", "[", "]");
- for (Pair<String, String> pair : needRunColumns) {
- stringJoiner.add(pair.toString());
+ for (Future future : futures) {
+ future.get();
}
- return new AnalysisInfoBuilder(jobInfo)
-
.setColName(stringJoiner.toString()).setJobColumns(needRunColumns).build();
+ }
+
+ public void appendToHighPriorityJobs(TableIf table) throws
InterruptedException {
+ highPriorityJobs.put(table);
+ }
+
+ public boolean appendToLowPriorityJobs(TableIf table) {
+ return lowPriorityJobs.offer(table);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
deleted file mode 100644
index ec187fe893a..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
+++ /dev/null
@@ -1,79 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.util.MasterDaemon;
-import org.apache.doris.statistics.util.StatisticsUtil;
-
-import org.apache.hudi.common.util.VisibleForTesting;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public abstract class StatisticsCollector extends MasterDaemon {
-
- private static final Logger LOG =
LogManager.getLogger(StatisticsCollector.class);
-
- protected final AnalysisTaskExecutor analysisTaskExecutor;
-
- public StatisticsCollector(String name, long intervalMs,
AnalysisTaskExecutor analysisTaskExecutor) {
- super(name, intervalMs);
- this.analysisTaskExecutor = analysisTaskExecutor;
- }
-
- @Override
- protected void runAfterCatalogReady() {
- if (!Env.getCurrentEnv().isMaster()) {
- return;
- }
- if (!StatisticsUtil.statsTblAvailable()) {
- LOG.info("Stats table not available, skip");
- return;
- }
- if (Env.isCheckpointThread()) {
- return;
- }
- collect();
- }
-
- protected abstract void collect();
-
- // Analysis job created by the system
- @VisibleForTesting
- protected void createSystemAnalysisJob(AnalysisInfo jobInfo)
- throws DdlException {
- if (jobInfo.jobColumns.isEmpty()) {
- // No statistics need to be collected or updated
- return;
- }
- Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
- AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
- analysisManager.createTaskForEachColumns(jobInfo, analysisTasks,
false);
- if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId,
jobInfo.tblId)) {
- analysisManager.createTableLevelTaskForExternalTable(jobInfo,
analysisTasks, false);
- }
- Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo,
analysisTasks.values());
- Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo,
analysisTasks);
- analysisTasks.values().forEach(analysisTaskExecutor::submitTask);
- }
-
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
new file mode 100644
index 00000000000..3703cf236c0
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class StatisticsJobAppender extends MasterDaemon {
+ private static final Logger LOG =
LogManager.getLogger(StatisticsJobAppender.class);
+
+ public StatisticsJobAppender(String name) {
+ super(name,
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes));
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ if (!Env.getCurrentEnv().isMaster()) {
+ return;
+ }
+ if (!StatisticsUtil.statsTblAvailable()) {
+ LOG.info("Stats table not available, skip");
+ return;
+ }
+ if (Env.getCurrentEnv().getStatisticsAutoCollector() == null) {
+ LOG.info("Statistics auto collector not ready, skip");
+ return;
+ }
+ if (Env.isCheckpointThread()) {
+ return;
+ }
+ if (!Env.getCurrentEnv().isReady()) {
+ return;
+ }
+ if (!StatisticsUtil.canCollect()) {
+ LOG.debug("Auto analyze not enabled or not in analyze time
range.");
+ return;
+ }
+ traverseAllTables();
+ }
+
+ protected void traverseAllTables() {
+ List<CatalogIf> catalogs = getCatalogsInOrder();
+ AnalysisManager analysisManager =
Env.getServingEnv().getAnalysisManager();
+ StatisticsAutoCollector autoCollector =
Env.getCurrentEnv().getStatisticsAutoCollector();
+ for (CatalogIf ctl : catalogs) {
+ if (!StatisticsUtil.canCollect()) {
+ break;
+ }
+ if (!ctl.enableAutoAnalyze()) {
+ continue;
+ }
+ List<DatabaseIf> dbs = getDatabasesInOrder(ctl);
+ for (DatabaseIf<TableIf> db : dbs) {
+ if (!StatisticsUtil.canCollect()) {
+ break;
+ }
+ if (StatisticConstants.SYSTEM_DBS.contains(db.getFullName())) {
+ continue;
+ }
+ for (TableIf table : getTablesInOrder(db)) {
+ try {
+ if (skip(table)) {
+ continue;
+ }
+ TableStatsMeta tblStats =
analysisManager.findTableStatsStatus(table.getId());
+ if (table.needReAnalyzeTable(tblStats)) {
+ autoCollector.appendToHighPriorityJobs(table);
+ } else if
(StatisticsUtil.tableNotAnalyzedForTooLong(table, tblStats)) {
+ autoCollector.appendToLowPriorityJobs(table);
+ }
+ } catch (Throwable t) {
+ LOG.warn("Failed to analyze table {}.{}.{}",
+ ctl.getName(), db.getFullName(),
table.getName(), t);
+ }
+ }
+ }
+ }
+ }
+
+ public List<CatalogIf> getCatalogsInOrder() {
+ return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream()
+ .sorted((c1, c2) -> (int) (c1.getId() -
c2.getId())).collect(Collectors.toList());
+ }
+
+ public List<DatabaseIf<? extends TableIf>>
getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) {
+ return catalog.getAllDbs().stream()
+ .sorted((d1, d2) -> (int) (d1.getId() -
d2.getId())).collect(Collectors.toList());
+ }
+
+ public List<TableIf> getTablesInOrder(DatabaseIf<? extends TableIf> db) {
+ return db.getTables().stream()
+ .sorted((t1, t2) -> (int) (t1.getId() -
t2.getId())).collect(Collectors.toList());
+ }
+
+ // return true if skip auto analyze this time.
+ protected boolean skip(TableIf table) {
+ if (!(table instanceof OlapTable || table instanceof
HMSExternalTable)) {
+ return true;
+ }
+ // For now, only support Hive HMS table auto collection.
+ if (table instanceof HMSExternalTable
+ && !((HMSExternalTable)
table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
+ return true;
+ }
+ // Skip wide table.
+ if (table.getBaseSchema().size() >
StatisticsUtil.getAutoAnalyzeTableWidthThreshold()) {
+ return true;
+ }
+ if (table.getDataSize(true) <
StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) {
+ return false;
+ }
+ TableStatsMeta tableStats =
Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
+ // means it's never got analyzed or new partition loaded data.
+ if (tableStats == null || tableStats.newPartitionLoaded.get()) {
+ return false;
+ }
+ if (tableStats.userInjected) {
+ return true;
+ }
+ return System.currentTimeMillis()
+ - tableStats.lastAnalyzeTime <
StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
index 50739e98aea..33f099e2b0b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
@@ -79,6 +79,9 @@ public class TableStatsMeta implements Writable,
GsonPostProcessable {
@SerializedName("updateTime")
public long updatedTime;
+ @SerializedName("lat")
+ public long lastAnalyzeTime;
+
@SerializedName("colNameToColStatsMeta")
private ConcurrentMap<String, ColStatsMeta>
deprecatedColNameToColStatsMeta = new ConcurrentHashMap<>();
@@ -156,19 +159,21 @@ public class TableStatsMeta implements Writable,
GsonPostProcessable {
public void update(AnalysisInfo analyzedJob, TableIf tableIf) {
updatedTime = analyzedJob.tblUpdateTime;
+ lastAnalyzeTime = analyzedJob.createTime;
if (analyzedJob.userInject) {
userInjected = true;
}
for (Pair<String, String> colPair : analyzedJob.jobColumns) {
ColStatsMeta colStatsMeta = colToColStatsMeta.get(colPair);
if (colStatsMeta == null) {
- colToColStatsMeta.put(colPair, new ColStatsMeta(updatedTime,
- analyzedJob.analysisMethod, analyzedJob.analysisType,
analyzedJob.jobType, 0));
+ colToColStatsMeta.put(colPair, new
ColStatsMeta(lastAnalyzeTime, analyzedJob.analysisMethod,
+ analyzedJob.analysisType, analyzedJob.jobType, 0,
analyzedJob.tableVersion));
} else {
- colStatsMeta.updatedTime = updatedTime;
+ colStatsMeta.updatedTime = lastAnalyzeTime;
colStatsMeta.analysisType = analyzedJob.analysisType;
colStatsMeta.analysisMethod = analyzedJob.analysisMethod;
colStatsMeta.jobType = analyzedJob.jobType;
+ colStatsMeta.tableVersion = analyzedJob.tableVersion;
}
}
jobType = analyzedJob.jobType;
@@ -233,4 +238,9 @@ public class TableStatsMeta implements Writable,
GsonPostProcessable {
public boolean isColumnsStatsEmpty() {
return colToColStatsMeta == null || colToColStatsMeta.isEmpty();
}
+
+ @VisibleForTesting
+ public void setColToColStatsMeta(ConcurrentMap<Pair<String, String>,
ColStatsMeta> colToColStatsMeta) {
+ this.colToColStatsMeta = colToColStatsMeta;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 288eb88e95f..718260a25b0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -51,6 +51,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.InternalCatalog;
@@ -70,6 +71,7 @@ import org.apache.doris.statistics.ColumnStatisticBuilder;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.StatisticConstants;
+import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.system.Frontend;
import com.google.common.base.Preconditions;
@@ -904,4 +906,37 @@ public class StatisticsUtil {
return rowCount == 0;
}
+ public static boolean canCollect() {
+ return enableAutoAnalyze() &&
inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()));
+ }
+
+ public static boolean tableNotAnalyzedForTooLong(TableIf table,
TableStatsMeta tblStats) {
+ if (table == null || tblStats == null) {
+ LOG.warn("Table or stats is null.");
+ return false;
+ }
+ if (tblStats.userInjected) {
+ return false;
+ }
+ if (!(table instanceof OlapTable)) {
+ return false;
+ }
+ boolean isLongTime = Config.auto_analyze_interval_seconds > 0
+ && System.currentTimeMillis() - tblStats.lastAnalyzeTime >
Config.auto_analyze_interval_seconds * 1000;
+ if (!isLongTime) {
+ return false;
+ }
+ // For OlapTable, if update rows is 0, row count doesn't change since
last analyze
+ // and table visible version doesn't change since last analyze. Then
we skip analyzing it.
+ if (tblStats.updatedRows.get() != 0) {
+ return true;
+ }
+ long rowCount = table.getRowCount();
+ if (rowCount != tblStats.rowCount) {
+ return true;
+ }
+ long visibleVersion = ((OlapTable) table).getVisibleVersion();
+ return tblStats.analyzeColumns().stream()
+ .anyMatch(c -> tblStats.findColumnStatsMeta(c.first,
c.second).tableVersion != visibleVersion);
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
index 4f40071f501..d687d111d2c 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
@@ -17,504 +17,102 @@
package org.apache.doris.statistics;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.DatabaseIf;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.Type;
-import org.apache.doris.catalog.View;
-import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
-import org.apache.doris.datasource.CatalogIf;
-import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.statistics.util.StatisticsUtil;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
-import com.google.common.collect.Lists;
-import mockit.Expectations;
-import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
-import mockit.Mocked;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import java.time.LocalTime;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ExecutionException;
public class StatisticsAutoCollectorTest {
@Test
- public void testAnalyzeAll(@Injectable AnalysisInfo analysisInfo) {
- new MockUp<CatalogIf>() {
- @Mock
- public Collection<DatabaseIf> getAllDbs() {
- Database db1 = new Database(1, FeConstants.INTERNAL_DB_NAME);
- Database db2 = new Database(2, "anyDB");
- List<DatabaseIf> databaseIfs = new ArrayList<>();
- databaseIfs.add(db1);
- databaseIfs.add(db2);
- return databaseIfs;
- }
- };
+ public void testCollect() throws DdlException, ExecutionException,
InterruptedException {
+ StatisticsAutoCollector collector = new StatisticsAutoCollector();
+ final int[] count = {0, 0};
new MockUp<StatisticsAutoCollector>() {
@Mock
- public List<AnalysisInfo>
constructAnalysisInfo(DatabaseIf<TableIf> db) {
- return Arrays.asList(analysisInfo, analysisInfo);
- }
-
- int count = 0;
-
- @Mock
- public AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo)
{
- return count++ == 0 ? null : jobInfo;
- }
-
- @Mock
- public void createSystemAnalysisJob(AnalysisInfo jobInfo)
- throws DdlException {
-
- }
- };
-
- StatisticsAutoCollector saa = new StatisticsAutoCollector();
- saa.runAfterCatalogReady();
- new Expectations() {
- {
- try {
- saa.createSystemAnalysisJob((AnalysisInfo) any);
- times = 1;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- @Test
- public void testConstructAnalysisInfo(
- @Injectable OlapTable o2, @Injectable View v) {
- new MockUp<Database>() {
- @Mock
- public List<Table> getTables() {
- List<Table> tableIfs = new ArrayList<>();
- tableIfs.add(o2);
- tableIfs.add(v);
- return tableIfs;
- }
-
- @Mock
- public String getFullName() {
- return "anyDb";
- }
- };
-
- new MockUp<OlapTable>() {
- @Mock
- public String getName() {
- return "anytable";
- }
-
- @Mock
- public List<Column> getSchemaAllIndexes(boolean full) {
- List<Column> columns = new ArrayList<>();
- columns.add(new Column("c1", PrimitiveType.INT));
- columns.add(new Column("c2", PrimitiveType.HLL));
- return columns;
- }
-
- @Mock
- public long getRowCount() {
- return 1;
- }
- };
- StatisticsAutoCollector saa = new StatisticsAutoCollector();
- List<AnalysisInfo> analysisInfoList = saa.constructAnalysisInfo(new
Database(1, "anydb"));
- Assertions.assertEquals(1, analysisInfoList.size());
- Assertions.assertNull(analysisInfoList.get(0).colName);
- }
-
- @Test
- public void testSkipWideTable() {
-
- TableIf tableIf = new OlapTable();
-
- new MockUp<OlapTable>() {
- @Mock
- public List<Column> getBaseSchema() {
- return Lists.newArrayList(new Column("col1", Type.INT), new
Column("col2", Type.INT));
- }
-
- @Mock
- public List<Pair<String, String>> getColumnIndexPairs(Set<String>
columns) {
- ArrayList<Pair<String, String>> list = Lists.newArrayList();
- list.add(Pair.of("1", "1"));
- return list;
+ protected Pair<TableIf, JobPriority> fetchOneJob() {
+ count[0]++;
+ return Pair.of(null, JobPriority.LOW);
}
};
- new MockUp<StatisticsUtil>() {
- int count = 0;
- int[] thresholds = {1, 10};
-
- @Mock
- public TableIf findTable(long catalogName, long dbName, long
tblName) {
- return tableIf;
- }
-
- @Mock
- public int getAutoAnalyzeTableWidthThreshold() {
- return thresholds[count++];
- }
- };
-
- AnalysisInfo analysisInfo = new AnalysisInfoBuilder().build();
- StatisticsAutoCollector statisticsAutoCollector = new
StatisticsAutoCollector();
-
Assertions.assertNull(statisticsAutoCollector.getNeedAnalyzeColumns(analysisInfo));
-
Assertions.assertNotNull(statisticsAutoCollector.getNeedAnalyzeColumns(analysisInfo));
- }
-
- @Test
- public void testLoop() {
- AtomicBoolean timeChecked = new AtomicBoolean();
- AtomicBoolean switchChecked = new AtomicBoolean();
- new MockUp<StatisticsUtil>() {
-
- @Mock
- public boolean inAnalyzeTime(LocalTime now) {
- timeChecked.set(true);
- return true;
- }
-
- @Mock
- public boolean enableAutoAnalyze() {
- switchChecked.set(true);
- return true;
- }
- };
- StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
- autoCollector.collect();
- Assertions.assertTrue(timeChecked.get() && switchChecked.get());
-
- }
-
- @Test
- public void checkAvailableThread() {
- StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
-
Assertions.assertEquals(Config.auto_analyze_simultaneously_running_task_num,
-
autoCollector.analysisTaskExecutor.executors.getMaximumPoolSize());
- }
-
- @Test
- public void testSkip(@Mocked OlapTable olapTable, @Mocked TableStatsMeta
stats, @Mocked TableIf anyOtherTable) {
- new MockUp<OlapTable>() {
-
- @Mock
- public long getDataSize(boolean singleReplica) {
- return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5
+ 1000000000;
- }
- };
-
- new MockUp<AnalysisManager>() {
-
- @Mock
- public TableStatsMeta findTableStatsStatus(long tblId) {
- return stats;
- }
- };
- // A very huge table has been updated recently, so we should skip it
this time
- stats.updatedTime = System.currentTimeMillis() - 1000;
- stats.newPartitionLoaded = new AtomicBoolean();
- stats.newPartitionLoaded.set(true);
- StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
- // Test new partition loaded data for the first time. Not skip.
- Assertions.assertFalse(autoCollector.skip(olapTable));
- stats.newPartitionLoaded.set(false);
- // Assertions.assertTrue(autoCollector.skip(olapTable));
- // The update of this huge table is long time ago, so we shouldn't
skip it this time
- stats.updatedTime = System.currentTimeMillis()
- - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() -
10000;
- Assertions.assertFalse(autoCollector.skip(olapTable));
new MockUp<AnalysisManager>() {
-
@Mock
public TableStatsMeta findTableStatsStatus(long tblId) {
+ count[1]++;
return null;
}
};
- // can't find table stats meta, which means this table never get
analyzed, so we shouldn't skip it this time
- Assertions.assertFalse(autoCollector.skip(olapTable));
- new MockUp<AnalysisManager>() {
-
- @Mock
- public TableStatsMeta findTableStatsStatus(long tblId) {
- return stats;
- }
- };
- stats.userInjected = true;
- Assertions.assertTrue(autoCollector.skip(olapTable));
- // this is not olap table nor external table, so we should skip it
this time
- Assertions.assertTrue(autoCollector.skip(anyOtherTable));
- }
-
- // For small table, use full
- @Test
- public void testCreateAnalyzeJobForTbl1(
- @Injectable OlapTable t1,
- @Injectable Database db
- ) throws Exception {
- new MockUp<Database>() {
-
- @Mock
- public CatalogIf getCatalog() {
- return Env.getCurrentInternalCatalog();
- }
-
- @Mock
- public long getId() {
- return 0;
- }
- };
- new MockUp<OlapTable>() {
-
- int count = 0;
-
- @Mock
- public List<Column> getBaseSchema() {
- return Lists.newArrayList(new Column("test",
PrimitiveType.INT));
- }
-
- @Mock
- public long getDataSize(boolean singleReplica) {
- return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() - 1;
- }
-
- @Mock
- public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
- return new OlapAnalysisTask(info);
- }
-
- @Mock
- public List<Long> getMvColumnIndexIds(String columnName) {
- ArrayList<Long> objects = new ArrayList<>();
- objects.add(-1L);
- return objects;
- }
-
- @Mock
- public long getRowCount() {
- return 1;
- }
- };
-
- new MockUp<StatisticsUtil>() {
- @Mock
- public TableIf findTable(long catalogId, long dbId, long tblId) {
- return t1;
- }
- };
-
- StatisticsAutoCollector sac = new StatisticsAutoCollector();
- List<AnalysisInfo> jobInfos = new ArrayList<>();
- sac.createAnalyzeJobForTbl(db, jobInfos, t1);
- AnalysisInfo jobInfo = jobInfos.get(0);
- List<Pair<String, String>> columnNames = Lists.newArrayList();
- columnNames.add(Pair.of("test", "t1"));
- jobInfo = new
AnalysisInfoBuilder(jobInfo).setJobColumns(columnNames).build();
- Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
- AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
- analysisManager.createTaskForEachColumns(jobInfo, analysisTasks,
false);
- Assertions.assertEquals(1, analysisTasks.size());
- for (BaseAnalysisTask task : analysisTasks.values()) {
- Assertions.assertNull(task.getTableSample());
- }
- }
-
- // for big table, use sample
- @Test
- public void testCreateAnalyzeJobForTbl2(
- @Injectable OlapTable t1,
- @Injectable Database db
- ) throws Exception {
- new MockUp<Database>() {
-
- @Mock
- public CatalogIf getCatalog() {
- return Env.getCurrentInternalCatalog();
- }
-
- @Mock
- public long getId() {
- return 0;
- }
- };
- new MockUp<OlapTable>() {
-
- int count = 0;
-
- @Mock
- public List<Column> getBaseSchema() {
- return Lists.newArrayList(new Column("test",
PrimitiveType.INT));
- }
-
- @Mock
- public long getDataSize(boolean singleReplica) {
- return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 2;
- }
-
- @Mock
- public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
- return new OlapAnalysisTask(info);
- }
-
- @Mock
- public List<Long> getMvColumnIndexIds(String columnName) {
- ArrayList<Long> objects = new ArrayList<>();
- objects.add(-1L);
- return objects;
- }
-
- @Mock
- public long getRowCount() {
- return 1;
- }
- };
-
- new MockUp<StatisticsUtil>() {
- @Mock
- public TableIf findTable(long catalogId, long dbId, long tblId) {
- return t1;
- }
- };
-
- StatisticsAutoCollector sac = new StatisticsAutoCollector();
- List<AnalysisInfo> jobInfos = new ArrayList<>();
- sac.createAnalyzeJobForTbl(db, jobInfos, t1);
- AnalysisInfo jobInfo = jobInfos.get(0);
- List<Pair<String, String>> colNames = Lists.newArrayList();
- colNames.add(Pair.of("test", "1"));
- jobInfo = new
AnalysisInfoBuilder(jobInfo).setJobColumns(colNames).build();
- Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
- AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
- analysisManager.createTaskForEachColumns(jobInfo, analysisTasks,
false);
- Assertions.assertEquals(1, analysisTasks.size());
- for (BaseAnalysisTask task : analysisTasks.values()) {
- Assertions.assertNotNull(task.getTableSample());
- }
- }
-
- @Test
- public void testDisableAuto1() throws Exception {
- InternalCatalog catalog1 = EnvFactory.createInternalCatalog();
- List<CatalogIf> catalogs = Lists.newArrayList();
- catalogs.add(catalog1);
+ collector.collect();
+ Assertions.assertEquals(1, count[0]);
+ Assertions.assertEquals(0, count[1]);
+ OlapTable table = new OlapTable();
new MockUp<StatisticsAutoCollector>() {
@Mock
- public List<CatalogIf> getCatalogsInOrder() {
- return catalogs;
- }
-
- @Mock
- protected boolean canCollect() {
- return false;
+ protected Pair<TableIf, JobPriority> fetchOneJob() {
+ if (count[0] == 0) {
+ count[0]++;
+ return Pair.of(table, JobPriority.LOW);
+ }
+ count[0]++;
+ return Pair.of(null, JobPriority.LOW);
}
-
};
-
- StatisticsAutoCollector sac = new StatisticsAutoCollector();
- new Expectations(catalog1) {{
- catalog1.enableAutoAnalyze();
- times = 0;
- }};
-
- sac.analyzeAll();
+ count[0] = 0;
+ count[1] = 0;
+ collector.collect();
+ Assertions.assertEquals(2, count[0]);
+ Assertions.assertEquals(1, count[1]);
}
@Test
- public void testDisableAuto2() throws Exception {
- InternalCatalog catalog1 = EnvFactory.createInternalCatalog();
- List<CatalogIf> catalogs = Lists.newArrayList();
- catalogs.add(catalog1);
-
- Database db1 = new Database();
- List<DatabaseIf<? extends TableIf>> dbs = Lists.newArrayList();
- dbs.add(db1);
-
- new MockUp<StatisticsAutoCollector>() {
- int count = 0;
- boolean[] canCollectReturn = {true, false};
- @Mock
- public List<CatalogIf> getCatalogsInOrder() {
- return catalogs;
- }
-
- @Mock
- public List<DatabaseIf<? extends TableIf>>
getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) {
- return dbs;
- }
-
- @Mock
- protected boolean canCollect() {
- return canCollectReturn[count++];
- }
-
- };
-
- StatisticsAutoCollector sac = new StatisticsAutoCollector();
- new Expectations(catalog1, db1) {{
- catalog1.enableAutoAnalyze();
- result = true;
- times = 1;
- db1.getFullName();
- times = 0;
- }};
-
- sac.analyzeAll();
+ public void testFetchOneJob() throws InterruptedException {
+ OlapTable table1 = new OlapTable();
+ OlapTable table2 = new OlapTable();
+ StatisticsAutoCollector collector = new StatisticsAutoCollector();
+ collector.appendToHighPriorityJobs(table1);
+ collector.appendToLowPriorityJobs(table2);
+ Pair<TableIf, JobPriority> jobPair = collector.fetchOneJob();
+ Assertions.assertSame(table1, jobPair.first);
+ Assertions.assertEquals(JobPriority.HIGH, jobPair.second);
+ jobPair = collector.fetchOneJob();
+ Assertions.assertSame(table2, jobPair.first);
+ Assertions.assertEquals(JobPriority.LOW, jobPair.second);
+ jobPair = collector.fetchOneJob();
+ Assertions.assertNull(jobPair.first);
}
@Test
- public void testCreateAnalyzeJobForTbl() {
+ public void testTableRowCountReported() {
StatisticsAutoCollector collector = new StatisticsAutoCollector();
- OlapTable table = new OlapTable();
+ ExternalTable externalTable = new ExternalTable();
+ Assertions.assertTrue(collector.tableRowCountReported(externalTable,
AnalysisMethod.SAMPLE));
+ OlapTable olapTable = new OlapTable();
+ Assertions.assertTrue(collector.tableRowCountReported(olapTable,
AnalysisMethod.FULL));
+ Assertions.assertTrue(collector.tableRowCountReported(externalTable,
AnalysisMethod.FULL));
new MockUp<OlapTable>() {
- @Mock
- public long getDataSize(boolean singleReplica) {
- return 100;
- }
-
@Mock
public long getRowCountForIndex(long indexId, boolean strict) {
- return -1;
- }
-
- @Mock
- public boolean isPartitionedTable() {
- return false;
+ return TableIf.UNKNOWN_ROW_COUNT;
}
};
- List<AnalysisInfo> infos = Lists.newArrayList();
- collector.createAnalyzeJobForTbl(null, infos, table);
- Assertions.assertEquals(0, infos.size());
+ Assertions.assertFalse(collector.tableRowCountReported(olapTable,
AnalysisMethod.SAMPLE));
new MockUp<OlapTable>() {
@Mock
public long getRowCountForIndex(long indexId, boolean strict) {
- return 100;
+ return TableIf.UNKNOWN_ROW_COUNT + 1;
}
};
- Assertions.assertThrows(NullPointerException.class, () ->
collector.createAnalyzeJobForTbl(null, infos, table));
+ Assertions.assertTrue(collector.tableRowCountReported(olapTable,
AnalysisMethod.SAMPLE));
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsJobAppenderTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsJobAppenderTest.java
new file mode 100644
index 00000000000..f57791a1a5c
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsJobAppenderTest.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class StatisticsJobAppenderTest {
+ @Test
+ public void testSkip(@Mocked OlapTable olapTable, @Mocked TableStatsMeta
stats, @Mocked TableIf anyOtherTable) {
+ new MockUp<OlapTable>() {
+
+ @Mock
+ public long getDataSize(boolean singleReplica) {
+ return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5
+ 1000000000;
+ }
+ };
+
+ new MockUp<AnalysisManager>() {
+
+ @Mock
+ public TableStatsMeta findTableStatsStatus(long tblId) {
+ return stats;
+ }
+ };
+ // A very huge table has been updated recently, so we should skip it
this time
+ stats.updatedTime = System.currentTimeMillis() - 1000;
+ stats.newPartitionLoaded = new AtomicBoolean();
+ stats.newPartitionLoaded.set(true);
+ StatisticsJobAppender appender = new StatisticsJobAppender("appender");
+ // Test new partition loaded data for the first time. Not skip.
+ Assertions.assertFalse(appender.skip(olapTable));
+ stats.newPartitionLoaded.set(false);
+ // The update of this huge table is long time ago, so we shouldn't
skip it this time
+ stats.updatedTime = System.currentTimeMillis()
+ - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() -
10000;
+ Assertions.assertFalse(appender.skip(olapTable));
+ new MockUp<AnalysisManager>() {
+
+ @Mock
+ public TableStatsMeta findTableStatsStatus(long tblId) {
+ return null;
+ }
+ };
+ // can't find table stats meta, which means this table never get
analyzed, so we shouldn't skip it this time
+ Assertions.assertFalse(appender.skip(olapTable));
+ new MockUp<AnalysisManager>() {
+
+ @Mock
+ public TableStatsMeta findTableStatsStatus(long tblId) {
+ return stats;
+ }
+ };
+ stats.userInjected = true;
+ Assertions.assertTrue(appender.skip(olapTable));
+
+ // this is not olap table nor external table, so we should skip it
this time
+ Assertions.assertTrue(appender.skip(anyOtherTable));
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
index 724e0363833..0e221673e9d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
@@ -17,10 +17,20 @@
package org.apache.doris.statistics.util;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
+import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
+import org.apache.doris.statistics.AnalysisInfo.JobType;
+import org.apache.doris.statistics.ColStatsMeta;
import org.apache.doris.statistics.ResultRow;
+import org.apache.doris.statistics.TableStatsMeta;
import com.google.common.collect.Lists;
import mockit.Mock;
@@ -33,6 +43,8 @@ import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Base64;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
class StatisticsUtilTest {
@Test
@@ -150,4 +162,77 @@ class StatisticsUtilTest {
// \\''""
Assertions.assertEquals("\\\\''\"", StatisticsUtil.escapeSQL(origin));
}
+
+ @Test
+ public void testTableNotAnalyzedForTooLong() throws InterruptedException {
+ TableStatsMeta tableMeta = new TableStatsMeta();
+ OlapTable olapTable = new OlapTable();
+ ExternalTable externalTable = new ExternalTable();
+
+ // Test table or stats is null
+ Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(null,
tableMeta));
+
Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable,
null));
+
+ // Test user injected
+ tableMeta.userInjected = true;
+
Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable,
tableMeta));
+
+ // Test External table
+ tableMeta.userInjected = false;
+
Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(externalTable,
tableMeta));
+
+ // Test config is 0
+ Config.auto_analyze_interval_seconds = 0;
+
Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable,
tableMeta));
+
+ // Test time not long enough
+ Config.auto_analyze_interval_seconds = 86400;
+ tableMeta.lastAnalyzeTime = System.currentTimeMillis();
+
Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable,
tableMeta));
+
+ // Test time long enough and update rows > 0
+ Config.auto_analyze_interval_seconds = 1;
+ tableMeta.lastAnalyzeTime = System.currentTimeMillis();
+ Thread.sleep(2000);
+ tableMeta.updatedRows.set(10);
+
Assertions.assertTrue(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable,
tableMeta));
+
+ // Test row count is not equal with last analyze
+ tableMeta.updatedRows.set(0);
+ tableMeta.rowCount = 10;
+ new MockUp<Table>() {
+ @Mock
+ public long getRowCount() {
+ return 100;
+ }
+ };
+
Assertions.assertTrue(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable,
tableMeta));
+
+ // Test visible version changed
+ new MockUp<OlapTable>() {
+ @Mock
+ public long getVisibleVersion() {
+ return 100;
+ }
+ };
+ new MockUp<Table>() {
+ @Mock
+ public long getRowCount() {
+ return 10;
+ }
+ };
+ ConcurrentMap<Pair<String, String>, ColStatsMeta> colToColStatsMeta =
new ConcurrentHashMap<>();
+ ColStatsMeta col1Meta = new ColStatsMeta(0, AnalysisMethod.SAMPLE,
AnalysisType.FUNDAMENTALS, JobType.SYSTEM, 0, 100);
+ ColStatsMeta col2Meta = new ColStatsMeta(0, AnalysisMethod.SAMPLE,
AnalysisType.FUNDAMENTALS, JobType.SYSTEM, 0, 101);
+ colToColStatsMeta.put(Pair.of("index1", "col1"), col1Meta);
+ colToColStatsMeta.put(Pair.of("index2", "col2"), col2Meta);
+ tableMeta.setColToColStatsMeta(colToColStatsMeta);
+
Assertions.assertTrue(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable,
tableMeta));
+
+ // Test visible version unchanged.
+ col2Meta = new ColStatsMeta(0, AnalysisMethod.SAMPLE,
AnalysisType.FUNDAMENTALS, JobType.SYSTEM, 0, 100);
+ colToColStatsMeta.put(Pair.of("index2", "col2"), col2Meta);
+ tableMeta.setColToColStatsMeta(colToColStatsMeta);
+
Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable,
tableMeta));
+ }
}
diff --git
a/regression-test/suites/external_table_p0/hive/test_hive_statistic_auto.groovy
b/regression-test/suites/external_table_p0/hive/test_hive_statistic_auto.groovy
index 8a34bf9204f..56f5a0bd338 100644
---
a/regression-test/suites/external_table_p0/hive/test_hive_statistic_auto.groovy
+++
b/regression-test/suites/external_table_p0/hive/test_hive_statistic_auto.groovy
@@ -36,13 +36,14 @@ suite("test_hive_statistic_auto",
"p0,external,hive,external_docker,external_doc
logger.info("catalog " + catalog_name + " created")
// Test analyze table without init.
- sql """analyze database ${catalog_name}.statistics
PROPERTIES("use.auto.analyzer"="true")"""
+ sql """analyze table ${catalog_name}.statistics.statistics
PROPERTIES("use.auto.analyzer"="true")"""
sql """use ${catalog_name}.statistics"""
for (int i = 0; i < 10; i++) {
- Thread.sleep(1000)
+ Thread.sleep(2000)
def result = sql """show column stats `statistics` (lo_quantity)"""
- if (result.size <= 0) {
+ if (result.size() <= 0) {
+ sql """analyze table ${catalog_name}.statistics.statistics
PROPERTIES("use.auto.analyzer"="true")"""
continue;
}
assertEquals(result.size(), 1)
@@ -56,7 +57,7 @@ suite("test_hive_statistic_auto",
"p0,external,hive,external_docker,external_doc
assertEquals(result[0][8], "N/A")
result = sql """show column stats `statistics` (lo_orderkey)"""
- if (result.size <= 0) {
+ if (result.size() <= 0) {
continue;
}
assertEquals(result.size(), 1)
@@ -70,7 +71,7 @@ suite("test_hive_statistic_auto",
"p0,external,hive,external_docker,external_doc
assertEquals(result[0][8], "N/A")
result = sql """show column stats `statistics` (lo_linenumber)"""
- if (result.size <= 0) {
+ if (result.size() <= 0) {
continue;
}
assertEquals(result.size(), 1)
@@ -82,6 +83,7 @@ suite("test_hive_statistic_auto",
"p0,external,hive,external_docker,external_doc
assertEquals(result[0][6], "4.0")
assertEquals(result[0][7], "N/A")
assertEquals(result[0][8], "N/A")
+ break
}
sql """drop catalog ${catalog_name}"""
diff --git a/regression-test/suites/statistics/analyze_stats.groovy
b/regression-test/suites/statistics/analyze_stats.groovy
index b4edc5e9d7b..c518a15d7dd 100644
--- a/regression-test/suites/statistics/analyze_stats.groovy
+++ b/regression-test/suites/statistics/analyze_stats.groovy
@@ -2837,7 +2837,7 @@ PARTITION `p599` VALUES IN (599)
// Test auto analyze with job type SYSTEM
sql """drop stats trigger_test"""
- sql """analyze database trigger PROPERTIES("use.auto.analyzer"="true")"""
+ sql """analyze table trigger_test PROPERTIES("use.auto.analyzer"="true")"""
int i = 0;
for (0; i < 10; i++) {
result = sql """show column stats trigger_test"""
@@ -2941,7 +2941,38 @@ PARTITION `p599` VALUES IN (599)
new_part_result = sql """show column stats part(colint)"""
assertEquals("2.0", new_part_result[0][2])
-
sql """DROP DATABASE IF EXISTS trigger"""
+
+ // Test show last analyze table version
+ sql """create database if not exists test_version"""
+ sql """use test_version"""
+ sql """drop table if exists region"""
+ sql """
+ CREATE TABLE region (
+ r_regionkey int NOT NULL,
+ r_name VARCHAR(25) NOT NULL,
+ r_comment VARCHAR(152)
+ )ENGINE=OLAP
+ DUPLICATE KEY(`r_regionkey`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`r_regionkey`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """analyze table region with sync"""
+ def versionResult = sql """show column stats region"""
+ assertEquals(versionResult[0][14], "1")
+ assertEquals(versionResult[1][14], "1")
+ assertEquals(versionResult[2][14], "1")
+
+ sql """insert into region values (1, "1", "1")"""
+ sql """analyze table region with sync"""
+ versionResult = sql """show column stats region"""
+ assertEquals(versionResult[0][14], "2")
+ assertEquals(versionResult[1][14], "2")
+ assertEquals(versionResult[2][14], "2")
+
+ sql """drop database if exists test_version"""
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]