vinothchandar commented on a change in pull request #1577:
URL: https://github.com/apache/hudi/pull/1577#discussion_r446606888



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
##########
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.utilities.deltastreamer;
+package org.apache.hudi.async;

Review comment:
       org.apache.hudi.client.service is a better location? we should have 
this, the timeline server etc in a single place?

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
##########
@@ -32,11 +32,11 @@
 import java.util.function.Function;
 
 /**
- * Base Class for running delta-sync/compaction in separate thread and 
controlling their life-cycle.
+ * Base Class for running clean/delta-sync/compaction in separate thread and 
controlling their life-cycle.
  */
-public abstract class AbstractDeltaStreamerService implements Serializable {
+public abstract class AbstractAsyncService implements Serializable {

Review comment:
       we ideally should be reusing the same for #1752 ?

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -254,6 +260,7 @@ public static SparkConf registerClasses(SparkConf conf) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.BULK_INSERT);
+    AutoCleanerService.spawnAutoCleanerIfEnabled(this, instantTime);

Review comment:
       how come the return value is never set to the instance variable?

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/AutoCleanerService.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.async.AbstractAsyncService;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Auto Clean service running concurrently with write operation.
+ */
+class AutoCleanerService extends AbstractAsyncService {
+
+  private static final Logger LOG = 
LogManager.getLogger(AutoCleanerService.class);
+
+  private final HoodieWriteClient writeClient;
+  private final String cleanInstant;
+  private final transient ExecutorService executor = 
Executors.newFixedThreadPool(1);
+
+  protected AutoCleanerService(HoodieWriteClient writeClient, String 
cleanInstant) {
+    this.writeClient = writeClient;
+    this.cleanInstant = cleanInstant;
+  }
+
+  @Override
+  protected Pair<CompletableFuture, ExecutorService> startService() {

Review comment:
       This class feels more like a `AsyncTask` , rather than a service.. i.e 
something that is long running and accepts tasks.. can we file a follow on to 
clean this code uop?

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -81,6 +81,8 @@
   private final transient HoodieMetrics metrics;
   private transient Timer.Context compactionTimer;
 
+  private transient AutoCleanerService autoCleanerService;

Review comment:
       auto or not, is a configuration that should not be leaked into class 
names.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -338,15 +346,27 @@ protected void postCommit(HoodieCommitMetadata metadata, 
String instantTime,
       // We cannot have unbounded commit files. Archive commits if we have to 
archive
       HoodieTimelineArchiveLog archiveLog = new 
HoodieTimelineArchiveLog(config, createMetaClient(true));
       archiveLog.archiveIfRequired(hadoopConf);
-      if (config.isAutoClean()) {
-        // Call clean to cleanup if there is anything to cleanup after the 
commit,
+      autoCleanOnCommit(instantTime);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  /**
+   * Handle auto clean during commit.
+   * @param instantTime
+   */
+  private void autoCleanOnCommit(String instantTime) {
+    if (config.isAutoClean()) {
+      // Call clean to cleanup if there is anything to cleanup after the 
commit,
+      if (config.isRunParallelAutoClean()) {

Review comment:
       there are a lot of terms being overloaded here -- parallel, async, 
auto.. what you mean by parallel is async.. correct?

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -477,6 +497,8 @@ public HoodieRestoreMetadata restoreToInstant(final String 
instantTime) throws H
    */
   @Override
   public void close() {
+    AutoCleanerService.shutdownAutoCleaner(autoCleanerService);
+    autoCleanerService = null;

Review comment:
       this kind of resetting is probably needed after each write operation as 
well? may be its fine to just reinitialize the service after 
waitForCompletion.. food for thought.. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -81,6 +81,8 @@
   private final transient HoodieMetrics metrics;
   private transient Timer.Context compactionTimer;
 
+  private transient AutoCleanerService autoCleanerService;

Review comment:
       just rename to `AsyncCleanerService` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to