This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 46808dc [HUDI-2497] Refactor clean and restore actions in hudi-client module (#3734) 46808dc is described below commit 46808dcb1fe22491326a9e831dd4dde4c70796fb Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Thu Sep 30 15:20:25 2021 -0700 [HUDI-2497] Refactor clean and restore actions in hudi-client module (#3734) --- ...ctionExecutor.java => CleanActionExecutor.java} | 94 ++++++++++++++- ...nExecutor.java => CleanPlanActionExecutor.java} | 16 +-- .../restore/CopyOnWriteRestoreActionExecutor.java} | 48 ++++---- .../restore/MergeOnReadRestoreActionExecutor.java} | 33 ++--- .../client/common/HoodieFlinkEngineContext.java | 15 +++ .../hudi/table/HoodieFlinkCopyOnWriteTable.java | 10 +- .../action/clean/FlinkCleanActionExecutor.java | 128 -------------------- .../clean/FlinkScheduleCleanActionExecutor.java | 52 -------- .../client/common/HoodieJavaEngineContext.java | 15 +++ .../hudi/table/HoodieJavaCopyOnWriteTable.java | 14 +-- .../action/clean/JavaCleanActionExecutor.java | 130 -------------------- .../clean/JavaScheduleCleanActionExecutor.java | 52 -------- .../client/common/HoodieSparkEngineContext.java | 19 +++ .../hudi/table/HoodieSparkCopyOnWriteTable.java | 18 +-- .../hudi/table/HoodieSparkMergeOnReadTable.java | 8 +- .../action/clean/SparkCleanActionExecutor.java | 134 --------------------- .../action/clean/SparkCleanPlanActionExecutor.java | 55 --------- .../SparkCopyOnWriteRestoreActionExecutor.java | 70 ----------- .../hudi/common/engine/HoodieEngineContext.java | 7 ++ .../common/engine/HoodieLocalEngineContext.java | 16 +++ .../hudi/common/function/FunctionWrapper.java | 11 ++ .../function/SerializablePairFlatMapFunction.java | 33 +++++ 22 files changed, 273 insertions(+), 705 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java similarity index 53% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index acc3cdc..a5a72d4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -20,10 +20,13 @@ package org.apache.hudi.table.action.clean; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + +import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.CleanFileInfo; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; @@ -31,29 +34,36 @@ import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.BaseActionExecutor; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; -public abstract class BaseCleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, HoodieCleanMetadata> { +public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, HoodieCleanMetadata> { private static final long serialVersionUID = 1L; - private static final Logger LOG = LogManager.getLogger(BaseCleanActionExecutor.class); + private static final Logger LOG = LogManager.getLogger(CleanActionExecutor.class); - public BaseCleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) { + public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) { super(context, config, table, instantTime); } - protected static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException { + static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException { Path deletePath = new Path(deletePathStr); LOG.debug("Working on delete path :" + deletePath); try { @@ -68,13 +78,85 @@ public abstract class BaseCleanActionExecutor<T extends HoodieRecordPayload, I, } } + static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<Pair<String, CleanFileInfo>> cleanFileInfo, HoodieTable table) { + Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>(); + FileSystem fs = table.getMetaClient().getFs(); + + cleanFileInfo.forEachRemaining(partitionDelFileTuple -> { + String partitionPath = partitionDelFileTuple.getLeft(); + Path deletePath = new Path(partitionDelFileTuple.getRight().getFilePath()); + String deletePathStr = deletePath.toString(); + Boolean deletedFileResult = null; + try { + deletedFileResult = deleteFileAndGetResult(fs, deletePathStr); + + } catch (IOException e) { + LOG.error("Delete file failed: " + deletePathStr); + } + final PartitionCleanStat partitionCleanStat = + partitionCleanStatMap.computeIfAbsent(partitionPath, k -> new PartitionCleanStat(partitionPath)); + boolean isBootstrapBasePathFile = partitionDelFileTuple.getRight().isBootstrapBaseFile(); + + if (isBootstrapBasePathFile) { + // For Bootstrap Base file deletions, store the full file path. + partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true); + partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true); + } else { + partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false); + partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false); + } + }); + return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())); + } + /** * Performs cleaning of partition paths according to cleaning policy and returns the number of files cleaned. Handles * skews in partitions to clean by making files to clean as the unit of task distribution. * * @throws IllegalArgumentException if unknown cleaning policy is provided */ - abstract List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan); + List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) { + int cleanerParallelism = Math.min( + (int) (cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()), + config.getCleanerParallelism()); + LOG.info("Using cleanerParallelism: " + cleanerParallelism); + + context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions"); + + Stream<Pair<String, CleanFileInfo>> filesToBeDeletedPerPartition = + cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream() + .flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(), + new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))); + + Stream<ImmutablePair<String, PartitionCleanStat>> partitionCleanStats = + context.mapPartitionsToPairAndReduceByKey(filesToBeDeletedPerPartition, + iterator -> deleteFilesFunc(iterator, table), PartitionCleanStat::merge, cleanerParallelism); + + Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + // Return PartitionCleanStat for each partition passed. + return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> { + PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath) + ? partitionCleanStatsMap.get(partitionPath) + : new PartitionCleanStat(partitionPath); + HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain(); + return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath) + .withEarliestCommitRetained(Option.ofNullable( + actionInstant != null + ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()), + actionInstant.getAction(), actionInstant.getTimestamp()) + : null)) + .withDeletePathPattern(partitionCleanStat.deletePathPatterns()) + .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles()) + .withFailedDeletes(partitionCleanStat.failedDeleteFiles()) + .withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns()) + .withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles()) + .withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles()) + .build(); + }).collect(Collectors.toList()); + } + /** * Executes the Cleaner plan stored in the instant metadata. @@ -143,7 +225,7 @@ public abstract class BaseCleanActionExecutor<T extends HoodieRecordPayload, I, } // return the last clean metadata for now // TODO (NA) : Clean only the earliest pending clean just like how we do for other table services - // This requires the BaseCleanActionExecutor to be refactored as BaseCommitActionExecutor + // This requires the CleanActionExecutor to be refactored as BaseCommitActionExecutor return cleanMetadataList.size() > 0 ? cleanMetadataList.get(cleanMetadataList.size() - 1) : null; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java similarity index 90% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanPlanActionExecutor.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index fc0c000..9b95bd7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -43,22 +43,24 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -public abstract class BaseCleanPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCleanerPlan>> { +public class CleanPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCleanerPlan>> { private static final Logger LOG = LogManager.getLogger(CleanPlanner.class); private final Option<Map<String, String>> extraMetadata; - public BaseCleanPlanActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable<T, I, K, O> table, - String instantTime, - Option<Map<String, String>> extraMetadata) { + public CleanPlanActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable<T, I, K, O> table, + String instantTime, + Option<Map<String, String>> extraMetadata) { super(context, config, table, instantTime); this.extraMetadata = extraMetadata; } - protected abstract Option<HoodieCleanerPlan> createCleanerPlan(); + protected Option<HoodieCleanerPlan> createCleanerPlan() { + return execute(); + } /** * Generates List of files to be cleaned. diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java similarity index 59% rename from hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java index f7677ae..2e3b148 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java @@ -7,22 +7,20 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.hudi.table.action.restore; import org.apache.hudi.avro.model.HoodieRollbackMetadata; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieJavaEngineContext; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -32,35 +30,35 @@ import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; -import java.util.List; - -public class JavaCopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload> extends - BaseRestoreActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> { - - public JavaCopyOnWriteRestoreActionExecutor(HoodieJavaEngineContext context, - HoodieWriteConfig config, - HoodieTable table, - String instantTime, - String restoreInstantTime) { +public class CopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload, I, K, O> + extends BaseRestoreActionExecutor<T, I, K, O> { + public CopyOnWriteRestoreActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + String restoreInstantTime) { super(context, config, table, instantTime, restoreInstantTime); } @Override protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) { + if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION) + && !instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + throw new HoodieRollbackException("Unsupported action in rollback instant:" + instantToRollback); + } + table.getMetaClient().reloadActiveTimeline(); + String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); + table.scheduleRollback(context, newInstantTime, instantToRollback, false); table.getMetaClient().reloadActiveTimeline(); CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor( context, config, table, - HoodieActiveTimeline.createNewInstantTime(), + newInstantTime, instantToRollback, true, true, false); - if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION) - && !instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { - throw new HoodieRollbackException("Unsupported action in rollback instant:" + instantToRollback); - } return rollbackActionExecutor.execute(); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java similarity index 67% rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java index 14a0b24..58663b6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java @@ -7,22 +7,20 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.hudi.table.action.restore; import org.apache.hudi.avro.model.HoodieRollbackMetadata; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -31,17 +29,10 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor; -import org.apache.spark.api.java.JavaRDD; - -@SuppressWarnings("checkstyle:LineLength") -public class SparkMergeOnReadRestoreActionExecutor<T extends HoodieRecordPayload> extends - BaseRestoreActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { - - public SparkMergeOnReadRestoreActionExecutor(HoodieSparkEngineContext context, - HoodieWriteConfig config, - HoodieTable table, - String instantTime, - String restoreInstantTime) { +public class MergeOnReadRestoreActionExecutor<T extends HoodieRecordPayload, I, K, O> + extends BaseRestoreActionExecutor<T, I, K, O> { + public MergeOnReadRestoreActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, + String instantTime, String restoreInstantTime) { super(context, config, table, instantTime, restoreInstantTime); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index 174122c..687ecc1 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -26,11 +26,13 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; +import org.apache.hudi.common.function.SerializablePairFlatMapFunction; import org.apache.hudi.common.function.SerializablePairFunction; import org.apache.hudi.common.util.Option; import org.apache.flink.api.common.functions.RuntimeContext; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -38,9 +40,11 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.util.FlinkClientUtil; +import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapToPairWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper; @@ -87,6 +91,17 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext { } @Override + public <I, K, V> Stream<ImmutablePair<K, V>> mapPartitionsToPairAndReduceByKey( + Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc, + SerializableBiFunction<V, V, V> reduceFunc, int parallelism) { + return throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator()) + .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream() + .map(entry -> new ImmutablePair<>(entry.getKey(), entry.getValue().stream().map( + Pair::getValue).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))) + .filter(Objects::nonNull); + } + + @Override public <I, K, V> List<V> reduceByKey( List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) { return data.stream().parallel() diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 93785b9..2238ac3 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -44,8 +44,8 @@ import org.apache.hudi.io.HoodieSortedMergeHandle; import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; -import org.apache.hudi.table.action.clean.FlinkCleanActionExecutor; -import org.apache.hudi.table.action.clean.FlinkScheduleCleanActionExecutor; +import org.apache.hudi.table.action.clean.CleanActionExecutor; +import org.apache.hudi.table.action.clean.CleanPlanActionExecutor; import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor; @@ -297,7 +297,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends */ @Override public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) { - return new FlinkScheduleCleanActionExecutor(context, config, this, instantTime, extraMetadata).execute(); + return new CleanPlanActionExecutor(context, config, this, instantTime, extraMetadata).execute(); } @Override @@ -308,7 +308,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends @Override public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) { - return new FlinkCleanActionExecutor(context, config, this, cleanInstantTime).execute(); + return new CleanActionExecutor(context, config, this, cleanInstantTime).execute(); } @Override @@ -329,7 +329,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends // ------------------------------------------------------------------------- // Used for compaction // ------------------------------------------------------------------------- - + public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId, Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException { // these are updates diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkCleanActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkCleanActionExecutor.java deleted file mode 100644 index 9378cb2..0000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkCleanActionExecutor.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.clean; - -import org.apache.hudi.avro.model.HoodieActionInstant; -import org.apache.hudi.avro.model.HoodieCleanerPlan; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.HoodieCleanStat; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.CleanFileInfo; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieTable; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import scala.Tuple2; - -public class FlinkCleanActionExecutor<T extends HoodieRecordPayload> extends - BaseCleanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> { - - private static final Logger LOG = LogManager.getLogger(FlinkCleanActionExecutor.class); - - public FlinkCleanActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, - String instantTime) { - super(context, config, table, instantTime); - } - - @Override - List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) { - Stream<Tuple2<String, CleanFileInfo>> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream() - .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))); - - Stream<Tuple2<String, PartitionCleanStat>> partitionCleanStats = - deleteFilesFunc(filesToBeDeletedPerPartition, table) - .collect(Collectors.groupingBy(Pair::getLeft)) - .entrySet().stream() - .map(x -> new Tuple2(x.getKey(), x.getValue().stream().map(y -> y.getRight()).reduce(PartitionCleanStat::merge).get())); - - Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats - .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); - - // Return PartitionCleanStat for each partition passed. - return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> { - PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath) - ? partitionCleanStatsMap.get(partitionPath) - : new PartitionCleanStat(partitionPath); - HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain(); - return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath) - .withEarliestCommitRetained(Option.ofNullable( - actionInstant != null - ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()), - actionInstant.getAction(), actionInstant.getTimestamp()) - : null)) - .withDeletePathPattern(partitionCleanStat.deletePathPatterns()) - .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles()) - .withFailedDeletes(partitionCleanStat.failedDeleteFiles()) - .withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns()) - .withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles()) - .withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles()) - .build(); - }).collect(Collectors.toList()); - } - - private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Stream<Tuple2<String, CleanFileInfo>> cleanFileInfo, HoodieTable table) { - Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>(); - FileSystem fs = table.getMetaClient().getFs(); - - cleanFileInfo.parallel().forEach(partitionDelFileTuple -> { - String partitionPath = partitionDelFileTuple._1(); - Path deletePath = new Path(partitionDelFileTuple._2().getFilePath()); - String deletePathStr = deletePath.toString(); - Boolean deletedFileResult = null; - try { - deletedFileResult = deleteFileAndGetResult(fs, deletePathStr); - } catch (IOException e) { - LOG.error("Delete file failed"); - } - final PartitionCleanStat partitionCleanStat; - synchronized (partitionCleanStatMap) { - partitionCleanStat = partitionCleanStatMap.computeIfAbsent(partitionPath, k -> new PartitionCleanStat(partitionPath)); - } - boolean isBootstrapBasePathFile = partitionDelFileTuple._2().isBootstrapBaseFile(); - if (isBootstrapBasePathFile) { - // For Bootstrap Base file deletions, store the full file path. - partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true); - partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true); - } else { - partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false); - partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false); - } - }); - return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())); - } -} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkScheduleCleanActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkScheduleCleanActionExecutor.java deleted file mode 100644 index 75da54e..0000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkScheduleCleanActionExecutor.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.clean; - -import java.util.List; -import java.util.Map; -import org.apache.hudi.avro.model.HoodieCleanerPlan; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieTable; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -public class FlinkScheduleCleanActionExecutor<T extends HoodieRecordPayload> extends - BaseCleanPlanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> { - - private static final Logger LOG = LogManager.getLogger(FlinkScheduleCleanActionExecutor.class); - - public FlinkScheduleCleanActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, - String instantTime, - Option<Map<String, String>> extraMetadata) { - super(context, config, table, instantTime, extraMetadata); - } - - @Override - protected Option<HoodieCleanerPlan> createCleanerPlan() { - return super.execute(); - } -} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java index 4cdbff2..bdc2a85 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java @@ -19,6 +19,7 @@ package org.apache.hudi.client.common; import org.apache.hadoop.conf.Configuration; + import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -26,11 +27,14 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; +import org.apache.hudi.common.function.SerializablePairFlatMapFunction; import org.apache.hudi.common.function.SerializablePairFunction; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -38,6 +42,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; +import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapToPairWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper; @@ -71,6 +76,16 @@ public class HoodieJavaEngineContext extends HoodieEngineContext { } @Override + public <I, K, V> Stream<ImmutablePair<K, V>> mapPartitionsToPairAndReduceByKey(Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc, + SerializableBiFunction<V, V, V> reduceFunc, int parallelism) { + return throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator()) + .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream() + .map(entry -> new ImmutablePair<>(entry.getKey(), entry.getValue().stream().map( + Pair::getValue).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))) + .filter(Objects::nonNull); + } + + @Override public <I, K, V> List<V> reduceByKey( List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) { return data.stream().parallel() diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 72d63d5..99cf413 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -39,8 +39,8 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; -import org.apache.hudi.table.action.clean.JavaCleanActionExecutor; -import org.apache.hudi.table.action.clean.JavaScheduleCleanActionExecutor; +import org.apache.hudi.table.action.clean.CleanActionExecutor; +import org.apache.hudi.table.action.clean.CleanPlanActionExecutor; import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaBulkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor; @@ -50,7 +50,7 @@ import org.apache.hudi.table.action.commit.JavaInsertOverwriteTableCommitActionE import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor; -import org.apache.hudi.table.action.restore.JavaCopyOnWriteRestoreActionExecutor; +import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor; import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; @@ -187,13 +187,13 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H @Override public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) { - return new JavaScheduleCleanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute(); + return new CleanPlanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute(); } @Override public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) { - return new JavaCleanActionExecutor(context, config, this, cleanInstantTime).execute(); + return new CleanActionExecutor(context, config, this, cleanInstantTime).execute(); } @Override @@ -218,7 +218,7 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) { - return new JavaCopyOnWriteRestoreActionExecutor((HoodieJavaEngineContext) context, - config, this, restoreInstantTime, instantToRestore).execute(); + return new CopyOnWriteRestoreActionExecutor( + context, config, this, restoreInstantTime, instantToRestore).execute(); } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java deleted file mode 100644 index 0ca73d4..0000000 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.clean; - -import org.apache.hudi.avro.model.HoodieActionInstant; -import org.apache.hudi.avro.model.HoodieCleanerPlan; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.HoodieCleanStat; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.CleanFileInfo; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.ImmutablePair; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieTable; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -public class JavaCleanActionExecutor<T extends HoodieRecordPayload> extends - BaseCleanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> { - - private static final Logger LOG = LogManager.getLogger(JavaCleanActionExecutor.class); - - public JavaCleanActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, - String instantTime) { - super(context, config, table, instantTime); - } - - @Override - List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) { - - Iterator<ImmutablePair<String, CleanFileInfo>> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream() - .flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))).iterator(); - - Stream<Pair<String, PartitionCleanStat>> partitionCleanStats = - deleteFilesFunc(filesToBeDeletedPerPartition, table) - .collect(Collectors.groupingBy(Pair::getLeft)) - .entrySet().stream() - .map(x -> new ImmutablePair(x.getKey(), x.getValue().stream().map(y -> y.getRight()).reduce(PartitionCleanStat::merge).get())); - - Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats - .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); - - // Return PartitionCleanStat for each partition passed. - return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> { - PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath) - ? partitionCleanStatsMap.get(partitionPath) - : new PartitionCleanStat(partitionPath); - HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain(); - return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath) - .withEarliestCommitRetained(Option.ofNullable( - actionInstant != null - ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()), - actionInstant.getAction(), actionInstant.getTimestamp()) - : null)) - .withDeletePathPattern(partitionCleanStat.deletePathPatterns()) - .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles()) - .withFailedDeletes(partitionCleanStat.failedDeleteFiles()) - .withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns()) - .withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles()) - .withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles()) - .build(); - }).collect(Collectors.toList()); - } - - private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<ImmutablePair<String, CleanFileInfo>> iter, HoodieTable table) { - Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>(); - FileSystem fs = table.getMetaClient().getFs(); - - while (iter.hasNext()) { - Pair<String, CleanFileInfo> partitionDelFileTuple = iter.next(); - String partitionPath = partitionDelFileTuple.getLeft(); - Path deletePath = new Path(partitionDelFileTuple.getRight().getFilePath()); - String deletePathStr = deletePath.toString(); - Boolean deletedFileResult = null; - try { - deletedFileResult = deleteFileAndGetResult(fs, deletePathStr); - } catch (IOException e) { - LOG.error("Delete file failed"); - } - if (!partitionCleanStatMap.containsKey(partitionPath)) { - partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath)); - } - boolean isBootstrapBasePathFile = partitionDelFileTuple.getRight().isBootstrapBaseFile(); - PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath); - if (isBootstrapBasePathFile) { - // For Bootstrap Base file deletions, store the full file path. - partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true); - partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true); - } else { - partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false); - partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false); - } - } - return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())); - } -} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaScheduleCleanActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaScheduleCleanActionExecutor.java deleted file mode 100644 index 05d19a6..0000000 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaScheduleCleanActionExecutor.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.clean; - -import java.util.List; -import java.util.Map; -import org.apache.hudi.avro.model.HoodieCleanerPlan; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieTable; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -public class JavaScheduleCleanActionExecutor<T extends HoodieRecordPayload> extends - BaseCleanPlanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> { - - private static final Logger LOG = LogManager.getLogger(JavaScheduleCleanActionExecutor.class); - - public JavaScheduleCleanActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, - String instantTime, - Option<Map<String, String>> extraMetadata) { - super(context, config, table, instantTime, extraMetadata); - } - - @Override - protected Option<HoodieCleanerPlan> createCleanerPlan() { - return super.execute(); - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java index de06ea4..416992e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java @@ -25,18 +25,23 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; +import org.apache.hudi.common.function.SerializablePairFlatMapFunction; import org.apache.hudi.common.function.SerializablePairFunction; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.sql.SQLContext; import scala.Tuple2; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -83,6 +88,20 @@ public class HoodieSparkEngineContext extends HoodieEngineContext { } @Override + public <I, K, V> Stream<ImmutablePair<K, V>> mapPartitionsToPairAndReduceByKey( + Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc, + SerializableBiFunction<V, V, V> reduceFunc, int parallelism) { + return javaSparkContext.parallelize(data.collect(Collectors.toList()), parallelism) + .mapPartitionsToPair((PairFlatMapFunction<Iterator<I>, K, V>) iterator -> + flatMapToPairFunc.call(iterator).collect(Collectors.toList()).stream() + .map(e -> new Tuple2<>(e.getKey(), e.getValue())).iterator() + ) + .reduceByKey(reduceFunc::apply) + .map(e -> new ImmutablePair<>(e._1, e._2)) + .collect().stream(); + } + + @Override public <I, K, V> List<V> reduceByKey( List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) { return javaSparkContext.parallelize(data, parallelism).mapToPair(pair -> new Tuple2<K, V>(pair.getLeft(), pair.getRight())) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index c2770a7..a9b36a8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -50,8 +50,8 @@ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; import org.apache.hudi.table.action.bootstrap.SparkBootstrapCommitActionExecutor; -import org.apache.hudi.table.action.clean.SparkCleanActionExecutor; -import org.apache.hudi.table.action.clean.SparkCleanPlanActionExecutor; +import org.apache.hudi.table.action.clean.CleanActionExecutor; +import org.apache.hudi.table.action.clean.CleanPlanActionExecutor; import org.apache.hudi.table.action.cluster.SparkClusteringPlanActionExecutor; import org.apache.hudi.table.action.cluster.SparkExecuteClusteringCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor; @@ -65,7 +65,7 @@ import org.apache.hudi.table.action.commit.SparkInsertPreppedCommitActionExecuto import org.apache.hudi.table.action.commit.SparkMergeHelper; import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor; -import org.apache.hudi.table.action.restore.SparkCopyOnWriteRestoreActionExecutor; +import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor; import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; @@ -181,12 +181,12 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends @Override public void rollbackBootstrap(HoodieEngineContext context, String instantTime) { - new SparkCopyOnWriteRestoreActionExecutor((HoodieSparkEngineContext) context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute(); + new CopyOnWriteRestoreActionExecutor(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute(); } @Override public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) { - return new SparkCleanPlanActionExecutor<>(context, config,this, instantTime, extraMetadata).execute(); + return new CleanPlanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute(); } @Override @@ -197,7 +197,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends } public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId, - Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException { + Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException { // these are updates HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, keyToNewRecords, oldDataFile); return handleUpdateInternal(upsertHandle, instantTime, fileId); @@ -242,7 +242,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends } public Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId, - Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) { + Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) { HoodieCreateHandle<?,?,?,?> createHandle = new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier); createHandle.write(); @@ -251,7 +251,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends @Override public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) { - return new SparkCleanActionExecutor((HoodieSparkEngineContext)context, config, this, cleanInstantTime).execute(); + return new CleanActionExecutor(context, config, this, cleanInstantTime).execute(); } @Override @@ -266,7 +266,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends @Override public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) { - return new SparkCopyOnWriteRestoreActionExecutor((HoodieSparkEngineContext) context, config, this, restoreInstantTime, instantToRestore).execute(); + return new CopyOnWriteRestoreActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute(); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index ee66d7b..b4b106c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -39,6 +39,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.bootstrap.SparkBootstrapDeltaCommitActionExecutor; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; +import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor; import org.apache.hudi.table.action.compact.SparkRunCompactionActionExecutor; import org.apache.hudi.table.action.compact.SparkScheduleCompactionActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkBulkInsertDeltaCommitActionExecutor; @@ -48,8 +49,7 @@ import org.apache.hudi.table.action.deltacommit.SparkInsertDeltaCommitActionExec import org.apache.hudi.table.action.deltacommit.SparkInsertPreppedDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor; -import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor; -import org.apache.hudi.table.action.restore.SparkMergeOnReadRestoreActionExecutor; +import org.apache.hudi.table.action.restore.MergeOnReadRestoreActionExecutor; import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor; @@ -141,7 +141,7 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends @Override public void rollbackBootstrap(HoodieEngineContext context, String instantTime) { - new SparkMergeOnReadRestoreActionExecutor((HoodieSparkEngineContext) context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute(); + new MergeOnReadRestoreActionExecutor(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute(); } @Override @@ -161,7 +161,7 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends @Override public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) { - return new SparkMergeOnReadRestoreActionExecutor((HoodieSparkEngineContext) context, config, this, restoreInstantTime, instantToRestore).execute(); + return new MergeOnReadRestoreActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanActionExecutor.java deleted file mode 100644 index ba2d42f..0000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanActionExecutor.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.clean; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hudi.avro.model.HoodieActionInstant; -import org.apache.hudi.avro.model.HoodieCleanerPlan; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.HoodieCleanStat; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.CleanFileInfo; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieTable; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import scala.Tuple2; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -@SuppressWarnings("checkstyle:LineLength") -public class SparkCleanActionExecutor<T extends HoodieRecordPayload> extends - BaseCleanActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { - - private static final Logger LOG = LogManager.getLogger(SparkCleanActionExecutor.class); - - public SparkCleanActionExecutor(HoodieSparkEngineContext context, - HoodieWriteConfig config, - HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, - String instantTime) { - super(context, config, table, instantTime); - } - - private static PairFlatMapFunction<Iterator<Tuple2<String, CleanFileInfo>>, String, PartitionCleanStat> - deleteFilesFunc(HoodieTable table) { - return (PairFlatMapFunction<Iterator<Tuple2<String, CleanFileInfo>>, String, PartitionCleanStat>) iter -> { - Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>(); - FileSystem fs = table.getMetaClient().getFs(); - while (iter.hasNext()) { - Tuple2<String, CleanFileInfo> partitionDelFileTuple = iter.next(); - String partitionPath = partitionDelFileTuple._1(); - Path deletePath = new Path(partitionDelFileTuple._2().getFilePath()); - String deletePathStr = deletePath.toString(); - Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr); - if (!partitionCleanStatMap.containsKey(partitionPath)) { - partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath)); - } - boolean isBootstrapBasePathFile = partitionDelFileTuple._2().isBootstrapBaseFile(); - PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath); - if (isBootstrapBasePathFile) { - // For Bootstrap Base file deletions, store the full file path. - partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true); - partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true); - } else { - partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false); - partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false); - } - } - return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue())) - .collect(Collectors.toList()).iterator(); - }; - } - - @Override - List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) { - JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); - int cleanerParallelism = Math.min( - (int) (cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()), - config.getCleanerParallelism()); - LOG.info("Using cleanerParallelism: " + cleanerParallelism); - - context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions"); - List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc - .parallelize(cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream() - .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), - new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))) - .collect(Collectors.toList()), cleanerParallelism) - .mapPartitionsToPair(deleteFilesFunc(table)) - .reduceByKey(PartitionCleanStat::merge).collect(); - - Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats.stream() - .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); - - // Return PartitionCleanStat for each partition passed. - return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> { - PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath) - ? partitionCleanStatsMap.get(partitionPath) - : new PartitionCleanStat(partitionPath); - HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain(); - return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath) - .withEarliestCommitRetained(Option.ofNullable( - actionInstant != null - ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()), - actionInstant.getAction(), actionInstant.getTimestamp()) - : null)) - .withDeletePathPattern(partitionCleanStat.deletePathPatterns()) - .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles()) - .withFailedDeletes(partitionCleanStat.failedDeleteFiles()) - .withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns()) - .withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles()) - .withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles()) - .build(); - }).collect(Collectors.toList()); - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanPlanActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanPlanActionExecutor.java deleted file mode 100644 index f5529a8..0000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanPlanActionExecutor.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.clean; - -import org.apache.hudi.avro.model.HoodieCleanerPlan; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieTable; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaRDD; - -import java.util.Map; - -@SuppressWarnings("checkstyle:LineLength") -public class SparkCleanPlanActionExecutor<T extends HoodieRecordPayload> extends - BaseCleanPlanActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { - - private static final Logger LOG = LogManager.getLogger(SparkCleanPlanActionExecutor.class); - - public SparkCleanPlanActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, - String instantTime, - Option<Map<String, String>> extraMetadata) { - super(context, config, table, instantTime, extraMetadata); - } - - @Override - protected Option<HoodieCleanerPlan> createCleanerPlan() { - return super.execute(); - } - -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java deleted file mode 100644 index 7d60b28..0000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.restore; - -import org.apache.hudi.avro.model.HoodieRollbackMetadata; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieRollbackException; -import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; - -import org.apache.spark.api.java.JavaRDD; - -@SuppressWarnings("checkstyle:LineLength") -public class SparkCopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload> extends - BaseRestoreActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { - - public SparkCopyOnWriteRestoreActionExecutor(HoodieSparkEngineContext context, - HoodieWriteConfig config, - HoodieTable table, - String instantTime, - String restoreInstantTime) { - super(context, config, table, instantTime, restoreInstantTime); - } - - @Override - protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) { - if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION) - && !instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { - throw new HoodieRollbackException("Unsupported action in rollback instant:" + instantToRollback); - } - table.getMetaClient().reloadActiveTimeline(); - String instantTime = HoodieActiveTimeline.createNewInstantTime(); - table.scheduleRollback(context, instantTime, instantToRollback, false); - table.getMetaClient().reloadActiveTimeline(); - CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor( - (HoodieSparkEngineContext) context, - config, - table, - instantTime, - instantToRollback, - true, - true, - false); - return rollbackActionExecutor.execute(); - } -} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java index 10c7ced..fde34b6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java @@ -22,10 +22,13 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; +import org.apache.hudi.common.function.SerializablePairFlatMapFunction; import org.apache.hudi.common.function.SerializablePairFunction; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Stream; @@ -61,6 +64,10 @@ public abstract class HoodieEngineContext { public abstract <I, K, V> List<V> mapToPairAndReduceByKey( List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism); + public abstract <I, K, V> Stream<ImmutablePair<K, V>> mapPartitionsToPairAndReduceByKey( + Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc, + SerializableBiFunction<V, V, V> reduceFunc, int parallelism); + public abstract <I, K, V> List<V> reduceByKey( List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java index 1c935ff..ca032e7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java @@ -19,15 +19,19 @@ package org.apache.hudi.common.engine; import org.apache.hadoop.conf.Configuration; + import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; +import org.apache.hudi.common.function.SerializablePairFlatMapFunction; import org.apache.hudi.common.function.SerializablePairFunction; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -35,6 +39,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; +import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapToPairWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper; @@ -69,6 +74,17 @@ public final class HoodieLocalEngineContext extends HoodieEngineContext { } @Override + public <I, K, V> Stream<ImmutablePair<K, V>> mapPartitionsToPairAndReduceByKey( + Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc, + SerializableBiFunction<V, V, V> reduceFunc, int parallelism) { + return throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator()) + .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream() + .map(entry -> new ImmutablePair<>(entry.getKey(), entry.getValue().stream().map( + Pair::getValue).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))) + .filter(Objects::nonNull); + } + + @Override public <I, K, V> List<V> reduceByKey( List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) { return data.stream().parallel() diff --git a/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java b/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java index b729e48..40e1a9d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java @@ -72,6 +72,17 @@ public class FunctionWrapper { }; } + public static <I, K, V> Function<I, Stream<Pair<K, V>>> throwingFlatMapToPairWrapper( + SerializablePairFlatMapFunction<I, K, V> throwingPairFlatMapFunction) { + return v1 -> { + try { + return throwingPairFlatMapFunction.call(v1); + } catch (Exception e) { + throw new HoodieException("Error occurs when executing mapToPair", e); + } + }; + } + public static <V> BinaryOperator<V> throwingReduceWrapper(SerializableBiFunction<V, V, V> throwingReduceFunction) { return (v1, v2) -> { try { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/function/SerializablePairFlatMapFunction.java b/hudi-common/src/main/java/org/apache/hudi/common/function/SerializablePairFlatMapFunction.java new file mode 100644 index 0000000..4cc34ce --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/function/SerializablePairFlatMapFunction.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.function; + +import org.apache.hudi.common.util.collection.Pair; + +import java.io.Serializable; +import java.util.stream.Stream; + +/** + * A function that returns a stream of key-value pairs (Pair<K, V>). + */ +@FunctionalInterface +public interface SerializablePairFlatMapFunction<I, K, V> extends Serializable { + Stream<Pair<K, V>> call(I t) throws Exception; +}