This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 46808dc  [HUDI-2497] Refactor clean and restore actions in hudi-client 
module (#3734)
46808dc is described below

commit 46808dcb1fe22491326a9e831dd4dde4c70796fb
Author: Y Ethan Guo <ethan.guoyi...@gmail.com>
AuthorDate: Thu Sep 30 15:20:25 2021 -0700

    [HUDI-2497] Refactor clean and restore actions in hudi-client module (#3734)
---
 ...ctionExecutor.java => CleanActionExecutor.java} |  94 ++++++++++++++-
 ...nExecutor.java => CleanPlanActionExecutor.java} |  16 +--
 .../restore/CopyOnWriteRestoreActionExecutor.java} |  48 ++++----
 .../restore/MergeOnReadRestoreActionExecutor.java} |  33 ++---
 .../client/common/HoodieFlinkEngineContext.java    |  15 +++
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |  10 +-
 .../action/clean/FlinkCleanActionExecutor.java     | 128 --------------------
 .../clean/FlinkScheduleCleanActionExecutor.java    |  52 --------
 .../client/common/HoodieJavaEngineContext.java     |  15 +++
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |  14 +--
 .../action/clean/JavaCleanActionExecutor.java      | 130 --------------------
 .../clean/JavaScheduleCleanActionExecutor.java     |  52 --------
 .../client/common/HoodieSparkEngineContext.java    |  19 +++
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |  18 +--
 .../hudi/table/HoodieSparkMergeOnReadTable.java    |   8 +-
 .../action/clean/SparkCleanActionExecutor.java     | 134 ---------------------
 .../action/clean/SparkCleanPlanActionExecutor.java |  55 ---------
 .../SparkCopyOnWriteRestoreActionExecutor.java     |  70 -----------
 .../hudi/common/engine/HoodieEngineContext.java    |   7 ++
 .../common/engine/HoodieLocalEngineContext.java    |  16 +++
 .../hudi/common/function/FunctionWrapper.java      |  11 ++
 .../function/SerializablePairFlatMapFunction.java  |  33 +++++
 22 files changed, 273 insertions(+), 705 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
