This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2d1f597413 [Fix](statistics)Fix hive table statistic bug (#19365)
2d1f597413 is described below
commit 2d1f597413ecb70597cc54436dab5c8a2eaf55b8
Author: Jibing-Li <[email protected]>
AuthorDate: Thu May 11 07:48:58 2023 +0800
[Fix](statistics)Fix hive table statistic bug (#19365)
Fix hive table statistic bug. Collect table/partition level statistics.
---
.../doris/catalog/external/HMSExternalTable.java | 15 ++++
.../apache/doris/statistics/AnalysisManager.java | 26 +++++++
.../apache/doris/statistics/AnalysisTaskInfo.java | 7 +-
.../doris/statistics/AnalysisTaskInfoBuilder.java | 12 +++-
.../apache/doris/statistics/BaseAnalysisTask.java | 4 ++
.../apache/doris/statistics/ColumnStatistic.java | 7 +-
.../apache/doris/statistics/HMSAnalysisTask.java | 8 +--
.../apache/doris/statistics/HiveAnalysisTask.java | 84 +++++++++++++++++-----
.../doris/statistics/IcebergAnalysisTask.java | 2 +-
9 files changed, 134 insertions(+), 31 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 8aecb96bb3..b94804ac61 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -18,12 +18,15 @@
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.Type;
+import org.apache.doris.common.Config;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
import org.apache.doris.statistics.AnalysisTaskInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
+import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.HiveAnalysisTask;
import org.apache.doris.statistics.IcebergAnalysisTask;
import org.apache.doris.thrift.THiveTable;
@@ -322,6 +325,18 @@ public class HMSExternalTable extends ExternalTable {
return columns;
}
+ @Override
+ public long estimatedRowCount() {
+ ColumnStatistic cache = Config.enable_stats
+ ?
Env.getCurrentEnv().getStatisticsCache().getColumnStatistics(id, "")
+ : ColumnStatistic.UNKNOWN;
+ if (cache == ColumnStatistic.UNKNOWN) {
+ return 1;
+ } else {
+ return (long) cache.count;
+ }
+ }
+
private List<Column> getIcebergSchema(List<FieldSchema> hmsSchema) {
Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(this);
Schema schema = icebergTable.schema();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 688924949d..d12baf0bb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -124,6 +124,7 @@ public class AnalysisManager {
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
createTaskForMVIdx(jobInfo, analysisTaskInfos, isSync);
+ createTaskForExternalTable(jobInfo, analysisTaskInfos, isSync);
ConnectContext ctx = ConnectContext.get();
if (!isSync || ctx.getSessionVariable().enableSaveStatisticsSyncJob) {
@@ -425,6 +426,31 @@ public class AnalysisManager {
}
}
+ private void createTaskForExternalTable(AnalysisTaskInfo jobInfo,
+ Map<Long, BaseAnalysisTask>
analysisTasks,
+ boolean isSync) throws
DdlException {
+ TableIf table;
+ try {
+ table = StatisticsUtil.findTable(jobInfo.catalogName,
jobInfo.dbName, jobInfo.tblName);
+ } catch (Throwable e) {
+ LOG.warn(e.getMessage());
+ return;
+ }
+ if (jobInfo.analysisType == AnalysisType.HISTOGRAM || table.getType()
!= TableType.HMS_EXTERNAL_TABLE) {
+ return;
+ }
+ AnalysisTaskInfoBuilder colTaskInfoBuilder = new
AnalysisTaskInfoBuilder(jobInfo);
+ long taskId = Env.getCurrentEnv().getNextId();
+ AnalysisTaskInfo analysisTaskInfo = colTaskInfoBuilder.setIndexId(-1L)
+ .setTaskId(taskId).setExternalTableLevelTask(true).build();
+ analysisTasks.put(taskId, createTask(analysisTaskInfo));
+ try {
+ StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
+ } catch (Exception e) {
+ throw new DdlException("Failed to create analysis task", e);
+ }
+ }
+
public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState
jobState, String message, long time) {
if (analysisJobIdToTaskMap.get(info.jobId) == null) {
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
index 8690682ea2..2860a472c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
@@ -104,11 +104,15 @@ public class AnalysisTaskInfo {
public String message;
+ // True means this task is a table level task for external table.
+ // This kind of task is mainly to collect the number of rows of a table.
+ public boolean externalTableLevelTask;
+
public AnalysisTaskInfo(long jobId, long taskId, String catalogName,
String dbName, String tblName,
Map<String, Set<String>> colToPartitions, String colName, Long
indexId, JobType jobType,
AnalysisMode analysisMode, AnalysisMethod analysisMethod,
AnalysisType analysisType,
int samplePercent, int sampleRows, int maxBucketNum, long
periodTimeInMs, String message,
- long lastExecTimeInMs, AnalysisState state, ScheduleType
scheduleType) {
+ long lastExecTimeInMs, AnalysisState state, ScheduleType
scheduleType, boolean isExternalTableLevelTask) {
this.jobId = jobId;
this.taskId = taskId;
this.catalogName = catalogName;
@@ -129,6 +133,7 @@ public class AnalysisTaskInfo {
this.lastExecTimeInMs = lastExecTimeInMs;
this.state = state;
this.scheduleType = scheduleType;
+ this.externalTableLevelTask = isExternalTableLevelTask;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java
index 5a6e6b41ad..acaae0baab 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java
@@ -47,6 +47,7 @@ public class AnalysisTaskInfoBuilder {
private AnalysisState state;
private ScheduleType scheduleType;
private String message;
+ private boolean externalTableLevelTask;
public AnalysisTaskInfoBuilder() {
}
@@ -174,10 +175,16 @@ public class AnalysisTaskInfoBuilder {
return this;
}
+ public AnalysisTaskInfoBuilder setExternalTableLevelTask(boolean
isTableLevel) {
+ this.externalTableLevelTask = isTableLevel;
+ return this;
+ }
+
public AnalysisTaskInfo build() {
return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName,
tblName, colToPartitions,
colName, indexId, jobType, analysisMode, analysisMethod,
analysisType, samplePercent,
- sampleRows, maxBucketNum, periodTimeInMs, message,
lastExecTimeInMs, state, scheduleType);
+ sampleRows, maxBucketNum, periodTimeInMs, message,
lastExecTimeInMs, state, scheduleType,
+ externalTableLevelTask);
}
public AnalysisTaskInfoBuilder copy() {
@@ -201,6 +208,7 @@ public class AnalysisTaskInfoBuilder {
.setMessage(message)
.setLastExecTimeInMs(lastExecTimeInMs)
.setState(state)
- .setScheduleType(scheduleType);
+ .setScheduleType(scheduleType)
+ .setExternalTableLevelTask(false);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index 0f62a39396..a528229012 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -142,6 +142,10 @@ public abstract class BaseAnalysisTask {
info, AnalysisState.FAILED,
String.format("Table with name %s not exists",
info.tblName), System.currentTimeMillis());
}
+ // External Table level task doesn't contain a column. Don't need to
do the column related analyze.
+ if (info.externalTableLevelTask) {
+ return;
+ }
if (info.analysisType != null &&
(info.analysisType.equals(AnalysisType.COLUMN)
|| info.analysisType.equals(AnalysisType.HISTOGRAM))) {
col = tbl.getColumn(info.colName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
index ef5da59d81..3dfc66adbd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
@@ -138,10 +138,9 @@ public class ColumnStatistic {
String colName = resultRow.getColumnValue("col_id");
Column col = StatisticsUtil.findColumn(catalogId, dbID, tblId,
idxId, colName);
if (col == null) {
- LOG.warn("Failed to deserialize column statistics, ctlId: {}
dbId: {}"
- + "tblId: {} column: {} not exists",
- catalogId, dbID, tblId, colName);
- return ColumnStatistic.UNKNOWN;
+ // Col is null indicates this information is external table
level info,
+ // which doesn't have a column.
+ return columnStatisticBuilder.build();
}
String min = resultRow.getColumnValue("min");
String max = resultRow.getColumnValue("max");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
index 5651f2617a..8bfaa1ca8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
@@ -34,7 +34,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
/**
* Collect the column level stats for external table through metadata.
*/
- protected void getColumnStatsByMeta() throws Exception {
+ protected void getStatsByMeta() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@@ -42,16 +42,16 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
* Collect the stats for external table through sql.
* @return ColumnStatistics
*/
- protected void getColumnStatsBySql() {
+ protected void getStatsBySql() {
throw new NotImplementedException("getColumnStatsBySql is not
implemented");
}
@Override
public void execute() throws Exception {
if (Config.collect_external_table_stats_by_sql) {
- getColumnStatsBySql();
+ getStatsBySql();
} else {
- getColumnStatsByMeta();
+ getStatsByMeta();
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java
index 3f96c1bccc..76f14a5a68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java
@@ -57,21 +57,63 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
public static final String TIMESTAMP = "transient_lastDdlTime";
public static final String DELIMITER = "-";
+ private final boolean isTableLevelTask;
+
public HiveAnalysisTask(AnalysisTaskInfo info) {
super(info);
+ isTableLevelTask = info.externalTableLevelTask;
}
- private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO "
+ private static final String ANALYZE_TABLE_COLUMN_SQL_TEMPLATE = "INSERT
INTO "
+ + "${internalDB}.${columnStatTbl}"
+ + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1',
'${colId}', NULL, "
+ + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize},
'${update_time}')";
+
+ private static final String ANALYZE_PARTITION_COLUMN_SQL_TEMPLATE =
"INSERT INTO "
+ "${internalDB}.${columnStatTbl}"
+ " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1',
'${colId}', '${partId}', "
+ "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize},
'${update_time}')";
private static final String ANALYZE_TABLE_SQL_TEMPLATE = "INSERT INTO "
+ "${internalDB}.${columnStatTbl}"
- + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1',
'${colId}', NULL, "
- + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize},
'${update_time}')";
+ + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1',
'', NULL, "
+ + "${numRows}, 0, 0, '', '', ${dataSize}, '${update_time}')";
@Override
+ protected void getStatsByMeta() throws Exception {
+ if (isTableLevelTask) {
+ getTableStatsByMeta();
+ } else {
+ getColumnStatsByMeta();
+ }
+ }
+
+ protected void getTableStatsByMeta() throws Exception {
+ Map<String, String> params = new HashMap<>();
+ params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
+ params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
+ params.put("catalogId", String.valueOf(catalog.getId()));
+ params.put("dbId", String.valueOf(db.getId()));
+ params.put("tblId", String.valueOf(tbl.getId()));
+ params.put("colId", "");
+
+ // Get table level information.
+ Map<String, String> parameters =
table.getRemoteTable().getParameters();
+ // Collect table level row count, null number and timestamp.
+ setParameterData(parameters, params);
+ if (parameters.containsKey(TOTAL_SIZE)) {
+ params.put("dataSize", parameters.get(TOTAL_SIZE));
+ }
+ params.put("id", genColumnStatId(tbl.getId(), -1, "", null));
+ StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
+ String sql = stringSubstitutor.replace(ANALYZE_TABLE_SQL_TEMPLATE);
+ try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext())
{
+ r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
+ this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
+ this.stmtExecutor.execute();
+ }
+ }
+
protected void getColumnStatsByMeta() throws Exception {
List<String> columns = new ArrayList<>();
columns.add(col.getName());
@@ -89,16 +131,17 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
setParameterData(parameters, params);
params.put("id", genColumnStatId(tbl.getId(), -1, col.getName(),
null));
List<ColumnStatisticsObj> tableStats =
table.getHiveTableColumnStats(columns);
+ long rowCount = parameters.containsKey(NUM_ROWS) ?
Long.parseLong(parameters.get(NUM_ROWS)) : 0;
// Collect table level ndv, nulls, min and max. tableStats contains at
most 1 item;
for (ColumnStatisticsObj tableStat : tableStats) {
if (!tableStat.isSetStatsData()) {
continue;
}
ColumnStatisticsData data = tableStat.getStatsData();
- getStatData(data, params);
+ getStatData(data, params, rowCount);
}
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
- String sql = stringSubstitutor.replace(ANALYZE_TABLE_SQL_TEMPLATE);
+ String sql =
stringSubstitutor.replace(ANALYZE_TABLE_COLUMN_SQL_TEMPLATE);
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext())
{
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
@@ -128,11 +171,12 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
if (!stat.isSetStatsData()) {
continue;
}
+ rowCount = parameters.containsKey(NUM_ROWS) ?
Long.parseLong(parameters.get(NUM_ROWS)) : 0;
// Collect ndv, nulls, min and max for different data type.
ColumnStatisticsData data = stat.getStatsData();
- getStatData(data, params);
+ getStatData(data, params, rowCount);
stringSubstitutor = new StringSubstitutor(params);
-
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
+
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_SQL_TEMPLATE));
}
// Update partition level stats for this column.
for (String partitionSql : partitionAnalysisSQLs) {
@@ -145,11 +189,15 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1,
col.getName());
}
- private void getStatData(ColumnStatisticsData data, Map<String, String>
params) {
+ private void getStatData(ColumnStatisticsData data, Map<String, String>
params, long rowCount) {
long ndv = 0;
long nulls = 0;
String min = "";
String max = "";
+ long colSize = 0;
+ if (!data.isSetStringStats()) {
+ colSize = rowCount * col.getType().getSlotSize();
+ }
// Collect ndv, nulls, min and max for different data type.
if (data.isSetLongStats()) {
LongColumnStatsData longStats = data.getLongStats();
@@ -161,6 +209,8 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
StringColumnStatsData stringStats = data.getStringStats();
ndv = stringStats.getNumDVs();
nulls = stringStats.getNumNulls();
+ double avgColLen = stringStats.getAvgColLen();
+ colSize = Math.round(avgColLen * rowCount);
} else if (data.isSetDecimalStats()) {
DecimalColumnStatsData decimalStats = data.getDecimalStats();
ndv = decimalStats.getNumDVs();
@@ -211,25 +261,21 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
params.put("nulls", String.valueOf(nulls));
params.put("min", min);
params.put("max", max);
+ params.put("dataSize", String.valueOf(colSize));
}
private void setParameterData(Map<String, String> parameters, Map<String,
String> params) {
- long numRows = 0;
- long timestamp = 0;
- long dataSize = 0;
+ String numRows = "";
+ String timestamp = "";
if (parameters.containsKey(NUM_ROWS)) {
- numRows = Long.parseLong(parameters.get(NUM_ROWS));
+ numRows = parameters.get(NUM_ROWS);
}
if (parameters.containsKey(TIMESTAMP)) {
- timestamp = Long.parseLong(parameters.get(TIMESTAMP));
- }
- if (parameters.containsKey(TOTAL_SIZE)) {
- dataSize = Long.parseLong(parameters.get(TOTAL_SIZE));
+ timestamp = parameters.get(TIMESTAMP);
}
- params.put("dataSize", String.valueOf(dataSize));
- params.put("numRows", String.valueOf(numRows));
+ params.put("numRows", numRows);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- params.put("update_time", sdf.format(new Date(timestamp * 1000)));
+ params.put("update_time", sdf.format(new
Date(Long.parseLong(timestamp) * 1000)));
}
private String genColumnStatId(long tableId, long indexId, String
columnName, String partitionName) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java
index 7c41954dc0..61b3f6ea1c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java
@@ -56,7 +56,7 @@ public class IcebergAnalysisTask extends HMSAnalysisTask {
@Override
- protected void getColumnStatsByMeta() throws Exception {
+ protected void getStatsByMeta() throws Exception {
Table icebergTable = getIcebergTable();
TableScan tableScan = icebergTable.newScan().includeColumnStats();
for (FileScanTask task : tableScan.planFiles()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]