alexeykudinkin commented on a change in pull request #4556: URL: https://github.com/apache/hudi/pull/4556#discussion_r798092122
########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java ########## @@ -44,9 +44,7 @@ private Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty(); - public HoodieRealtimeFileSplit() { - super(); Review comment: We don't need to remove it, but there's also no point in keeping it ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ########## @@ -144,28 +204,32 @@ return rtSplits.toArray(new InputSplit[0]); } + /** + * @deprecated will be replaced w/ {@link #getRealtimeSplits(Configuration, List)} + */ // get IncrementalRealtimeSplits - public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) throws IOException { + public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException { + checkState(fileSplits.stream().allMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery), + "All splits have to belong to incremental query"); + List<InputSplit> rtSplits = new ArrayList<>(); - List<FileSplit> fileSplitList = fileSplits.collect(Collectors.toList()); - Set<Path> partitionSet = fileSplitList.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet()); + Set<Path> partitionSet = fileSplits.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet()); Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet); // Pre process tableConfig from first partition to fetch virtual key info Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty(); if (partitionSet.size() > 0) { hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next())); } Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo; - fileSplitList.stream().forEach(s -> { + fileSplits.stream().forEach(s -> { // deal with incremental query. try { if (s instanceof BaseFileWithLogsSplit) { - BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; - if (bs.getBelongToIncrementalSplit()) { - rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); - } + BaseFileWithLogsSplit bs = unsafeCast(s); + rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); } else if (s instanceof RealtimeBootstrapBaseFileSplit) { - rtSplits.add(s); + RealtimeBootstrapBaseFileSplit bs = unsafeCast(s); Review comment: I see now. Makes sense ########## File path: hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala ########## @@ -87,6 +89,12 @@ abstract class HoodieTableFileIndexBase(engineContext: HoodieEngineContext, refresh0() + /** + * Returns latest completed instant as seen by this instance of the file-index + */ + def latestCompletedInstant(): Option[HoodieInstant] = Review comment: It's def closer to former ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ########## @@ -65,11 +65,71 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.TypeUtils.unsafeCast; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class); - public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) { + public static InputSplit[] getRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException { + if (fileSplits.isEmpty()) { + return new InputSplit[0]; + } + + FileSplit fileSplit = fileSplits.get(0); + + // Pre-process table-config to fetch virtual key info + Path partitionPath = fileSplit.getPath().getParent(); + HoodieTableMetaClient metaClient = getTableMetaClientForBasePathUnchecked(conf, partitionPath); + + Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt = getHoodieVirtualKeyInfo(metaClient); + + // NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase} + HoodieInstant latestCommitInstant = + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get(); + + InputSplit[] finalSplits = fileSplits.stream() + .map(split -> { + // There are 4 types of splits could we have to handle here + // - {@code BootstrapBaseFileSplit}: in case base file does have associated bootstrap file, + // but does NOT have any log files appended (convert it to {@code RealtimeBootstrapBaseFileSplit}) + // - {@code RealtimeBootstrapBaseFileSplit}: in case base file does have associated bootstrap file + // and does have log files appended + // - {@code BaseFileWithLogsSplit}: in case base file does NOT have associated bootstrap file Review comment: Apologies, meant to say that this will still be handled by `BaseFileWithLogsSplit` section -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org