similarity index 53%
rename from 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
rename to 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index acc3cdc..a5a72d4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -20,10 +20,13 @@ package org.apache.hudi.table.action.clean;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.avro.model.HoodieActionInstant;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.common.HoodieCleanStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.CleanFileInfo;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -31,29 +34,36 @@ import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
-public abstract class BaseCleanActionExecutor<T extends HoodieRecordPayload, 
I, K, O> extends BaseActionExecutor<T, I, K, O, HoodieCleanMetadata> {
+public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> 
extends BaseActionExecutor<T, I, K, O, HoodieCleanMetadata> {
 
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LogManager.getLogger(BaseCleanActionExecutor.class);
+  private static final Logger LOG = 
LogManager.getLogger(CleanActionExecutor.class);
 
-  public BaseCleanActionExecutor(HoodieEngineContext context, 
HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
+  public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig 
config, HoodieTable<T, I, K, O> table, String instantTime) {
     super(context, config, table, instantTime);
   }
 
-  protected static Boolean deleteFileAndGetResult(FileSystem fs, String 
deletePathStr) throws IOException {
+  static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) 
throws IOException {
     Path deletePath = new Path(deletePathStr);
     LOG.debug("Working on delete path :" + deletePath);
     try {
@@ -68,13 +78,85 @@ public abstract class BaseCleanActionExecutor<T extends 
HoodieRecordPayload, I,
     }
   }
 
+  static Stream<Pair<String, PartitionCleanStat>> 
deleteFilesFunc(Iterator<Pair<String, CleanFileInfo>> cleanFileInfo, 
HoodieTable table) {
+    Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
+    FileSystem fs = table.getMetaClient().getFs();
+
+    cleanFileInfo.forEachRemaining(partitionDelFileTuple -> {
+      String partitionPath = partitionDelFileTuple.getLeft();
+      Path deletePath = new 
Path(partitionDelFileTuple.getRight().getFilePath());
+      String deletePathStr = deletePath.toString();
+      Boolean deletedFileResult = null;
+      try {
+        deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
+
+      } catch (IOException e) {
+        LOG.error("Delete file failed: " + deletePathStr);
+      }
+      final PartitionCleanStat partitionCleanStat =
+          partitionCleanStatMap.computeIfAbsent(partitionPath, k -> new 
PartitionCleanStat(partitionPath));
+      boolean isBootstrapBasePathFile = 
partitionDelFileTuple.getRight().isBootstrapBaseFile();
+
+      if (isBootstrapBasePathFile) {
+        // For Bootstrap Base file deletions, store the full file path.
+        partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
+        partitionCleanStat.addDeletedFileResult(deletePath.toString(), 
deletedFileResult, true);
+      } else {
+        partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
+        partitionCleanStat.addDeletedFileResult(deletePath.getName(), 
deletedFileResult, false);
+      }
+    });
+    return partitionCleanStatMap.entrySet().stream().map(e -> 
Pair.of(e.getKey(), e.getValue()));
+  }
+
   /**
    * Performs cleaning of partition paths according to cleaning policy and 
returns the number of files cleaned. Handles
    * skews in partitions to clean by making files to clean as the unit of task 
distribution.
    *
    * @throws IllegalArgumentException if unknown cleaning policy is provided
    */
-  abstract List<HoodieCleanStat> clean(HoodieEngineContext context, 
HoodieCleanerPlan cleanerPlan);
+  List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan 
cleanerPlan) {
+    int cleanerParallelism = Math.min(
+        (int) 
(cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
+        config.getCleanerParallelism());
+    LOG.info("Using cleanerParallelism: " + cleanerParallelism);
+
+    context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of 
partitions");
+
+    Stream<Pair<String, CleanFileInfo>> filesToBeDeletedPerPartition =
+        cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
+            .flatMap(x -> x.getValue().stream().map(y -> new 
ImmutablePair<>(x.getKey(),
+                new CleanFileInfo(y.getFilePath(), 
y.getIsBootstrapBaseFile()))));
+
+    Stream<ImmutablePair<String, PartitionCleanStat>> partitionCleanStats =
+        context.mapPartitionsToPairAndReduceByKey(filesToBeDeletedPerPartition,
+            iterator -> deleteFilesFunc(iterator, table), 
PartitionCleanStat::merge, cleanerParallelism);
+
+    Map<String, PartitionCleanStat> partitionCleanStatsMap = 
partitionCleanStats
+        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+    // Return PartitionCleanStat for each partition passed.
+    return 
cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath
 -> {
+      PartitionCleanStat partitionCleanStat = 
partitionCleanStatsMap.containsKey(partitionPath)
+          ? partitionCleanStatsMap.get(partitionPath)
+          : new PartitionCleanStat(partitionPath);
+      HoodieActionInstant actionInstant = 
cleanerPlan.getEarliestInstantToRetain();
+      return 
HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
+          .withEarliestCommitRetained(Option.ofNullable(
+              actionInstant != null
+                  ? new 
HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
+                  actionInstant.getAction(), actionInstant.getTimestamp())
+                  : null))
+          .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
+          .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
+          .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
+          
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
+          
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
+          
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
+          .build();
+    }).collect(Collectors.toList());
+  }
+
 
   /**
    * Executes the Cleaner plan stored in the instant metadata.
@@ -143,7 +225,7 @@ public abstract class BaseCleanActionExecutor<T extends 
HoodieRecordPayload, I,
     }
     // return the last clean metadata for now
     // TODO (NA) : Clean only the earliest pending clean just like how we do 
for other table services
-    // This requires the BaseCleanActionExecutor to be refactored as 
BaseCommitActionExecutor
+    // This requires the CleanActionExecutor to be refactored as 
BaseCommitActionExecutor
     return cleanMetadataList.size() > 0 ? 
cleanMetadataList.get(cleanMetadataList.size() - 1) : null;
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
similarity index 90%
rename from 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanPlanActionExecutor.java
rename to 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index fc0c000..9b95bd7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -43,22 +43,24 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public abstract class BaseCleanPlanActionExecutor<T extends 
HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, 
Option<HoodieCleanerPlan>> {
+public class CleanPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> 
extends BaseActionExecutor<T, I, K, O, Option<HoodieCleanerPlan>> {
 
   private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
 
   private final Option<Map<String, String>> extraMetadata;
 
-  public BaseCleanPlanActionExecutor(HoodieEngineContext context,
-                                     HoodieWriteConfig config,
-                                     HoodieTable<T, I, K, O> table,
-                                     String instantTime,
-                                     Option<Map<String, String>> 
extraMetadata) {
+  public CleanPlanActionExecutor(HoodieEngineContext context,
+                                 HoodieWriteConfig config,
+                                 HoodieTable<T, I, K, O> table,
+                                 String instantTime,
+                                 Option<Map<String, String>> extraMetadata) {
     super(context, config, table, instantTime);
     this.extraMetadata = extraMetadata;
   }
 
-  protected abstract Option<HoodieCleanerPlan> createCleanerPlan();
+  protected Option<HoodieCleanerPlan> createCleanerPlan() {
+    return execute();
+  }
 
   /**
    * Generates List of files to be cleaned.
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
similarity index 59%
rename from 
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java
rename to 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
index f7677ae..2e3b148 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
@@ -7,22 +7,20 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.hudi.table.action.restore;
 
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieJavaEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -32,35 +30,35 @@ import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 
-import java.util.List;
-
-public class JavaCopyOnWriteRestoreActionExecutor<T extends 
HoodieRecordPayload> extends
-    BaseRestoreActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> {
-
-  public JavaCopyOnWriteRestoreActionExecutor(HoodieJavaEngineContext context,
-                                              HoodieWriteConfig config,
-                                              HoodieTable table,
-                                              String instantTime,
-                                              String restoreInstantTime) {
+public class CopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload, 
I, K, O>
+    extends BaseRestoreActionExecutor<T, I, K, O> {
+  public CopyOnWriteRestoreActionExecutor(HoodieEngineContext context,
+                                          HoodieWriteConfig config,
+                                          HoodieTable table,
+                                          String instantTime,
+                                          String restoreInstantTime) {
     super(context, config, table, instantTime, restoreInstantTime);
   }
 
   @Override
   protected HoodieRollbackMetadata rollbackInstant(HoodieInstant 
instantToRollback) {
+    if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)
+        && 
!instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+      throw new HoodieRollbackException("Unsupported action in rollback 
instant:" + instantToRollback);
+    }
+    table.getMetaClient().reloadActiveTimeline();
+    String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
+    table.scheduleRollback(context, newInstantTime, instantToRollback, false);
     table.getMetaClient().reloadActiveTimeline();
     CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new 
CopyOnWriteRollbackActionExecutor(
         context,
         config,
         table,
-        HoodieActiveTimeline.createNewInstantTime(),
+        newInstantTime,
         instantToRollback,
         true,
         true,
         false);
-    if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)
-        && 
!instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-      throw new HoodieRollbackException("Unsupported action in rollback 
instant:" + instantToRollback);
-    }
     return rollbackActionExecutor.execute();
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
similarity index 67%
rename from 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java
rename to 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
index 14a0b24..58663b6 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
@@ -7,22 +7,20 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.hudi.table.action.restore;
 
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -31,17 +29,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
 
-import org.apache.spark.api.java.JavaRDD;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class SparkMergeOnReadRestoreActionExecutor<T extends 
HoodieRecordPayload> extends
-    BaseRestoreActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> {
-
-  public SparkMergeOnReadRestoreActionExecutor(HoodieSparkEngineContext 
context,
-                                               HoodieWriteConfig config,
-                                               HoodieTable table,
-                                               String instantTime,
-                                               String restoreInstantTime) {
+public class MergeOnReadRestoreActionExecutor<T extends HoodieRecordPayload, 
I, K, O>
+    extends BaseRestoreActionExecutor<T, I, K, O> {
+  public MergeOnReadRestoreActionExecutor(HoodieEngineContext context, 
HoodieWriteConfig config, HoodieTable<T, I, K, O> table,
+                                          String instantTime, String 
restoreInstantTime) {
     super(context, config, table, instantTime, restoreInstantTime);
   }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 174122c..687ecc1 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -26,11 +26,13 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.Option;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -38,9 +40,11 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.util.FlinkClientUtil;
 
+import static 
org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapToPairWrapper;
 import static 
org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper;
 import static 
org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
 import static 
org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
@@ -87,6 +91,17 @@ public class HoodieFlinkEngineContext extends 
HoodieEngineContext {
   }
 
   @Override
+  public <I, K, V> Stream<ImmutablePair<K, V>> 
mapPartitionsToPairAndReduceByKey(
+      Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> 
flatMapToPairFunc,
+      SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+    return 
throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
+        .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
+        .map(entry -> new ImmutablePair<>(entry.getKey(), 
entry.getValue().stream().map(
+            
Pair::getValue).reduce(throwingReduceWrapper(reduceFunc)).orElse(null)))
+        .filter(Objects::nonNull);
+  }
+
+  @Override
   public <I, K, V> List<V> reduceByKey(
       List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int 
parallelism) {
     return data.stream().parallel()
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 93785b9..2238ac3 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -44,8 +44,8 @@ import org.apache.hudi.io.HoodieSortedMergeHandle;
 import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
-import org.apache.hudi.table.action.clean.FlinkCleanActionExecutor;
-import org.apache.hudi.table.action.clean.FlinkScheduleCleanActionExecutor;
+import org.apache.hudi.table.action.clean.CleanActionExecutor;
+import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
@@ -297,7 +297,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends 
HoodieRecordPayload> extends
    */
   @Override
   public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata) {
-    return new FlinkScheduleCleanActionExecutor(context, config, this, 
instantTime, extraMetadata).execute();
+    return new CleanPlanActionExecutor(context, config, this, instantTime, 
extraMetadata).execute();
   }
 
   @Override
