nsivabalan commented on a change in pull request #3986:
URL: https://github.com/apache/hudi/pull/3986#discussion_r749415355



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
##########
@@ -51,68 +52,70 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile 
profile, HoodieSparkEng
 
   @Override
   protected List<SmallFile> getSmallFiles(String partitionPath) {
-
-    // smallFiles only for partitionPath
-    List<SmallFile> smallFileLocations = new ArrayList<>();
-
     // Init here since this class (and member variables) might not have been 
initialized
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
 
-    // Find out all eligible small file slices
-    if (!commitTimeline.empty()) {
-      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
-      // find smallest file in partition and append to it
-      List<FileSlice> allSmallFileSlices = new ArrayList<>();
-      // If we cannot index log files, then we choose the smallest parquet 
file in the partition and add inserts to
-      // it. Doing this overtime for a partition, we ensure that we handle 
small file issues
-      if (!table.getIndex().canIndexLogFiles()) {
-        // TODO : choose last N small files since there can be multiple small 
files written to a single partition
-        // by different spark partitions in a single batch
-        Option<FileSlice> smallFileSlice = 
Option.fromJavaOptional(table.getSliceView()
-            .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), false)
-            .filter(
-                fileSlice -> fileSlice.getLogFiles().count() < 1 && 
fileSlice.getBaseFile().get().getFileSize() < config
-                    .getParquetSmallFileLimit())
-            .min((FileSlice left, FileSlice right) ->
-                left.getBaseFile().get().getFileSize() < 
right.getBaseFile().get().getFileSize() ? -1 : 1));
-        if (smallFileSlice.isPresent()) {
-          allSmallFileSlices.add(smallFileSlice.get());
-        }
+    if (commitTimeline.empty()) {
+      return Collections.emptyList();
+    }
+
+    HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+
+    // Find out all eligible small file slices, looking for
+    // smallest file in the partition to append to
+    List<FileSlice> smallFileSlicesCandidates = 
pickSmallFileCandidates(partitionPath, latestCommitTime);
+    List<SmallFile> smallFileLocations = new ArrayList<>();
+
+    // Create SmallFiles from the eligible file slices
+    for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
+      SmallFile sf = new SmallFile();
+      if (smallFileSlice.getBaseFile().isPresent()) {
+        // TODO : Move logic of file name, file id, base commit time handling 
inside file slice
+        String filename = smallFileSlice.getBaseFile().get().getFileName();
+        sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       } else {
-        // If we can index log files, we can add more inserts to log files for 
fileIds NOT including those under
-        // pending compaction
-        List<FileSlice> allFileSlices =
-            table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), false)
-                .collect(Collectors.toList());
-        for (FileSlice fileSlice : allFileSlices) {
-          if (isSmallFile(fileSlice)) {
-            allSmallFileSlices.add(fileSlice);
-          }
-        }
-      }
-      // Create SmallFiles from the eligible file slices
-      for (FileSlice smallFileSlice : allSmallFileSlices) {
-        SmallFile sf = new SmallFile();
-        if (smallFileSlice.getBaseFile().isPresent()) {
-          // TODO : Move logic of file name, file id, base commit time 
handling inside file slice
-          String filename = smallFileSlice.getBaseFile().get().getFileName();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        } else {
-          HoodieLogFile logFile = 
smallFileSlice.getLogFiles().findFirst().get();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
-              FSUtils.getFileIdFromLogPath(logFile.getPath()));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        }
+        HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
+        sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+            FSUtils.getFileIdFromLogPath(logFile.getPath()));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       }
     }
     return smallFileLocations;
   }
 
