nsivabalan commented on a change in pull request #3741:
URL: https://github.com/apache/hudi/pull/3741#discussion_r726211865



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
##########
@@ -19,65 +19,65 @@
 package org.apache.hudi.table.action.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.client.utils.SparkMemoryUtils;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCompactionException;
+import org.apache.hudi.table.HoodieCopyOnWriteTableOperation;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-import org.apache.spark.api.java.JavaRDD;
-
 import java.io.IOException;
 import java.util.List;
 
 @SuppressWarnings("checkstyle:LineLength")
-public class SparkRunCompactionActionExecutor<T extends HoodieRecordPayload> 
extends
-    BaseActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>, HoodieWriteMetadata<JavaRDD<WriteStatus>>> {
+public class RunCompactionActionExecutor<T extends HoodieRecordPayload> extends
+    BaseActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>> {
+
+  private final AbstractHoodieWriteClient writeClient;
+  private final HoodieCompactor compactor;
+  private final HoodieCopyOnWriteTableOperation copyOnWriteTableOperation;
 
-  public SparkRunCompactionActionExecutor(HoodieSparkEngineContext context,
-                                          HoodieWriteConfig config,
-                                          HoodieTable<T, 
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                                          String instantTime) {
+  public RunCompactionActionExecutor(HoodieEngineContext context,
+                                     HoodieWriteConfig config,
+                                     HoodieTable table,
+                                     String instantTime,
+                                     AbstractHoodieWriteClient writeClient,
+                                     HoodieCompactor compactor,
+                                     HoodieCopyOnWriteTableOperation 
copyOnWriteTableOperation) {
     super(context, config, table, instantTime);
+    this.writeClient = writeClient;
+    this.compactor = compactor;
+    this.copyOnWriteTableOperation = copyOnWriteTableOperation;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    HoodieInstant instant = 
HoodieTimeline.getCompactionRequestedInstant(instantTime);
-    HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
-    if (!pendingCompactionTimeline.containsInstant(instant)) {
-      throw new IllegalStateException(
-          "No Compaction request available at " + instantTime + " to run 
compaction");
-    }
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    compactor.checkCompactionTimeline(table, instantTime, writeClient);
 
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = new 
HoodieWriteMetadata<>();
+    HoodieWriteMetadata<HoodieData<WriteStatus>> compactionMetadata = new 
HoodieWriteMetadata<>();
     try {
-      HoodieActiveTimeline timeline = table.getActiveTimeline();
+      // generate compaction plan
+      // should support configurable commit metadata
       HoodieCompactionPlan compactionPlan =
           CompactionUtils.getCompactionPlan(table.getMetaClient(), 
instantTime);
-      // Mark instant as compaction inflight
-      timeline.transitionCompactionRequestedToInflight(instant);
-      table.getMetaClient().reloadActiveTimeline();
 
-      HoodieSparkMergeOnReadTableCompactor compactor = new 
HoodieSparkMergeOnReadTableCompactor();
-      JavaRDD<WriteStatus> statuses = compactor.compact(context, 
compactionPlan, table, config, instantTime);
+      HoodieData<WriteStatus> statuses = compactor.compact(
+          context, compactionPlan, table, config, instantTime, 
copyOnWriteTableOperation);
 
-      
statuses.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps()));
+      statuses.persist(config.getProps());

Review comment:
       synced up w/ Ethan offline. I am good here.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTableOperation.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Interface for insert and update operations in compaction.

Review comment:
       another option I can think of : HoodieCompactionHandler. will sit well 
with handleUpdate and handleInsert.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -366,12 +367,13 @@ public HoodieActiveTimeline getActiveTimeline() {
   /**
    * Run Compaction on the table. Compaction arranges the data so that it is 
optimized for data access.
    *
-   * @param context HoodieEngineContext
+   * @param context               HoodieEngineContext
    * @param compactionInstantTime Instant Time
+   * @param writeClient           Write client
    */
   public abstract HoodieWriteMetadata<O> compact(HoodieEngineContext context,
-                                              String compactionInstantTime);
-
+                                                 String compactionInstantTime,
+                                                 AbstractHoodieWriteClient 
writeClient);

Review comment:
       yeah, I triaged the usage of writeclient within compact(). looks like we 
might need it to rollback any pending compaction in flink code base(and as of 
now, this method is in AbstractHoodieWriteClient and not at the table layer). 
may be we need to add a callback or something and work around it. not sure if 
there is any easy way around. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
##########
@@ -45,31 +46,67 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 
-@SuppressWarnings("checkstyle:LineLength")
-public class FlinkScheduleCompactionActionExecutor<T extends 
HoodieRecordPayload> extends
-    BaseScheduleCompactionActionExecutor<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> {
+public class ScheduleCompactionActionExecutor<T extends HoodieRecordPayload, 
I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCompactionPlan>> {
 
-  private static final Logger LOG = 
LogManager.getLogger(FlinkScheduleCompactionActionExecutor.class);
+  private static final Logger LOG = 
LogManager.getLogger(ScheduleCompactionActionExecutor.class);
 
   private final Option<Map<String, String>> extraMetadata;
+  private final HoodieCompactor compactor;
 
-  public FlinkScheduleCompactionActionExecutor(HoodieEngineContext context,
-                                               HoodieWriteConfig config,
-                                               HoodieTable<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
-                                               String instantTime,
-                                               Option<Map<String, String>> 
extraMetadata) {
-    super(context, config, table, instantTime, extraMetadata);
+  public ScheduleCompactionActionExecutor(HoodieEngineContext context,
+                                          HoodieWriteConfig config,
+                                          HoodieTable<T, I, K, O> table,
+                                          String instantTime,
+                                          Option<Map<String, String>> 
extraMetadata,
+                                          HoodieCompactor compactor) {
+    super(context, config, table, instantTime);
     this.extraMetadata = extraMetadata;
+    this.compactor = compactor;
   }
 
   @Override
-  protected HoodieCompactionPlan scheduleCompaction() {
+  public Option<HoodieCompactionPlan> execute() {

Review comment:
       yeah, lets discuss this outside of the scope of this PR. curious to know 
more about the pain point here,




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to