@@ -308,7 +308,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends 
HoodieRecordPayload> extends
 
   @Override
   public HoodieCleanMetadata clean(HoodieEngineContext context, String 
cleanInstantTime) {
-    return new FlinkCleanActionExecutor(context, config, this, 
cleanInstantTime).execute();
+    return new CleanActionExecutor(context, config, this, 
cleanInstantTime).execute();
   }
 
   @Override
@@ -329,7 +329,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends 
HoodieRecordPayload> extends
   // -------------------------------------------------------------------------
   //  Used for compaction
   // -------------------------------------------------------------------------
-
+  
   public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String 
partitionPath, String fileId,
                                                   Map<String, HoodieRecord<T>> 
keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
     // these are updates
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkCleanActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkCleanActionExecutor.java
deleted file mode 100644
index 9378cb2..0000000
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkCleanActionExecutor.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.clean;
-
-import org.apache.hudi.avro.model.HoodieActionInstant;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieCleanStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.CleanFileInfo;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import scala.Tuple2;
-
-public class FlinkCleanActionExecutor<T extends HoodieRecordPayload> extends
-    BaseCleanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> {
-
-  private static final Logger LOG = 
LogManager.getLogger(FlinkCleanActionExecutor.class);
-
-  public FlinkCleanActionExecutor(HoodieEngineContext context,
-                                  HoodieWriteConfig config,
-                                  HoodieTable<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> table,
-                                  String instantTime) {
-    super(context, config, table, instantTime);
-  }
-
-  @Override
-  List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan 
cleanerPlan) {
-    Stream<Tuple2<String, CleanFileInfo>> filesToBeDeletedPerPartition = 
cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
-        .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), 
new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile()))));
-
-    Stream<Tuple2<String, PartitionCleanStat>> partitionCleanStats =
-        deleteFilesFunc(filesToBeDeletedPerPartition, table)
-            .collect(Collectors.groupingBy(Pair::getLeft))
-            .entrySet().stream()
-            .map(x -> new Tuple2(x.getKey(), x.getValue().stream().map(y -> 
y.getRight()).reduce(PartitionCleanStat::merge).get()));
-
-    Map<String, PartitionCleanStat> partitionCleanStatsMap = 
partitionCleanStats
-        .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
-
-    // Return PartitionCleanStat for each partition passed.
-    return 
cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath
 -> {
-      PartitionCleanStat partitionCleanStat = 
partitionCleanStatsMap.containsKey(partitionPath)
-          ? partitionCleanStatsMap.get(partitionPath)
-          : new PartitionCleanStat(partitionPath);
-      HoodieActionInstant actionInstant = 
cleanerPlan.getEarliestInstantToRetain();
-      return 
HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
-          .withEarliestCommitRetained(Option.ofNullable(
-              actionInstant != null
-                  ? new 
HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
-                  actionInstant.getAction(), actionInstant.getTimestamp())
-                  : null))
-          .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
-          .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
-          .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
-          
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
-          
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
-          
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
-          .build();
-    }).collect(Collectors.toList());
-  }
-
-  private static Stream<Pair<String, PartitionCleanStat>> 
deleteFilesFunc(Stream<Tuple2<String, CleanFileInfo>> cleanFileInfo, 
HoodieTable table) {
-    Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
-    FileSystem fs = table.getMetaClient().getFs();
-
-    cleanFileInfo.parallel().forEach(partitionDelFileTuple -> {
-      String partitionPath = partitionDelFileTuple._1();
-      Path deletePath = new Path(partitionDelFileTuple._2().getFilePath());
-      String deletePathStr = deletePath.toString();
-      Boolean deletedFileResult = null;
-      try {
-        deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
-      } catch (IOException e) {
-        LOG.error("Delete file failed");
-      }
-      final PartitionCleanStat partitionCleanStat;
-      synchronized (partitionCleanStatMap) {
-        partitionCleanStat = 
partitionCleanStatMap.computeIfAbsent(partitionPath, k -> new 
PartitionCleanStat(partitionPath));
-      }
-      boolean isBootstrapBasePathFile = 
partitionDelFileTuple._2().isBootstrapBaseFile();
-      if (isBootstrapBasePathFile) {
-        // For Bootstrap Base file deletions, store the full file path.
-        partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
-        partitionCleanStat.addDeletedFileResult(deletePath.toString(), 
deletedFileResult, true);
-      } else {
-        partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
-        partitionCleanStat.addDeletedFileResult(deletePath.getName(), 
deletedFileResult, false);
-      }
-    });
-    return partitionCleanStatMap.entrySet().stream().map(e -> 
Pair.of(e.getKey(), e.getValue()));
-  }
-}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkScheduleCleanActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkScheduleCleanActionExecutor.java
deleted file mode 100644
index 75da54e..0000000
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkScheduleCleanActionExecutor.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.clean;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-public class FlinkScheduleCleanActionExecutor<T extends HoodieRecordPayload> 
extends
-    BaseCleanPlanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> {
-
-  private static final Logger LOG = 
LogManager.getLogger(FlinkScheduleCleanActionExecutor.class);
-
-  public FlinkScheduleCleanActionExecutor(HoodieEngineContext context,
-                                  HoodieWriteConfig config,
-                                  HoodieTable<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> table,
-                                  String instantTime,
-                                  Option<Map<String, String>> extraMetadata) {
-    super(context, config, table, instantTime, extraMetadata);
-  }
-
-  @Override
-  protected Option<HoodieCleanerPlan> createCleanerPlan() {
-    return super.execute();
-  }
-}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index 4cdbff2..bdc2a85 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client.common;
 
 import org.apache.hadoop.conf.Configuration;
