This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 976bc8d9f0 [Improvement](statistics)Support basic jdbc external table
stats collection (#23965)
976bc8d9f0 is described below
commit 976bc8d9f011d2e86d53e8381345ecd91d674396
Author: Jibing-Li <[email protected]>
AuthorDate: Wed Sep 6 20:24:02 2023 +0800
[Improvement](statistics)Support basic jdbc external table stats collection
(#23965)
---
.../doris/catalog/external/JdbcExternalTable.java | 34 ++++
.../apache/doris/statistics/AnalysisManager.java | 3 +-
.../apache/doris/statistics/JdbcAnalysisTask.java | 175 +++++++++++++++++++++
.../jdbc/test_mysql_jdbc_statistics.groovy | 64 ++++++++
4 files changed, 275 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
index 5db69ac8f0..7dbb4a65fb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
@@ -18,14 +18,20 @@
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
+import org.apache.doris.statistics.AnalysisInfo;
+import org.apache.doris.statistics.BaseAnalysisTask;
+import org.apache.doris.statistics.JdbcAnalysisTask;
+import org.apache.doris.statistics.TableStatistic;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
+import java.util.Optional;
/**
* Elasticsearch external table.
@@ -93,4 +99,32 @@ public class JdbcExternalTable extends ExternalTable {
jdbcTable.setCheckSum(jdbcCatalog.getCheckSum());
return jdbcTable;
}
+
+ @Override
+ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
+ makeSureInitialized();
+ return new JdbcAnalysisTask(info);
+ }
+
+ @Override
+ public long getRowCount() {
+ makeSureInitialized();
+ try {
+ Optional<TableStatistic> tableStatistics =
Env.getCurrentEnv().getStatisticsCache().getTableStatistics(
+ catalog.getId(),
catalog.getDbOrAnalysisException(dbName).getId(), id);
+ if (tableStatistics.isPresent()) {
+ long rowCount = tableStatistics.get().rowCount;
+ LOG.debug("Estimated row count for db {} table {} is {}.",
dbName, name, rowCount);
+ return rowCount;
+ }
+ } catch (Exception e) {
+ LOG.warn("Fail to get row count for table {}", name, e);
+ }
+ return 1;
+ }
+
+ @Override
+ public long estimatedRowCount() {
+ return getRowCount();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index cde111af4b..5ac0c0cd52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -35,6 +35,7 @@ import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.View;
+import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
@@ -578,7 +579,7 @@ public class AnalysisManager extends Daemon implements
Writable {
LOG.warn(e.getMessage());
return;
}
- if (jobInfo.analysisType == AnalysisType.HISTOGRAM || table.getType()
!= TableType.HMS_EXTERNAL_TABLE) {
+ if (jobInfo.analysisType == AnalysisType.HISTOGRAM || !(table
instanceof ExternalTable)) {
return;
}
AnalysisInfoBuilder colTaskInfoBuilder = new
AnalysisInfoBuilder(jobInfo);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java
new file mode 100644
index 0000000000..25b9db0b2c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.external.JdbcExternalTable;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.util.InternalQueryResult;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import org.apache.commons.text.StringSubstitutor;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class JdbcAnalysisTask extends BaseAnalysisTask {
+ private static final Logger LOG =
LogManager.getLogger(JdbcAnalysisTask.class);
+
+ private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO "
+ + "${internalDB}.${columnStatTbl}"
+ + " SELECT "
+ + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+ + "${catalogId} AS catalog_id, "
+ + "${dbId} AS db_id, "
+ + "${tblId} AS tbl_id, "
+ + "${idxId} AS idx_id, "
+ + "'${colId}' AS col_id, "
+ + "NULL AS part_id, "
+ + "COUNT(1) AS row_count, "
+ + "NDV(`${colName}`) AS ndv, "
+ + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS
null_count, "
+ + "MIN(`${colName}`) AS min, "
+ + "MAX(`${colName}`) AS max, "
+ + "${dataSizeFunction} AS data_size, "
+ + "NOW() "
+ + "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
+
+ private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT
COUNT(1) as rowCount "
+ + "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
+
+ private final boolean isTableLevelTask;
+ private JdbcExternalTable table;
+
+ public JdbcAnalysisTask(AnalysisInfo info) {
+ super(info);
+ isTableLevelTask = info.externalTableLevelTask;
+ table = (JdbcExternalTable) tbl;
+ }
+
+ public void doExecute() throws Exception {
+ if (isTableLevelTask) {
+ getTableStats();
+ } else {
+ getTableColumnStats();
+ }
+ }
+
+ /**
+ * Get table row count and store the result to metadata.
+ */
+ private void getTableStats() throws Exception {
+ Map<String, String> params = buildTableStatsParams(null);
+ List<InternalQueryResult.ResultRow> columnResult =
+ StatisticsUtil.execStatisticQuery(new
StringSubstitutor(params).replace(ANALYZE_TABLE_COUNT_TEMPLATE));
+ String rowCount = columnResult.get(0).getColumnValue("rowCount");
+ params.put("rowCount", rowCount);
+ StatisticsRepository.persistTableStats(params);
+ }
+
+ /**
+ * Get column statistics and insert the result to
__internal_schema.column_statistics
+ */
+ private void getTableColumnStats() throws Exception {
+ // An example sql for a column stats:
+ // INSERT INTO __internal_schema.column_statistics
+ // SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id,
+ // 13002 AS catalog_id,
+ // 13038 AS db_id,
+ // 13055 AS tbl_id,
+ // -1 AS idx_id,
+ // 'r_regionkey' AS col_id,
+ // 'NULL' AS part_id,
+ // COUNT(1) AS row_count,
+ // NDV(`r_regionkey`) AS ndv,
+ // SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS
null_count,
+ // MIN(`r_regionkey`) AS min,
+ // MAX(`r_regionkey`) AS max,
+ // 0 AS data_size,
+ // NOW() FROM `hive`.`tpch100`.`region`
+ StringBuilder sb = new StringBuilder();
+ sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
+ Map<String, String> params = buildTableStatsParams("NULL");
+ params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
+ params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
+ params.put("colName", col.getName());
+ params.put("colId", info.colName);
+ params.put("dataSizeFunction", getDataSizeFunction(col));
+ StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
+ String sql = stringSubstitutor.replace(sb.toString());
+ executeInsertSql(sql);
+ }
+
+ private void executeInsertSql(String sql) throws Exception {
+ long startTime = System.currentTimeMillis();
+ try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext())
{
+ r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
+ this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
+ r.connectContext.setExecutor(stmtExecutor);
+ this.stmtExecutor.execute();
+ QueryState queryState = r.connectContext.getState();
+ if
(queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
+ LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s],
error: [%s]",
+ info.catalogName, info.dbName, info.colName, sql,
queryState.getErrorMessage()));
+ throw new RuntimeException(queryState.getErrorMessage());
+ }
+ LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d
ms.",
+ info.catalogName, info.dbName, info.colName, sql,
(System.currentTimeMillis() - startTime)));
+ }
+ }
+
+ private Map<String, String> buildTableStatsParams(String partId) {
+ Map<String, String> commonParams = new HashMap<>();
+ String id = StatisticsUtil.constructId(tbl.getId(), -1);
+ if (partId == null) {
+ commonParams.put("partId", "NULL");
+ } else {
+ id = StatisticsUtil.constructId(id, partId);
+ commonParams.put("partId", "\'" + partId + "\'");
+ }
+ commonParams.put("id", id);
+ commonParams.put("catalogId", String.valueOf(catalog.getId()));
+ commonParams.put("dbId", String.valueOf(db.getId()));
+ commonParams.put("tblId", String.valueOf(tbl.getId()));
+ commonParams.put("indexId", "-1");
+ commonParams.put("idxId", "-1");
+ commonParams.put("catalogName", catalog.getName());
+ commonParams.put("dbName", db.getFullName());
+ commonParams.put("tblName", tbl.getName());
+ if (col != null) {
+ commonParams.put("type", col.getType().toString());
+ }
+ commonParams.put("lastAnalyzeTimeInMs",
String.valueOf(System.currentTimeMillis()));
+ return commonParams;
+ }
+
+ @Override
+ protected void afterExecution() {
+ // Table level task doesn't need to sync any value to sync stats, it
stores the value in metadata.
+ if (isTableLevelTask) {
+ return;
+ }
+ Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(),
-1, col.getName());
+ }
+}
diff --git
a/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy
b/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy
new file mode 100644
index 0000000000..d6f0ca351d
--- /dev/null
+++
b/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mysql_jdbc_statistics",
"p0,external,mysql,external_docker,external_docker_mysql") {
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-java-8.0.25.jar"
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String catalog_name = "test_mysql_jdbc_statistics";
+
+ sql """create catalog if not exists ${catalog_name} properties(
+ "type"="jdbc",
+ "user"="root",
+ "password"="123456",
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false&zeroDateTimeBehavior=convertToNull",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver"
+ );"""
+
+ sql """use ${catalog_name}.doris_test"""
+ sql """analyze table ex_tb0 with sync"""
+ def result = sql """show column stats ex_tb0 (name)"""
+ assertTrue(result.size() == 1)
+ assertTrue(result[0][0] == "name")
+ assertTrue(result[0][1] == "5.0")
+ assertTrue(result[0][2] == "5.0")
+ assertTrue(result[0][3] == "0.0")
+ assertTrue(result[0][4] == "18.0")
+ assertTrue(result[0][5] == "3.0")
+ assertTrue(result[0][6] == "'abc'")
+ assertTrue(result[0][7] == "'abg'")
+
+ result = sql """show column stats ex_tb0 (id)"""
+ assertTrue(result.size() == 1)
+ assertTrue(result[0][0] == "id")
+ assertTrue(result[0][1] == "5.0")
+ assertTrue(result[0][2] == "5.0")
+ assertTrue(result[0][3] == "0.0")
+ assertTrue(result[0][4] == "24.0")
+ assertTrue(result[0][5] == "4.0")
+ assertTrue(result[0][6] == "111")
+ assertTrue(result[0][7] == "115")
+
+ sql """drop catalog ${catalog_name}"""
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]