This is an automated email from the ASF dual-hosted git repository. vinoyang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 528f4ca [HUDI-1880] Support streaming read with compaction and cleaning (#2921) 528f4ca is described below commit 528f4ca988209cbc9e519bd42b87b896aea992b6 Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Fri May 7 20:04:35 2021 +0800 [HUDI-1880] Support streaming read with compaction and cleaning (#2921) --- .../java/org/apache/hudi/io/FlinkMergeHandle.java | 14 ++++----- .../org/apache/hudi/sink/StreamWriteFunction.java | 9 ++---- .../hudi/source/StreamReadMonitoringFunction.java | 33 ++++++++++++---------- .../table/format/mor/MergeOnReadInputFormat.java | 11 ++++++++ .../sink/utils/StreamWriteFunctionWrapper.java | 1 - 5 files changed, 36 insertions(+), 32 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index 4da6404..7fffd6b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -87,8 +87,9 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O> * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. */ protected String generatesDataFileNameWithRollover() { - final String fileID = this.fileId + "-" + rollNumber; - return FSUtils.makeDataFileName(instantTime, writeToken, fileID, hoodieTable.getBaseFileExtension()); + // make the intermediate file as hidden + return FSUtils.makeDataFileName("." + instantTime, + writeToken + "-" + rollNumber, this.fileId, hoodieTable.getBaseFileExtension()); } public boolean shouldRollover() { @@ -193,13 +194,8 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O> throw new HoodieIOException("Error when clean the temporary roll file: " + path, e); } } - Path lastPath = rolloverPaths.size() > 0 - ? rolloverPaths.get(rolloverPaths.size() - 1) - : newFilePath; - String newFileName = generatesDataFileName(); - String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") - + newFileName).toString(); - final Path desiredPath = new Path(config.getBasePath(), relativePath); + final Path lastPath = rolloverPaths.get(rolloverPaths.size() - 1); + final Path desiredPath = rolloverPaths.get(0); try { fs.rename(lastPath, desiredPath); } catch (IOException e) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 8244226..5c3cbb7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -33,7 +33,6 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -92,7 +91,7 @@ import java.util.function.BiFunction; */ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O> - implements CheckpointedFunction, CheckpointListener { + implements CheckpointedFunction { private static final long serialVersionUID = 1L; @@ -181,11 +180,6 @@ public class StreamWriteFunction<K, I, O> } } - @Override - public void notifyCheckpointComplete(long checkpointId) { - this.writeClient.cleanHandles(); - } - /** * End input action for batch source. */ @@ -390,6 +384,7 @@ public class StreamWriteFunction<K, I, O> .build(); this.eventGateway.sendEventToCoordinator(event); this.buckets.clear(); + this.writeClient.cleanHandles(); this.currentInstant = ""; } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index f508726..b6eb397 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -112,8 +112,6 @@ public class StreamReadMonitoringFunction private final long maxCompactionMemoryInBytes; - private final boolean isDelta; - public StreamReadMonitoringFunction( Configuration conf, Path path, @@ -124,7 +122,6 @@ public class StreamReadMonitoringFunction this.metaClient = metaClient; this.interval = conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL); this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; - this.isDelta = conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ); } @Override @@ -189,15 +186,12 @@ public class StreamReadMonitoringFunction @VisibleForTesting public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> context) { metaClient.reloadActiveTimeline(); - HoodieTimeline commitTimeline = isDelta - // if is delta, exclude the parquet files from compaction - ? metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants() - : metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); if (commitTimeline.empty()) { LOG.warn("No splits found for the table under path " + path); return; } - List<HoodieInstant> instants = getUncompactedInstants(commitTimeline, this.issuedInstant); + List<HoodieInstant> instants = filterInstantsWithStart(commitTimeline, this.issuedInstant); // get the latest instant that satisfies condition final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1); final InstantRange instantRange; @@ -303,29 +297,26 @@ public class StreamReadMonitoringFunction } /** - * Returns the uncompacted instants with a given issuedInstant to start from. + * Returns the instants with a given issuedInstant to start from. * * @param commitTimeline The completed commits timeline * @param issuedInstant The last issued instant that has already been delivered to downstream * @return the filtered hoodie instants */ - private List<HoodieInstant> getUncompactedInstants( + private List<HoodieInstant> filterInstantsWithStart( HoodieTimeline commitTimeline, final String issuedInstant) { if (issuedInstant != null) { return commitTimeline.getInstants() - .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant)) .collect(Collectors.toList()); } else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) { String definedStartCommit = this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT); return commitTimeline.getInstants() - .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, definedStartCommit)) .collect(Collectors.toList()); } else { return commitTimeline.getInstants() - .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) .collect(Collectors.toList()); } } @@ -357,14 +348,26 @@ public class StreamReadMonitoringFunction private List<FileStatus> getWritePathsOfInstant(HoodieCommitMetadata metadata, FileSystem fs) { return metadata.getFileIdAndFullPaths(path.toString()).values().stream() - .map(path -> { + .map(org.apache.hadoop.fs.Path::new) + // filter out the file paths that does not exist, some files may be cleaned by + // the cleaner. + .filter(path -> { + try { + return fs.exists(path); + } catch (IOException e) { + LOG.error("Checking exists of path: {} error", path); + throw new HoodieException(e); + } + }).map(path -> { try { - return fs.getFileStatus(new org.apache.hadoop.fs.Path(path)); + return fs.getFileStatus(path); } catch (IOException e) { LOG.error("Get write status of path: {} error", path); throw new HoodieException(e); } }) + // filter out crushed files + .filter(fileStatus -> fileStatus.getLen() > 0) .collect(Collectors.toList()); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 595bf5a..f0f4f41 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -536,6 +536,9 @@ public class MergeOnReadInputFormat private final GenericRecordBuilder recordBuilder; private final RowDataProjection projection; + + private final InstantRange instantRange; + // add the flag because the flink ParquetColumnarRowSplitReader is buggy: // method #reachedEnd() returns false after it returns true. // refactor it out once FLINK-22370 is resolved. @@ -564,12 +567,20 @@ public class MergeOnReadInputFormat this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType); this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType); this.projection = RowDataProjection.instance(requiredRowType, requiredPos); + this.instantRange = split.getInstantRange().orElse(null); } @Override public boolean reachedEnd() throws IOException { if (!readLogs && !this.reader.reachedEnd()) { currentRecord = this.reader.nextRecord(); + if (instantRange != null) { + boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString()); + if (!isInRange) { + // filter base file by instant range + return reachedEnd(); + } + } final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString(); if (logRecords.containsKey(curKey)) { keyToSkip.add(curKey); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index a4b6c16..5050109 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -163,7 +163,6 @@ public class StreamWriteFunctionWrapper<I> { functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); coordinator.notifyCheckpointComplete(checkpointId); this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId); - this.writeFunction.notifyCheckpointComplete(checkpointId); if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) { try { compactFunctionWrapper.compact(checkpointId);