luoyuxia commented on code in PR #22805:
URL: https://github.com/apache/flink/pull/22805#discussion_r1254328035


##########
docs/content/docs/connectors/table/hive/hive_read_write.md:
##########
@@ -206,6 +206,10 @@ Users can do some performance tuning by tuning the split's 
size with the follow
 - Currently, these configurations for tuning split size only works for the 
Hive table stored as ORC format.
 {{< /hint >}}
 
+### Read Table Statistics

Review Comment:
   Please don't forget to also update chinese doc



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -134,6 +134,13 @@ public class HiveOptions {
                                     + " custom: use policy class to create a 
commit policy."
                                     + " Support to configure multiple 
policies: 'metastore,success-file'.");
 
+    public static final ConfigOption<Integer> 
TABLE_EXEC_HIVE_READ_FORMAT_STATISTICS_THREAD_NUM =
+            key("table.exec.hive.read-format-statistics.thread-num")

Review Comment:
   After rethink it, I don't think we don't need to expose the word `format` 
which I think may make user confused. So, I'd like to rename it to 
   `table.exec.hive.read-statistics.thread-num`.
   WDYT?
   



##########
docs/content/docs/connectors/table/hive/hive_read_write.md:
##########
@@ -206,6 +206,10 @@ Users can do some performance tuning by tuning the split's 
size with the follow
 - Currently, these configurations for tuning split size only works for the 
Hive table stored as ORC format.
 {{< /hint >}}
 
+### Read Table Statistics

Review Comment:
   Please also specify why we may need to scan the table's  to get statistics.
   When the table statistic is not available from Hive metastore, we will then 
try to get the statistic by scanning the table.



##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java:
##########
@@ -70,13 +75,50 @@ public static TableStats getTableStatistics(
             DataType producedDataType,
             Configuration hadoopConfig,
             boolean isUtcTimestamp) {
+        return getTableStatistics(
+                files,
+                producedDataType,
+                hadoopConfig,
+                isUtcTimestamp,
+                Runtime.getRuntime().availableProcessors());
+    }
+
+    public static TableStats getTableStatistics(
+            List<Path> files,
+            DataType producedDataType,
+            Configuration hadoopConfig,
+            boolean isUtcTimestamp,
+            int statisticsThreadNum) {
         try {
             Map<String, Statistics<?>> columnStatisticsMap = new HashMap<>();
             RowType producedRowType = (RowType) 
producedDataType.getLogicalType();
+            ExecutorService executorService =
+                    Executors.newFixedThreadPool(
+                            statisticsThreadNum,
+                            new 
ExecutorThreadFactory("parquet-get-table-statistic-worker"));
             long rowCount = 0;
+            List<Future<FileParquetStatistics>> fileRowCountFutures = new 
ArrayList<>();
             for (Path file : files) {
-                rowCount += updateStatistics(hadoopConfig, file, 
columnStatisticsMap);
+                fileRowCountFutures.add(
+                        executorService.submit(
+                                new ParquetFileRowCountCalculator(
+                                        hadoopConfig, file, 
columnStatisticsMap)));
+            }
+            for (Future<FileParquetStatistics> fileCountFuture : 
fileRowCountFutures) {
+                FileParquetStatistics fileStatistics = fileCountFuture.get();
+                List<String> columns = fileStatistics.getColumns();
+                List<BlockMetaData> blocks = fileStatistics.blocks;
+                for (BlockMetaData block : blocks) {
+                    rowCount += block.getRowCount();
+                    for (int i = 0; i < columns.size(); ++i) {
+                        updateStatistics(
+                                block.getColumns().get(i).getStatistics(),
+                                columns.get(i),
+                                columnStatisticsMap);
+                    }
+                }
             }
+            executorService.shutdownNow();

Review Comment:
   dito



##########
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java:
##########
@@ -54,15 +60,42 @@ public class OrcFormatStatisticsReportUtil {
 
     public static TableStats getTableStatistics(
             List<Path> files, DataType producedDataType, Configuration 
hadoopConfig) {
+        return getTableStatistics(
+                files, producedDataType, hadoopConfig, 
Runtime.getRuntime().availableProcessors());
+    }
+
+    public static TableStats getTableStatistics(
+            List<Path> files,
+            DataType producedDataType,
+            Configuration hadoopConfig,
+            int statisticsThreadNum) {
         try {
             long rowCount = 0;
             Map<String, ColumnStatistics> columnStatisticsMap = new 
HashMap<>();
             RowType producedRowType = (RowType) 
producedDataType.getLogicalType();
+
+            ExecutorService executorService =
+                    Executors.newFixedThreadPool(
+                            statisticsThreadNum,
+                            new 
ExecutorThreadFactory("orc-get-table-statistic-worker"));
+            List<Future<FileOrcStatistics>> fileRowCountFutures = new 
ArrayList<>();
             for (Path file : files) {
-                rowCount +=
-                        updateStatistics(hadoopConfig, file, 
columnStatisticsMap, producedRowType);
+                fileRowCountFutures.add(
+                        executorService.submit(new 
OrcFileRowCountCalculator(hadoopConfig, file)));
             }
-
+            for (Future<FileOrcStatistics> fileCountFuture : 
fileRowCountFutures) {
+                FileOrcStatistics fileOrcStatistics = fileCountFuture.get();
+                rowCount += fileOrcStatistics.getRowCount();
+                for (String column : producedRowType.getFieldNames()) {
+                    int fieldIdx = 
fileOrcStatistics.getFieldNames().indexOf(column);
+                    if (fieldIdx >= 0) {
+                        int colId = 
fileOrcStatistics.getColumnTypes().get(fieldIdx).getId();
+                        ColumnStatistics statistic = 
fileOrcStatistics.getStatistics()[colId];
+                        updateStatistics(statistic, column, 
columnStatisticsMap);
+                    }
+                }
+            }
+            executorService.shutdownNow();

Review Comment:
   Please use try {} finnal{} to shutdown the `executorService`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to