vinothchandar commented on a change in pull request #3122:
URL: https://github.com/apache/hudi/pull/3122#discussion_r664267191



##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
##########
@@ -209,6 +210,12 @@ public Configuration getConf() {
             colNamesWithTypesForExternal.size(),
             true);
       }
+    } else if (split instanceof RealtimeSplit) {

Review comment:
       can we do this at the sub class? Not sure if we want to make this class 
aware of RealtimeSplit 

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
##########
@@ -253,4 +260,34 @@ private BootstrapBaseFileSplit 
makeExternalFileSplit(PathWithBootstrapFileStatus
       throw new HoodieIOException(e.getMessage(), e);
     }
   }
+
+  /**
+   * Dummy record reader that outputs nothing.
+   */
+  public static class DummyRecordReader implements RecordReader<NullWritable, 
ArrayWritable> {

Review comment:
       rename: NoopRecordReader

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -102,28 +105,37 @@
             HoodieTimeline.ROLLBACK_ACTION, 
HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION))
             .filterCompletedInstants().lastInstant().get().getTimestamp();
         latestFileSlices.forEach(fileSlice -> {
+          List<String> logFilePaths = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
+              .map(logFile -> 
logFile.getPath().toString()).collect(Collectors.toList());
           List<FileSplit> dataFileSplits = 
groupedInputSplits.get(fileSlice.getFileId());
-          dataFileSplits.forEach(split -> {
-            try {
-              List<String> logFilePaths = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
-                  .map(logFile -> 
logFile.getPath().toString()).collect(Collectors.toList());
-              if (split instanceof BootstrapBaseFileSplit) {
-                BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split;
-                String[] hosts = split.getLocationInfo() != null ? 
Arrays.stream(split.getLocationInfo())
-                    .filter(x -> !x.isInMemory()).toArray(String[]::new) : new 
String[0];
-                String[] inMemoryHosts = split.getLocationInfo() != null ? 
Arrays.stream(split.getLocationInfo())
-                    
.filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0];
-                FileSplit baseSplit = new FileSplit(eSplit.getPath(), 
eSplit.getStart(), eSplit.getLength(),
-                    hosts, inMemoryHosts);
-                rtSplits.add(new RealtimeBootstrapBaseFileSplit(baseSplit, 
metaClient.getBasePath(),
-                    logFilePaths, maxCommitTime, 
eSplit.getBootstrapFileSplit()));
-              } else {
-                rtSplits.add(new HoodieRealtimeFileSplit(split, 
metaClient.getBasePath(), logFilePaths, maxCommitTime));
+          if (dataFileSplits != null) {
+            dataFileSplits.forEach(split -> {
+              try {
+                if (split instanceof BootstrapBaseFileSplit) {
+                  BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) 
split;
+                  String[] hosts = split.getLocationInfo() != null ? 
Arrays.stream(split.getLocationInfo())
+                      .filter(x -> !x.isInMemory()).toArray(String[]::new) : 
new String[0];
+                  String[] inMemoryHosts = split.getLocationInfo() != null ? 
Arrays.stream(split.getLocationInfo())
+                      
.filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0];
+                  FileSplit baseSplit = new FileSplit(eSplit.getPath(), 
eSplit.getStart(), eSplit.getLength(),
+                      hosts, inMemoryHosts);
+                  rtSplits.add(new RealtimeBootstrapBaseFileSplit(baseSplit, 
metaClient.getBasePath(),
+                      logFilePaths, maxCommitTime, 
eSplit.getBootstrapFileSplit()));
+                } else {
+                  rtSplits.add(new HoodieRealtimeFileSplit(split, 
metaClient.getBasePath(), logFilePaths, maxCommitTime));
+                }
+              } catch (IOException e) {
+                throw new HoodieIOException("Error creating hoodie real time 
split ", e);
               }
+            });
+          } else {
+            // the file group has only logs (say the index is global).
+            try {
+              rtSplits.add(new 
HoodieRealtimeFileSplit(DummyInputSplit.INSTANCE, metaClient.getBasePath(), 
logFilePaths, maxCommitTime));

Review comment:
       This code seems to be intending to solely avoid reading this split, if 
it only has logs? Can't we just skip this fileId /fileslice, instead of adding 
a dummy split and record reader. They do add quite bit of complexity here. 
Wondering if we can fix it localized manner if all we want to do is avoid NPE




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to