vinothchandar commented on a change in pull request #3122: URL: https://github.com/apache/hudi/pull/3122#discussion_r664267191
########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java ########## @@ -209,6 +210,12 @@ public Configuration getConf() { colNamesWithTypesForExternal.size(), true); } + } else if (split instanceof RealtimeSplit) { Review comment: can we do this at the sub class? Not sure if we want to make this class aware of RealtimeSplit ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java ########## @@ -253,4 +260,34 @@ private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus throw new HoodieIOException(e.getMessage(), e); } } + + /** + * Dummy record reader that outputs nothing. + */ + public static class DummyRecordReader implements RecordReader<NullWritable, ArrayWritable> { Review comment: rename: NoopRecordReader ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ########## @@ -102,28 +105,37 @@ HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); latestFileSlices.forEach(fileSlice -> { + List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) + .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId()); - dataFileSplits.forEach(split -> { - try { - List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) - .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); - if (split instanceof BootstrapBaseFileSplit) { - BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split; - String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) - .filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0]; - String[] inMemoryHosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) - .filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0]; - FileSplit baseSplit = new FileSplit(eSplit.getPath(), eSplit.getStart(), eSplit.getLength(), - hosts, inMemoryHosts); - rtSplits.add(new RealtimeBootstrapBaseFileSplit(baseSplit, metaClient.getBasePath(), - logFilePaths, maxCommitTime, eSplit.getBootstrapFileSplit())); - } else { - rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime)); + if (dataFileSplits != null) { + dataFileSplits.forEach(split -> { + try { + if (split instanceof BootstrapBaseFileSplit) { + BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split; + String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) + .filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0]; + String[] inMemoryHosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) + .filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0]; + FileSplit baseSplit = new FileSplit(eSplit.getPath(), eSplit.getStart(), eSplit.getLength(), + hosts, inMemoryHosts); + rtSplits.add(new RealtimeBootstrapBaseFileSplit(baseSplit, metaClient.getBasePath(), + logFilePaths, maxCommitTime, eSplit.getBootstrapFileSplit())); + } else { + rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime)); + } + } catch (IOException e) { + throw new HoodieIOException("Error creating hoodie real time split ", e); } + }); + } else { + // the file group has only logs (say the index is global). + try { + rtSplits.add(new HoodieRealtimeFileSplit(DummyInputSplit.INSTANCE, metaClient.getBasePath(), logFilePaths, maxCommitTime)); Review comment: This code seems to be intending to solely avoid reading this split, if it only has logs? Can't we just skip this fileId /fileslice, instead of adding a dummy split and record reader. They do add quite bit of complexity here. Wondering if we can fix it localized manner if all we want to do is avoid NPE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org