alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r798092122



##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
##########
@@ -44,9 +44,7 @@
 
   private Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
 
-  public HoodieRealtimeFileSplit() {
-    super();

Review comment:
       We don't need to remove it, but there's also no point in keeping it

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -144,28 +204,32 @@
     return rtSplits.toArray(new InputSplit[0]);
   }
 
+  /**
+   * @deprecated will be replaced w/ {@link #getRealtimeSplits(Configuration, 
List)}
+   */
   // get IncrementalRealtimeSplits
-  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, 
Stream<FileSplit> fileSplits) throws IOException {
+  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, 
List<FileSplit> fileSplits) throws IOException {
+    
checkState(fileSplits.stream().allMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery),
+        "All splits have to belong to incremental query");
+
     List<InputSplit> rtSplits = new ArrayList<>();
-    List<FileSplit> fileSplitList = fileSplits.collect(Collectors.toList());
-    Set<Path> partitionSet = fileSplitList.stream().map(f -> 
f.getPath().getParent()).collect(Collectors.toSet());
+    Set<Path> partitionSet = fileSplits.stream().map(f -> 
f.getPath().getParent()).collect(Collectors.toSet());
     Map<Path, HoodieTableMetaClient> partitionsToMetaClient = 
getTableMetaClientByPartitionPath(conf, partitionSet);
     // Pre process tableConfig from first partition to fetch virtual key info
     Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
     if (partitionSet.size() > 0) {
       hoodieVirtualKeyInfo = 
getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
     }
     Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo = 
hoodieVirtualKeyInfo;
-    fileSplitList.stream().forEach(s -> {
+    fileSplits.stream().forEach(s -> {
       // deal with incremental query.
       try {
         if (s instanceof BaseFileWithLogsSplit) {
-          BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
-          if (bs.getBelongToIncrementalSplit()) {
-            rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
-          }
+          BaseFileWithLogsSplit bs = unsafeCast(s);
+          rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
         } else if (s instanceof RealtimeBootstrapBaseFileSplit) {
-          rtSplits.add(s);
+          RealtimeBootstrapBaseFileSplit bs = unsafeCast(s);

Review comment:
       I see now. Makes sense

##########
File path: 
hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala
##########
@@ -87,6 +89,12 @@ abstract class HoodieTableFileIndexBase(engineContext: 
HoodieEngineContext,
 
   refresh0()
 
+  /**
+   * Returns latest completed instant as seen by this instance of the 
file-index
+   */
+  def latestCompletedInstant(): Option[HoodieInstant] =

Review comment:
       It's def closer to former

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,71 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
-  public static InputSplit[] getRealtimeSplits(Configuration conf, 
Stream<FileSplit> fileSplits) {
+  public static InputSplit[] getRealtimeSplits(Configuration conf, 
List<FileSplit> fileSplits) throws IOException {
+    if (fileSplits.isEmpty()) {
+      return new InputSplit[0];
+    }
+
+    FileSplit fileSplit = fileSplits.get(0);
+
+    // Pre-process table-config to fetch virtual key info
+    Path partitionPath = fileSplit.getPath().getParent();
+    HoodieTableMetaClient metaClient = 
getTableMetaClientForBasePathUnchecked(conf, partitionPath);
+
+    Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt = 
getHoodieVirtualKeyInfo(metaClient);
+
+    // NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
+    HoodieInstant latestCommitInstant =
+        
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+
+    InputSplit[] finalSplits = fileSplits.stream()
+      .map(split -> {
+        // There are 4 types of splits could we have to handle here
+        //    - {@code BootstrapBaseFileSplit}: in case base file does have 
associated bootstrap file,
+        //      but does NOT have any log files appended (convert it to {@code 
RealtimeBootstrapBaseFileSplit})
+        //    - {@code RealtimeBootstrapBaseFileSplit}: in case base file does 
have associated bootstrap file
+        //      and does have log files appended
+        //    - {@code BaseFileWithLogsSplit}: in case base file does NOT have 
associated bootstrap file

Review comment:
       Apologies, meant to say that this will still be handled by 
`BaseFileWithLogsSplit` section




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to