This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 6fd21d0f10 [HUDI-4044] When reading data from flink-hudi to external storage, the … (#5516) 6fd21d0f10 is described below commit 6fd21d0f1043d0a06b93332d86e63d7b708fcbe8 Author: aliceyyan <104287562+alicey...@users.noreply.github.com> AuthorDate: Tue May 10 10:25:13 2022 +0800 [HUDI-4044] When reading data from flink-hudi to external storage, the … (#5516) Co-authored-by: aliceyyan <alicey...@tencent.com> --- .../java/org/apache/hudi/source/IncrementalInputSplits.java | 2 +- .../main/java/org/apache/hudi/table/HoodieTableSource.java | 3 ++- .../apache/hudi/table/format/mor/MergeOnReadInputSplit.java | 13 ++++++++++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 02e0e253cf..94eeefcd36 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -226,7 +226,7 @@ public class IncrementalInputSplits implements Serializable { String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, endInstant, - metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange); + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange, fileSlice.getFileId()); }).collect(Collectors.toList())) .flatMap(Collection::stream) .collect(Collectors.toList()); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index d00eb3e3ec..da4abf0a96 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -181,6 +181,7 @@ public class HoodieTableSource implements OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor")) .setParallelism(1) + .keyBy(inputSplit -> inputSplit.getFileId()) .transform("split_reader", typeInfo, factory) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource<>(source); @@ -316,7 +317,7 @@ public class HoodieTableSource implements .map(logFile -> logFile.getPath().toString()) .collect(Collectors.toList())); return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit, - metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null); + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null, fileSlice.getFileId()); }).collect(Collectors.toList())) .flatMap(Collection::stream) .collect(Collectors.toList()); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java index 156622c303..cde646e41f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java @@ -43,6 +43,7 @@ public class MergeOnReadInputSplit implements InputSplit { private final long maxCompactionMemoryInBytes; private final String mergeType; private final Option<InstantRange> instantRange; + private String fileId; // for streaming reader to record the consumed offset, // which is the start of next round reading. @@ -56,7 +57,8 @@ public class MergeOnReadInputSplit implements InputSplit { String tablePath, long maxCompactionMemoryInBytes, String mergeType, - @Nullable InstantRange instantRange) { + @Nullable InstantRange instantRange, + String fileId) { this.splitNum = splitNum; this.basePath = Option.ofNullable(basePath); this.logPaths = logPaths; @@ -65,6 +67,15 @@ public class MergeOnReadInputSplit implements InputSplit { this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; this.mergeType = mergeType; this.instantRange = Option.ofNullable(instantRange); + this.fileId = fileId; + } + + public String getFileId() { + return fileId; + } + + public void setFileId(String fileId) { + this.fileId = fileId; } public Option<String> getBasePath() {