This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit f2256ec94ccf7c6fc68b49ddd351f5b2aaa8fc35 Author: shaoxiong.zhan <shaoxiong0...@gmail.com> AuthorDate: Tue Dec 6 20:15:49 2022 +0800 exclude hudi-kafka-connect & add some api to support FLIP-27 source --- .../apache/hudi/configuration/FlinkOptions.java | 6 ++++ .../org/apache/hudi/table/HoodieTableSource.java | 42 +++++++++++++--------- .../table/format/mor/MergeOnReadInputFormat.java | 29 +++++++++++++++ .../table/format/mor/MergeOnReadInputSplit.java | 8 ++++- 4 files changed, 68 insertions(+), 17 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index aa1e3297bd..df2c96c8a9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -200,6 +200,12 @@ public class FlinkOptions extends HoodieConfig { .noDefaultValue() .withDescription("Parallelism of tasks that do actual read, default is the parallelism of the execution environment"); + public static final ConfigOption<Integer> NUM_RECORDS_PER_BATCH = ConfigOptions + .key("num.records_per.batch") + .intType() + .defaultValue(10000) + .withDescription("num records per batch in single split"); + public static final ConfigOption<String> SOURCE_AVRO_SCHEMA_PATH = ConfigOptions .key("source.avro-schema.path") .stringType() diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 6fac5e4b88..ab270f89b0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -115,8 +115,9 @@ public class HoodieTableSource implements private final transient HoodieTableMetaClient metaClient; private final long maxCompactionMemoryInBytes; - private final ResolvedSchema schema; private final RowType tableRowType; + private final String[] schemaFieldNames; + private final DataType[] schemaTypes; private final Path path; private final List<String> partitionKeys; private final String defaultPartName; @@ -135,34 +136,43 @@ public class HoodieTableSource implements List<String> partitionKeys, String defaultPartName, Configuration conf) { - this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null); + + this(schema.getColumnNames().toArray(new String[0]), + schema.getColumnDataTypes().toArray(new DataType[0]), + (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType(), + path, partitionKeys, defaultPartName, conf, null, null, null, null, null); } public HoodieTableSource( - ResolvedSchema schema, + String[] schemaFieldNames, + DataType[] schemaTypes, + RowType rowType, Path path, List<String> partitionKeys, String defaultPartName, Configuration conf, + @Nullable FileIndex fileIndex, @Nullable List<Map<String, String>> requiredPartitions, @Nullable int[] requiredPos, @Nullable Long limit, - @Nullable List<ResolvedExpression> filters) { - this.schema = schema; - this.tableRowType = (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType(); + @Nullable HoodieTableMetaClient metaClient) { + this.schemaFieldNames = schemaFieldNames; + this.schemaTypes = schemaTypes; + this.tableRowType = rowType; this.path = path; this.partitionKeys = partitionKeys; this.defaultPartName = defaultPartName; this.conf = conf; + this.fileIndex = fileIndex == null + ? FileIndex.instance(this.path, this.conf, this.tableRowType) + : fileIndex; this.requiredPartitions = requiredPartitions; this.requiredPos = requiredPos == null ? IntStream.range(0, this.tableRowType.getFieldCount()).toArray() : requiredPos; this.limit = limit == null ? NO_LIMIT_CONSTANT : limit; - this.filters = filters == null ? Collections.emptyList() : filters; this.hadoopConf = HadoopConfigurations.getHadoopConf(conf); - this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf); - this.fileIndex = FileIndex.instance(this.path, this.conf, this.tableRowType); + this.metaClient = metaClient == null ? StreamerUtil.metaClientForReader(conf, hadoopConf) : metaClient; this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf); } @@ -210,8 +220,8 @@ public class HoodieTableSource implements @Override public DynamicTableSource copy() { - return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, - conf, requiredPartitions, requiredPos, limit, filters); + return new HoodieTableSource(schemaFieldNames, schemaTypes, tableRowType, path, partitionKeys, defaultPartName, + conf, fileIndex, requiredPartitions, requiredPos, limit, metaClient); } @Override @@ -256,8 +266,8 @@ public class HoodieTableSource implements } private DataType getProducedDataType() { - String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]); - DataType[] schemaTypes = this.schema.getColumnDataTypes().toArray(new DataType[0]); + String[] schemaFieldNames = this.schemaFieldNames; + DataType[] schemaTypes = this.schemaTypes; return DataTypes.ROW(Arrays.stream(this.requiredPos) .mapToObj(i -> DataTypes.FIELD(schemaFieldNames[i], schemaTypes[i])) @@ -266,7 +276,7 @@ public class HoodieTableSource implements } private String getSourceOperatorName(String operatorName) { - String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]); + String[] schemaFieldNames = this.schemaFieldNames; List<String> fields = Arrays.stream(this.requiredPos) .mapToObj(i -> schemaFieldNames[i]) .collect(Collectors.toList()); @@ -450,8 +460,8 @@ public class HoodieTableSource implements return new CopyOnWriteInputFormat( FilePathUtils.toFlinkPaths(paths), - this.schema.getColumnNames().toArray(new String[0]), - this.schema.getColumnDataTypes().toArray(new DataType[0]), + this.schemaFieldNames, + this.schemaTypes, this.requiredPos, this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index c9b6561bde..a0026d54ca 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -59,6 +59,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; @@ -132,6 +133,8 @@ public class MergeOnReadInputFormat */ private boolean emitDelete; + private final int numRecordsPerBatch; + /** * Flag saying whether the input format has been closed. */ @@ -153,6 +156,7 @@ public class MergeOnReadInputFormat // because we need to this.requiredPos = tableState.getRequiredPositions(); this.limit = limit; + this.numRecordsPerBatch = conf.get(FlinkOptions.NUM_RECORDS_PER_BATCH); this.emitDelete = emitDelete; } @@ -408,6 +412,31 @@ public class MergeOnReadInputFormat }; } + public Iterator<RowData> readBatch() throws IOException { + List<RowData> result = new ArrayList<>(this.numRecordsPerBatch); + int remaining = this.numRecordsPerBatch; + RowData next = null; + while (!this.isClosed() && remaining-- > 0) { + if (!this.reachedEnd()) { + next = nextRecord(null); + result.add(next); + } else { + close(); + break; + } + } + + if (result.isEmpty()) { + return null; + } + return result.iterator(); + } + + public MergeOnReadInputFormat copy() { + return new MergeOnReadInputFormat(this.conf, this.tableState, this.fieldTypes, + this.defaultPartName, this.limit, this.emitDelete); + } + private ClosableIterator<RowData> getUnMergedLogFileIterator(MergeOnReadInputSplit split) { final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema()); final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema()); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java index cde646e41f..f3bc3361a6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java @@ -21,6 +21,7 @@ package org.apache.hudi.table.format.mor; import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.util.Option; +import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.core.io.InputSplit; import javax.annotation.Nullable; @@ -30,7 +31,7 @@ import java.util.List; /** * Represents an input split of source, actually a data bucket. */ -public class MergeOnReadInputSplit implements InputSplit { +public class MergeOnReadInputSplit implements InputSplit, SourceSplit { private static final long serialVersionUID = 1L; private static final long NUM_NO_CONSUMPTION = 0L; @@ -78,6 +79,11 @@ public class MergeOnReadInputSplit implements InputSplit { this.fileId = fileId; } + @Override + public String splitId() { + return getFileId(); + } + public Option<String> getBasePath() { return basePath; }