This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0f2ee60c277 [FLINK-32365][hive] Allow Hive source to scan table to get statistics in parallel (#22805) 0f2ee60c277 is described below commit 0f2ee60c2774c7b03c20ae0d495c51c35df48789 Author: baiwuchang <41145695+baibaiwu...@users.noreply.github.com> AuthorDate: Tue Jul 18 09:15:19 2023 +0800 [FLINK-32365][hive] Allow Hive source to scan table to get statistics in parallel (#22805) --- .../docs/connectors/table/hive/hive_read_write.md | 4 + .../docs/connectors/table/hive/hive_read_write.md | 5 + .../apache/flink/connectors/hive/HiveOptions.java | 7 + .../flink/connectors/hive/HiveTableSource.java | 13 +- .../orc/util/OrcFormatStatisticsReportUtil.java | 133 ++++++++++++++----- .../utils/ParquetFormatStatisticsReportUtil.java | 144 +++++++++++++++------ 6 files changed, 236 insertions(+), 70 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md index c54efe4c37f..bdd3ac4f06d 100644 --- a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md +++ b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md @@ -190,6 +190,10 @@ Flink 允许你灵活的配置并发推断策略。你可以在 `TableConfig` - 目前上述参数仅适用于 ORC 格式的 Hive 表。 {{< /hint >}} +### 读取表统计信息 + +当hive metastore 中没有表的统计信息时,Flink 会尝试扫描表来获取统计信息从而生成合适的执行计划。此过程可以会比较耗时,你可以使用`table.exec.hive.read-statistics.thread-num`去配置使用多少个线程去扫描表,默认值是当前系统可用处理器数,配置的值应该大于0。 + ### 加载分区切片 Flink 使用多个线程并发将 Hive 分区切分成多个 split 进行读取。你可以使用 `table.exec.hive.load-partition-splits.thread-num` 去配置线程数。默认值是3,你配置的值应该大于0。 diff --git a/docs/content/docs/connectors/table/hive/hive_read_write.md b/docs/content/docs/connectors/table/hive/hive_read_write.md index c5bc91edb11..98742a92be7 100644 --- a/docs/content/docs/connectors/table/hive/hive_read_write.md +++ b/docs/content/docs/connectors/table/hive/hive_read_write.md @@ -206,6 +206,11 @@ Users can do some performance tuning by tuning the split's size with the follow - Currently, these configurations for tuning split size only works for the Hive table stored as ORC format. {{< /hint >}} +### Read Table Statistics + +When the table statistic is not available from the Hive metastore, Flink will try to scan the table to get the statistic to generate a better execution plan. It may cost some time to get the statistic. To get it faster, you can use `table.exec.hive.read-statistics.thread-num` to configure how many threads to use to scan the table. +The default value is the number of available processors in the current system and the configured value should be bigger than 0. + ### Load Partition Splits Multi-thread is used to split hive's partitions. You can use `table.exec.hive.load-partition-splits.thread-num` to configure the thread number. The default value is 3 and the configured value should be bigger than 0. diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java index 2cba3dae4fc..01e1493681d 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java @@ -134,6 +134,13 @@ public class HiveOptions { + " custom: use policy class to create a commit policy." + " Support to configure multiple policies: 'metastore,success-file'."); + public static final ConfigOption<Integer> TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM = + key("table.exec.hive.read-statistics.thread-num") + .intType() + .defaultValue(Runtime.getRuntime().availableProcessors()) + .withDescription( + "The thread number to read input format statistics. It should be bigger than 0."); + public static final ConfigOption<MemorySize> COMPACT_SMALL_FILES_AVG_SIZE = key("compaction.small-files.avg-size") .memoryType() diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java index e5acc1d8ca5..a1dac332c2d 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java @@ -85,6 +85,7 @@ import java.util.stream.Collectors; import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER; import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_CONSUME_START_OFFSET; import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_ENABLE; +import static org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM; import static org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions; /** A TableSource implementation to read data from Hive tables. */ @@ -373,13 +374,21 @@ public class HiveTableSource .toLowerCase(); List<Path> files = inputSplits.stream().map(FileSourceSplit::path).collect(Collectors.toList()); + int statisticsThreadNum = flinkConf.get(TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM); + Preconditions.checkArgument( + statisticsThreadNum >= 1, + TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM.key() + " cannot be less than 1"); // Now we only support Parquet, Orc formats. if (serializationLib.contains("parquet")) { return ParquetFormatStatisticsReportUtil.getTableStatistics( - files, producedDataType, jobConf, hiveVersion.startsWith("3")); + files, + producedDataType, + jobConf, + hiveVersion.startsWith("3"), + statisticsThreadNum); } else if (serializationLib.contains("orc")) { return OrcFormatStatisticsReportUtil.getTableStatistics( - files, producedDataType, jobConf); + files, producedDataType, jobConf, statisticsThreadNum); } else { // Now, only support Orc and Parquet Formats. LOG.info( diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java index a4abfd63ad2..e183f9bf394 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java @@ -24,6 +24,7 @@ import org.apache.flink.table.plan.stats.TableStats; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.hadoop.conf.Configuration; import org.apache.orc.ColumnStatistics; @@ -43,9 +44,14 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.sql.Date; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** Utils for Orc format statistics report. */ public class OrcFormatStatisticsReportUtil { @@ -54,15 +60,41 @@ public class OrcFormatStatisticsReportUtil { public static TableStats getTableStatistics( List<Path> files, DataType producedDataType, Configuration hadoopConfig) { + return getTableStatistics( + files, producedDataType, hadoopConfig, Runtime.getRuntime().availableProcessors()); + } + + public static TableStats getTableStatistics( + List<Path> files, + DataType producedDataType, + Configuration hadoopConfig, + int statisticsThreadNum) { + ExecutorService executorService = null; try { long rowCount = 0; Map<String, ColumnStatistics> columnStatisticsMap = new HashMap<>(); RowType producedRowType = (RowType) producedDataType.getLogicalType(); + executorService = + Executors.newFixedThreadPool( + statisticsThreadNum, + new ExecutorThreadFactory("orc-get-table-statistic-worker")); + List<Future<FileOrcStatistics>> fileRowCountFutures = new ArrayList<>(); for (Path file : files) { - rowCount += - updateStatistics(hadoopConfig, file, columnStatisticsMap, producedRowType); + fileRowCountFutures.add( + executorService.submit(new OrcFileRowCountCalculator(hadoopConfig, file))); + } + for (Future<FileOrcStatistics> fileCountFuture : fileRowCountFutures) { + FileOrcStatistics fileOrcStatistics = fileCountFuture.get(); + rowCount += fileOrcStatistics.getRowCount(); + for (String column : producedRowType.getFieldNames()) { + int fieldIdx = fileOrcStatistics.getFieldNames().indexOf(column); + if (fieldIdx >= 0) { + int colId = fileOrcStatistics.getColumnTypes().get(fieldIdx).getId(); + ColumnStatistics statistic = fileOrcStatistics.getStatistics()[colId]; + updateStatistics(statistic, column, columnStatisticsMap); + } + } } - Map<String, ColumnStats> columnStatsMap = convertToColumnStats(rowCount, columnStatisticsMap, producedRowType); @@ -70,35 +102,11 @@ public class OrcFormatStatisticsReportUtil { } catch (Exception e) { LOG.warn("Reporting statistics failed for Orc format: {}", e.getMessage()); return TableStats.UNKNOWN; - } - } - - private static long updateStatistics( - Configuration hadoopConf, - Path file, - Map<String, ColumnStatistics> columnStatisticsMap, - RowType producedRowType) - throws IOException { - org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(file.toUri()); - Reader reader = - OrcFile.createReader( - path, - OrcFile.readerOptions(hadoopConf) - .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf))); - ColumnStatistics[] statistics = reader.getStatistics(); - TypeDescription schema = reader.getSchema(); - List<String> fieldNames = schema.getFieldNames(); - List<TypeDescription> columnTypes = schema.getChildren(); - for (String column : producedRowType.getFieldNames()) { - int fieldIdx = fieldNames.indexOf(column); - if (fieldIdx >= 0) { - int colId = columnTypes.get(fieldIdx).getId(); - ColumnStatistics statistic = statistics[colId]; - updateStatistics(statistic, column, columnStatisticsMap); + } finally { + if (executorService != null) { + executorService.shutdownNow(); } } - - return reader.getNumberOfRows(); } private static void updateStatistics( @@ -217,4 +225,69 @@ public class OrcFormatStatisticsReportUtil { } return builder.build(); } + + private static class FileOrcStatistics { + private final Long rowCount; + + private final List<String> fieldNames; + + private final ColumnStatistics[] statistics; + + private final List<TypeDescription> columnTypes; + + public FileOrcStatistics( + Long rowCount, + List<String> fieldNames, + ColumnStatistics[] statistics, + List<TypeDescription> columnTypes) { + this.rowCount = rowCount; + this.fieldNames = fieldNames; + this.statistics = statistics; + this.columnTypes = columnTypes; + } + + public Long getRowCount() { + return rowCount; + } + + public List<String> getFieldNames() { + return fieldNames; + } + + public ColumnStatistics[] getStatistics() { + return statistics; + } + + public List<TypeDescription> getColumnTypes() { + return columnTypes; + } + } + + private static class OrcFileRowCountCalculator implements Callable<FileOrcStatistics> { + + private final Configuration hadoopConf; + private final Path file; + + public OrcFileRowCountCalculator(Configuration hadoopConf, Path file) { + this.hadoopConf = hadoopConf; + this.file = file; + } + + @Override + public FileOrcStatistics call() throws IOException { + org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(file.toUri()); + Reader reader = + OrcFile.createReader( + path, + OrcFile.readerOptions(hadoopConf) + .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf))); + ColumnStatistics[] statistics = reader.getStatistics(); + TypeDescription schema = reader.getSchema(); + List<String> fieldNames = schema.getFieldNames(); + List<TypeDescription> columnTypes = schema.getChildren(); + + return new FileOrcStatistics( + reader.getNumberOfRows(), fieldNames, statistics, columnTypes); + } + } } diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java index 7e0aad6ce28..a40784db312 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java @@ -28,6 +28,7 @@ import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.utils.DateTimeUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.Preconditions; @@ -46,7 +47,6 @@ import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; @@ -54,9 +54,14 @@ import java.nio.ByteOrder; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.stream.Collectors; /** Utils for Parquet format statistics report. */ @@ -70,12 +75,49 @@ public class ParquetFormatStatisticsReportUtil { DataType producedDataType, Configuration hadoopConfig, boolean isUtcTimestamp) { + return getTableStatistics( + files, + producedDataType, + hadoopConfig, + isUtcTimestamp, + Runtime.getRuntime().availableProcessors()); + } + + public static TableStats getTableStatistics( + List<Path> files, + DataType producedDataType, + Configuration hadoopConfig, + boolean isUtcTimestamp, + int statisticsThreadNum) { + ExecutorService executorService = null; try { Map<String, Statistics<?>> columnStatisticsMap = new HashMap<>(); RowType producedRowType = (RowType) producedDataType.getLogicalType(); + executorService = + Executors.newFixedThreadPool( + statisticsThreadNum, + new ExecutorThreadFactory("parquet-get-table-statistic-worker")); long rowCount = 0; + List<Future<FileParquetStatistics>> fileRowCountFutures = new ArrayList<>(); for (Path file : files) { - rowCount += updateStatistics(hadoopConfig, file, columnStatisticsMap); + fileRowCountFutures.add( + executorService.submit( + new ParquetFileRowCountCalculator( + hadoopConfig, file, columnStatisticsMap))); + } + for (Future<FileParquetStatistics> fileCountFuture : fileRowCountFutures) { + FileParquetStatistics fileStatistics = fileCountFuture.get(); + List<String> columns = fileStatistics.getColumns(); + List<BlockMetaData> blocks = fileStatistics.blocks; + for (BlockMetaData block : blocks) { + rowCount += block.getRowCount(); + for (int i = 0; i < columns.size(); ++i) { + updateStatistics( + block.getColumns().get(i).getStatistics(), + columns.get(i), + columnStatisticsMap); + } + } } Map<String, ColumnStats> columnStatsMap = convertToColumnStats(columnStatisticsMap, producedRowType, isUtcTimestamp); @@ -83,6 +125,22 @@ public class ParquetFormatStatisticsReportUtil { } catch (Exception e) { LOG.warn("Reporting statistics failed for Parquet format", e); return TableStats.UNKNOWN; + } finally { + if (executorService != null) { + executorService.shutdownNow(); + } + } + } + + private static void updateStatistics( + Statistics<?> statistics, + String column, + Map<String, Statistics<?>> columnStatisticsMap) { + Statistics<?> previousStatistics = columnStatisticsMap.get(column); + if (previousStatistics == null) { + columnStatisticsMap.put(column, statistics); + } else { + previousStatistics.mergeStatistics(statistics); } } @@ -252,42 +310,6 @@ public class ParquetFormatStatisticsReportUtil { return builder.build(); } - private static long updateStatistics( - Configuration hadoopConfig, Path file, Map<String, Statistics<?>> columnStatisticsMap) - throws IOException { - org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(file.toUri()); - ParquetMetadata metadata = ParquetFileReader.readFooter(hadoopConfig, hadoopPath); - MessageType schema = metadata.getFileMetaData().getSchema(); - List<String> columns = - schema.asGroupType().getFields().stream() - .map(Type::getName) - .collect(Collectors.toList()); - List<BlockMetaData> blocks = metadata.getBlocks(); - long rowCount = 0; - for (BlockMetaData block : blocks) { - rowCount += block.getRowCount(); - for (int i = 0; i < columns.size(); ++i) { - updateStatistics( - block.getColumns().get(i).getStatistics(), - columns.get(i), - columnStatisticsMap); - } - } - return rowCount; - } - - private static void updateStatistics( - Statistics<?> statistics, - String column, - Map<String, Statistics<?>> columnStatisticsMap) { - Statistics<?> previousStatistics = columnStatisticsMap.get(column); - if (previousStatistics == null) { - columnStatisticsMap.put(column, statistics); - } else { - previousStatistics.mergeStatistics(statistics); - } - } - private static BigDecimal binaryToDecimal(Binary decimal, int scale) { BigInteger bigInteger = new BigInteger(decimal.getBytesUnsafe()); return new BigDecimal(bigInteger, scale); @@ -304,4 +326,50 @@ public class ParquetFormatStatisticsReportUtil { TimestampColumnReader.int96ToTimestamp(utcTimestamp, timeOfDayNanos, julianDay); return timestampData.toTimestamp(); } + + private static class FileParquetStatistics { + + private final List<String> columns; + + private final List<BlockMetaData> blocks; + + public FileParquetStatistics(List<String> columns, List<BlockMetaData> blocks) { + this.columns = columns; + this.blocks = blocks; + } + + public List<String> getColumns() { + return columns; + } + + public List<BlockMetaData> getBlocks() { + return blocks; + } + } + + private static class ParquetFileRowCountCalculator implements Callable<FileParquetStatistics> { + private final Configuration hadoopConfig; + private final Path file; + + public ParquetFileRowCountCalculator( + Configuration hadoopConfig, + Path file, + Map<String, Statistics<?>> columnStatisticsMap) { + this.hadoopConfig = hadoopConfig; + this.file = file; + } + + @Override + public FileParquetStatistics call() throws Exception { + org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(file.toUri()); + ParquetMetadata metadata = ParquetFileReader.readFooter(hadoopConfig, hadoopPath); + MessageType schema = metadata.getFileMetaData().getSchema(); + List<String> columns = + schema.asGroupType().getFields().stream() + .map(Type::getName) + .collect(Collectors.toList()); + List<BlockMetaData> blocks = metadata.getBlocks(); + return new FileParquetStatistics(columns, blocks); + } + } }