luoyuxia commented on code in PR #22805: URL: https://github.com/apache/flink/pull/22805#discussion_r1254328035
########## docs/content/docs/connectors/table/hive/hive_read_write.md: ########## @@ -206,6 +206,10 @@ Users can do some performance tuning by tuning the split's size with the follow - Currently, these configurations for tuning split size only works for the Hive table stored as ORC format. {{< /hint >}} +### Read Table Statistics Review Comment: Please don't forget to also update chinese doc ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java: ########## @@ -134,6 +134,13 @@ public class HiveOptions { + " custom: use policy class to create a commit policy." + " Support to configure multiple policies: 'metastore,success-file'."); + public static final ConfigOption<Integer> TABLE_EXEC_HIVE_READ_FORMAT_STATISTICS_THREAD_NUM = + key("table.exec.hive.read-format-statistics.thread-num") Review Comment: After rethink it, I don't think we don't need to expose the word `format` which I think may make user confused. So, I'd like to rename it to `table.exec.hive.read-statistics.thread-num`. WDYT? ########## docs/content/docs/connectors/table/hive/hive_read_write.md: ########## @@ -206,6 +206,10 @@ Users can do some performance tuning by tuning the split's size with the follow - Currently, these configurations for tuning split size only works for the Hive table stored as ORC format. {{< /hint >}} +### Read Table Statistics Review Comment: Please also specify why we may need to scan the table's to get statistics. When the table statistic is not available from Hive metastore, we will then try to get the statistic by scanning the table. ########## flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java: ########## @@ -70,13 +75,50 @@ public static TableStats getTableStatistics( DataType producedDataType, Configuration hadoopConfig, boolean isUtcTimestamp) { + return getTableStatistics( + files, + producedDataType, + hadoopConfig, + isUtcTimestamp, + Runtime.getRuntime().availableProcessors()); + } + + public static TableStats getTableStatistics( + List<Path> files, + DataType producedDataType, + Configuration hadoopConfig, + boolean isUtcTimestamp, + int statisticsThreadNum) { try { Map<String, Statistics<?>> columnStatisticsMap = new HashMap<>(); RowType producedRowType = (RowType) producedDataType.getLogicalType(); + ExecutorService executorService = + Executors.newFixedThreadPool( + statisticsThreadNum, + new ExecutorThreadFactory("parquet-get-table-statistic-worker")); long rowCount = 0; + List<Future<FileParquetStatistics>> fileRowCountFutures = new ArrayList<>(); for (Path file : files) { - rowCount += updateStatistics(hadoopConfig, file, columnStatisticsMap); + fileRowCountFutures.add( + executorService.submit( + new ParquetFileRowCountCalculator( + hadoopConfig, file, columnStatisticsMap))); + } + for (Future<FileParquetStatistics> fileCountFuture : fileRowCountFutures) { + FileParquetStatistics fileStatistics = fileCountFuture.get(); + List<String> columns = fileStatistics.getColumns(); + List<BlockMetaData> blocks = fileStatistics.blocks; + for (BlockMetaData block : blocks) { + rowCount += block.getRowCount(); + for (int i = 0; i < columns.size(); ++i) { + updateStatistics( + block.getColumns().get(i).getStatistics(), + columns.get(i), + columnStatisticsMap); + } + } } + executorService.shutdownNow(); Review Comment: dito ########## flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java: ########## @@ -54,15 +60,42 @@ public class OrcFormatStatisticsReportUtil { public static TableStats getTableStatistics( List<Path> files, DataType producedDataType, Configuration hadoopConfig) { + return getTableStatistics( + files, producedDataType, hadoopConfig, Runtime.getRuntime().availableProcessors()); + } + + public static TableStats getTableStatistics( + List<Path> files, + DataType producedDataType, + Configuration hadoopConfig, + int statisticsThreadNum) { try { long rowCount = 0; Map<String, ColumnStatistics> columnStatisticsMap = new HashMap<>(); RowType producedRowType = (RowType) producedDataType.getLogicalType(); + + ExecutorService executorService = + Executors.newFixedThreadPool( + statisticsThreadNum, + new ExecutorThreadFactory("orc-get-table-statistic-worker")); + List<Future<FileOrcStatistics>> fileRowCountFutures = new ArrayList<>(); for (Path file : files) { - rowCount += - updateStatistics(hadoopConfig, file, columnStatisticsMap, producedRowType); + fileRowCountFutures.add( + executorService.submit(new OrcFileRowCountCalculator(hadoopConfig, file))); } - + for (Future<FileOrcStatistics> fileCountFuture : fileRowCountFutures) { + FileOrcStatistics fileOrcStatistics = fileCountFuture.get(); + rowCount += fileOrcStatistics.getRowCount(); + for (String column : producedRowType.getFieldNames()) { + int fieldIdx = fileOrcStatistics.getFieldNames().indexOf(column); + if (fieldIdx >= 0) { + int colId = fileOrcStatistics.getColumnTypes().get(fieldIdx).getId(); + ColumnStatistics statistic = fileOrcStatistics.getStatistics()[colId]; + updateStatistics(statistic, column, columnStatisticsMap); + } + } + } + executorService.shutdownNow(); Review Comment: Please use try {} finnal{} to shutdown the `executorService` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org