+
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -26,11 +27,14 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.Option;
 
+import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -38,6 +42,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.toList;
+import static 
org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapToPairWrapper;
 import static 
org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper;
 import static 
org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
 import static 
org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
@@ -71,6 +76,16 @@ public class HoodieJavaEngineContext extends 
HoodieEngineContext {
   }
 
   @Override
+  public <I, K, V> Stream<ImmutablePair<K, V>> 
mapPartitionsToPairAndReduceByKey(Stream<I> data, 
SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc,
+                                                                               
  SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+    return 
throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
+        .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
+        .map(entry -> new ImmutablePair<>(entry.getKey(), 
entry.getValue().stream().map(
+            
Pair::getValue).reduce(throwingReduceWrapper(reduceFunc)).orElse(null)))
+        .filter(Objects::nonNull);
+  }
+
+  @Override
   public <I, K, V> List<V> reduceByKey(
       List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int 
parallelism) {
     return data.stream().parallel()
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 72d63d5..99cf413 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -39,8 +39,8 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
-import org.apache.hudi.table.action.clean.JavaCleanActionExecutor;
-import org.apache.hudi.table.action.clean.JavaScheduleCleanActionExecutor;
+import org.apache.hudi.table.action.clean.CleanActionExecutor;
+import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
 import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor;
 import org.apache.hudi.table.action.commit.JavaBulkInsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
@@ -50,7 +50,7 @@ import 
org.apache.hudi.table.action.commit.JavaInsertOverwriteTableCommitActionE
 import 
org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
-import 
org.apache.hudi.table.action.restore.JavaCopyOnWriteRestoreActionExecutor;
+import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
 import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
@@ -187,13 +187,13 @@ public class HoodieJavaCopyOnWriteTable<T extends 
HoodieRecordPayload> extends H
 
   @Override
   public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata) {
-    return new JavaScheduleCleanActionExecutor<>(context, config, this, 
instantTime, extraMetadata).execute();
+    return new CleanPlanActionExecutor<>(context, config, this, instantTime, 
extraMetadata).execute();
   }
 
   @Override
   public HoodieCleanMetadata clean(HoodieEngineContext context,
                                    String cleanInstantTime) {
-    return new JavaCleanActionExecutor(context, config, this, 
cleanInstantTime).execute();
+    return new CleanActionExecutor(context, config, this, 
cleanInstantTime).execute();
   }
 
   @Override
