danny0405 commented on a change in pull request #3134: URL: https://github.com/apache/hudi/pull/3134#discussion_r657581213
########## File path: hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java ########## @@ -197,10 +219,70 @@ private void loadRecords(String partitionPath, Collector<O> out) { } } } + } - long cost = System.currentTimeMillis() - start; - LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.", - this.getClass().getSimpleName(), taskID, partitionPath, cost); + @SuppressWarnings("unchecked") + private void loadLogFiles(String partitionPath, Collector<O> out) throws Exception { + Option<HoodieInstant> latestCommitTime = this.hoodieTable.getMetaClient().getCommitsTimeline() + .filterCompletedInstants().lastInstant(); + if (!latestCommitTime.isPresent()) { + return; + } + + List<HoodieLogFile> logFiles = this.hoodieTable.getSliceView() + .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp(), true) + .map(fileSlice -> fileSlice.getLogFiles().collect(toList())) Review comment: To simplify the logic, fetch the fileslice directly and send the parquet and log record keys, the file group id is solid, which is recorded by the fileslice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org