luoyuxia commented on code in PR #22805:
URL: https://github.com/apache/flink/pull/22805#discussion_r1247405915


##########
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java:
##########
@@ -54,15 +59,31 @@ public class OrcFormatStatisticsReportUtil {
 
     public static TableStats getTableStatistics(
             List<Path> files, DataType producedDataType, Configuration 
hadoopConfig) {
+        return getTableStatistics(
+                files, producedDataType, hadoopConfig, 
Runtime.getRuntime().availableProcessors());
+    }
+
+    public static TableStats getTableStatistics(
+            List<Path> files,
+            DataType producedDataType,
+            Configuration hadoopConfig,
+            int statisticsThreadNum) {
         try {
             long rowCount = 0;
             Map<String, ColumnStatistics> columnStatisticsMap = new 
HashMap<>();
             RowType producedRowType = (RowType) 
producedDataType.getLogicalType();
+
+            ExecutorService executorService = 
Executors.newFixedThreadPool(statisticsThreadNum);

Review Comment:
   ```
   Executors.newFixedThreadPool(
                           statisticsThreadNum,
                           new 
ExecutorThreadFactory("orc-get-table-statistic-worker"));
   ```
   ?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -373,13 +374,18 @@ private TableStats getMapRedInputFormatStatistics(
                         .toLowerCase();
         List<Path> files =
                 
inputSplits.stream().map(FileSourceSplit::path).collect(Collectors.toList());
+        int statisticsThreadNum = 
flinkConf.get(TABLE_EXEC_HIVE_READ_FORMAT_STATISTICS_THREAD_NUM);

Review Comment:
   Check the thread num is not less than 1;



##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java:
##########
@@ -70,12 +74,34 @@ public static TableStats getTableStatistics(
             DataType producedDataType,
             Configuration hadoopConfig,
             boolean isUtcTimestamp) {
+        return getTableStatistics(
+                files,
+                producedDataType,
+                hadoopConfig,
+                isUtcTimestamp,
+                Runtime.getRuntime().availableProcessors());
+    }
+
+    public static TableStats getTableStatistics(
+            List<Path> files,
+            DataType producedDataType,
+            Configuration hadoopConfig,
+            boolean isUtcTimestamp,
+            int statisticsThreadNum) {
         try {
             Map<String, Statistics<?>> columnStatisticsMap = new HashMap<>();
             RowType producedRowType = (RowType) 
producedDataType.getLogicalType();
+            ExecutorService executorService = 
Executors.newFixedThreadPool(statisticsThreadNum);

Review Comment:
   dito



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -134,6 +134,13 @@ public class HiveOptions {
                                     + " custom: use policy class to create a 
commit policy."
                                     + " Support to configure multiple 
policies: 'metastore,success-file'.");
 
+    public static final ConfigOption<Integer> 
TABLE_EXEC_HIVE_READ_FORMAT_STATISTICS_THREAD_NUM =

Review Comment:
   Please remeber to add the doc for the newly added option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to