@@ -218,7 +218,7 @@ public class HoodieJavaCopyOnWriteTable<T extends 
HoodieRecordPayload> extends H
   public HoodieRestoreMetadata restore(HoodieEngineContext context,
                                        String restoreInstantTime,
                                        String instantToRestore) {
-    return new JavaCopyOnWriteRestoreActionExecutor((HoodieJavaEngineContext) 
context,
-        config, this, restoreInstantTime, instantToRestore).execute();
+    return new CopyOnWriteRestoreActionExecutor(
+        context, config, this, restoreInstantTime, instantToRestore).execute();
   }
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java
deleted file mode 100644
index 0ca73d4..0000000
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.clean;
-
-import org.apache.hudi.avro.model.HoodieActionInstant;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieCleanStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.CleanFileInfo;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ImmutablePair;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-public class JavaCleanActionExecutor<T extends HoodieRecordPayload> extends
-    BaseCleanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> {
-
-  private static final Logger LOG = 
LogManager.getLogger(JavaCleanActionExecutor.class);
-
-  public JavaCleanActionExecutor(HoodieEngineContext context,
-                                  HoodieWriteConfig config,
-                                  HoodieTable<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> table,
-                                  String instantTime) {
-    super(context, config, table, instantTime);
-  }
-
-  @Override
-  List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan 
cleanerPlan) {
-
-    Iterator<ImmutablePair<String, CleanFileInfo>> 
filesToBeDeletedPerPartition = 
cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
-        .flatMap(x -> x.getValue().stream().map(y -> new 
ImmutablePair<>(x.getKey(), new CleanFileInfo(y.getFilePath(), 
y.getIsBootstrapBaseFile())))).iterator();
-
-    Stream<Pair<String, PartitionCleanStat>> partitionCleanStats =
-        deleteFilesFunc(filesToBeDeletedPerPartition, table)
-            .collect(Collectors.groupingBy(Pair::getLeft))
-            .entrySet().stream()
-            .map(x -> new ImmutablePair(x.getKey(), 
x.getValue().stream().map(y -> 
y.getRight()).reduce(PartitionCleanStat::merge).get()));
-
-    Map<String, PartitionCleanStat> partitionCleanStatsMap = 
partitionCleanStats
-        .collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
-
-    // Return PartitionCleanStat for each partition passed.
-    return 
cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath
 -> {
-      PartitionCleanStat partitionCleanStat = 
partitionCleanStatsMap.containsKey(partitionPath)
-          ? partitionCleanStatsMap.get(partitionPath)
-          : new PartitionCleanStat(partitionPath);
-      HoodieActionInstant actionInstant = 
cleanerPlan.getEarliestInstantToRetain();
-      return 
HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
-          .withEarliestCommitRetained(Option.ofNullable(
-              actionInstant != null
-                  ? new 
HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
-                  actionInstant.getAction(), actionInstant.getTimestamp())
-                  : null))
-          .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
-          .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
-          .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
-          
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
-          
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
-          
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
-          .build();
-    }).collect(Collectors.toList());
-  }
-
-  private static Stream<Pair<String, PartitionCleanStat>> 
deleteFilesFunc(Iterator<ImmutablePair<String, CleanFileInfo>> iter, 
HoodieTable table) {
-    Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
-    FileSystem fs = table.getMetaClient().getFs();
-
-    while (iter.hasNext()) {
-      Pair<String, CleanFileInfo> partitionDelFileTuple = iter.next();
-      String partitionPath = partitionDelFileTuple.getLeft();
-      Path deletePath = new 
Path(partitionDelFileTuple.getRight().getFilePath());
-      String deletePathStr = deletePath.toString();
-      Boolean deletedFileResult = null;
-      try {
-        deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
-      } catch (IOException e) {
-        LOG.error("Delete file failed");
-      }
-      if (!partitionCleanStatMap.containsKey(partitionPath)) {
-        partitionCleanStatMap.put(partitionPath, new 
PartitionCleanStat(partitionPath));
-      }
-      boolean isBootstrapBasePathFile = 
partitionDelFileTuple.getRight().isBootstrapBaseFile();
-      PartitionCleanStat partitionCleanStat = 
partitionCleanStatMap.get(partitionPath);
-      if (isBootstrapBasePathFile) {
-        // For Bootstrap Base file deletions, store the full file path.
-        partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
-        partitionCleanStat.addDeletedFileResult(deletePath.toString(), 
deletedFileResult, true);
-      } else {
-        partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
-        partitionCleanStat.addDeletedFileResult(deletePath.getName(), 
deletedFileResult, false);
-      }
-    }
-    return partitionCleanStatMap.entrySet().stream().map(e -> 
Pair.of(e.getKey(), e.getValue()));
-  }
-}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaScheduleCleanActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaScheduleCleanActionExecutor.java
deleted file mode 100644
index 05d19a6..0000000
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaScheduleCleanActionExecutor.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.clean;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-public class JavaScheduleCleanActionExecutor<T extends HoodieRecordPayload> 
extends
-    BaseCleanPlanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> {
-
-  private static final Logger LOG = 
LogManager.getLogger(JavaScheduleCleanActionExecutor.class);
-
-  public JavaScheduleCleanActionExecutor(HoodieEngineContext context,
-                                  HoodieWriteConfig config,
-                                  HoodieTable<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> table,
-                                  String instantTime,
-                                  Option<Map<String, String>> extraMetadata) {
-    super(context, config, table, instantTime, extraMetadata);
-  }
-
-  @Override
-  protected Option<HoodieCleanerPlan> createCleanerPlan() {
-    return super.execute();
-  }
-}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index de06ea4..416992e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -25,18 +25,23 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.sql.SQLContext;
 import scala.Tuple2;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -83,6 +88,20 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
   }
 
   @Override
