This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fd21d0f10 [HUDI-4044] When reading data from flink-hudi to external 
storage, the … (#5516)
6fd21d0f10 is described below

commit 6fd21d0f1043d0a06b93332d86e63d7b708fcbe8
Author: aliceyyan <104287562+alicey...@users.noreply.github.com>
AuthorDate: Tue May 10 10:25:13 2022 +0800

    [HUDI-4044] When reading data from flink-hudi to external storage, the … 
(#5516)
    
    
    Co-authored-by: aliceyyan <alicey...@tencent.com>
---
 .../java/org/apache/hudi/source/IncrementalInputSplits.java |  2 +-
 .../main/java/org/apache/hudi/table/HoodieTableSource.java  |  3 ++-
 .../apache/hudi/table/format/mor/MergeOnReadInputSplit.java | 13 ++++++++++++-
 3 files changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 02e0e253cf..94eeefcd36 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -226,7 +226,7 @@ public class IncrementalInputSplits implements Serializable 
{
               String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
               return new MergeOnReadInputSplit(cnt.getAndAdd(1),
                   basePath, logPaths, endInstant,
-                  metaClient.getBasePath(), maxCompactionMemoryInBytes, 
mergeType, instantRange);
+                  metaClient.getBasePath(), maxCompactionMemoryInBytes, 
mergeType, instantRange, fileSlice.getFileId());
             }).collect(Collectors.toList()))
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index d00eb3e3ec..da4abf0a96 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -181,6 +181,7 @@ public class HoodieTableSource implements
           OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> 
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
           SingleOutputStreamOperator<RowData> source = 
execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
               .setParallelism(1)
+              .keyBy(inputSplit -> inputSplit.getFileId())
               .transform("split_reader", typeInfo, factory)
               .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
           return new DataStreamSource<>(source);
@@ -316,7 +317,7 @@ public class HoodieTableSource implements
                   .map(logFile -> logFile.getPath().toString())
                   .collect(Collectors.toList()));
               return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, 
logPaths, latestCommit,
-                  metaClient.getBasePath(), maxCompactionMemoryInBytes, 
mergeType, null);
+                  metaClient.getBasePath(), maxCompactionMemoryInBytes, 
mergeType, null, fileSlice.getFileId());
             }).collect(Collectors.toList()))
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
index 156622c303..cde646e41f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
@@ -43,6 +43,7 @@ public class MergeOnReadInputSplit implements InputSplit {
   private final long maxCompactionMemoryInBytes;
   private final String mergeType;
   private final Option<InstantRange> instantRange;
+  private String fileId;
 
   // for streaming reader to record the consumed offset,
   // which is the start of next round reading.
@@ -56,7 +57,8 @@ public class MergeOnReadInputSplit implements InputSplit {
       String tablePath,
       long maxCompactionMemoryInBytes,
       String mergeType,
-      @Nullable InstantRange instantRange) {
+      @Nullable InstantRange instantRange,
+      String fileId) {
     this.splitNum = splitNum;
     this.basePath = Option.ofNullable(basePath);
     this.logPaths = logPaths;
@@ -65,6 +67,15 @@ public class MergeOnReadInputSplit implements InputSplit {
     this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
     this.mergeType = mergeType;
     this.instantRange = Option.ofNullable(instantRange);
+    this.fileId = fileId;
+  }
+
+  public String getFileId() {
+    return fileId;
+  }
+
+  public void setFileId(String fileId) {
+    this.fileId = fileId;
   }
 
   public Option<String> getBasePath() {

Reply via email to