wangxianghu commented on a change in pull request #1827:
URL: https://github.com/apache/hudi/pull/1827#discussion_r485591618



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -18,120 +18,195 @@
 
 package org.apache.hudi.client;
 
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
+import org.apache.hudi.client.embebbed.BaseEmbeddedTimelineService;
+import org.apache.hudi.common.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRestoreException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metrics.HoodieMetrics;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.HoodieTimelineArchiveLog;
-import org.apache.hudi.table.MarkerFiles;
 import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.compact.CompactHelpers;
 import org.apache.hudi.table.action.savepoint.SavepointHelpers;
-
-import com.codahale.metrics.Timer;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
- * Hoodie Write Client helps you build tables on HDFS [insert()] and then 
perform efficient mutations on an HDFS
- * table [upsert()]
- * <p>
- * Note that, at any given time, there can only be one Spark job performing 
these operations on a Hoodie table.
+ * Abstract Write Client providing functionality for performing commit, index 
updates and rollback
+ * Reused for regular write operations like upsert/insert/bulk-insert.. as 
well as bootstrap
+ *
+ * @param <T> Sub type of HoodieRecordPayload
+ * @param <I> Type of inputs
+ * @param <K> Type of keys
+ * @param <O> Type of outputs
+ * @param <P> Type of record position [Key, Option[partitionPath, fileID]] in 
hoodie table
  */
-public class HoodieWriteClient<T extends HoodieRecordPayload> extends 
AbstractHoodieWriteClient<T> {
-
+public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, 
I, K, O, P> extends AbstractHoodieClient {
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LogManager.getLogger(HoodieWriteClient.class);
-  private static final String LOOKUP_STR = "lookup";
-  private final boolean rollbackPending;
-  private final transient HoodieMetrics metrics;
-  private transient Timer.Context compactionTimer;
+  private static final Logger LOG = 
LogManager.getLogger(AbstractHoodieWriteClient.class);
+
+  protected final transient HoodieMetrics metrics;
+  private final transient HoodieIndex<T, I, K, O, P> index;
+
+  protected transient Timer.Context writeContext = null;
+  private transient WriteOperationType operationType;
+  private transient HoodieWriteCommitCallback commitCallback;
+
+  protected static final String LOOKUP_STR = "lookup";
+  protected final boolean rollbackPending;
+  protected transient Timer.Context compactionTimer;
   private transient AsyncCleanerService asyncCleanerService;
 
+  public void setOperationType(WriteOperationType operationType) {
+    this.operationType = operationType;
+  }
+
+  public WriteOperationType getOperationType() {
+    return this.operationType;
+  }
+
   /**
    * Create a write client, without cleaning up failed/inflight commits.
    *
-   * @param jsc Java Spark Context
+   * @param context      Java Spark Context
    * @param clientConfig instance of HoodieWriteConfig
    */
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig) {
-    this(jsc, clientConfig, false);
+  public AbstractHoodieWriteClient(HoodieEngineContext context, 
HoodieWriteConfig clientConfig) {
+    this(context, clientConfig, false);
   }
 
   /**
    * Create a write client, with new hudi index.
    *
-   * @param jsc Java Spark Context
-   * @param writeConfig instance of HoodieWriteConfig
+   * @param context         HoodieEngineContext
+   * @param writeConfig     instance of HoodieWriteConfig
    * @param rollbackPending whether need to cleanup pending commits
    */
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
writeConfig, boolean rollbackPending) {
-    this(jsc, writeConfig, rollbackPending, 
HoodieIndex.createIndex(writeConfig));
-  }
-
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
writeConfig, boolean rollbackPending, HoodieIndex index) {
-    this(jsc, writeConfig, rollbackPending, index, Option.empty());
+  public AbstractHoodieWriteClient(HoodieEngineContext context, 
HoodieWriteConfig writeConfig, boolean rollbackPending) {
+    this(context, writeConfig, rollbackPending, Option.empty());
   }
 
   /**
-   *  Create a write client, allows to specify all parameters.
+   * Create a write client, allows to specify all parameters.
    *
-   * @param jsc Java Spark Context
-   * @param writeConfig instance of HoodieWriteConfig
+   * @param context         HoodieEngineContext
+   * @param writeConfig     instance of HoodieWriteConfig
    * @param rollbackPending whether need to cleanup pending commits
    * @param timelineService Timeline Service that runs as part of write client.
    */
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
writeConfig, boolean rollbackPending,
-      HoodieIndex index, Option<EmbeddedTimelineService> timelineService) {
-    super(jsc, index, writeConfig, timelineService);
+  public AbstractHoodieWriteClient(HoodieEngineContext context, 
HoodieWriteConfig writeConfig, boolean rollbackPending,
+                                   Option<BaseEmbeddedTimelineService> 
timelineService) {
+    super(context, writeConfig, timelineService);
     this.metrics = new HoodieMetrics(config, config.getTableName());
     this.rollbackPending = rollbackPending;
+    this.index = createIndex(writeConfig);
   }
 
+  protected abstract HoodieIndex<T, I, K, O, P> createIndex(HoodieWriteConfig 
writeConfig);
+
   /**
-   * Register hudi classes for Kryo serialization.
-   *
-   * @param conf instance of SparkConf
-   * @return SparkConf
+   * Commit changes performed at the given instantTime marker.
    */
-  public static SparkConf registerClasses(SparkConf conf) {

Review comment:
       > guessing this is all moved to spark client now?
   
   yes, In `SparkRDDWriteClient` now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to