+  public <I, K, V> Stream<ImmutablePair<K, V>> 
mapPartitionsToPairAndReduceByKey(
+      Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> 
flatMapToPairFunc,
+      SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+    return javaSparkContext.parallelize(data.collect(Collectors.toList()), 
parallelism)
+        .mapPartitionsToPair((PairFlatMapFunction<Iterator<I>, K, V>) iterator 
->
+            
flatMapToPairFunc.call(iterator).collect(Collectors.toList()).stream()
+                .map(e -> new Tuple2<>(e.getKey(), e.getValue())).iterator()
+        )
+        .reduceByKey(reduceFunc::apply)
+        .map(e -> new ImmutablePair<>(e._1, e._2))
+        .collect().stream();
+  }
+
+  @Override
   public <I, K, V> List<V> reduceByKey(
       List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int 
parallelism) {
     return javaSparkContext.parallelize(data, parallelism).mapToPair(pair -> 
new Tuple2<K, V>(pair.getLeft(), pair.getRight()))
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index c2770a7..a9b36a8 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -50,8 +50,8 @@ import 
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
 import 
org.apache.hudi.table.action.bootstrap.SparkBootstrapCommitActionExecutor;
-import org.apache.hudi.table.action.clean.SparkCleanActionExecutor;
-import org.apache.hudi.table.action.clean.SparkCleanPlanActionExecutor;
+import org.apache.hudi.table.action.clean.CleanActionExecutor;
+import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
 import org.apache.hudi.table.action.cluster.SparkClusteringPlanActionExecutor;
 import 
org.apache.hudi.table.action.cluster.SparkExecuteClusteringCommitActionExecutor;
 import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
@@ -65,7 +65,7 @@ import 
org.apache.hudi.table.action.commit.SparkInsertPreppedCommitActionExecuto
 import org.apache.hudi.table.action.commit.SparkMergeHelper;
 import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor;
-import 
org.apache.hudi.table.action.restore.SparkCopyOnWriteRestoreActionExecutor;
+import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
 import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
@@ -181,12 +181,12 @@ public class HoodieSparkCopyOnWriteTable<T extends 
HoodieRecordPayload> extends
 
   @Override
   public void rollbackBootstrap(HoodieEngineContext context, String 
instantTime) {
-    new SparkCopyOnWriteRestoreActionExecutor((HoodieSparkEngineContext) 
context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
+    new CopyOnWriteRestoreActionExecutor(context, config, this, instantTime, 
HoodieTimeline.INIT_INSTANT_TS).execute();
   }
 
   @Override
   public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata) {
-    return new SparkCleanPlanActionExecutor<>(context, config,this, 
instantTime, extraMetadata).execute();
+    return new CleanPlanActionExecutor<>(context, config, this, instantTime, 
extraMetadata).execute();
   }
 
   @Override
@@ -197,7 +197,7 @@ public class HoodieSparkCopyOnWriteTable<T extends 
HoodieRecordPayload> extends
   }
 
   public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String 
partitionPath, String fileId,
-      Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile 
oldDataFile) throws IOException {
+                                                  Map<String, HoodieRecord<T>> 
keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
     // these are updates
     HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, 
partitionPath, fileId, keyToNewRecords, oldDataFile);
     return handleUpdateInternal(upsertHandle, instantTime, fileId);
