mincwang commented on a change in pull request #4254:
URL: https://github.com/apache/hudi/pull/4254#discussion_r770271063



##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
##########
@@ -18,141 +18,299 @@
 
 package org.apache.hudi.sink.compact;
 
+import org.apache.hudi.async.HoodieAsyncService;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.StreamerUtil;
 
 import com.beust.jcommander.JCommander;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 /**
  * Flink hudi compaction program that can be executed manually.
  */
 public class HoodieFlinkCompactor {
 
   protected static final Logger LOG = 
LoggerFactory.getLogger(HoodieFlinkCompactor.class);
 
+  /**
+   * Flink Compaction Config.
+   */
+  private final FlinkCompactionConfig cfg;
+
+  /**
+   * Flink Execution Environment.
+   */
+  private final AsyncCompactionService compactionScheduleService;
+
+  public HoodieFlinkCompactor(FlinkCompactionConfig cfg, Configuration conf, 
StreamExecutionEnvironment env) throws Exception {
+    this.cfg = cfg;
+    this.compactionScheduleService = new AsyncCompactionService(cfg, conf, 
env);
+  }
+
   public static void main(String[] args) throws Exception {
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
+    FlinkCompactionConfig cfg = getFlinkCompactionConfig(args);
+    Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+
+    new HoodieFlinkCompactor(cfg, conf, env).start();
+  }
+
+  /**
+   * Main method to start compaction service.
+   */
+  public void start() throws Exception {
+    if (cfg.serviceMode) {
+      compactionScheduleService.start(null);
+      try {
+        compactionScheduleService.waitForShutdown();
+      } catch (Exception e) {
+        throw new HoodieException(e.getMessage(), e);
+      } finally {
+        LOG.info("Shut down hoodie flink compactor");
+      }
+    } else {
+      LOG.info("Hoodie Flink Compactor running only single round");
+      try {
+        compactionScheduleService.scheduleCompaction();
+      } catch (Exception e) {
+        LOG.error("Got error running delta sync once. Shutting down", e);
+        throw e;
+      } finally {
+        LOG.info("Shut down hoodie flink compactor");
+      }
+    }
+  }
+
+  public static FlinkCompactionConfig getFlinkCompactionConfig(String[] args) {
     FlinkCompactionConfig cfg = new FlinkCompactionConfig();
     JCommander cmd = new JCommander(cfg, null, args);
     if (cfg.help || args.length == 0) {
       cmd.usage();
       System.exit(1);
     }
+    return cfg;
+  }
 
-    Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+
+  /**
+   * Schedule compaction in service.
+   */
+  public static class AsyncCompactionService extends HoodieAsyncService {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Flink Compaction Config.
+     */
+    private final FlinkCompactionConfig cfg;
+
+    /**
+     * Flink Config.
+     */
+    private final Configuration conf;
+
+    /**
+     * Meta Client.
+     */
+    private final HoodieTableMetaClient metaClient;
+
+    /**
+     * Write Client.
+     */
+    private final HoodieFlinkWriteClient<?> writeClient;
+
+    /**
+     * The hoodie table.
+     */
+    private final HoodieFlinkTable<?> table;
+
+    /**
+     * Flink Execution Environment.
+     */
+    private final StreamExecutionEnvironment env;
+
+    /**
+     * Executor Service.
+     */
+    private final ExecutorService executor;
+
+    public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration 
conf, StreamExecutionEnvironment env) throws Exception {
+      this.cfg = cfg;
+      this.conf = conf;
+      this.env = env;
+      this.executor = Executors.newFixedThreadPool(1);
 
-    // create metaClient
-    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+      // create metaClient
+      this.metaClient = StreamerUtil.createMetaClient(conf);
 
-    // get the table name
-    conf.setString(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+      // get the table name
+      conf.setString(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
 
-    // set table schema
-    CompactionUtil.setAvroSchema(conf, metaClient);
+      // set table schema
+      CompactionUtil.setAvroSchema(conf, metaClient);
 
-    // infer changelog mode
-    CompactionUtil.inferChangelogMode(conf, metaClient);
+      // infer changelog mode
+      CompactionUtil.inferChangelogMode(conf, metaClient);
 
-    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
-    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+      this.writeClient = StreamerUtil.createWriteClient(conf);
+      this.table = writeClient.getHoodieTable();
+    }
+
+    @Override
+    protected Pair<CompletableFuture, ExecutorService> startService() {
+      ExecutorService executor = Executors.newFixedThreadPool(1);
+      return Pair.of(CompletableFuture.supplyAsync(() -> {
+        boolean error = false;
+
+        try {
+          while (!isShutdownRequested()) {
+            try {
+              long start = System.currentTimeMillis();
+              scheduleCompaction();
+              long toSleepMs = cfg.minCompactionIntervalSeconds * 1000 - 
(System.currentTimeMillis() - start);
+              if (toSleepMs > 0) {
+                LOG.info("Last compaction ran less than min sync interval: " + 
cfg.minCompactionIntervalSeconds + " s, sleep: "
+                    + toSleepMs + " ms.");
+                Thread.sleep(toSleepMs);
+              }
+            } catch (Exception e) {
+              LOG.error("Shutting down compaction service due to exception", 
e);
+              error = true;
+              throw new HoodieException(e.getMessage(), e);
+            }
+          }
+        } finally {
+          shutdownAsyncService(error);

Review comment:
       Here will not be FS been close when next compaction plan is run ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to