vinothchandar commented on a change in pull request #1870:
URL: https://github.com/apache/hudi/pull/1870#discussion_r467944196



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
##########
@@ -82,40 +83,45 @@ HoodieCleanerPlan requestClean(JavaSparkContext jsc) {
       LOG.info("Using cleanerParallelism: " + cleanerParallelism);
 
       jsc.setJobGroup(this.getClass().getSimpleName(), "Generates list of file 
slices to be cleaned");
-      Map<String, List<String>> cleanOps = jsc
+      Map<String, List<HoodieCleanFileInfo>> cleanOps = jsc
           .parallelize(partitionsToClean, cleanerParallelism)
           .map(partitionPathToClean -> Pair.of(partitionPathToClean, 
planner.getDeletePaths(partitionPathToClean)))
           .collect().stream()
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+          .collect(Collectors.toMap(Pair::getKey,

Review comment:
       stylistic: in general, a stream within stream is a bit hard to read. 
`flatMap()` first? but guess this is a map. probably using a named lambda 
function may help 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -52,6 +52,8 @@
   public static final String MAX_COMMITS_TO_KEEP_PROP = 
"hoodie.keep.max.commits";
   public static final String MIN_COMMITS_TO_KEEP_PROP = 
"hoodie.keep.min.commits";
   public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = 
"hoodie.commits.archival.batch";
+  // Set true to clean bootstrap source files when necessary
+  public static final String CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = 
"hoodie.cleaner.bootstrap.base.file";

Review comment:
       rename : `hoodie.cleaner.delete.bootstrap.base.file` ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java
##########
@@ -39,17 +40,34 @@
   private final List<String> successDeleteFiles;
   // Files that could not be deleted
   private final List<String> failedDeleteFiles;
+  // Boostrap Base Path patterns that were generated for the delete operation
+  private final List<String> deleteBootstrapBasePathPatterns;
+  private final List<String> successDeleteBootstrapBaseFiles;
+  // Files that could not be deleted
+  private final List<String> failedDeleteBootstrapBaseFiles;
   // Earliest commit that was retained in this clean
   private final String earliestCommitToRetain;
 
   public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, 
List<String> deletePathPatterns,
       List<String> successDeleteFiles, List<String> failedDeleteFiles, String 
earliestCommitToRetain) {
+    this(policy, partitionPath, deletePathPatterns, successDeleteFiles, 
failedDeleteFiles, earliestCommitToRetain,
+        new ArrayList<>(), new ArrayList<>(), new ArrayList<>());

Review comment:
       CollectionUtils.emptyList or something? 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
##########
@@ -82,40 +83,45 @@ HoodieCleanerPlan requestClean(JavaSparkContext jsc) {
       LOG.info("Using cleanerParallelism: " + cleanerParallelism);
 
       jsc.setJobGroup(this.getClass().getSimpleName(), "Generates list of file 
slices to be cleaned");
-      Map<String, List<String>> cleanOps = jsc
+      Map<String, List<HoodieCleanFileInfo>> cleanOps = jsc
           .parallelize(partitionsToClean, cleanerParallelism)
           .map(partitionPathToClean -> Pair.of(partitionPathToClean, 
planner.getDeletePaths(partitionPathToClean)))
           .collect().stream()
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+          .collect(Collectors.toMap(Pair::getKey,
+            (y) -> 
y.getValue().stream().map(CleanFileInfo::toHoodieFileCleanInfo).collect(Collectors.toList())));
 
       return new HoodieCleanerPlan(earliestInstant
           .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), 
x.getState().name())).orElse(null),
-          config.getCleanerPolicy().name(), cleanOps, 1);
+          config.getCleanerPolicy().name(), null, 
CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps);
     } catch (IOException e) {
       throw new HoodieIOException("Failed to schedule clean operation", e);
     }
   }
 
-  private static PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, 
PartitionCleanStat> deleteFilesFunc(
-      HoodieTable table) {
-    return (PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, 
PartitionCleanStat>) iter -> {
+  private static PairFlatMapFunction<Iterator<Tuple2<String, CleanFileInfo>>, 
String, PartitionCleanStat>
+        deleteFilesFunc(HoodieTable table) {
+    return (PairFlatMapFunction<Iterator<Tuple2<String, CleanFileInfo>>, 
String, PartitionCleanStat>) iter -> {
       Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
-
       FileSystem fs = table.getMetaClient().getFs();
-      Path basePath = new Path(table.getMetaClient().getBasePath());
       while (iter.hasNext()) {
-        Tuple2<String, String> partitionDelFileTuple = iter.next();
+        Tuple2<String, CleanFileInfo> partitionDelFileTuple = iter.next();
         String partitionPath = partitionDelFileTuple._1();
-        String delFileName = partitionDelFileTuple._2();
-        Path deletePath = 
FSUtils.getPartitionPath(FSUtils.getPartitionPath(basePath, partitionPath), 
delFileName);
+        Path deletePath = new Path(partitionDelFileTuple._2().getFilePath());
         String deletePathStr = deletePath.toString();
         Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
         if (!partitionCleanStatMap.containsKey(partitionPath)) {
           partitionCleanStatMap.put(partitionPath, new 
PartitionCleanStat(partitionPath));
         }
+        boolean isBootstrapBasePathFile = 
partitionDelFileTuple._2().isBootstrapBaseFile();
         PartitionCleanStat partitionCleanStat = 
partitionCleanStatMap.get(partitionPath);
-        partitionCleanStat.addDeleteFilePatterns(deletePath.getName());
-        partitionCleanStat.addDeletedFileResult(deletePath.getName(), 
deletedFileResult);
+        if (isBootstrapBasePathFile) {

Review comment:
       this is same as 
   
   ```
   partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), 
isBootstrapBasePathFile);
   partitionCleanStat.addDeletedFileResult(deletePath.toString(), 
deletedFileResult, isBootstrapBasePathFile);
   ```
   right

##########
File path: hudi-common/src/main/avro/HoodieCleanMetadata.avsc
##########
@@ -24,23 +24,22 @@
      {"name": "totalFilesDeleted", "type": "int"},
      {"name": "earliestCommitToRetain", "type": "string"},
      {"name": "partitionMetadata", "type": {
-     "type" : "map", "values" : {

Review comment:
       do we know that would be backwards compatible?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to