nsivabalan commented on a change in pull request #3741: URL: https://github.com/apache/hudi/pull/3741#discussion_r726211865
########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java ########## @@ -19,65 +19,65 @@ package org.apache.hudi.table.action.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.AbstractHoodieWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.client.utils.SparkMemoryUtils; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCompactionException; +import org.apache.hudi.table.HoodieCopyOnWriteTableOperation; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.BaseActionExecutor; import org.apache.hudi.table.action.HoodieWriteMetadata; -import org.apache.spark.api.java.JavaRDD; - import java.io.IOException; import java.util.List; @SuppressWarnings("checkstyle:LineLength") -public class SparkRunCompactionActionExecutor<T extends HoodieRecordPayload> extends - BaseActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, HoodieWriteMetadata<JavaRDD<WriteStatus>>> { +public class RunCompactionActionExecutor<T extends HoodieRecordPayload> extends + BaseActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>> { + + private final AbstractHoodieWriteClient writeClient; + private final HoodieCompactor compactor; + private final HoodieCopyOnWriteTableOperation copyOnWriteTableOperation; - public SparkRunCompactionActionExecutor(HoodieSparkEngineContext context, - HoodieWriteConfig config, - HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, - String instantTime) { + public RunCompactionActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + AbstractHoodieWriteClient writeClient, + HoodieCompactor compactor, + HoodieCopyOnWriteTableOperation copyOnWriteTableOperation) { super(context, config, table, instantTime); + this.writeClient = writeClient; + this.compactor = compactor; + this.copyOnWriteTableOperation = copyOnWriteTableOperation; } @Override - public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() { - HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(instantTime); - HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); - if (!pendingCompactionTimeline.containsInstant(instant)) { - throw new IllegalStateException( - "No Compaction request available at " + instantTime + " to run compaction"); - } + public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() { + compactor.checkCompactionTimeline(table, instantTime, writeClient); - HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = new HoodieWriteMetadata<>(); + HoodieWriteMetadata<HoodieData<WriteStatus>> compactionMetadata = new HoodieWriteMetadata<>(); try { - HoodieActiveTimeline timeline = table.getActiveTimeline(); + // generate compaction plan + // should support configurable commit metadata HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime); - // Mark instant as compaction inflight - timeline.transitionCompactionRequestedToInflight(instant); - table.getMetaClient().reloadActiveTimeline(); - HoodieSparkMergeOnReadTableCompactor compactor = new HoodieSparkMergeOnReadTableCompactor(); - JavaRDD<WriteStatus> statuses = compactor.compact(context, compactionPlan, table, config, instantTime); + HoodieData<WriteStatus> statuses = compactor.compact( + context, compactionPlan, table, config, instantTime, copyOnWriteTableOperation); - statuses.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps())); + statuses.persist(config.getProps()); Review comment: synced up w/ Ethan offline. I am good here. ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTableOperation.java ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Interface for insert and update operations in compaction. Review comment: another option I can think of : HoodieCompactionHandler. will sit well with handleUpdate and handleInsert. ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java ########## @@ -366,12 +367,13 @@ public HoodieActiveTimeline getActiveTimeline() { /** * Run Compaction on the table. Compaction arranges the data so that it is optimized for data access. * - * @param context HoodieEngineContext + * @param context HoodieEngineContext * @param compactionInstantTime Instant Time + * @param writeClient Write client */ public abstract HoodieWriteMetadata<O> compact(HoodieEngineContext context, - String compactionInstantTime); - + String compactionInstantTime, + AbstractHoodieWriteClient writeClient); Review comment: yeah, I triaged the usage of writeclient within compact(). looks like we might need it to rollback any pending compaction in flink code base(and as of now, this method is in AbstractHoodieWriteClient and not at the table layer). may be we need to add a callback or something and work around it. not sure if there is any easy way around. ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java ########## @@ -45,31 +46,67 @@ import java.util.Set; import java.util.stream.Collectors; -@SuppressWarnings("checkstyle:LineLength") -public class FlinkScheduleCompactionActionExecutor<T extends HoodieRecordPayload> extends - BaseScheduleCompactionActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> { +public class ScheduleCompactionActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCompactionPlan>> { - private static final Logger LOG = LogManager.getLogger(FlinkScheduleCompactionActionExecutor.class); + private static final Logger LOG = LogManager.getLogger(ScheduleCompactionActionExecutor.class); private final Option<Map<String, String>> extraMetadata; + private final HoodieCompactor compactor; - public FlinkScheduleCompactionActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, - String instantTime, - Option<Map<String, String>> extraMetadata) { - super(context, config, table, instantTime, extraMetadata); + public ScheduleCompactionActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable<T, I, K, O> table, + String instantTime, + Option<Map<String, String>> extraMetadata, + HoodieCompactor compactor) { + super(context, config, table, instantTime); this.extraMetadata = extraMetadata; + this.compactor = compactor; } @Override - protected HoodieCompactionPlan scheduleCompaction() { + public Option<HoodieCompactionPlan> execute() { Review comment: yeah, lets discuss this outside of the scope of this PR. curious to know more about the pain point here, -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org