This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 528f4ca  [HUDI-1880] Support streaming read with compaction and 
cleaning (#2921)
528f4ca is described below

commit 528f4ca988209cbc9e519bd42b87b896aea992b6
Author: Danny Chan <yuzhao....@gmail.com>
AuthorDate: Fri May 7 20:04:35 2021 +0800

    [HUDI-1880] Support streaming read with compaction and cleaning (#2921)
---
 .../java/org/apache/hudi/io/FlinkMergeHandle.java  | 14 ++++-----
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  9 ++----
 .../hudi/source/StreamReadMonitoringFunction.java  | 33 ++++++++++++----------
 .../table/format/mor/MergeOnReadInputFormat.java   | 11 ++++++++
 .../sink/utils/StreamWriteFunctionWrapper.java     |  1 -
 5 files changed, 36 insertions(+), 32 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index 4da6404..7fffd6b 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -87,8 +87,9 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, 
I, K, O>
    * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
    */
   protected String generatesDataFileNameWithRollover() {
-    final String fileID = this.fileId + "-" + rollNumber;
-    return FSUtils.makeDataFileName(instantTime, writeToken, fileID, 
hoodieTable.getBaseFileExtension());
+    // make the intermediate file as hidden
+    return FSUtils.makeDataFileName("." + instantTime,
+        writeToken + "-" + rollNumber, this.fileId, 
hoodieTable.getBaseFileExtension());
   }
 
   public boolean shouldRollover() {
@@ -193,13 +194,8 @@ public class FlinkMergeHandle<T extends 
HoodieRecordPayload, I, K, O>
         throw new HoodieIOException("Error when clean the temporary roll file: 
" + path, e);
       }
     }
-    Path lastPath = rolloverPaths.size() > 0
-        ? rolloverPaths.get(rolloverPaths.size() - 1)
-        : newFilePath;
-    String newFileName = generatesDataFileName();
-    String relativePath = new Path((partitionPath.isEmpty() ? "" : 
partitionPath + "/")
-        + newFileName).toString();
-    final Path desiredPath = new Path(config.getBasePath(), relativePath);
+    final Path lastPath = rolloverPaths.get(rolloverPaths.size() - 1);
+    final Path desiredPath = rolloverPaths.get(0);
     try {
       fs.rename(lastPath, desiredPath);
     } catch (IOException e) {
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 8244226..5c3cbb7 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -33,7 +33,6 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -92,7 +91,7 @@ import java.util.function.BiFunction;
  */
 public class StreamWriteFunction<K, I, O>
     extends KeyedProcessFunction<K, I, O>
-    implements CheckpointedFunction, CheckpointListener {
+    implements CheckpointedFunction {
 
   private static final long serialVersionUID = 1L;
 
@@ -181,11 +180,6 @@ public class StreamWriteFunction<K, I, O>
     }
   }
 
-  @Override
-  public void notifyCheckpointComplete(long checkpointId) {
-    this.writeClient.cleanHandles();
-  }
-
   /**
    * End input action for batch source.
    */
@@ -390,6 +384,7 @@ public class StreamWriteFunction<K, I, O>
         .build();
     this.eventGateway.sendEventToCoordinator(event);
     this.buckets.clear();
+    this.writeClient.cleanHandles();
     this.currentInstant = "";
   }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index f508726..b6eb397 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -112,8 +112,6 @@ public class StreamReadMonitoringFunction
 
   private final long maxCompactionMemoryInBytes;
 
-  private final boolean isDelta;
-
   public StreamReadMonitoringFunction(
       Configuration conf,
       Path path,
@@ -124,7 +122,6 @@ public class StreamReadMonitoringFunction
     this.metaClient = metaClient;
     this.interval = 
conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
     this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
-    this.isDelta = 
conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
   }
 
   @Override
@@ -189,15 +186,12 @@ public class StreamReadMonitoringFunction
   @VisibleForTesting
   public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> 
context) {
     metaClient.reloadActiveTimeline();
-    HoodieTimeline commitTimeline = isDelta
-        // if is delta, exclude the parquet files from compaction
-        ? 
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
-        : 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+    HoodieTimeline commitTimeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
     if (commitTimeline.empty()) {
       LOG.warn("No splits found for the table under path " + path);
       return;
     }
-    List<HoodieInstant> instants = getUncompactedInstants(commitTimeline, 
this.issuedInstant);
+    List<HoodieInstant> instants = filterInstantsWithStart(commitTimeline, 
this.issuedInstant);
     // get the latest instant that satisfies condition
     final HoodieInstant instantToIssue = instants.size() == 0 ? null : 
instants.get(instants.size() - 1);
     final InstantRange instantRange;
@@ -303,29 +297,26 @@ public class StreamReadMonitoringFunction
   }
 
   /**
-   * Returns the uncompacted instants with a given issuedInstant to start from.
+   * Returns the instants with a given issuedInstant to start from.
    *
    * @param commitTimeline The completed commits timeline
    * @param issuedInstant  The last issued instant that has already been 
delivered to downstream
    * @return the filtered hoodie instants
    */
-  private List<HoodieInstant> getUncompactedInstants(
+  private List<HoodieInstant> filterInstantsWithStart(
       HoodieTimeline commitTimeline,
       final String issuedInstant) {
     if (issuedInstant != null) {
       return commitTimeline.getInstants()
-          .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
           .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN, issuedInstant))
           .collect(Collectors.toList());
     } else if 
(this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
       String definedStartCommit = 
this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT);
       return commitTimeline.getInstants()
-          .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
           .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN_OR_EQUALS, definedStartCommit))
           .collect(Collectors.toList());
     } else {
       return commitTimeline.getInstants()
-          .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
           .collect(Collectors.toList());
     }
   }
