This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b72c71dec0 [fix](stats) Analysis jobs didn't get persisted properly
(#18602)
b72c71dec0 is described below
commit b72c71dec06f47c1a4a4200eef3a873d6e6c7841
Author: AKIRA <[email protected]>
AuthorDate: Thu Apr 13 17:36:06 2023 +0900
[fix](stats) Analysis jobs didn't get persisted properly (#18602)
In previous implementation, Doris would only persist one task to tract
analysis job status. After this PR, each task of column analysis would be
persisted.And store a record which task_id is -1 as the job of the user
submitted AnalyzeStmt.
AnalyzeStmt <---1-1---> AnalysisJob
AnalysisJob <---1-n---> AnalysisTask
---
.../main/java/org/apache/doris/catalog/Env.java | 2 +-
.../doris/catalog/InternalSchemaInitializer.java | 4 +-
.../apache/doris/statistics/AnalysisManager.java | 126 +++++++++++++--------
.../apache/doris/statistics/AnalysisTaskInfo.java | 4 +
.../doris/statistics/StatisticsRepository.java | 4 +-
5 files changed, 90 insertions(+), 50 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 5bdf74f1fe..0c79634103 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -5350,7 +5350,7 @@ public class Env {
// 1. handle partition level analysis statement properly
// 2. support sample job
// 3. support period job
- public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
+ public void createAnalysisJob(AnalyzeStmt analyzeStmt) throws DdlException
{
analysisManager.createAnalysisJob(analyzeStmt);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
index 6feec883a7..3f421d5a86 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
@@ -191,11 +191,11 @@ public class InternalSchemaInitializer extends Thread {
columnDefs.add(new ColumnDef("schedule_type",
TypeDef.createVarchar(32)));
String engineName = "olap";
KeysDesc keysDesc = new KeysDesc(KeysType.UNIQUE_KEYS,
- Lists.newArrayList("job_id"));
+ Lists.newArrayList("job_id", "task_id"));
DistributionDesc distributionDesc = new HashDistributionDesc(
StatisticConstants.STATISTIC_TABLE_BUCKET_COUNT,
- Lists.newArrayList("job_id"));
+ Lists.newArrayList("job_id", "task_id"));
Map<String, String> properties = new HashMap<String, String>() {
{
put("replication_num",
String.valueOf(Config.statistic_internal_table_replica_num));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 8eba9eb8fb..ee3c15eae4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -62,7 +62,7 @@ public class AnalysisManager {
private static final String UPDATE_JOB_STATE_SQL_TEMPLATE = "UPDATE "
+ FeConstants.INTERNAL_DB_NAME + "." +
StatisticConstants.ANALYSIS_JOB_TABLE + " "
- + "SET state = '${jobState}' ${message} ${updateExecTime} WHERE
job_id = ${jobId}";
+ + "SET state = '${jobState}' ${message} ${updateExecTime} WHERE
job_id = ${jobId} and task_id=${taskId}";
private static final String SHOW_JOB_STATE_SQL_TEMPLATE = "SELECT "
+ "job_id, catalog_name, db_name, tbl_name, col_name, job_type, "
@@ -90,7 +90,8 @@ public class AnalysisManager {
return statisticsCache;
}
- public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
+ // Each analyze stmt corresponding to an analysis job.
+ public void createAnalysisJob(AnalyzeStmt analyzeStmt) throws DdlException
{
String catalogName = analyzeStmt.getCatalogName();
String db = analyzeStmt.getDBName();
TableName tbl = analyzeStmt.getTblName();
@@ -99,7 +100,6 @@ public class AnalysisManager {
Set<String> partitionNames = analyzeStmt.getPartitionNames();
Map<Long, AnalysisTaskInfo> analysisTaskInfos = new HashMap<>();
long jobId = Env.getCurrentEnv().getNextId();
-
// If the analysis is not incremental, need to delete existing
statistics.
// we cannot collect histograms incrementally and do not support it
if (!analyzeStmt.isIncrement && !analyzeStmt.isHistogram) {
@@ -112,58 +112,87 @@ public class AnalysisManager {
StatisticsRepository.dropStatistics(dbId, tblIds, colNames,
partIds);
}
- if (colNames != null) {
- for (String colName : colNames) {
+ createTaskForEachColumns(analyzeStmt, catalogName, db, tbl, colNames,
partitionNames, analysisTaskInfos, jobId);
+ createTaskForMVIdx(analyzeStmt, catalogName, db, tbl, partitionNames,
analysisTaskInfos, jobId);
+ persistAnalysisJob(catalogName, db, tbl, jobId);
+
+ if (analyzeStmt.isSync()) {
+ syncExecute(analysisTaskInfos.values());
+ return;
+ }
+
+ analysisJobIdToTaskMap.put(jobId, analysisTaskInfos);
+ analysisTaskInfos.values().forEach(taskScheduler::schedule);
+ }
+
+ private void persistAnalysisJob(String catalogName, String db, TableName
tbl,
+ long jobId) throws DdlException {
+ try {
+ AnalysisTaskInfo analysisTaskInfo = new
AnalysisTaskInfoBuilder().setJobId(
+ jobId).setTaskId(-1)
+ .setCatalogName(catalogName).setDbName(db)
+ .setTblName(tbl.getTbl())
+ .setJobType(JobType.MANUAL)
+
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX)
+ .setScheduleType(ScheduleType.ONCE).build();
+ StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
+ } catch (Throwable t) {
+ throw new DdlException(t.getMessage(), t);
+ }
+ }
+
+ private void createTaskForMVIdx(AnalyzeStmt analyzeStmt, String
catalogName, String db, TableName tbl,
+ Set<String> partitionNames, Map<Long, AnalysisTaskInfo>
analysisTaskInfos, long jobId) throws DdlException {
+ if (!(analyzeStmt.isWholeTbl &&
analyzeStmt.getTable().getType().equals(TableType.OLAP))) {
+ return;
+ }
+ OlapTable olapTable = (OlapTable) analyzeStmt.getTable();
+ try {
+ olapTable.readLock();
+ for (MaterializedIndexMeta meta :
olapTable.getIndexIdToMeta().values()) {
+ if (meta.getDefineStmt() == null) {
+ continue;
+ }
long taskId = Env.getCurrentEnv().getNextId();
- AnalysisType analType = analyzeStmt.isHistogram ?
AnalysisType.HISTOGRAM : AnalysisType.COLUMN;
- AnalysisTaskInfo analysisTaskInfo = new
AnalysisTaskInfoBuilder().setJobId(jobId)
-
.setTaskId(taskId).setCatalogName(catalogName).setDbName(db)
- .setTblName(tbl.getTbl()).setColName(colName)
-
.setPartitionNames(partitionNames).setJobType(JobType.MANUAL)
-
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(analType)
- .setState(AnalysisState.PENDING)
+ AnalysisTaskInfo analysisTaskInfo = new
AnalysisTaskInfoBuilder().setJobId(
+ jobId).setTaskId(taskId)
+ .setCatalogName(catalogName).setDbName(db)
+
.setTblName(tbl.getTbl()).setPartitionNames(partitionNames)
+
.setIndexId(meta.getIndexId()).setJobType(JobType.MANUAL)
+
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX)
.setScheduleType(ScheduleType.ONCE).build();
try {
- StatisticsRepository.createAnalysisTask(analysisTaskInfo);
+ StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
} catch (Exception e) {
- throw new RuntimeException("Failed to create analysis
job", e);
+ throw new DdlException("Failed to create analysis task",
e);
}
analysisTaskInfos.put(taskId, analysisTaskInfo);
}
+ } finally {
+ olapTable.readUnlock();
}
- if (analyzeStmt.isWholeTbl &&
analyzeStmt.getTable().getType().equals(TableType.OLAP)) {
- OlapTable olapTable = (OlapTable) analyzeStmt.getTable();
+ }
+
+ private void createTaskForEachColumns(AnalyzeStmt analyzeStmt, String
catalogName, String db, TableName tbl,
+ Set<String> colNames, Set<String> partitionNames, Map<Long,
AnalysisTaskInfo> analysisTaskInfos,
+ long jobId) throws DdlException {
+ for (String colName : colNames) {
+ long taskId = Env.getCurrentEnv().getNextId();
+ AnalysisType analType = analyzeStmt.isHistogram ?
AnalysisType.HISTOGRAM : AnalysisType.COLUMN;
+ AnalysisTaskInfo analysisTaskInfo = new
AnalysisTaskInfoBuilder().setJobId(jobId)
+
.setTaskId(taskId).setCatalogName(catalogName).setDbName(db)
+ .setTblName(tbl.getTbl()).setColName(colName)
+
.setPartitionNames(partitionNames).setJobType(JobType.MANUAL)
+
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(analType)
+ .setState(AnalysisState.PENDING)
+ .setScheduleType(ScheduleType.ONCE).build();
try {
- olapTable.readLock();
- for (MaterializedIndexMeta meta :
olapTable.getIndexIdToMeta().values()) {
- if (meta.getDefineStmt() == null) {
- continue;
- }
- long taskId = Env.getCurrentEnv().getNextId();
- AnalysisTaskInfo analysisTaskInfo = new
AnalysisTaskInfoBuilder().setJobId(
- jobId).setTaskId(taskId)
- .setCatalogName(catalogName).setDbName(db)
-
.setTblName(tbl.getTbl()).setPartitionNames(partitionNames)
-
.setIndexId(meta.getIndexId()).setJobType(JobType.MANUAL)
-
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX)
- .setScheduleType(ScheduleType.ONCE).build();
- try {
-
StatisticsRepository.createAnalysisTask(analysisTaskInfo);
- } catch (Exception e) {
- throw new RuntimeException("Failed to create analysis
job", e);
- }
- analysisTaskInfos.put(taskId, analysisTaskInfo);
- }
- } finally {
- olapTable.readUnlock();
+ StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
+ } catch (Exception e) {
+ throw new DdlException("Failed to create analysis task", e);
}
+ analysisTaskInfos.put(taskId, analysisTaskInfo);
}
- if (analyzeStmt.isSync()) {
- syncExecute(analysisTaskInfos.values());
- return;
- }
- analysisJobIdToTaskMap.put(jobId, analysisTaskInfos);
- analysisTaskInfos.values().forEach(taskScheduler::schedule);
}
public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState
jobState, String message, long time) {
@@ -172,16 +201,23 @@ public class AnalysisManager {
params.put("message", StringUtils.isNotEmpty(message) ?
String.format(", message = '%s'", message) : "");
params.put("updateExecTime", time == -1 ? "" : ",
last_exec_time_in_ms=" + time);
params.put("jobId", String.valueOf(info.jobId));
+ params.put("taskId", String.valueOf(info.taskId));
try {
StatisticsUtil.execUpdate(new
StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE));
} catch (Exception e) {
- LOG.warn(String.format("Failed to update state for job: %s",
info.jobId), e);
+ LOG.warn(String.format("Failed to update state for task: %d, %d",
info.jobId, info.taskId), e);
} finally {
info.state = jobState;
if (analysisJobIdToTaskMap.get(info.jobId).values()
.stream().allMatch(i -> i.state != null
&& i.state != AnalysisState.PENDING && i.state !=
AnalysisState.RUNNING)) {
analysisJobIdToTaskMap.remove(info.jobId);
+ params.put("taskId", String.valueOf(-1));
+ try {
+ StatisticsUtil.execUpdate(new
StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE));
+ } catch (Exception e) {
+ LOG.warn(String.format("Failed to update state for job:
%s", info.jobId), e);
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
index 9f5f608229..def16de41c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
@@ -127,4 +127,8 @@ public class AnalysisTaskInfo {
public AnalysisState getState() {
return state;
}
+
+ public boolean isJob() {
+ return taskId == -1;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
index 9199c74b25..c18cb38d0a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
@@ -198,14 +198,14 @@ public class StatisticsRepository {
predicate.append(partPredicate);
}
- public static void createAnalysisTask(AnalysisTaskInfo analysisTaskInfo)
throws Exception {
+ public static void persistAnalysisTask(AnalysisTaskInfo analysisTaskInfo)
throws Exception {
Map<String, String> params = new HashMap<>();
params.put("jobId", String.valueOf(analysisTaskInfo.jobId));
params.put("taskId", String.valueOf(analysisTaskInfo.taskId));
params.put("catalogName", analysisTaskInfo.catalogName);
params.put("dbName", analysisTaskInfo.dbName);
params.put("tblName", analysisTaskInfo.tblName);
- params.put("colName", analysisTaskInfo.colName);
+ params.put("colName", analysisTaskInfo.colName == null ? "" :
analysisTaskInfo.colName);
params.put("indexId", analysisTaskInfo.indexId == null ? "-1" :
String.valueOf(analysisTaskInfo.indexId));
params.put("jobType", analysisTaskInfo.jobType.toString());
params.put("analysisType", analysisTaskInfo.analysisMethod.toString());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]