+  @Nonnull
+  private List<FileSlice> pickSmallFileCandidates(String partitionPath, 
HoodieInstant latestCommitInstant) {

Review comment:
       minor. getSmallFileCandidates to be in line with how we name other 
similar methods.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
##########
@@ -51,68 +52,70 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile 
profile, HoodieSparkEng
 
   @Override
   protected List<SmallFile> getSmallFiles(String partitionPath) {
-
-    // smallFiles only for partitionPath
-    List<SmallFile> smallFileLocations = new ArrayList<>();
-
     // Init here since this class (and member variables) might not have been 
initialized
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
 
-    // Find out all eligible small file slices
-    if (!commitTimeline.empty()) {
-      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
-      // find smallest file in partition and append to it
-      List<FileSlice> allSmallFileSlices = new ArrayList<>();
-      // If we cannot index log files, then we choose the smallest parquet 
file in the partition and add inserts to
-      // it. Doing this overtime for a partition, we ensure that we handle 
small file issues
-      if (!table.getIndex().canIndexLogFiles()) {
-        // TODO : choose last N small files since there can be multiple small 
files written to a single partition
-        // by different spark partitions in a single batch
-        Option<FileSlice> smallFileSlice = 
Option.fromJavaOptional(table.getSliceView()
-            .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), false)
-            .filter(
-                fileSlice -> fileSlice.getLogFiles().count() < 1 && 
fileSlice.getBaseFile().get().getFileSize() < config
-                    .getParquetSmallFileLimit())
-            .min((FileSlice left, FileSlice right) ->
-                left.getBaseFile().get().getFileSize() < 
right.getBaseFile().get().getFileSize() ? -1 : 1));
-        if (smallFileSlice.isPresent()) {
-          allSmallFileSlices.add(smallFileSlice.get());
-        }
+    if (commitTimeline.empty()) {
+      return Collections.emptyList();
+    }
+
+    HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+
+    // Find out all eligible small file slices, looking for
+    // smallest file in the partition to append to
+    List<FileSlice> smallFileSlicesCandidates = 
pickSmallFileCandidates(partitionPath, latestCommitTime);
+    List<SmallFile> smallFileLocations = new ArrayList<>();
+
+    // Create SmallFiles from the eligible file slices
+    for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
+      SmallFile sf = new SmallFile();
+      if (smallFileSlice.getBaseFile().isPresent()) {
+        // TODO : Move logic of file name, file id, base commit time handling 
inside file slice
+        String filename = smallFileSlice.getBaseFile().get().getFileName();
+        sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       } else {
-        // If we can index log files, we can add more inserts to log files for 
fileIds NOT including those under
-        // pending compaction
-        List<FileSlice> allFileSlices =
-            table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), false)
-                .collect(Collectors.toList());
-        for (FileSlice fileSlice : allFileSlices) {
-          if (isSmallFile(fileSlice)) {
-            allSmallFileSlices.add(fileSlice);
-          }
-        }
-      }
-      // Create SmallFiles from the eligible file slices
-      for (FileSlice smallFileSlice : allSmallFileSlices) {
-        SmallFile sf = new SmallFile();
-        if (smallFileSlice.getBaseFile().isPresent()) {
-          // TODO : Move logic of file name, file id, base commit time 
handling inside file slice
-          String filename = smallFileSlice.getBaseFile().get().getFileName();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        } else {
-          HoodieLogFile logFile = 
smallFileSlice.getLogFiles().findFirst().get();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
-              FSUtils.getFileIdFromLogPath(logFile.getPath()));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        }
+        HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
+        sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+            FSUtils.getFileIdFromLogPath(logFile.getPath()));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       }
     }
     return smallFileLocations;
   }
 
+  @Nonnull
+  private List<FileSlice> pickSmallFileCandidates(String partitionPath, 
HoodieInstant latestCommitInstant) {
+    // If we can index log files, we can add more inserts to log files for 
fileIds NOT including those under
+    // pending compaction
+    if (table.getIndex().canIndexLogFiles()) {
+      return table.getSliceView()
+              .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitInstant.getTimestamp(), false)
+              .filter(this::isSmallFile)
+              .collect(Collectors.toList());
+    }
+

Review comment:
       can we add in else {} here so that its explicit. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
