[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r740695703 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ## @@ -161,6 +148,82 @@ return rtSplits.toArray(new InputSplit[0]); } + // get IncrementalRealtimeSplits + public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, Stream fileSplits) throws IOException { +List rtSplits = new ArrayList<>(); +List fileSplitList = fileSplits.collect(Collectors.toList()); +Set partitionSet = fileSplitList.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet()); +Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet); +// Pre process tableConfig from first partition to fetch virtual key info +Option hoodieVirtualKeyInfo = Option.empty(); +if (partitionSet.size() > 0) { + hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next())); +} +Option finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo; +fileSplitList.stream().forEach(s -> { + // deal with incremental query. + try { +if (s instanceof BaseFileWithLogsSplit) { + BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; + if (bs.getBelongToIncrementalSplit()) { +rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); + } +} else if (s instanceof RealtimeBootstrapBaseFileSplit) { + rtSplits.add(s); +} + } catch (IOException e) { +throw new HoodieIOException("Error creating hoodie real time split ", e); + } +}); +LOG.info("Returning a total splits of " + rtSplits.size()); +return rtSplits.toArray(new InputSplit[0]); + } + + public static Option getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) { +HoodieTableConfig tableConfig = metaClient.getTableConfig(); +if (!tableConfig.populateMetaFields()) { + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); + try { +MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema(); +return Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(), +tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()), +parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp(; + } catch (Exception exception) { +throw new HoodieException("Fetching table schema failed with exception ", exception); + } +} +return Option.empty(); + } + + public static boolean isIncrementalQuerySplits(List fileSplits) { +if (fileSplits == null || fileSplits.size() == 0) { + return false; +} +return fileSplits.stream().anyMatch(s -> { + if (s instanceof BaseFileWithLogsSplit) { +BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; +return bs.getBelongToIncrementalSplit(); + } else { +return s instanceof RealtimeBootstrapBaseFileSplit; + } +}); + } + + public static RealtimeBootstrapBaseFileSplit createRealimeBootstrapBaseFileSplit( Review comment: ok ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java ## @@ -61,9 +85,183 @@ @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { -Stream fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is); +List fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList()); + +boolean isIncrementalSplits = HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits); + +return isIncrementalSplits ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits.stream()); + } + + /** + * Keep the logic of mor_incr_view as same as spark datasource. + * Step1: Get list of commits to be fetched based on start commit and max commits(for snapshot max commits is -1). + * Step2: Get list of affected files status for these affected file status. + * Step3: Construct HoodieTableFileSystemView based on those affected file status. + *a. Filter affected partitions based on inputPaths. + *b. Get list of fileGroups based on affected partitions by fsView.getAllFileGroups. + * Step4: Set input paths based on filtered affected partition paths. changes that amony original input paths passed to + *this method. some partitions did not have commits as part of the trimmed down list of commits and hence we need this step. + * Step5: Find candidate fileStatus, since when
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r740695703 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ## @@ -161,6 +148,82 @@ return rtSplits.toArray(new InputSplit[0]); } + // get IncrementalRealtimeSplits + public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, Stream fileSplits) throws IOException { +List rtSplits = new ArrayList<>(); +List fileSplitList = fileSplits.collect(Collectors.toList()); +Set partitionSet = fileSplitList.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet()); +Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet); +// Pre process tableConfig from first partition to fetch virtual key info +Option hoodieVirtualKeyInfo = Option.empty(); +if (partitionSet.size() > 0) { + hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next())); +} +Option finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo; +fileSplitList.stream().forEach(s -> { + // deal with incremental query. + try { +if (s instanceof BaseFileWithLogsSplit) { + BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; + if (bs.getBelongToIncrementalSplit()) { +rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); + } +} else if (s instanceof RealtimeBootstrapBaseFileSplit) { + rtSplits.add(s); +} + } catch (IOException e) { +throw new HoodieIOException("Error creating hoodie real time split ", e); + } +}); +LOG.info("Returning a total splits of " + rtSplits.size()); +return rtSplits.toArray(new InputSplit[0]); + } + + public static Option getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) { +HoodieTableConfig tableConfig = metaClient.getTableConfig(); +if (!tableConfig.populateMetaFields()) { + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); + try { +MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema(); +return Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(), +tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()), +parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp(; + } catch (Exception exception) { +throw new HoodieException("Fetching table schema failed with exception ", exception); + } +} +return Option.empty(); + } + + public static boolean isIncrementalQuerySplits(List fileSplits) { +if (fileSplits == null || fileSplits.size() == 0) { + return false; +} +return fileSplits.stream().anyMatch(s -> { + if (s instanceof BaseFileWithLogsSplit) { +BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; +return bs.getBelongToIncrementalSplit(); + } else { +return s instanceof RealtimeBootstrapBaseFileSplit; + } +}); + } + + public static RealtimeBootstrapBaseFileSplit createRealimeBootstrapBaseFileSplit( Review comment: ok ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java ## @@ -61,9 +85,183 @@ @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { -Stream fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is); +List fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList()); + +boolean isIncrementalSplits = HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits); + +return isIncrementalSplits ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits.stream()); + } + + /** + * Keep the logic of mor_incr_view as same as spark datasource. + * Step1: Get list of commits to be fetched based on start commit and max commits(for snapshot max commits is -1). + * Step2: Get list of affected files status for these affected file status. + * Step3: Construct HoodieTableFileSystemView based on those affected file status. + *a. Filter affected partitions based on inputPaths. + *b. Get list of fileGroups based on affected partitions by fsView.getAllFileGroups. + * Step4: Set input paths based on filtered affected partition paths. changes that amony original input paths passed to + *this method. some partitions did not have commits as part of the trimmed down list of commits and hence we need this step. + * Step5: Find candidate fileStatus, since when
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r740697734 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java ## @@ -61,9 +85,183 @@ @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { -Stream fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is); +List fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList()); + +boolean isIncrementalSplits = HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits); + +return isIncrementalSplits ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits.stream()); + } + + /** + * Keep the logic of mor_incr_view as same as spark datasource. + * Step1: Get list of commits to be fetched based on start commit and max commits(for snapshot max commits is -1). + * Step2: Get list of affected files status for these affected file status. + * Step3: Construct HoodieTableFileSystemView based on those affected file status. + *a. Filter affected partitions based on inputPaths. + *b. Get list of fileGroups based on affected partitions by fsView.getAllFileGroups. + * Step4: Set input paths based on filtered affected partition paths. changes that amony original input paths passed to + *this method. some partitions did not have commits as part of the trimmed down list of commits and hence we need this step. + * Step5: Find candidate fileStatus, since when we get baseFileStatus from HoodieTableFileSystemView, + *the BaseFileStatus will missing file size information. + *We should use candidate fileStatus to update the size information for BaseFileStatus. + * Step6: For every file group from step3(b) + *Get 1st available base file from all file slices. then we use candidate file status to update the baseFileStatus, + *and construct RealTimeFileStatus and add it to result along with log files. + *If file group just has log files, construct RealTimeFileStatus and add it to result. + * TODO: unify the incremental view code between hive/spark-sql and spark datasource + */ + @Override + protected List listStatusForIncrementalMode( + JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths) throws IOException { +List result = new ArrayList<>(); +String tableName = tableMetaClient.getTableConfig().getTableName(); +Job jobContext = Job.getInstance(job); + +// step1 +Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); +if (!timeline.isPresent()) { + return result; +} +String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, tableName); +// Total number of commits to return in this batch. Set this to -1 to get all the commits. +Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName); +HoodieTimeline commitsTimelineToReturn = timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits); +Option> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList())); +if (!commitsToCheck.isPresent()) { + return result; +} +// step2 +commitsToCheck.get().sort(HoodieInstant::compareTo); +List metadataList = commitsToCheck +.get().stream().map(instant -> { + try { +return HoodieInputFormatUtils.getCommitMetadata(instant, commitsTimelineToReturn); + } catch (IOException e) { +throw new HoodieException(String.format("cannot get metadata for instant: %s", instant)); + } +}).collect(Collectors.toList()); + +// build fileGroup from fsView +List affectedFileStatus = Arrays.asList(HoodieInputFormatUtils +.listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), metadataList)); +// step3 +HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0])); +// build fileGroup from fsView +Path basePath = new Path(tableMetaClient.getBasePath()); +// filter affectedPartition by inputPaths +List affectedPartition = HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream() +.filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList()); +if (affectedPartition.isEmpty()) { + return result; +} +List fileGroups = affectedPartition.stream() Review comment: L148 has already added all files in HoodieTableFileSystemView, and partitionToFileGroupsMap will be filled and cached。 L1
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r740696192 ## File path: hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java ## @@ -535,4 +548,211 @@ public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMa arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true) ); } + + @Test + public void testIncremetalWithOnlylog() throws Exception { +// initial commit +Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); +HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); +String instantTime = "100"; +final int numRecords = 1000; +File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, numRecords, instantTime, +HoodieTableType.MERGE_ON_READ); +//FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime); +createDeltaCommitFile(basePath, instantTime,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0"); +// Add the paths +FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); + +// insert new records to log file +try { + String newCommitTime = "102"; + HoodieLogFormat.Writer writer = + InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, + numRecords, numRecords, 0); + writer.close(); + createDeltaCommitFile(basePath, newCommitTime,"2016/05/01", "2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0"); + + InputFormatTestUtil.setupIncremental(baseJobConf, "101", 1); + + HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); + inputFormat.setConf(baseJobConf); + InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1); + assertTrue(splits.length == 1); + JobConf newJobConf = new JobConf(baseJobConf); + List fields = schema.getFields(); + setHiveColumnNameProps(fields, newJobConf, false); + RecordReader reader = inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL); + // use reader to read log file. + NullWritable key = reader.createKey(); + ArrayWritable value = reader.createValue(); + while (reader.next(key, value)) { +Writable[] values = value.get(); +// check if the record written is with latest commit, here "101" +assertEquals(newCommitTime, values[0].toString()); +key = reader.createKey(); +value = reader.createValue(); + } + reader.close(); +} catch (IOException e) { + throw new HoodieException(e.getMessage(), e); +} + } + + @Test + public void testIncremetalWithReplace() throws Exception { Review comment: yes will add test case for compaction, -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r740695822 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java ## @@ -61,9 +85,183 @@ @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { -Stream fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is); +List fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList()); + +boolean isIncrementalSplits = HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits); + +return isIncrementalSplits ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits.stream()); + } + + /** + * Keep the logic of mor_incr_view as same as spark datasource. + * Step1: Get list of commits to be fetched based on start commit and max commits(for snapshot max commits is -1). + * Step2: Get list of affected files status for these affected file status. + * Step3: Construct HoodieTableFileSystemView based on those affected file status. + *a. Filter affected partitions based on inputPaths. + *b. Get list of fileGroups based on affected partitions by fsView.getAllFileGroups. + * Step4: Set input paths based on filtered affected partition paths. changes that amony original input paths passed to + *this method. some partitions did not have commits as part of the trimmed down list of commits and hence we need this step. + * Step5: Find candidate fileStatus, since when we get baseFileStatus from HoodieTableFileSystemView, + *the BaseFileStatus will missing file size information. + *We should use candidate fileStatus to update the size information for BaseFileStatus. + * Step6: For every file group from step3(b) + *Get 1st available base file from all file slices. then we use candidate file status to update the baseFileStatus, + *and construct RealTimeFileStatus and add it to result along with log files. + *If file group just has log files, construct RealTimeFileStatus and add it to result. + * TODO: unify the incremental view code between hive/spark-sql and spark datasource + */ + @Override + protected List listStatusForIncrementalMode( + JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths) throws IOException { +List result = new ArrayList<>(); +String tableName = tableMetaClient.getTableConfig().getTableName(); +Job jobContext = Job.getInstance(job); + +// step1 +Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); +if (!timeline.isPresent()) { + return result; +} +String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, tableName); +// Total number of commits to return in this batch. Set this to -1 to get all the commits. +Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName); Review comment: good suggest, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r740695703 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ## @@ -161,6 +148,82 @@ return rtSplits.toArray(new InputSplit[0]); } + // get IncrementalRealtimeSplits + public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, Stream fileSplits) throws IOException { +List rtSplits = new ArrayList<>(); +List fileSplitList = fileSplits.collect(Collectors.toList()); +Set partitionSet = fileSplitList.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet()); +Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet); +// Pre process tableConfig from first partition to fetch virtual key info +Option hoodieVirtualKeyInfo = Option.empty(); +if (partitionSet.size() > 0) { + hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next())); +} +Option finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo; +fileSplitList.stream().forEach(s -> { + // deal with incremental query. + try { +if (s instanceof BaseFileWithLogsSplit) { + BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; + if (bs.getBelongToIncrementalSplit()) { +rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); + } +} else if (s instanceof RealtimeBootstrapBaseFileSplit) { + rtSplits.add(s); +} + } catch (IOException e) { +throw new HoodieIOException("Error creating hoodie real time split ", e); + } +}); +LOG.info("Returning a total splits of " + rtSplits.size()); +return rtSplits.toArray(new InputSplit[0]); + } + + public static Option getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) { +HoodieTableConfig tableConfig = metaClient.getTableConfig(); +if (!tableConfig.populateMetaFields()) { + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); + try { +MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema(); +return Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(), +tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()), +parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp(; + } catch (Exception exception) { +throw new HoodieException("Fetching table schema failed with exception ", exception); + } +} +return Option.empty(); + } + + public static boolean isIncrementalQuerySplits(List fileSplits) { +if (fileSplits == null || fileSplits.size() == 0) { + return false; +} +return fileSplits.stream().anyMatch(s -> { + if (s instanceof BaseFileWithLogsSplit) { +BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; +return bs.getBelongToIncrementalSplit(); + } else { +return s instanceof RealtimeBootstrapBaseFileSplit; + } +}); + } + + public static RealtimeBootstrapBaseFileSplit createRealimeBootstrapBaseFileSplit( Review comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r738013103 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ## @@ -161,6 +148,110 @@ return rtSplits.toArray(new InputSplit[0]); } + // get IncrementalRealtimeSplits + public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, Stream fileSplits) throws IOException { +List rtSplits = new ArrayList<>(); +List fileSplitList = fileSplits.collect(Collectors.toList()); +Set partitionSet = fileSplitList.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet()); +Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet); +// Pre process tableConfig from first partition to fetch virtual key info +Option hoodieVirtualKeyInfo = Option.empty(); +if (partitionSet.size() > 0) { + hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next())); +} +Option finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo; +fileSplitList.stream().forEach(s -> { + // deal with incremental query. + try { +if (s instanceof BaseFileWithLogsSplit) { + BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; + if (bs.getBelongToIncrementalSplit()) { +rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); + } +} else if (s instanceof RealtimeBootstrapBaseFileSplit) { + rtSplits.add(s); +} + } catch (IOException e) { +throw new HoodieIOException("Error creating hoodie real time split ", e); + } +}); +LOG.info("Returning a total splits of " + rtSplits.size()); +return rtSplits.toArray(new InputSplit[0]); + } + + public static Option getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) { +HoodieTableConfig tableConfig = metaClient.getTableConfig(); +if (!tableConfig.populateMetaFields()) { + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); + try { +MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema(); +return Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(), +tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()), +parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp(; + } catch (Exception exception) { +throw new HoodieException("Fetching table schema failed with exception ", exception); + } +} +return Option.empty(); + } + + public static boolean isIncrementalQuerySplits(List fileSplits) { +if (fileSplits == null || fileSplits.size() == 0) { + return false; +} +return fileSplits.stream().anyMatch(s -> { + if (s instanceof BaseFileWithLogsSplit) { +BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; +if (bs.getBelongToIncrementalSplit()) { + return true; +} + } else if (s instanceof RealtimeBootstrapBaseFileSplit) { +return true; + } + return false; +}); + } + + // Pick all incremental files and add them to rtSplits then filter out those files. + private static Map> filterOutIncrementalSplits( + List fileSplitList, Review comment: sorry, will update。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r736332305 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ## @@ -161,6 +162,46 @@ return rtSplits.toArray(new InputSplit[0]); } + // pick all incremental files and add them to rtSplits then filter out those files. + private static Map> filterOutIncrementalSplits( + List fileSplitList, Review comment: good, let me try it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r733521589 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ## @@ -161,6 +162,46 @@ return rtSplits.toArray(new InputSplit[0]); } + // pick all incremental files and add them to rtSplits then filter out those files. + private static Map> filterOutIncrementalSplits( + List fileSplitList, Review comment: The key point is not the number of commits, the key point is that at commit3 file3 replace the file1 and file2。 we can set startcommit a little smller than commit2 and max commit is 1. set hoodie.testlog.consume.max.commits=1 set hoodie.testlog.consume.start.timestamp=commit2-1 then commit2 will be picked -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r733458463 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ## @@ -161,6 +162,46 @@ return rtSplits.toArray(new InputSplit[0]); } + // pick all incremental files and add them to rtSplits then filter out those files. + private static Map> filterOutIncrementalSplits( + List fileSplitList, Review comment: @danny0405 I remembered why I did this. we cannot read incremental datas which before replacecommit. line 122, return latest fileSlices line 134 we use those latest fileSlices to filter the queried inputslit. think that now our table has 3 commits: commit1file: (file1) commit2file: (file2) replacecommit file: (file3) now we want to query the incremental data of commit2 (file2 will be picked as inputSplit); line122, will give us a wrong fileSlices, (this fileSlices only contains file3) line 134, when we do filter by using those fileSlices, all the incremental inputSplit will be exclueded. since incremental inputSplit is file2, but fileSlices only contains file3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r732502115 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ## @@ -161,6 +162,46 @@ return rtSplits.toArray(new InputSplit[0]); } + // pick all incremental files and add them to rtSplits then filter out those files. + private static Map> filterOutIncrementalSplits( + List fileSplitList, Review comment: @danny0405 thanks。 i will try and update the code。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r728890684 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ## @@ -161,6 +162,51 @@ return rtSplits.toArray(new InputSplit[0]); } + // pick all incremental files and add them to rtSplits then filter out those files. + private static Map> filterOutIncrementalSplits( + List fileSplitList, + List rtSplits, + final Option finalHoodieVirtualKeyInfo) { +return fileSplitList.stream().filter(s -> { + // deal with incremental query. + try { +if (s instanceof BaseFileWithLogsSplit) { + BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; + if (bs.getBelongToIncrementalSplit()) { +rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); + } +} else if (s instanceof RealtimeBootstrapBaseFileSplit) { + rtSplits.add(s); +} + } catch (IOException e) { +throw new HoodieIOException("Error creating hoodie real time split ", e); + } + // filter the snapshot split. + if (s instanceof RealtimeBootstrapBaseFileSplit) { +return false; + } else if ((s instanceof BaseFileWithLogsSplit) && ((BaseFileWithLogsSplit) s).getBelongToIncrementalSplit()) { Review comment: i just want to split the logical of incremental query and snapshot query.ok i will change it ,thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r728743555 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java ## @@ -119,6 +307,11 @@ void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf job addProjectionToJobConf(realtimeSplit, jobConf); LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); + +// for log only split, we no need parquet reader, set it to empty +if (FSUtils.isLogFile(realtimeSplit.getPath())) { Review comment: agree ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ## @@ -69,17 +70,18 @@ private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class); public static InputSplit[] getRealtimeSplits(Configuration conf, Stream fileSplits) throws IOException { -Map> partitionsToParquetSplits = -fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent())); +// for all unique split parents, obtain all delta files based on delta commit timeline, +// grouped on file id +List rtSplits = new ArrayList<>(); +List candidateFileSplits = fileSplits.collect(Collectors.toList()); +Map> partitionsToParquetSplits = candidateFileSplits Review comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r728743157 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieEmptyRecordReader.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.realtime; + +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; + +import java.io.IOException; + +/** + * dummy record for log only realtime split. + */ Review comment: agree -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r728741908 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java ## @@ -66,6 +90,170 @@ return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits); } + /** + * Keep the logical of mor_incr_view as same as spark datasource. + * Step1: Get list of commits to be fetched based on start commit and max commits(for snapshot max commits is -1). Review comment: agree -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r726838452 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ## @@ -99,7 +101,32 @@ } } Option finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo; - partitionsToParquetSplits.keySet().forEach(partitionPath -> { + // deal with incremental query + candidateFileSplits.stream().forEach(s -> { +try { Review comment: thanks, if we can do it in one iteration, the logical of deal with incremental query and the logical of deal with snapshot query are mixed together, are you sure we need to do that? ok, i try to do it in one iteration。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r726802648 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class BaseFileWithLogsSplit extends FileSplit { Review comment: agree -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r726732125 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * we need to encode additional information in split to track matching log file and base files. Review comment: This is a bridge class,all BaseFileWithLogSplit will convert to HoodieRealtimeFileSplit finally。 i think we should not add too many things in HoodieRealtimeFileSplit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r726731332 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * we need to encode additional information in split to track matching log file and base files. + * Hence, this weird looking class which tracks an log/base file split + */ +public class BaseFileWithLogsSplit extends FileSplit { + // a flag to mark this split is produced by incremental query or not. + private boolean belongToIncrementalSplit = false; Review comment: agree, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view
xiarixiaoyao commented on a change in pull request #3203: URL: https://github.com/apache/hudi/pull/3203#discussion_r725781031 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java ## @@ -66,6 +91,139 @@ return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits); } + /** + * keep the logical of mor_incr_view as same as spark datasource. + * to do: unify the incremental view code between hive/spark-sql and spark datasource + */ + @Override + protected List listStatusForIncrementalMode( + JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths) throws IOException { +List result = new ArrayList<>(); +String tableName = tableMetaClient.getTableConfig().getTableName(); +Job jobContext = Job.getInstance(job); + +Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); +if (!timeline.isPresent()) { + return result; +} +String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, tableName); +// Total number of commits to return in this batch. Set this to -1 to get all the commits. +Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName); +HoodieTimeline commitsTimelineToReturn = timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits); +Option> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList())); +if (!commitsToCheck.isPresent()) { + return result; +} +Map> partitionsWithFileStatus = HoodieInputFormatUtils +.listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), commitsToCheck.get(), commitsTimelineToReturn); +// build fileGroup from fsView +List affectedFileStatus = new ArrayList<>(); +partitionsWithFileStatus.forEach((key, value) -> value.forEach((k, v) -> affectedFileStatus.add(v))); +HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0])); +// build fileGroup from fsView +String basePath = tableMetaClient.getBasePath(); +// filter affectedPartition by inputPaths +List affectedPartition = partitionsWithFileStatus.keySet().stream() +.filter(k -> k.isEmpty() ? inputPaths.contains(new Path(basePath)) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList()); +if (affectedPartition.isEmpty()) { + return result; +} +List fileGroups = affectedPartition.stream() +.flatMap(partitionPath -> fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList()); +setInputPaths(job, affectedPartition.stream() +.map(p -> p.isEmpty() ? basePath : new Path(basePath, p).toUri().toString()).collect(Collectors.joining(","))); + +// find all file status in current partitionPath +FileStatus[] fileStatuses = getStatus(job); +Map candidateFileStatus = new HashMap<>(); +for (int i = 0; i < fileStatuses.length; i++) { + String key = fileStatuses[i].getPath().toString(); + candidateFileStatus.put(key, fileStatuses[i]); +} + +String maxCommitTime = fsView.getLastInstant().get().getTimestamp(); +fileGroups.stream().forEach(f -> { + try { +List baseFiles = f.getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).collect(Collectors.toList()); +if (!baseFiles.isEmpty()) { + FileStatus baseFileStatus = HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get()); + String baseFilePath = baseFileStatus.getPath().toUri().toString(); + if (!candidateFileStatus.containsKey(baseFilePath)) { +throw new HoodieException("Error obtaining fileStatus for file: " + baseFilePath); + } + RealtimeFileStatus fileStatus = new RealtimeFileStatus(candidateFileStatus.get(baseFilePath)); + fileStatus.setMaxCommitTime(maxCommitTime); + fileStatus.setBelongToIncrementalFileStatus(true); + fileStatus.setBasePath(basePath); + fileStatus.setBaseFilePath(baseFilePath); + fileStatus.setDeltaLogPaths(f.getLatestFileSlice().get().getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList())); Review comment: // Ensure get the base file when there is a pending compaction, which means the base file // won't be in the latest file slice. see the doc of MergeOnReadIncrementalRelation https://github.com/apache/hudi/blob/ceace1c653a3ce3c97e6ee5a244d71ff1806be4f/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala#L203 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For querie