This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 976bc8d9f0 [Improvement](statistics)Support basic jdbc external table 
stats collection (#23965)
976bc8d9f0 is described below

commit 976bc8d9f011d2e86d53e8381345ecd91d674396
Author: Jibing-Li <[email protected]>
AuthorDate: Wed Sep 6 20:24:02 2023 +0800

    [Improvement](statistics)Support basic jdbc external table stats collection 
(#23965)
---
 .../doris/catalog/external/JdbcExternalTable.java  |  34 ++++
 .../apache/doris/statistics/AnalysisManager.java   |   3 +-
 .../apache/doris/statistics/JdbcAnalysisTask.java  | 175 +++++++++++++++++++++
 .../jdbc/test_mysql_jdbc_statistics.groovy         |  64 ++++++++
 4 files changed, 275 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
index 5db69ac8f0..7dbb4a65fb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
@@ -18,14 +18,20 @@
 package org.apache.doris.catalog.external;
 
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.JdbcTable;
 import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
+import org.apache.doris.statistics.AnalysisInfo;
+import org.apache.doris.statistics.BaseAnalysisTask;
+import org.apache.doris.statistics.JdbcAnalysisTask;
+import org.apache.doris.statistics.TableStatistic;
 import org.apache.doris.thrift.TTableDescriptor;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
+import java.util.Optional;
 
 /**
  * Elasticsearch external table.
@@ -93,4 +99,32 @@ public class JdbcExternalTable extends ExternalTable {
         jdbcTable.setCheckSum(jdbcCatalog.getCheckSum());
         return jdbcTable;
     }
+
+    @Override
+    public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
+        makeSureInitialized();
+        return new JdbcAnalysisTask(info);
+    }
+
+    @Override
+    public long getRowCount() {
+        makeSureInitialized();
+        try {
+            Optional<TableStatistic> tableStatistics = 
Env.getCurrentEnv().getStatisticsCache().getTableStatistics(
+                    catalog.getId(), 
catalog.getDbOrAnalysisException(dbName).getId(), id);
+            if (tableStatistics.isPresent()) {
+                long rowCount = tableStatistics.get().rowCount;
+                LOG.debug("Estimated row count for db {} table {} is {}.", 
dbName, name, rowCount);
+                return rowCount;
+            }
+        } catch (Exception e) {
+            LOG.warn("Fail to get row count for table {}", name, e);
+        }
+        return 1;
+    }
+
+    @Override
+    public long estimatedRowCount() {
+        return getRowCount();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index cde111af4b..5ac0c0cd52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -35,6 +35,7 @@ import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.catalog.View;
+import org.apache.doris.catalog.external.ExternalTable;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
@@ -578,7 +579,7 @@ public class AnalysisManager extends Daemon implements 
Writable {
             LOG.warn(e.getMessage());
             return;
         }
-        if (jobInfo.analysisType == AnalysisType.HISTOGRAM || table.getType() 
!= TableType.HMS_EXTERNAL_TABLE) {
+        if (jobInfo.analysisType == AnalysisType.HISTOGRAM || !(table 
instanceof ExternalTable)) {
             return;
         }
         AnalysisInfoBuilder colTaskInfoBuilder = new 
AnalysisInfoBuilder(jobInfo);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java
new file mode 100644
index 0000000000..25b9db0b2c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.external.JdbcExternalTable;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.util.InternalQueryResult;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import org.apache.commons.text.StringSubstitutor;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class JdbcAnalysisTask extends BaseAnalysisTask {
+    private static final Logger LOG = 
LogManager.getLogger(JdbcAnalysisTask.class);
+
+    private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO "
+            + "${internalDB}.${columnStatTbl}"
+            + " SELECT "
+            + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+            + "${catalogId} AS catalog_id, "
+            + "${dbId} AS db_id, "
+            + "${tblId} AS tbl_id, "
+            + "${idxId} AS idx_id, "
+            + "'${colId}' AS col_id, "
+            + "NULL AS part_id, "
+            + "COUNT(1) AS row_count, "
+            + "NDV(`${colName}`) AS ndv, "
+            + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS 
null_count, "
+            + "MIN(`${colName}`) AS min, "
+            + "MAX(`${colName}`) AS max, "
+            + "${dataSizeFunction} AS data_size, "
+            + "NOW() "
+            + "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
+
+    private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT 
COUNT(1) as rowCount "
+            + "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
+
+    private final boolean isTableLevelTask;
+    private JdbcExternalTable table;
+
+    public JdbcAnalysisTask(AnalysisInfo info) {
+        super(info);
+        isTableLevelTask = info.externalTableLevelTask;
+        table = (JdbcExternalTable) tbl;
+    }
+
+    public void doExecute() throws Exception {
+        if (isTableLevelTask) {
+            getTableStats();
+        } else {
+            getTableColumnStats();
+        }
+    }
+
+    /**
+     * Get table row count and store the result to metadata.
+     */
+    private void getTableStats() throws Exception {
+        Map<String, String> params = buildTableStatsParams(null);
+        List<InternalQueryResult.ResultRow> columnResult =
+                StatisticsUtil.execStatisticQuery(new 
StringSubstitutor(params).replace(ANALYZE_TABLE_COUNT_TEMPLATE));
+        String rowCount = columnResult.get(0).getColumnValue("rowCount");
+        params.put("rowCount", rowCount);
+        StatisticsRepository.persistTableStats(params);
+    }
+
+    /**
+     * Get column statistics and insert the result to 
__internal_schema.column_statistics
+     */
+    private void getTableColumnStats() throws Exception {
+        // An example sql for a column stats:
+        // INSERT INTO __internal_schema.column_statistics
+        //   SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id,
+        //   13002 AS catalog_id,
+        //   13038 AS db_id,
+        //   13055 AS tbl_id,
+        //   -1 AS idx_id,
+        //   'r_regionkey' AS col_id,
+        //   'NULL' AS part_id,
+        //   COUNT(1) AS row_count,
+        //   NDV(`r_regionkey`) AS ndv,
+        //   SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS 
null_count,
+        //   MIN(`r_regionkey`) AS min,
+        //   MAX(`r_regionkey`) AS max,
+        //   0 AS data_size,
+        //   NOW() FROM `hive`.`tpch100`.`region`
+        StringBuilder sb = new StringBuilder();
+        sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
+        Map<String, String> params = buildTableStatsParams("NULL");
+        params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
+        params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
+        params.put("colName", col.getName());
+        params.put("colId", info.colName);
+        params.put("dataSizeFunction", getDataSizeFunction(col));
+        StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
+        String sql = stringSubstitutor.replace(sb.toString());
+        executeInsertSql(sql);
+    }
+
+    private void executeInsertSql(String sql) throws Exception {
+        long startTime = System.currentTimeMillis();
+        try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) 
{
+            r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
+            this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
+            r.connectContext.setExecutor(stmtExecutor);
+            this.stmtExecutor.execute();
+            QueryState queryState = r.connectContext.getState();
+            if 
(queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
+                LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s], 
error: [%s]",
+                        info.catalogName, info.dbName, info.colName, sql, 
queryState.getErrorMessage()));
+                throw new RuntimeException(queryState.getErrorMessage());
+            }
+            LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d 
ms.",
+                    info.catalogName, info.dbName, info.colName, sql, 
(System.currentTimeMillis() - startTime)));
+        }
+    }
+
+    private Map<String, String> buildTableStatsParams(String partId) {
+        Map<String, String> commonParams = new HashMap<>();
+        String id = StatisticsUtil.constructId(tbl.getId(), -1);
+        if (partId == null) {
+            commonParams.put("partId", "NULL");
+        } else {
+            id = StatisticsUtil.constructId(id, partId);
+            commonParams.put("partId", "\'" + partId + "\'");
+        }
+        commonParams.put("id", id);
+        commonParams.put("catalogId", String.valueOf(catalog.getId()));
+        commonParams.put("dbId", String.valueOf(db.getId()));
+        commonParams.put("tblId", String.valueOf(tbl.getId()));
+        commonParams.put("indexId", "-1");
+        commonParams.put("idxId", "-1");
+        commonParams.put("catalogName", catalog.getName());
+        commonParams.put("dbName", db.getFullName());
+        commonParams.put("tblName", tbl.getName());
+        if (col != null) {
+            commonParams.put("type", col.getType().toString());
+        }
+        commonParams.put("lastAnalyzeTimeInMs", 
String.valueOf(System.currentTimeMillis()));
+        return commonParams;
+    }
+
+    @Override
+    protected void afterExecution() {
+        // Table level task doesn't need to sync any value to sync stats, it 
stores the value in metadata.
+        if (isTableLevelTask) {
+            return;
+        }
+        Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), 
-1, col.getName());
+    }
+}
diff --git 
a/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy
 
b/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy
new file mode 100644
index 0000000000..d6f0ca351d
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mysql_jdbc_statistics", 
"p0,external,mysql,external_docker,external_docker_mysql") {
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+    String s3_endpoint = getS3Endpoint()
+    String bucket = getS3BucketName()
+    String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-java-8.0.25.jar";
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String catalog_name = "test_mysql_jdbc_statistics";
+
+        sql """create catalog if not exists ${catalog_name} properties(
+            "type"="jdbc",
+            "user"="root",
+            "password"="123456",
+            "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false&zeroDateTimeBehavior=convertToNull",
+            "driver_url" = "${driver_url}",
+            "driver_class" = "com.mysql.cj.jdbc.Driver"
+        );"""
+
+        sql """use ${catalog_name}.doris_test"""
+        sql """analyze table ex_tb0 with sync"""
+        def result = sql """show column stats ex_tb0 (name)"""
+        assertTrue(result.size() == 1)
+        assertTrue(result[0][0] == "name")
+        assertTrue(result[0][1] == "5.0")
+        assertTrue(result[0][2] == "5.0")
+        assertTrue(result[0][3] == "0.0")
+        assertTrue(result[0][4] == "18.0")
+        assertTrue(result[0][5] == "3.0")
+        assertTrue(result[0][6] == "'abc'")
+        assertTrue(result[0][7] == "'abg'")
+
+        result = sql """show column stats ex_tb0 (id)"""
+        assertTrue(result.size() == 1)
+        assertTrue(result[0][0] == "id")
+        assertTrue(result[0][1] == "5.0")
+        assertTrue(result[0][2] == "5.0")
+        assertTrue(result[0][3] == "0.0")
+        assertTrue(result[0][4] == "24.0")
+        assertTrue(result[0][5] == "4.0")
+        assertTrue(result[0][6] == "111")
+        assertTrue(result[0][7] == "115")
+
+        sql """drop catalog ${catalog_name}"""
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to