@@ -242,7 +242,7 @@ public class HoodieSparkCopyOnWriteTable<T extends 
HoodieRecordPayload> extends
   }
 
   public Iterator<List<WriteStatus>> handleInsert(String instantTime, String 
partitionPath, String fileId,
-      Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
+                                                  Map<String, HoodieRecord<? 
extends HoodieRecordPayload>> recordMap) {
     HoodieCreateHandle<?,?,?,?> createHandle =
         new HoodieCreateHandle(config, instantTime, this, partitionPath, 
fileId, recordMap, taskContextSupplier);
     createHandle.write();
@@ -251,7 +251,7 @@ public class HoodieSparkCopyOnWriteTable<T extends 
HoodieRecordPayload> extends
 
   @Override
   public HoodieCleanMetadata clean(HoodieEngineContext context, String 
cleanInstantTime) {
-    return new SparkCleanActionExecutor((HoodieSparkEngineContext)context, 
config, this, cleanInstantTime).execute();
+    return new CleanActionExecutor(context, config, this, 
cleanInstantTime).execute();
   }
 
   @Override
@@ -266,7 +266,7 @@ public class HoodieSparkCopyOnWriteTable<T extends 
HoodieRecordPayload> extends
 
   @Override
   public HoodieRestoreMetadata restore(HoodieEngineContext context, String 
restoreInstantTime, String instantToRestore) {
-    return new 
SparkCopyOnWriteRestoreActionExecutor((HoodieSparkEngineContext) context, 
config, this, restoreInstantTime, instantToRestore).execute();
+    return new CopyOnWriteRestoreActionExecutor(context, config, this, 
restoreInstantTime, instantToRestore).execute();
   }
 
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index ee66d7b..b4b106c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -39,6 +39,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import 
org.apache.hudi.table.action.bootstrap.SparkBootstrapDeltaCommitActionExecutor;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
+import 
org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
 import org.apache.hudi.table.action.compact.SparkRunCompactionActionExecutor;
 import 
org.apache.hudi.table.action.compact.SparkScheduleCompactionActionExecutor;
 import 
org.apache.hudi.table.action.deltacommit.SparkBulkInsertDeltaCommitActionExecutor;
@@ -48,8 +49,7 @@ import 
org.apache.hudi.table.action.deltacommit.SparkInsertDeltaCommitActionExec
 import 
org.apache.hudi.table.action.deltacommit.SparkInsertPreppedDeltaCommitActionExecutor;
 import 
org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor;
 import 
org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
-import 
org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
-import 
org.apache.hudi.table.action.restore.SparkMergeOnReadRestoreActionExecutor;
+import org.apache.hudi.table.action.restore.MergeOnReadRestoreActionExecutor;
 import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
 
@@ -141,7 +141,7 @@ public class HoodieSparkMergeOnReadTable<T extends 
HoodieRecordPayload> extends
 
   @Override
   public void rollbackBootstrap(HoodieEngineContext context, String 
instantTime) {
-    new SparkMergeOnReadRestoreActionExecutor((HoodieSparkEngineContext) 
context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
+    new MergeOnReadRestoreActionExecutor(context, config, this, instantTime, 
HoodieTimeline.INIT_INSTANT_TS).execute();
   }
 
   @Override
@@ -161,7 +161,7 @@ public class HoodieSparkMergeOnReadTable<T extends 
HoodieRecordPayload> extends
 
   @Override
   public HoodieRestoreMetadata restore(HoodieEngineContext context, String 
restoreInstantTime, String instantToRestore) {
-    return new 
SparkMergeOnReadRestoreActionExecutor((HoodieSparkEngineContext) context, 
config, this, restoreInstantTime, instantToRestore).execute();
+    return new MergeOnReadRestoreActionExecutor(context, config, this, 
restoreInstantTime, instantToRestore).execute();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanActionExecutor.java
deleted file mode 100644
index ba2d42f..0000000
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanActionExecutor.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.clean;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hudi.avro.model.HoodieActionInstant;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.HoodieCleanStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.CleanFileInfo;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import scala.Tuple2;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class SparkCleanActionExecutor<T extends HoodieRecordPayload> extends
-    BaseCleanActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> {
-
-  private static final Logger LOG = 
LogManager.getLogger(SparkCleanActionExecutor.class);
-
-  public SparkCleanActionExecutor(HoodieSparkEngineContext context,
-                                  HoodieWriteConfig config,
-                                  HoodieTable<T, JavaRDD<HoodieRecord<T>>, 
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                                  String instantTime) {
-    super(context, config, table, instantTime);
-  }
-
-  private static PairFlatMapFunction<Iterator<Tuple2<String, CleanFileInfo>>, 
String, PartitionCleanStat>
-      deleteFilesFunc(HoodieTable table) {
-    return (PairFlatMapFunction<Iterator<Tuple2<String, CleanFileInfo>>, 
String, PartitionCleanStat>) iter -> {
-      Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
-      FileSystem fs = table.getMetaClient().getFs();
-      while (iter.hasNext()) {
-        Tuple2<String, CleanFileInfo> partitionDelFileTuple = iter.next();
-        String partitionPath = partitionDelFileTuple._1();
-        Path deletePath = new Path(partitionDelFileTuple._2().getFilePath());
-        String deletePathStr = deletePath.toString();
-        Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
-        if (!partitionCleanStatMap.containsKey(partitionPath)) {
-          partitionCleanStatMap.put(partitionPath, new 
PartitionCleanStat(partitionPath));
-        }
-        boolean isBootstrapBasePathFile = 
partitionDelFileTuple._2().isBootstrapBaseFile();
-        PartitionCleanStat partitionCleanStat = 
partitionCleanStatMap.get(partitionPath);
-        if (isBootstrapBasePathFile) {
-          // For Bootstrap Base file deletions, store the full file path.
-          partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), 
true);
-          partitionCleanStat.addDeletedFileResult(deletePath.toString(), 
deletedFileResult, true);
-        } else {
-          partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), 
false);
-          partitionCleanStat.addDeletedFileResult(deletePath.getName(), 
deletedFileResult, false);
-        }
-      }
-      return partitionCleanStatMap.entrySet().stream().map(e -> new 
Tuple2<>(e.getKey(), e.getValue()))
-          .collect(Collectors.toList()).iterator();
-    };
-  }
-
-  @Override
-  List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan 
cleanerPlan) {
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-    int cleanerParallelism = Math.min(
-        (int) 
(cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
-        config.getCleanerParallelism());
-    LOG.info("Using cleanerParallelism: " + cleanerParallelism);
-
-    context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of 
partitions");
-    List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
-        
.parallelize(cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
-            .flatMap(x -> x.getValue().stream().map(y -> new 
Tuple2<>(x.getKey(),
-                new CleanFileInfo(y.getFilePath(), 
y.getIsBootstrapBaseFile()))))
-            .collect(Collectors.toList()), cleanerParallelism)
-        .mapPartitionsToPair(deleteFilesFunc(table))
-        .reduceByKey(PartitionCleanStat::merge).collect();
-
-    Map<String, PartitionCleanStat> partitionCleanStatsMap = 
partitionCleanStats.stream()
-        .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
-
-    // Return PartitionCleanStat for each partition passed.
-    return 
cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath
 -> {
-      PartitionCleanStat partitionCleanStat = 
partitionCleanStatsMap.containsKey(partitionPath)
-          ? partitionCleanStatsMap.get(partitionPath)
-          : new PartitionCleanStat(partitionPath);
-      HoodieActionInstant actionInstant = 
cleanerPlan.getEarliestInstantToRetain();
-      return 
HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
-          .withEarliestCommitRetained(Option.ofNullable(
-              actionInstant != null
-                  ? new 
HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
-                  actionInstant.getAction(), actionInstant.getTimestamp())
-                  : null))
-          .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
-          .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
-          .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
-          
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
-          
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
-          
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
-          .build();
-    }).collect(Collectors.toList());
-  }
-}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanPlanActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanPlanActionExecutor.java
deleted file mode 100644
index f5529a8..0000000
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanPlanActionExecutor.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.clean;
-
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
-
-import java.util.Map;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class SparkCleanPlanActionExecutor<T extends HoodieRecordPayload> 
extends
-    BaseCleanPlanActionExecutor<T, JavaRDD<HoodieRecord<T>>, 
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
-
-  private static final Logger LOG = 
LogManager.getLogger(SparkCleanPlanActionExecutor.class);
-
-  public SparkCleanPlanActionExecutor(HoodieEngineContext context,
-                                      HoodieWriteConfig config,
-                                      HoodieTable<T, JavaRDD<HoodieRecord<T>>, 
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                                      String instantTime,
-                                      Option<Map<String, String>> 
extraMetadata) {
-    super(context, config, table, instantTime, extraMetadata);
-  }
-
-  @Override
-  protected Option<HoodieCleanerPlan> createCleanerPlan() {
-    return super.execute();
-  }
-
-}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
deleted file mode 100644
index 7d60b28..0000000
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.restore;
-
-import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
-
-import org.apache.spark.api.java.JavaRDD;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class SparkCopyOnWriteRestoreActionExecutor<T extends 
HoodieRecordPayload> extends
-    BaseRestoreActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> {
-
-  public SparkCopyOnWriteRestoreActionExecutor(HoodieSparkEngineContext 
context,
-                                               HoodieWriteConfig config,
-                                               HoodieTable table,
-                                               String instantTime,
-                                               String restoreInstantTime) {
-    super(context, config, table, instantTime, restoreInstantTime);
-  }
-
-  @Override
-  protected HoodieRollbackMetadata rollbackInstant(HoodieInstant 
instantToRollback) {
-    if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)
-        && 
!instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-      throw new HoodieRollbackException("Unsupported action in rollback 
instant:" + instantToRollback);
-    }
-    table.getMetaClient().reloadActiveTimeline();
-    String instantTime = HoodieActiveTimeline.createNewInstantTime();
-    table.scheduleRollback(context, instantTime, instantToRollback, false);
-    table.getMetaClient().reloadActiveTimeline();
-    CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new 
CopyOnWriteRollbackActionExecutor(
-        (HoodieSparkEngineContext) context,
-        config,
-        table,
-        instantTime,
-        instantToRollback,
-        true,
-        true,
-        false);
-    return rollbackActionExecutor.execute();
-  }
-}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 10c7ced..fde34b6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -22,10 +22,13 @@ import 
org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
@@ -61,6 +64,10 @@ public abstract class HoodieEngineContext {
   public abstract <I, K, V> List<V> mapToPairAndReduceByKey(
       List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, 
SerializableBiFunction<V, V, V> reduceFunc, int parallelism);
 
+  public abstract <I, K, V> Stream<ImmutablePair<K, V>> 
mapPartitionsToPairAndReduceByKey(
+      Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> 
flatMapToPairFunc,
+      SerializableBiFunction<V, V, V> reduceFunc, int parallelism);
+
   public abstract <I, K, V> List<V> reduceByKey(
       List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int 
parallelism);
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index 1c935ff..ca032e7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -19,15 +19,19 @@
 package org.apache.hudi.common.engine;
 
 import org.apache.hadoop.conf.Configuration;
+
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.Option;
 
+import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -35,6 +39,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.toList;
+import static 
org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapToPairWrapper;
 import static 
org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper;
 import static 
org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
 import static 
org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
@@ -69,6 +74,17 @@ public final class HoodieLocalEngineContext extends 
HoodieEngineContext {
   }
 
   @Override
+  public <I, K, V> Stream<ImmutablePair<K, V>> 
mapPartitionsToPairAndReduceByKey(
+      Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> 
flatMapToPairFunc,
+      SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+    return 
throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
+        .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
+        .map(entry -> new ImmutablePair<>(entry.getKey(), 
entry.getValue().stream().map(
+            
Pair::getValue).reduce(throwingReduceWrapper(reduceFunc)).orElse(null)))
+        .filter(Objects::nonNull);
+  }
+
+  @Override
   public <I, K, V> List<V> reduceByKey(
       List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int 
parallelism) {
     return data.stream().parallel()
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java
index b729e48..40e1a9d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java
@@ -72,6 +72,17 @@ public class FunctionWrapper {
     };
   }
 
+  public static <I, K, V> Function<I, Stream<Pair<K, V>>> 
throwingFlatMapToPairWrapper(
+      SerializablePairFlatMapFunction<I, K, V> throwingPairFlatMapFunction) {
+    return v1 -> {
+      try {
+        return throwingPairFlatMapFunction.call(v1);
+      } catch (Exception e) {
+        throw new HoodieException("Error occurs when executing mapToPair", e);
+      }
+    };
+  }
+
   public static <V> BinaryOperator<V> 
throwingReduceWrapper(SerializableBiFunction<V, V, V> throwingReduceFunction) {
     return (v1, v2) -> {
       try {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/function/SerializablePairFlatMapFunction.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/function/SerializablePairFlatMapFunction.java
new file mode 100644
index 0000000..4cc34ce
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/function/SerializablePairFlatMapFunction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.function;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.Serializable;
+import java.util.stream.Stream;
+
+/**
+ * A function that returns a stream of key-value pairs (Pair&lt;K, V&gt;).
+ */
+@FunctionalInterface
+public interface SerializablePairFlatMapFunction<I, K, V> extends Serializable 
{
+  Stream<Pair<K, V>> call(I t) throws Exception;
+}

Reply via email to