##########
@@ -51,68 +52,70 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile 
profile, HoodieSparkEng
 
   @Override
   protected List<SmallFile> getSmallFiles(String partitionPath) {
-
-    // smallFiles only for partitionPath
-    List<SmallFile> smallFileLocations = new ArrayList<>();
-
     // Init here since this class (and member variables) might not have been 
initialized
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
 
-    // Find out all eligible small file slices
-    if (!commitTimeline.empty()) {
-      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
-      // find smallest file in partition and append to it
-      List<FileSlice> allSmallFileSlices = new ArrayList<>();
-      // If we cannot index log files, then we choose the smallest parquet 
file in the partition and add inserts to
-      // it. Doing this overtime for a partition, we ensure that we handle 
small file issues
-      if (!table.getIndex().canIndexLogFiles()) {
-        // TODO : choose last N small files since there can be multiple small 
files written to a single partition
-        // by different spark partitions in a single batch
-        Option<FileSlice> smallFileSlice = 
Option.fromJavaOptional(table.getSliceView()
-            .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), false)
-            .filter(
-                fileSlice -> fileSlice.getLogFiles().count() < 1 && 
fileSlice.getBaseFile().get().getFileSize() < config
-                    .getParquetSmallFileLimit())
-            .min((FileSlice left, FileSlice right) ->
-                left.getBaseFile().get().getFileSize() < 
right.getBaseFile().get().getFileSize() ? -1 : 1));
-        if (smallFileSlice.isPresent()) {
-          allSmallFileSlices.add(smallFileSlice.get());
-        }
+    if (commitTimeline.empty()) {
+      return Collections.emptyList();
+    }
+
+    HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+
+    // Find out all eligible small file slices, looking for
+    // smallest file in the partition to append to
+    List<FileSlice> smallFileSlicesCandidates = 
pickSmallFileCandidates(partitionPath, latestCommitTime);
+    List<SmallFile> smallFileLocations = new ArrayList<>();
+
+    // Create SmallFiles from the eligible file slices
+    for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
+      SmallFile sf = new SmallFile();
+      if (smallFileSlice.getBaseFile().isPresent()) {
+        // TODO : Move logic of file name, file id, base commit time handling 
inside file slice
+        String filename = smallFileSlice.getBaseFile().get().getFileName();
+        sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       } else {
-        // If we can index log files, we can add more inserts to log files for 
fileIds NOT including those under
-        // pending compaction
-        List<FileSlice> allFileSlices =
-            table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), false)
-                .collect(Collectors.toList());
-        for (FileSlice fileSlice : allFileSlices) {
-          if (isSmallFile(fileSlice)) {
-            allSmallFileSlices.add(fileSlice);
-          }
-        }
-      }
-      // Create SmallFiles from the eligible file slices
-      for (FileSlice smallFileSlice : allSmallFileSlices) {
-        SmallFile sf = new SmallFile();
-        if (smallFileSlice.getBaseFile().isPresent()) {
-          // TODO : Move logic of file name, file id, base commit time 
handling inside file slice
-          String filename = smallFileSlice.getBaseFile().get().getFileName();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        } else {
-          HoodieLogFile logFile = 
smallFileSlice.getLogFiles().findFirst().get();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
-              FSUtils.getFileIdFromLogPath(logFile.getPath()));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        }
+        HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
+        sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+            FSUtils.getFileIdFromLogPath(logFile.getPath()));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       }
     }
     return smallFileLocations;
   }
 
+  @Nonnull
+  private List<FileSlice> pickSmallFileCandidates(String partitionPath, 
HoodieInstant latestCommitInstant) {
+    // If we can index log files, we can add more inserts to log files for 
fileIds NOT including those under
+    // pending compaction
+    if (table.getIndex().canIndexLogFiles()) {
+      return table.getSliceView()
+              .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitInstant.getTimestamp(), false)
+              .filter(this::isSmallFile)
+              .collect(Collectors.toList());
+    }
+
+    // If we cannot index log files, then we choose the smallest parquet file 
in the partition and add inserts to
+    // it. Doing this overtime for a partition, we ensure that we handle small 
file issues
+    // TODO : choose last N small files since there can be multiple small 
files written to a single partition

Review comment:
       why TODO ? I see we already limit by 
config.getSmallFileGroupCandidatesLimit() below. is there anything pending 
still ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to