nsivabalan commented on a change in pull request #3986: URL: https://github.com/apache/hudi/pull/3986#discussion_r749415355
########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java ########## @@ -51,68 +52,70 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, HoodieSparkEng @Override protected List<SmallFile> getSmallFiles(String partitionPath) { - - // smallFiles only for partitionPath - List<SmallFile> smallFileLocations = new ArrayList<>(); - // Init here since this class (and member variables) might not have been initialized HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); - // Find out all eligible small file slices - if (!commitTimeline.empty()) { - HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); - // find smallest file in partition and append to it - List<FileSlice> allSmallFileSlices = new ArrayList<>(); - // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to - // it. Doing this overtime for a partition, we ensure that we handle small file issues - if (!table.getIndex().canIndexLogFiles()) { - // TODO : choose last N small files since there can be multiple small files written to a single partition - // by different spark partitions in a single batch - Option<FileSlice> smallFileSlice = Option.fromJavaOptional(table.getSliceView() - .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false) - .filter( - fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config - .getParquetSmallFileLimit()) - .min((FileSlice left, FileSlice right) -> - left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1)); - if (smallFileSlice.isPresent()) { - allSmallFileSlices.add(smallFileSlice.get()); - } + if (commitTimeline.empty()) { + return Collections.emptyList(); + } + + HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); + + // Find out all eligible small file slices, looking for + // smallest file in the partition to append to + List<FileSlice> smallFileSlicesCandidates = pickSmallFileCandidates(partitionPath, latestCommitTime); + List<SmallFile> smallFileLocations = new ArrayList<>(); + + // Create SmallFiles from the eligible file slices + for (FileSlice smallFileSlice : smallFileSlicesCandidates) { + SmallFile sf = new SmallFile(); + if (smallFileSlice.getBaseFile().isPresent()) { + // TODO : Move logic of file name, file id, base commit time handling inside file slice + String filename = smallFileSlice.getBaseFile().get().getFileName(); + sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); + sf.sizeBytes = getTotalFileSize(smallFileSlice); + smallFileLocations.add(sf); } else { - // If we can index log files, we can add more inserts to log files for fileIds NOT including those under - // pending compaction - List<FileSlice> allFileSlices = - table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false) - .collect(Collectors.toList()); - for (FileSlice fileSlice : allFileSlices) { - if (isSmallFile(fileSlice)) { - allSmallFileSlices.add(fileSlice); - } - } - } - // Create SmallFiles from the eligible file slices - for (FileSlice smallFileSlice : allSmallFileSlices) { - SmallFile sf = new SmallFile(); - if (smallFileSlice.getBaseFile().isPresent()) { - // TODO : Move logic of file name, file id, base commit time handling inside file slice - String filename = smallFileSlice.getBaseFile().get().getFileName(); - sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); - sf.sizeBytes = getTotalFileSize(smallFileSlice); - smallFileLocations.add(sf); - } else { - HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); - sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), - FSUtils.getFileIdFromLogPath(logFile.getPath())); - sf.sizeBytes = getTotalFileSize(smallFileSlice); - smallFileLocations.add(sf); - } + HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); + sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), + FSUtils.getFileIdFromLogPath(logFile.getPath())); + sf.sizeBytes = getTotalFileSize(smallFileSlice); + smallFileLocations.add(sf); } } return smallFileLocations; } + @Nonnull + private List<FileSlice> pickSmallFileCandidates(String partitionPath, HoodieInstant latestCommitInstant) { Review comment: minor. getSmallFileCandidates to be in line with how we name other similar methods. ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java ########## @@ -51,68 +52,70 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, HoodieSparkEng @Override protected List<SmallFile> getSmallFiles(String partitionPath) { - - // smallFiles only for partitionPath - List<SmallFile> smallFileLocations = new ArrayList<>(); - // Init here since this class (and member variables) might not have been initialized HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); - // Find out all eligible small file slices - if (!commitTimeline.empty()) { - HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); - // find smallest file in partition and append to it - List<FileSlice> allSmallFileSlices = new ArrayList<>(); - // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to - // it. Doing this overtime for a partition, we ensure that we handle small file issues - if (!table.getIndex().canIndexLogFiles()) { - // TODO : choose last N small files since there can be multiple small files written to a single partition - // by different spark partitions in a single batch - Option<FileSlice> smallFileSlice = Option.fromJavaOptional(table.getSliceView() - .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false) - .filter( - fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config - .getParquetSmallFileLimit()) - .min((FileSlice left, FileSlice right) -> - left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1)); - if (smallFileSlice.isPresent()) { - allSmallFileSlices.add(smallFileSlice.get()); - } + if (commitTimeline.empty()) { + return Collections.emptyList(); + } + + HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); + + // Find out all eligible small file slices, looking for + // smallest file in the partition to append to + List<FileSlice> smallFileSlicesCandidates = pickSmallFileCandidates(partitionPath, latestCommitTime); + List<SmallFile> smallFileLocations = new ArrayList<>(); + + // Create SmallFiles from the eligible file slices + for (FileSlice smallFileSlice : smallFileSlicesCandidates) { + SmallFile sf = new SmallFile(); + if (smallFileSlice.getBaseFile().isPresent()) { + // TODO : Move logic of file name, file id, base commit time handling inside file slice + String filename = smallFileSlice.getBaseFile().get().getFileName(); + sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); + sf.sizeBytes = getTotalFileSize(smallFileSlice); + smallFileLocations.add(sf); } else { - // If we can index log files, we can add more inserts to log files for fileIds NOT including those under - // pending compaction - List<FileSlice> allFileSlices = - table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false) - .collect(Collectors.toList()); - for (FileSlice fileSlice : allFileSlices) { - if (isSmallFile(fileSlice)) { - allSmallFileSlices.add(fileSlice); - } - } - } - // Create SmallFiles from the eligible file slices - for (FileSlice smallFileSlice : allSmallFileSlices) { - SmallFile sf = new SmallFile(); - if (smallFileSlice.getBaseFile().isPresent()) { - // TODO : Move logic of file name, file id, base commit time handling inside file slice - String filename = smallFileSlice.getBaseFile().get().getFileName(); - sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); - sf.sizeBytes = getTotalFileSize(smallFileSlice); - smallFileLocations.add(sf); - } else { - HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); - sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), - FSUtils.getFileIdFromLogPath(logFile.getPath())); - sf.sizeBytes = getTotalFileSize(smallFileSlice); - smallFileLocations.add(sf); - } + HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); + sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), + FSUtils.getFileIdFromLogPath(logFile.getPath())); + sf.sizeBytes = getTotalFileSize(smallFileSlice); + smallFileLocations.add(sf); } } return smallFileLocations; } + @Nonnull + private List<FileSlice> pickSmallFileCandidates(String partitionPath, HoodieInstant latestCommitInstant) { + // If we can index log files, we can add more inserts to log files for fileIds NOT including those under + // pending compaction + if (table.getIndex().canIndexLogFiles()) { + return table.getSliceView() + .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitInstant.getTimestamp(), false) + .filter(this::isSmallFile) + .collect(Collectors.toList()); + } + Review comment: can we add in else {} here so that its explicit. ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java ########## @@ -51,68 +52,70 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, HoodieSparkEng @Override protected List<SmallFile> getSmallFiles(String partitionPath) { - - // smallFiles only for partitionPath - List<SmallFile> smallFileLocations = new ArrayList<>(); - // Init here since this class (and member variables) might not have been initialized HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); - // Find out all eligible small file slices - if (!commitTimeline.empty()) { - HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); - // find smallest file in partition and append to it - List<FileSlice> allSmallFileSlices = new ArrayList<>(); - // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to - // it. Doing this overtime for a partition, we ensure that we handle small file issues - if (!table.getIndex().canIndexLogFiles()) { - // TODO : choose last N small files since there can be multiple small files written to a single partition - // by different spark partitions in a single batch - Option<FileSlice> smallFileSlice = Option.fromJavaOptional(table.getSliceView() - .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false) - .filter( - fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config - .getParquetSmallFileLimit()) - .min((FileSlice left, FileSlice right) -> - left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1)); - if (smallFileSlice.isPresent()) { - allSmallFileSlices.add(smallFileSlice.get()); - } + if (commitTimeline.empty()) { + return Collections.emptyList(); + } + + HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); + + // Find out all eligible small file slices, looking for + // smallest file in the partition to append to + List<FileSlice> smallFileSlicesCandidates = pickSmallFileCandidates(partitionPath, latestCommitTime); + List<SmallFile> smallFileLocations = new ArrayList<>(); + + // Create SmallFiles from the eligible file slices + for (FileSlice smallFileSlice : smallFileSlicesCandidates) { + SmallFile sf = new SmallFile(); + if (smallFileSlice.getBaseFile().isPresent()) { + // TODO : Move logic of file name, file id, base commit time handling inside file slice + String filename = smallFileSlice.getBaseFile().get().getFileName(); + sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); + sf.sizeBytes = getTotalFileSize(smallFileSlice); + smallFileLocations.add(sf); } else { - // If we can index log files, we can add more inserts to log files for fileIds NOT including those under - // pending compaction - List<FileSlice> allFileSlices = - table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false) - .collect(Collectors.toList()); - for (FileSlice fileSlice : allFileSlices) { - if (isSmallFile(fileSlice)) { - allSmallFileSlices.add(fileSlice); - } - } - } - // Create SmallFiles from the eligible file slices - for (FileSlice smallFileSlice : allSmallFileSlices) { - SmallFile sf = new SmallFile(); - if (smallFileSlice.getBaseFile().isPresent()) { - // TODO : Move logic of file name, file id, base commit time handling inside file slice - String filename = smallFileSlice.getBaseFile().get().getFileName(); - sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); - sf.sizeBytes = getTotalFileSize(smallFileSlice); - smallFileLocations.add(sf); - } else { - HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); - sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), - FSUtils.getFileIdFromLogPath(logFile.getPath())); - sf.sizeBytes = getTotalFileSize(smallFileSlice); - smallFileLocations.add(sf); - } + HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); + sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), + FSUtils.getFileIdFromLogPath(logFile.getPath())); + sf.sizeBytes = getTotalFileSize(smallFileSlice); + smallFileLocations.add(sf); } } return smallFileLocations; } + @Nonnull + private List<FileSlice> pickSmallFileCandidates(String partitionPath, HoodieInstant latestCommitInstant) { + // If we can index log files, we can add more inserts to log files for fileIds NOT including those under + // pending compaction + if (table.getIndex().canIndexLogFiles()) { + return table.getSliceView() + .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitInstant.getTimestamp(), false) + .filter(this::isSmallFile) + .collect(Collectors.toList()); + } + + // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to + // it. Doing this overtime for a partition, we ensure that we handle small file issues + // TODO : choose last N small files since there can be multiple small files written to a single partition Review comment: why TODO ? I see we already limit by config.getSmallFileGroupCandidatesLimit() below. is there anything pending still ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org