This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new f171496de2 [HUDI-4769] Option read.streaming.skip_compaction skips delta commit (#6848) f171496de2 is described below commit f171496de244992958fd3fd22fbcd2a7dc62c7a2 Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Sun Oct 2 09:08:03 2022 +0800 [HUDI-4769] Option read.streaming.skip_compaction skips delta commit (#6848) --- .../table/view/AbstractTableFileSystemView.java | 50 ++++++ .../apache/hudi/source/IncrementalInputSplits.java | 178 +++++++++------------ .../hudi/source/StreamReadMonitoringFunction.java | 5 +- .../apache/hudi/table/format/TestInputFormat.java | 70 +++++++- 4 files changed, 197 insertions(+), 106 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index ed4bfd7601..976217ae07 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -713,6 +713,33 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV } } + /** + * Stream all "merged" file-slices before on an instant time + * for a MERGE_ON_READ table with index that can index log files(which means it writes pure logs first). + * + * <p>In streaming read scenario, in order for better reading efficiency, the user can choose to skip the + * base files that are produced by compaction. That is to say, we allow the users to consumer only from + * these partitioned log files, these log files keep the record sequence just like the normal message queue. + * + * <p>NOTE: only local view is supported. + * + * @param partitionStr Partition Path + * @param maxInstantTime Max Instant Time + */ + public final Stream<FileSlice> getAllLogsMergedFileSliceBeforeOrOn(String partitionStr, String maxInstantTime) { + try { + readLock.lock(); + String partition = formatPartitionKey(partitionStr); + ensurePartitionLoadedCorrectly(partition); + return fetchAllStoredFileGroups(partition) + .filter(fg -> !isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxInstantTime)) + .map(fileGroup -> fetchAllLogsMergedFileSlice(fileGroup, maxInstantTime)) + .filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent); + } finally { + readLock.unlock(); + } + } + @Override public final Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn) { try { @@ -1076,6 +1103,29 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV return fileSlice; } + /** + * Returns the file slice with all the file slice log files merged. + * + * @param fileGroup File Group for which the file slice belongs to + * @param maxInstantTime The max instant time + */ + private Option<FileSlice> fetchAllLogsMergedFileSlice(HoodieFileGroup fileGroup, String maxInstantTime) { + List<FileSlice> fileSlices = fileGroup.getAllFileSlicesBeforeOn(maxInstantTime).collect(Collectors.toList()); + if (fileSlices.size() == 0) { + return Option.empty(); + } + if (fileSlices.size() == 1) { + return Option.of(fileSlices.get(0)); + } + final FileSlice latestSlice = fileSlices.get(0); + FileSlice merged = new FileSlice(latestSlice.getPartitionPath(), latestSlice.getBaseInstantTime(), + latestSlice.getFileId()); + + // add log files from the latest slice to the earliest + fileSlices.forEach(slice -> slice.getLogFiles().forEach(merged::addLogFile)); + return Option.of(merged); + } + /** * Default implementation for fetching latest base-file. * diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index dc10970b05..09f7054cd7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -20,6 +20,7 @@ package org.apache.hudi.source; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieLogFile; @@ -220,7 +221,7 @@ public class IncrementalInputSplits implements Serializable { : instants.get(instants.size() - 1).getTimestamp(); List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline, - fileStatuses, readPartitions, endInstant, instantRange); + fileStatuses, readPartitions, endInstant, instantRange, false); return Result.instance(inputSplits, endInstant); } @@ -235,8 +236,9 @@ public class IncrementalInputSplits implements Serializable { */ public Result inputSplits( HoodieTableMetaClient metaClient, - org.apache.hadoop.conf.Configuration hadoopConf, - String issuedInstant) { + @Nullable org.apache.hadoop.conf.Configuration hadoopConf, + String issuedInstant, + boolean cdcEnabled) { metaClient.reloadActiveTimeline(); HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(); if (commitTimeline.empty()) { @@ -248,90 +250,15 @@ public class IncrementalInputSplits implements Serializable { final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1); final InstantRange instantRange; if (instantToIssue != null) { - instantRange = getInstantRange(issuedInstant, instantToIssue.getTimestamp(), false); + // when cdc is enabled, returns instant range with nullable boundary + // to filter the reading instants on the timeline + instantRange = getInstantRange(issuedInstant, instantToIssue.getTimestamp(), cdcEnabled); } else { LOG.info("No new instant found for the table under path " + path + ", skip reading"); return Result.EMPTY; } - String tableName = conf.getString(FlinkOptions.TABLE_NAME); - - Set<String> readPartitions; - final FileStatus[] fileStatuses; - - if (instantRange == null) { - // reading from the earliest, scans the partitions and files directly. - FileIndex fileIndex = getFileIndex(); - readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths()); - if (readPartitions.size() == 0) { - LOG.warn("No partitions found for reading in user provided path."); - return Result.EMPTY; - } - fileStatuses = fileIndex.getFilesInPartitions(); - } else { - List<HoodieCommitMetadata> activeMetadataList = instants.stream() - .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList()); - List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName); - if (archivedMetadataList.size() > 0) { - LOG.warn("\n" - + "--------------------------------------------------------------------------------\n" - + "---------- caution: the reader has fall behind too much from the writer,\n" - + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n" - + "--------------------------------------------------------------------------------"); - } - List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0 - // IMPORTANT: the merged metadata list must be in ascending order by instant time - ? mergeList(archivedMetadataList, activeMetadataList) - : activeMetadataList; - - readPartitions = getReadPartitions(metadataList); - if (readPartitions.size() == 0) { - LOG.warn("No partitions found for reading in user provided path."); - return Result.EMPTY; - } - fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType()); - } - - if (fileStatuses.length == 0) { - LOG.warn("No files found for reading in user provided path."); - return Result.EMPTY; - } - - final String endInstant = instantToIssue.getTimestamp(); - List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline, - fileStatuses, readPartitions, endInstant, instantRange); - - return Result.instance(inputSplits, endInstant); - } - - /** - * Returns the incremental cdc input splits. - * - * @param metaClient The meta client - * @param issuedInstant The last issued instant, only valid in streaming read - * @return The list of incremental input splits or empty if there are no new instants - */ - public Result inputSplitsCDC( - HoodieTableMetaClient metaClient, - String issuedInstant) { - metaClient.reloadActiveTimeline(); - HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(); - if (commitTimeline.empty()) { - LOG.warn("No splits found for the table under path " + path); - return Result.EMPTY; - } - List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, issuedInstant); - // get the latest instant that satisfies condition - final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1); - final InstantRange instantRange; - if (instantToIssue != null) { - instantRange = getInstantRange(issuedInstant, instantToIssue.getTimestamp(), true); - } else { - LOG.info("No new instant found for the table under path " + path + ", skip reading"); - return Result.EMPTY; - } - - Set<String> readPartitions; + final Set<String> readPartitions; final FileStatus[] fileStatuses; if (instantRange == null) { @@ -339,38 +266,77 @@ public class IncrementalInputSplits implements Serializable { FileIndex fileIndex = getFileIndex(); readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths()); if (readPartitions.size() == 0) { - LOG.warn("No partitions found for reading in path: " + path); + LOG.warn("No partitions found for reading under path: " + path); return Result.EMPTY; } fileStatuses = fileIndex.getFilesInPartitions(); if (fileStatuses.length == 0) { - LOG.warn("No files found for reading in path: " + path); + LOG.warn("No files found for reading under path: " + path); return Result.EMPTY; } final String endInstant = instantToIssue.getTimestamp(); List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline, - fileStatuses, readPartitions, endInstant, null); + fileStatuses, readPartitions, endInstant, null, false); return Result.instance(inputSplits, endInstant); } else { - HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, instantRange); - final String endInstant = instantToIssue.getTimestamp(); - Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits = extractor.extractCDCFileSplits(); + // streaming read + if (cdcEnabled) { + // case1: cdc change log enabled + HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, instantRange); + final String endInstant = instantToIssue.getTimestamp(); + Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits = extractor.extractCDCFileSplits(); + + if (fileSplits.isEmpty()) { + LOG.warn("No change logs found for reading in path: " + path); + return Result.EMPTY; + } - if (fileSplits.isEmpty()) { - LOG.warn("No change logs found for reading in path: " + path); - return Result.EMPTY; - } + final AtomicInteger cnt = new AtomicInteger(0); + List<MergeOnReadInputSplit> inputSplits = fileSplits.entrySet().stream() + .map(splits -> + new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath(), maxCompactionMemoryInBytes, + splits.getKey().getFileId(), splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new))) + .collect(Collectors.toList()); + return Result.instance(inputSplits, endInstant); + } else { + // case2: normal streaming read + String tableName = conf.getString(FlinkOptions.TABLE_NAME); + List<HoodieCommitMetadata> activeMetadataList = instants.stream() + .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList()); + List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName); + if (archivedMetadataList.size() > 0) { + LOG.warn("\n" + + "--------------------------------------------------------------------------------\n" + + "---------- caution: the reader has fall behind too much from the writer,\n" + + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n" + + "--------------------------------------------------------------------------------"); + } + List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0 + // IMPORTANT: the merged metadata list must be in ascending order by instant time + ? mergeList(archivedMetadataList, activeMetadataList) + : activeMetadataList; - final AtomicInteger cnt = new AtomicInteger(0); - List<MergeOnReadInputSplit> inputSplits = fileSplits.entrySet().stream() - .map(splits -> - new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath(), maxCompactionMemoryInBytes, - splits.getKey().getFileId(), splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new))) - .collect(Collectors.toList()); - return Result.instance(inputSplits, endInstant); + readPartitions = getReadPartitions(metadataList); + if (readPartitions.size() == 0) { + LOG.warn("No partitions found for reading under path: " + path); + return Result.EMPTY; + } + fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType()); + + if (fileStatuses.length == 0) { + LOG.warn("No files found for reading under path: " + path); + return Result.EMPTY; + } + + final String endInstant = instantToIssue.getTimestamp(); + List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline, + fileStatuses, readPartitions, endInstant, instantRange, skipCompaction); + + return Result.instance(inputSplits, endInstant); + } } } @@ -401,12 +367,13 @@ public class IncrementalInputSplits implements Serializable { FileStatus[] fileStatuses, Set<String> readPartitions, String endInstant, - InstantRange instantRange) { + InstantRange instantRange, + boolean skipBaseFiles) { final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses); final AtomicInteger cnt = new AtomicInteger(0); final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE); return readPartitions.stream() - .map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant) + .map(relPartitionPath -> getFileSlices(fsView, relPartitionPath, endInstant, skipBaseFiles) .map(fileSlice -> { Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles() .sorted(HoodieLogFile.getLogFileComparator()) @@ -421,6 +388,15 @@ public class IncrementalInputSplits implements Serializable { .collect(Collectors.toList()); } + private static Stream<FileSlice> getFileSlices( + HoodieTableFileSystemView fsView, + String relPartitionPath, + String endInstant, + boolean skipBaseFiles) { + return skipBaseFiles ? fsView.getAllLogsMergedFileSliceBeforeOrOn(relPartitionPath, endInstant) + : fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant); + } + private FileIndex getFileIndex() { FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf, rowType); if (this.requiredPartitions != null) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index 1812262ab4..2ad312241e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -201,9 +201,8 @@ public class StreamReadMonitoringFunction // table does not exist return; } - IncrementalInputSplits.Result result = cdcEnabled - ? incrementalInputSplits.inputSplitsCDC(metaClient, this.issuedInstant) - : incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant); + IncrementalInputSplits.Result result = + incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant, this.cdcEnabled); if (result.isEmpty()) { // no new instants, returns early return; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index f066a0b702..458e024446 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -37,6 +37,7 @@ import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; +import org.apache.hudi.utils.TestUtils; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.configuration.Configuration; @@ -63,6 +64,7 @@ import java.util.stream.Collectors; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; /** * Test cases for MergeOnReadInputFormat and ParquetInputFormat. @@ -341,7 +343,7 @@ public class TestInputFormat { .build(); // default read the latest commit - IncrementalInputSplits.Result splits = incrementalInputSplits.inputSplitsCDC(metaClient, null); + IncrementalInputSplits.Result splits = incrementalInputSplits.inputSplits(metaClient, null, null, true); List<RowData> result = readData(inputFormat, splits.getInputSplits().toArray(new MergeOnReadInputSplit[0])); @@ -383,7 +385,7 @@ public class TestInputFormat { .build(); // default read the latest commit - IncrementalInputSplits.Result splits = incrementalInputSplits.inputSplitsCDC(metaClient, null); + IncrementalInputSplits.Result splits = incrementalInputSplits.inputSplits(metaClient, null, null, true); List<RowData> result = readData(inputFormat, splits.getInputSplits().toArray(new MergeOnReadInputSplit[0])); @@ -398,6 +400,70 @@ public class TestInputFormat { assertThat(actual, is(expected)); } + @Test + void testReadSkipCompaction() throws Exception { + beforeEach(HoodieTableType.MERGE_ON_READ); + + org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf); + + // write base first with compaction + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true); + conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); + TestData.writeData(TestData.DATA_SET_INSERT, conf); + + InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat(true); + assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class)); + + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); + IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder() + .rowType(TestConfigurations.ROW_TYPE) + .conf(conf) + .path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2())) + .requiredPartitions(new HashSet<>(Arrays.asList("par1", "par2", "par3", "par4"))) + .skipCompaction(true) + .build(); + + // default read the latest commit + // the compaction base files are skipped + IncrementalInputSplits.Result splits1 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false); + assertFalse(splits1.isEmpty()); + List<RowData> result1 = readData(inputFormat, splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0])); + + String actual1 = TestData.rowDataToString(result1); + String expected1 = TestData.rowDataToString(TestData.DATA_SET_INSERT); + assertThat(actual1, is(expected1)); + + // write another commit using logs and read again + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); + TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); + + // read from the compaction commit + String secondCommit = TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0, false); + conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit); + + IncrementalInputSplits.Result splits2 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false); + assertFalse(splits2.isEmpty()); + List<RowData> result2 = readData(inputFormat, splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0])); + String actual2 = TestData.rowDataToString(result2); + String expected2 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT); + assertThat(actual2, is(expected2)); + + // write another commit using logs with separate partition + // so the file group has only logs + TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf); + + // refresh the input format + this.tableSource.reset(); + inputFormat = this.tableSource.getInputFormat(true); + + IncrementalInputSplits.Result splits3 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false); + assertFalse(splits3.isEmpty()); + List<RowData> result3 = readData(inputFormat, splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0])); + String actual3 = TestData.rowDataToString(result3); + String expected3 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT); + assertThat(actual3, is(expected3)); + } + @ParameterizedTest @EnumSource(value = HoodieTableType.class) void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {