nsivabalan commented on code in PR #6580:
URL: https://github.com/apache/hudi/pull/6580#discussion_r972428841
##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##
@@ -110,9 +112,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext
context) {
context.setJobStatus(this.getClass().getSimpleName(), "Generating list
of file slices to be cleaned: " + config.getTableName());
Map>>
cleanOpsWithPartitionMeta = context
- .map(partitionsToClean, partitionPathToClean ->
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)),
cleanerParallelism)
+ .parallelize(partitionsToClean, cleanerParallelism)
+ .mapPartitions((Iterator it) -> {
+List list = new ArrayList<>();
Review Comment:
minor: `list` -> `partitionList`
##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##
@@ -110,9 +112,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext
context) {
context.setJobStatus(this.getClass().getSimpleName(), "Generating list
of file slices to be cleaned: " + config.getTableName());
Map>>
cleanOpsWithPartitionMeta = context
- .map(partitionsToClean, partitionPathToClean ->
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)),
cleanerParallelism)
+ .parallelize(partitionsToClean, cleanerParallelism)
+ .mapPartitions((Iterator it) -> {
+List list = new ArrayList<>();
+it.forEachRemaining(list::add);
+Map>> res =
planner.getDeletePaths(list);
Review Comment:
cleanResult
##
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##
@@ -735,6 +735,34 @@ public final Stream
getAllFileGroups(String partitionStr) {
return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg ->
!isFileGroupReplaced(fg));
}
+ @Override
+ public final Stream>>
getAllFileGroups(List partitionStr) {
+return getAllFileGroupsIncludingReplaced(partitionStr)
+.map(pair -> Pair.of(pair.getLeft(),
pair.getRight().stream().filter(fg ->
!isFileGroupReplaced(fg)).collect(Collectors.toList(;
+ }
+
+ private Stream>>
getAllFileGroupsIncludingReplaced(final List partitionStrList) {
+try {
Review Comment:
shouldn't we be looking to call the exiting method here.
```
getAllFileGroupsIncludingReplaced(final String partitionStr)
```
and then union the outputs for multiple partition paths.
##
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##
@@ -735,6 +735,34 @@ public final Stream
getAllFileGroups(String partitionStr) {
return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg ->
!isFileGroupReplaced(fg));
}
+ @Override
+ public final Stream>>
getAllFileGroups(List partitionStr) {
+return getAllFileGroupsIncludingReplaced(partitionStr)
Review Comment:
same here. lets try to see if we can re-use methods and avoid code dedup.
##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##
@@ -110,9 +112,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext
context) {
context.setJobStatus(this.getClass().getSimpleName(), "Generating list
of file slices to be cleaned: " + config.getTableName());
Map>>
cleanOpsWithPartitionMeta = context
- .map(partitionsToClean, partitionPathToClean ->
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)),
cleanerParallelism)
+ .parallelize(partitionsToClean, cleanerParallelism)
+ .mapPartitions((Iterator it) -> {
+List list = new ArrayList<>();
+it.forEachRemaining(list::add);
+Map>> res =
planner.getDeletePaths(list);
+return res.entrySet().iterator();
+ }, false).collectAsList()
.stream()
- .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+ .collect(Collectors.toMap(it -> it.getKey(), it -> it.getValue()));
Review Comment:
why this change ? we can leave it as Pair::getKey and Pair::getValue .
##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##
@@ -233,43 +235,47 @@ private Pair>
getFilesToCleanKeepingLatestVersions(
// In this scenario, we will assume that once replaced a file group
automatically becomes eligible for cleaning completely
// In other words, the file versions only apply to the active file groups.
-deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles,
partitionPath, Option.empty()));
-boolean toDeletePartition = false;
-List fileGroups =
fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toLi