@@ -357,14 +348,26 @@ public class StreamReadMonitoringFunction
 
   private List<FileStatus> getWritePathsOfInstant(HoodieCommitMetadata 
metadata, FileSystem fs) {
     return metadata.getFileIdAndFullPaths(path.toString()).values().stream()
-        .map(path -> {
+        .map(org.apache.hadoop.fs.Path::new)
+        // filter out the file paths that does not exist, some files may be 
cleaned by
+        // the cleaner.
+        .filter(path -> {
+          try {
+            return fs.exists(path);
+          } catch (IOException e) {
+            LOG.error("Checking exists of path: {} error", path);
+            throw new HoodieException(e);
+          }
+        }).map(path -> {
           try {
-            return fs.getFileStatus(new org.apache.hadoop.fs.Path(path));
+            return fs.getFileStatus(path);
           } catch (IOException e) {
             LOG.error("Get write status of path: {} error", path);
             throw new HoodieException(e);
           }
         })
+        // filter out crushed files
+        .filter(fileStatus -> fileStatus.getLen() > 0)
         .collect(Collectors.toList());
   }
 
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 595bf5a..f0f4f41 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -536,6 +536,9 @@ public class MergeOnReadInputFormat
     private final GenericRecordBuilder recordBuilder;
 
     private final RowDataProjection projection;
+
+    private final InstantRange instantRange;
+
     // add the flag because the flink ParquetColumnarRowSplitReader is buggy:
     // method #reachedEnd() returns false after it returns true.
     // refactor it out once FLINK-22370 is resolved.
@@ -564,12 +567,20 @@ public class MergeOnReadInputFormat
       this.rowDataToAvroConverter = 
RowDataToAvroConverters.createConverter(tableRowType);
       this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(requiredRowType);
       this.projection = RowDataProjection.instance(requiredRowType, 
requiredPos);
+      this.instantRange = split.getInstantRange().orElse(null);
     }
 
     @Override
     public boolean reachedEnd() throws IOException {
       if (!readLogs && !this.reader.reachedEnd()) {
         currentRecord = this.reader.nextRecord();
+        if (instantRange != null) {
+          boolean isInRange = 
instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
+          if (!isInRange) {
+            // filter base file by instant range
+            return reachedEnd();
+          }
+        }
         final String curKey = 
currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString();
         if (logRecords.containsKey(curKey)) {
           keyToSkip.add(curKey);
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index a4b6c16..5050109 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -163,7 +163,6 @@ public class StreamWriteFunctionWrapper<I> {
     
functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
     coordinator.notifyCheckpointComplete(checkpointId);
     this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
-    this.writeFunction.notifyCheckpointComplete(checkpointId);
     if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
       try {
         compactFunctionWrapper.compact(checkpointId);

Reply via email to