luoyuxia commented on code in PR #22805: URL: https://github.com/apache/flink/pull/22805#discussion_r1247405915
########## flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java: ########## @@ -54,15 +59,31 @@ public class OrcFormatStatisticsReportUtil { public static TableStats getTableStatistics( List<Path> files, DataType producedDataType, Configuration hadoopConfig) { + return getTableStatistics( + files, producedDataType, hadoopConfig, Runtime.getRuntime().availableProcessors()); + } + + public static TableStats getTableStatistics( + List<Path> files, + DataType producedDataType, + Configuration hadoopConfig, + int statisticsThreadNum) { try { long rowCount = 0; Map<String, ColumnStatistics> columnStatisticsMap = new HashMap<>(); RowType producedRowType = (RowType) producedDataType.getLogicalType(); + + ExecutorService executorService = Executors.newFixedThreadPool(statisticsThreadNum); Review Comment: ``` Executors.newFixedThreadPool( statisticsThreadNum, new ExecutorThreadFactory("orc-get-table-statistic-worker")); ``` ? ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java: ########## @@ -373,13 +374,18 @@ private TableStats getMapRedInputFormatStatistics( .toLowerCase(); List<Path> files = inputSplits.stream().map(FileSourceSplit::path).collect(Collectors.toList()); + int statisticsThreadNum = flinkConf.get(TABLE_EXEC_HIVE_READ_FORMAT_STATISTICS_THREAD_NUM); Review Comment: Check the thread num is not less than 1; ########## flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java: ########## @@ -70,12 +74,34 @@ public static TableStats getTableStatistics( DataType producedDataType, Configuration hadoopConfig, boolean isUtcTimestamp) { + return getTableStatistics( + files, + producedDataType, + hadoopConfig, + isUtcTimestamp, + Runtime.getRuntime().availableProcessors()); + } + + public static TableStats getTableStatistics( + List<Path> files, + DataType producedDataType, + Configuration hadoopConfig, + boolean isUtcTimestamp, + int statisticsThreadNum) { try { Map<String, Statistics<?>> columnStatisticsMap = new HashMap<>(); RowType producedRowType = (RowType) producedDataType.getLogicalType(); + ExecutorService executorService = Executors.newFixedThreadPool(statisticsThreadNum); Review Comment: dito ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java: ########## @@ -134,6 +134,13 @@ public class HiveOptions { + " custom: use policy class to create a commit policy." + " Support to configure multiple policies: 'metastore,success-file'."); + public static final ConfigOption<Integer> TABLE_EXEC_HIVE_READ_FORMAT_STATISTICS_THREAD_NUM = Review Comment: Please remeber to add the doc for the newly added option. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org