vinothchandar commented on a change in pull request #1827:
URL: https://github.com/apache/hudi/pull/1827#discussion_r483291007



##########
File path: 
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
##########
@@ -92,8 +92,9 @@ public void init() throws IOException {
     
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
 
     // archive
-    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
hadoopConf);
-    archiveLog.archiveIfRequired(jsc);
+

Review comment:
       nit: extra line.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/asyc/AbstractAsyncService.java
##########
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.async;
+package org.apache.hudi.asyc;

Review comment:
       typo: async

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -18,120 +18,195 @@
 
 package org.apache.hudi.client;
 
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
+import org.apache.hudi.client.embebbed.BaseEmbeddedTimelineService;
+import org.apache.hudi.common.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRestoreException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metrics.HoodieMetrics;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.HoodieTimelineArchiveLog;
-import org.apache.hudi.table.MarkerFiles;
 import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.compact.CompactHelpers;
 import org.apache.hudi.table.action.savepoint.SavepointHelpers;
-
-import com.codahale.metrics.Timer;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
- * Hoodie Write Client helps you build tables on HDFS [insert()] and then 
perform efficient mutations on an HDFS
- * table [upsert()]
- * <p>
- * Note that, at any given time, there can only be one Spark job performing 
these operations on a Hoodie table.
+ * Abstract Write Client providing functionality for performing commit, index 
updates and rollback
+ * Reused for regular write operations like upsert/insert/bulk-insert.. as 
well as bootstrap
+ *
+ * @param <T> Sub type of HoodieRecordPayload
+ * @param <I> Type of inputs
+ * @param <K> Type of keys
+ * @param <O> Type of outputs
+ * @param <P> Type of record position [Key, Option[partitionPath, fileID]] in 
hoodie table
  */
-public class HoodieWriteClient<T extends HoodieRecordPayload> extends 
AbstractHoodieWriteClient<T> {
-
+public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, 
I, K, O, P> extends AbstractHoodieClient {
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LogManager.getLogger(HoodieWriteClient.class);
-  private static final String LOOKUP_STR = "lookup";
-  private final boolean rollbackPending;
-  private final transient HoodieMetrics metrics;
-  private transient Timer.Context compactionTimer;
+  private static final Logger LOG = 
LogManager.getLogger(AbstractHoodieWriteClient.class);
+
+  protected final transient HoodieMetrics metrics;
+  private final transient HoodieIndex<T, I, K, O, P> index;
+
+  protected transient Timer.Context writeContext = null;
+  private transient WriteOperationType operationType;
+  private transient HoodieWriteCommitCallback commitCallback;
+
+  protected static final String LOOKUP_STR = "lookup";
+  protected final boolean rollbackPending;
+  protected transient Timer.Context compactionTimer;
   private transient AsyncCleanerService asyncCleanerService;
 
+  public void setOperationType(WriteOperationType operationType) {
+    this.operationType = operationType;
+  }
+
+  public WriteOperationType getOperationType() {
+    return this.operationType;
+  }
+
   /**
    * Create a write client, without cleaning up failed/inflight commits.
    *
-   * @param jsc Java Spark Context
+   * @param context      Java Spark Context
    * @param clientConfig instance of HoodieWriteConfig
    */
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig) {
-    this(jsc, clientConfig, false);
+  public AbstractHoodieWriteClient(HoodieEngineContext context, 
HoodieWriteConfig clientConfig) {
+    this(context, clientConfig, false);
   }
 
   /**
    * Create a write client, with new hudi index.
    *
-   * @param jsc Java Spark Context
-   * @param writeConfig instance of HoodieWriteConfig
+   * @param context         HoodieEngineContext
+   * @param writeConfig     instance of HoodieWriteConfig
    * @param rollbackPending whether need to cleanup pending commits
    */
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
writeConfig, boolean rollbackPending) {
-    this(jsc, writeConfig, rollbackPending, 
HoodieIndex.createIndex(writeConfig));
-  }
-
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
writeConfig, boolean rollbackPending, HoodieIndex index) {
-    this(jsc, writeConfig, rollbackPending, index, Option.empty());
+  public AbstractHoodieWriteClient(HoodieEngineContext context, 
HoodieWriteConfig writeConfig, boolean rollbackPending) {
+    this(context, writeConfig, rollbackPending, Option.empty());
   }
 
   /**
-   *  Create a write client, allows to specify all parameters.
+   * Create a write client, allows to specify all parameters.
    *
-   * @param jsc Java Spark Context
-   * @param writeConfig instance of HoodieWriteConfig
+   * @param context         HoodieEngineContext
+   * @param writeConfig     instance of HoodieWriteConfig
    * @param rollbackPending whether need to cleanup pending commits
    * @param timelineService Timeline Service that runs as part of write client.
    */
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
writeConfig, boolean rollbackPending,
-      HoodieIndex index, Option<EmbeddedTimelineService> timelineService) {
-    super(jsc, index, writeConfig, timelineService);
+  public AbstractHoodieWriteClient(HoodieEngineContext context, 
HoodieWriteConfig writeConfig, boolean rollbackPending,
+                                   Option<BaseEmbeddedTimelineService> 
timelineService) {
+    super(context, writeConfig, timelineService);
     this.metrics = new HoodieMetrics(config, config.getTableName());
     this.rollbackPending = rollbackPending;
+    this.index = createIndex(writeConfig);
   }
 
+  protected abstract HoodieIndex<T, I, K, O, P> createIndex(HoodieWriteConfig 
writeConfig);

Review comment:
       same point, not sure if this is correct. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -242,150 +286,93 @@ protected void rollBackInflightBootstrap() {
    * de-duped if needed.
    *
    * @param preppedRecords HoodieRecords to insert
-   * @param instantTime Instant time of the commit
+   * @param instantTime    Instant time of the commit
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and 
counts
    */
-  public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> 
preppedRecords, final String instantTime) {
-    HoodieTable<T> table = 
getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
-    table.validateInsertSchema();
-    setOperationType(WriteOperationType.INSERT_PREPPED);
-    this.asyncCleanerService = 
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
-    HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, 
preppedRecords);
-    return postWrite(result, instantTime, table);
-  }
+  public abstract O insertPreppedRecords(I preppedRecords, final String 
instantTime);
 
   /**
    * Loads the given HoodieRecords, as inserts into the table. This is 
suitable for doing big bulk loads into a Hoodie
    * table for the very first time (e.g: converting an existing table to 
Hoodie).
    * <p>
    * This implementation uses sortBy (which does range partitioning based on 
reservoir sampling) and attempts to control
-   * the numbers of files with less memory compared to the {@link 
HoodieWriteClient#insert(JavaRDD, String)}
+   * the numbers of files with less memory compared to the {@link 
AbstractHoodieWriteClient#insert(I, String)}
    *
-   * @param records HoodieRecords to insert
+   * @param records     HoodieRecords to insert
    * @param instantTime Instant time of the commit
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and 
counts
    */
-  public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, 
final String instantTime) {
-    return bulkInsert(records, instantTime, Option.empty());
-  }
+  public abstract O bulkInsert(I records, final String instantTime);
 
   /**
    * Loads the given HoodieRecords, as inserts into the table. This is 
suitable for doing big bulk loads into a Hoodie
    * table for the very first time (e.g: converting an existing table to 
Hoodie).
    * <p>
    * This implementation uses sortBy (which does range partitioning based on 
reservoir sampling) and attempts to control
-   * the numbers of files with less memory compared to the {@link 
HoodieWriteClient#insert(JavaRDD, String)}. Optionally
+   * the numbers of files with less memory compared to the {@link 
AbstractHoodieWriteClient#insert(I, String)}. Optionally
    * it allows users to specify their own partitioner. If specified then it 
will be used for repartitioning records. See
    * {@link BulkInsertPartitioner}.
    *
-   * @param records HoodieRecords to insert
-   * @param instantTime Instant time of the commit
+   * @param records                          HoodieRecords to insert
+   * @param instantTime                      Instant time of the commit
    * @param userDefinedBulkInsertPartitioner If specified then it will be used 
to partition input records before they are inserted
-   * into hoodie.
+   *                                         into hoodie.
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and 
counts
    */
-  public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, 
final String instantTime,
-                                         Option<BulkInsertPartitioner> 
userDefinedBulkInsertPartitioner) {
-    HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT, 
instantTime);
-    table.validateInsertSchema();
-    setOperationType(WriteOperationType.BULK_INSERT);
-    this.asyncCleanerService = 
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
-    HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, 
userDefinedBulkInsertPartitioner);
-    return postWrite(result, instantTime, table);
-  }
+  public abstract O bulkInsert(I records, final String instantTime,
+                      Option<BulkInsertPartitioner<I>> 
userDefinedBulkInsertPartitioner);
+
 
   /**
    * Loads the given HoodieRecords, as inserts into the table. This is 
suitable for doing big bulk loads into a Hoodie
    * table for the very first time (e.g: converting an existing table to 
Hoodie). The input records should contain no
    * duplicates if needed.
    * <p>
    * This implementation uses sortBy (which does range partitioning based on 
reservoir sampling) and attempts to control
-   * the numbers of files with less memory compared to the {@link 
HoodieWriteClient#insert(JavaRDD, String)}. Optionally
+   * the numbers of files with less memory compared to the {@link 
AbstractHoodieWriteClient#insert(I, String)}. Optionally
    * it allows users to specify their own partitioner. If specified then it 
will be used for repartitioning records. See
    * {@link BulkInsertPartitioner}.
    *
-   * @param preppedRecords HoodieRecords to insert
-   * @param instantTime Instant time of the commit
+   * @param preppedRecords        HoodieRecords to insert

Review comment:
       it would be great, if you can avoid the whitespace changes :) Have to 
fish for what the real changes are

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
##########
@@ -21,94 +21,52 @@
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIClass;
 import org.apache.hudi.PublicAPIMethod;
-import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieEngineContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
-import org.apache.hudi.index.bloom.HoodieBloomIndex;
-import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
-import org.apache.hudi.index.hbase.HBaseIndex;
-import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex;
-import org.apache.hudi.index.simple.HoodieSimpleIndex;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
 import java.io.Serializable;
 
 /**
  * Base class for different types of indexes to determine the mapping from 
uuid.
  */
 @PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
-public abstract class HoodieIndex<T extends HoodieRecordPayload> implements 
Serializable {
+public abstract class HoodieIndex<T extends HoodieRecordPayload, I, K, O, P> 
implements Serializable {
 
   protected final HoodieWriteConfig config;
 
   protected HoodieIndex(HoodieWriteConfig config) {
     this.config = config;
   }
 
-  public static <T extends HoodieRecordPayload> HoodieIndex<T> createIndex(
-      HoodieWriteConfig config) throws HoodieIndexException {
-    // first use index class config to create index.
-    if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
-      Object instance = ReflectionUtils.loadClass(config.getIndexClass(), 
config);
-      if (!(instance instanceof HoodieIndex)) {
-        throw new HoodieIndexException(config.getIndexClass() + " is not a 
subclass of HoodieIndex");
-      }
-      return (HoodieIndex) instance;
-    }
-    switch (config.getIndexType()) {
-      case HBASE:
-        return new HBaseIndex<>(config);
-      case INMEMORY:
-        return new InMemoryHashIndex<>(config);
-      case BLOOM:
-        return new HoodieBloomIndex<>(config);
-      case GLOBAL_BLOOM:
-        return new HoodieGlobalBloomIndex<>(config);
-      case SIMPLE:
-        return new HoodieSimpleIndex<>(config);
-      case GLOBAL_SIMPLE:
-        return new HoodieGlobalSimpleIndex<>(config);
-      default:
-        throw new HoodieIndexException("Index type unspecified, set " + 
config.getIndexType());
-    }
-  }
-
   /**
    * Checks if the given [Keys] exists in the hoodie table and returns [Key, 
Option[partitionPath, fileID]] If the
    * optional is empty, then the key is not found.
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
-  public abstract JavaPairRDD<HoodieKey, Option<Pair<String, String>>> 
fetchRecordLocation(
-      JavaRDD<HoodieKey> hoodieKeys, final JavaSparkContext jsc, 
HoodieTable<T> hoodieTable);
+  public abstract P fetchRecordLocation(
+      K hoodieKeys, final HoodieEngineContext context, HoodieTable<T, I, K, O, 
P> hoodieTable);
 
   /**
    * Looks up the index and tags each incoming record with a location of a 
file that contains the row (if it is actually
    * present).
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)

Review comment:
       these annotations needs to moved over to a `SparkHoodieIndex` class? it 
will be hard for end developers to program against `HoodieIndex` directly 
anymore. This is a general point actually. The current public APIs should all 
be annotated against the Spark child classes.  wdyt? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGeneratorInterface.java
##########
@@ -34,8 +33,4 @@
 
   List<String> getRecordKeyFieldNames();
 
-  String getRecordKey(Row row);

Review comment:
       we should make sure there are no backwards incompatible changes to the 
key generator interface 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -18,120 +18,195 @@
 
 package org.apache.hudi.client;
 
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
+import org.apache.hudi.client.embebbed.BaseEmbeddedTimelineService;
+import org.apache.hudi.common.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRestoreException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metrics.HoodieMetrics;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.HoodieTimelineArchiveLog;
-import org.apache.hudi.table.MarkerFiles;
 import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.compact.CompactHelpers;
 import org.apache.hudi.table.action.savepoint.SavepointHelpers;
-
-import com.codahale.metrics.Timer;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
- * Hoodie Write Client helps you build tables on HDFS [insert()] and then 
perform efficient mutations on an HDFS
- * table [upsert()]
- * <p>
- * Note that, at any given time, there can only be one Spark job performing 
these operations on a Hoodie table.
+ * Abstract Write Client providing functionality for performing commit, index 
updates and rollback
+ * Reused for regular write operations like upsert/insert/bulk-insert.. as 
well as bootstrap
+ *
+ * @param <T> Sub type of HoodieRecordPayload
+ * @param <I> Type of inputs
+ * @param <K> Type of keys
+ * @param <O> Type of outputs
+ * @param <P> Type of record position [Key, Option[partitionPath, fileID]] in 
hoodie table
  */
-public class HoodieWriteClient<T extends HoodieRecordPayload> extends 
AbstractHoodieWriteClient<T> {
-
+public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, 
I, K, O, P> extends AbstractHoodieClient {
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LogManager.getLogger(HoodieWriteClient.class);
-  private static final String LOOKUP_STR = "lookup";
-  private final boolean rollbackPending;
-  private final transient HoodieMetrics metrics;
-  private transient Timer.Context compactionTimer;
+  private static final Logger LOG = 
LogManager.getLogger(AbstractHoodieWriteClient.class);
+
+  protected final transient HoodieMetrics metrics;
+  private final transient HoodieIndex<T, I, K, O, P> index;
+
+  protected transient Timer.Context writeContext = null;
+  private transient WriteOperationType operationType;
+  private transient HoodieWriteCommitCallback commitCallback;
+
+  protected static final String LOOKUP_STR = "lookup";
+  protected final boolean rollbackPending;
+  protected transient Timer.Context compactionTimer;
   private transient AsyncCleanerService asyncCleanerService;
 
+  public void setOperationType(WriteOperationType operationType) {
+    this.operationType = operationType;
+  }
+
+  public WriteOperationType getOperationType() {
+    return this.operationType;
+  }
+
   /**
    * Create a write client, without cleaning up failed/inflight commits.
    *
-   * @param jsc Java Spark Context
+   * @param context      Java Spark Context
    * @param clientConfig instance of HoodieWriteConfig
    */
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig) {
-    this(jsc, clientConfig, false);
+  public AbstractHoodieWriteClient(HoodieEngineContext context, 
HoodieWriteConfig clientConfig) {
+    this(context, clientConfig, false);
   }
 
   /**
    * Create a write client, with new hudi index.
    *
-   * @param jsc Java Spark Context
-   * @param writeConfig instance of HoodieWriteConfig
+   * @param context         HoodieEngineContext
+   * @param writeConfig     instance of HoodieWriteConfig
    * @param rollbackPending whether need to cleanup pending commits
    */
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
writeConfig, boolean rollbackPending) {
-    this(jsc, writeConfig, rollbackPending, 
HoodieIndex.createIndex(writeConfig));
-  }
-
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
writeConfig, boolean rollbackPending, HoodieIndex index) {
-    this(jsc, writeConfig, rollbackPending, index, Option.empty());
+  public AbstractHoodieWriteClient(HoodieEngineContext context, 
HoodieWriteConfig writeConfig, boolean rollbackPending) {
+    this(context, writeConfig, rollbackPending, Option.empty());
   }
 
   /**
-   *  Create a write client, allows to specify all parameters.
+   * Create a write client, allows to specify all parameters.
    *
-   * @param jsc Java Spark Context
-   * @param writeConfig instance of HoodieWriteConfig
+   * @param context         HoodieEngineContext
+   * @param writeConfig     instance of HoodieWriteConfig
    * @param rollbackPending whether need to cleanup pending commits
    * @param timelineService Timeline Service that runs as part of write client.
    */
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
writeConfig, boolean rollbackPending,
-      HoodieIndex index, Option<EmbeddedTimelineService> timelineService) {
-    super(jsc, index, writeConfig, timelineService);
+  public AbstractHoodieWriteClient(HoodieEngineContext context, 
HoodieWriteConfig writeConfig, boolean rollbackPending,
+                                   Option<BaseEmbeddedTimelineService> 
timelineService) {
+    super(context, writeConfig, timelineService);
     this.metrics = new HoodieMetrics(config, config.getTableName());
     this.rollbackPending = rollbackPending;
+    this.index = createIndex(writeConfig);

Review comment:
       why did this constructor have to change

##########
File path: hudi-client/hudi-client-common/pom.xml
##########
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>hudi-client</artifactId>
+    <groupId>org.apache.hudi</groupId>
+    <version>0.6.1-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>hudi-client-common</artifactId>

Review comment:
       surprised that this has so few dependencies.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -18,120 +18,195 @@
 
 package org.apache.hudi.client;
 
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
+import org.apache.hudi.client.embebbed.BaseEmbeddedTimelineService;
+import org.apache.hudi.common.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRestoreException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metrics.HoodieMetrics;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.HoodieTimelineArchiveLog;
-import org.apache.hudi.table.MarkerFiles;
 import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.compact.CompactHelpers;
 import org.apache.hudi.table.action.savepoint.SavepointHelpers;
-
-import com.codahale.metrics.Timer;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
- * Hoodie Write Client helps you build tables on HDFS [insert()] and then 
perform efficient mutations on an HDFS
- * table [upsert()]
- * <p>
- * Note that, at any given time, there can only be one Spark job performing 
these operations on a Hoodie table.
+ * Abstract Write Client providing functionality for performing commit, index 
updates and rollback
+ * Reused for regular write operations like upsert/insert/bulk-insert.. as 
well as bootstrap
+ *
+ * @param <T> Sub type of HoodieRecordPayload
+ * @param <I> Type of inputs
+ * @param <K> Type of keys
+ * @param <O> Type of outputs
+ * @param <P> Type of record position [Key, Option[partitionPath, fileID]] in 
hoodie table
  */
-public class HoodieWriteClient<T extends HoodieRecordPayload> extends 
AbstractHoodieWriteClient<T> {
-
+public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, 
I, K, O, P> extends AbstractHoodieClient {
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LogManager.getLogger(HoodieWriteClient.class);
-  private static final String LOOKUP_STR = "lookup";
-  private final boolean rollbackPending;
-  private final transient HoodieMetrics metrics;
-  private transient Timer.Context compactionTimer;
+  private static final Logger LOG = 
LogManager.getLogger(AbstractHoodieWriteClient.class);
+
+  protected final transient HoodieMetrics metrics;
+  private final transient HoodieIndex<T, I, K, O, P> index;

Review comment:
       not sure if this is right. index must be not be needed at the the write 
client level. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
##########
@@ -52,19 +52,6 @@ protected AsyncCleanerService(HoodieWriteClient<?> 
writeClient, String cleanInst
     }), executor);
   }
 
-  public static AsyncCleanerService 
startAsyncCleaningIfEnabled(HoodieWriteClient writeClient,

Review comment:
       this method need not have moved? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -18,120 +18,195 @@
 
 package org.apache.hudi.client;
 
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
+import org.apache.hudi.client.embebbed.BaseEmbeddedTimelineService;
+import org.apache.hudi.common.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRestoreException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metrics.HoodieMetrics;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.HoodieTimelineArchiveLog;
-import org.apache.hudi.table.MarkerFiles;
 import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.compact.CompactHelpers;
 import org.apache.hudi.table.action.savepoint.SavepointHelpers;
-
-import com.codahale.metrics.Timer;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
- * Hoodie Write Client helps you build tables on HDFS [insert()] and then 
perform efficient mutations on an HDFS
- * table [upsert()]
- * <p>
- * Note that, at any given time, there can only be one Spark job performing 
these operations on a Hoodie table.
+ * Abstract Write Client providing functionality for performing commit, index 
updates and rollback
+ * Reused for regular write operations like upsert/insert/bulk-insert.. as 
well as bootstrap
+ *
+ * @param <T> Sub type of HoodieRecordPayload
+ * @param <I> Type of inputs
+ * @param <K> Type of keys
+ * @param <O> Type of outputs
+ * @param <P> Type of record position [Key, Option[partitionPath, fileID]] in 
hoodie table
  */
-public class HoodieWriteClient<T extends HoodieRecordPayload> extends 
AbstractHoodieWriteClient<T> {
-
+public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, 
I, K, O, P> extends AbstractHoodieClient {
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LogManager.getLogger(HoodieWriteClient.class);
-  private static final String LOOKUP_STR = "lookup";
-  private final boolean rollbackPending;
-  private final transient HoodieMetrics metrics;
-  private transient Timer.Context compactionTimer;
+  private static final Logger LOG = 
LogManager.getLogger(AbstractHoodieWriteClient.class);
+
+  protected final transient HoodieMetrics metrics;
+  private final transient HoodieIndex<T, I, K, O, P> index;
+
+  protected transient Timer.Context writeContext = null;
+  private transient WriteOperationType operationType;
+  private transient HoodieWriteCommitCallback commitCallback;
+
+  protected static final String LOOKUP_STR = "lookup";

Review comment:
       can we move all the static members to the top, like how it was before. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java
##########
@@ -18,17 +18,18 @@
 
 package org.apache.hudi.client.bootstrap.selector;
 
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.client.bootstrap.BootstrapMode;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.util.List;

Review comment:
       are these from reformatting via IDE . 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -18,120 +18,195 @@
 
 package org.apache.hudi.client;
 
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
+import org.apache.hudi.client.embebbed.BaseEmbeddedTimelineService;
+import org.apache.hudi.common.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRestoreException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metrics.HoodieMetrics;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.HoodieTimelineArchiveLog;
-import org.apache.hudi.table.MarkerFiles;
 import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.compact.CompactHelpers;
 import org.apache.hudi.table.action.savepoint.SavepointHelpers;
-
-import com.codahale.metrics.Timer;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
- * Hoodie Write Client helps you build tables on HDFS [insert()] and then 
perform efficient mutations on an HDFS
- * table [upsert()]
- * <p>
- * Note that, at any given time, there can only be one Spark job performing 
these operations on a Hoodie table.
+ * Abstract Write Client providing functionality for performing commit, index 
updates and rollback
+ * Reused for regular write operations like upsert/insert/bulk-insert.. as 
well as bootstrap
+ *
+ * @param <T> Sub type of HoodieRecordPayload
+ * @param <I> Type of inputs
+ * @param <K> Type of keys
+ * @param <O> Type of outputs
+ * @param <P> Type of record position [Key, Option[partitionPath, fileID]] in 
hoodie table
  */
-public class HoodieWriteClient<T extends HoodieRecordPayload> extends 
AbstractHoodieWriteClient<T> {
-
+public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, 
I, K, O, P> extends AbstractHoodieClient {
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LogManager.getLogger(HoodieWriteClient.class);
-  private static final String LOOKUP_STR = "lookup";
-  private final boolean rollbackPending;
-  private final transient HoodieMetrics metrics;
-  private transient Timer.Context compactionTimer;
+  private static final Logger LOG = 
LogManager.getLogger(AbstractHoodieWriteClient.class);
+
+  protected final transient HoodieMetrics metrics;
+  private final transient HoodieIndex<T, I, K, O, P> index;
+
+  protected transient Timer.Context writeContext = null;
+  private transient WriteOperationType operationType;
+  private transient HoodieWriteCommitCallback commitCallback;
+
+  protected static final String LOOKUP_STR = "lookup";
+  protected final boolean rollbackPending;
+  protected transient Timer.Context compactionTimer;
   private transient AsyncCleanerService asyncCleanerService;
 
+  public void setOperationType(WriteOperationType operationType) {
+    this.operationType = operationType;
+  }
+
+  public WriteOperationType getOperationType() {
+    return this.operationType;
+  }
+
   /**
    * Create a write client, without cleaning up failed/inflight commits.
    *
-   * @param jsc Java Spark Context
+   * @param context      Java Spark Context
    * @param clientConfig instance of HoodieWriteConfig
    */
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig) {
-    this(jsc, clientConfig, false);
+  public AbstractHoodieWriteClient(HoodieEngineContext context, 
HoodieWriteConfig clientConfig) {
+    this(context, clientConfig, false);
   }
 
   /**
    * Create a write client, with new hudi index.
    *
-   * @param jsc Java Spark Context
-   * @param writeConfig instance of HoodieWriteConfig
+   * @param context         HoodieEngineContext
+   * @param writeConfig     instance of HoodieWriteConfig
    * @param rollbackPending whether need to cleanup pending commits
    */
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
writeConfig, boolean rollbackPending) {
-    this(jsc, writeConfig, rollbackPending, 
HoodieIndex.createIndex(writeConfig));
-  }
-
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
writeConfig, boolean rollbackPending, HoodieIndex index) {
-    this(jsc, writeConfig, rollbackPending, index, Option.empty());
+  public AbstractHoodieWriteClient(HoodieEngineContext context, 
HoodieWriteConfig writeConfig, boolean rollbackPending) {
+    this(context, writeConfig, rollbackPending, Option.empty());
   }
 
   /**
-   *  Create a write client, allows to specify all parameters.
+   * Create a write client, allows to specify all parameters.
    *
-   * @param jsc Java Spark Context
-   * @param writeConfig instance of HoodieWriteConfig
+   * @param context         HoodieEngineContext
+   * @param writeConfig     instance of HoodieWriteConfig
    * @param rollbackPending whether need to cleanup pending commits
    * @param timelineService Timeline Service that runs as part of write client.
    */
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
writeConfig, boolean rollbackPending,
-      HoodieIndex index, Option<EmbeddedTimelineService> timelineService) {
-    super(jsc, index, writeConfig, timelineService);
+  public AbstractHoodieWriteClient(HoodieEngineContext context, 
HoodieWriteConfig writeConfig, boolean rollbackPending,
+                                   Option<BaseEmbeddedTimelineService> 
timelineService) {
+    super(context, writeConfig, timelineService);
     this.metrics = new HoodieMetrics(config, config.getTableName());
     this.rollbackPending = rollbackPending;
+    this.index = createIndex(writeConfig);
   }
 
+  protected abstract HoodieIndex<T, I, K, O, P> createIndex(HoodieWriteConfig 
writeConfig);
+
   /**
-   * Register hudi classes for Kryo serialization.
-   *
-   * @param conf instance of SparkConf
-   * @return SparkConf
+   * Commit changes performed at the given instantTime marker.
    */
-  public static SparkConf registerClasses(SparkConf conf) {

Review comment:
       guessing this is all moved to spark client now? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/BaseLazyInsertIterable.java
##########
@@ -18,64 +18,47 @@
 
 package org.apache.hudi.execution;
 
-import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.client.TaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.LazyIterableIterator;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.io.WriteHandleFactory;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-
 import java.util.Iterator;
 import java.util.List;
 import java.util.function.Function;
 
 /**
  * Lazy Iterable, that writes a stream of HoodieRecords sorted by the 
partitionPath, into new files.
  */
-public class LazyInsertIterable<T extends HoodieRecordPayload>
+public abstract class BaseLazyInsertIterable<T extends HoodieRecordPayload>

Review comment:
       In general, not sure if this class is applicable outside of Spark. but 
we do use it in all of the code paths. So understand that we needed to do this. 
   

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
##########
@@ -21,94 +21,52 @@
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIClass;
 import org.apache.hudi.PublicAPIMethod;
-import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieEngineContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
-import org.apache.hudi.index.bloom.HoodieBloomIndex;
-import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
-import org.apache.hudi.index.hbase.HBaseIndex;
-import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex;
-import org.apache.hudi.index.simple.HoodieSimpleIndex;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
 import java.io.Serializable;
 
 /**
  * Base class for different types of indexes to determine the mapping from 
uuid.
  */
 @PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
-public abstract class HoodieIndex<T extends HoodieRecordPayload> implements 
Serializable {
+public abstract class HoodieIndex<T extends HoodieRecordPayload, I, K, O, P> 
implements Serializable {
 
   protected final HoodieWriteConfig config;
 
   protected HoodieIndex(HoodieWriteConfig config) {
     this.config = config;
   }
 
-  public static <T extends HoodieRecordPayload> HoodieIndex<T> createIndex(

Review comment:
       some of these index types don't make sense without Spark Index now. 
actually almost all of them except may be HBaseIndex. 
   So these should all be renamed with the `Spark` prefix 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
##########
@@ -21,94 +21,52 @@
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIClass;
 import org.apache.hudi.PublicAPIMethod;
-import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieEngineContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
-import org.apache.hudi.index.bloom.HoodieBloomIndex;
-import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
-import org.apache.hudi.index.hbase.HBaseIndex;
-import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex;
-import org.apache.hudi.index.simple.HoodieSimpleIndex;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
 import java.io.Serializable;
 
 /**
  * Base class for different types of indexes to determine the mapping from 
uuid.
  */
 @PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
-public abstract class HoodieIndex<T extends HoodieRecordPayload> implements 
Serializable {
+public abstract class HoodieIndex<T extends HoodieRecordPayload, I, K, O, P> 
implements Serializable {
 
   protected final HoodieWriteConfig config;
 
   protected HoodieIndex(HoodieWriteConfig config) {
     this.config = config;
   }
 
-  public static <T extends HoodieRecordPayload> HoodieIndex<T> createIndex(
-      HoodieWriteConfig config) throws HoodieIndexException {
-    // first use index class config to create index.
-    if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
-      Object instance = ReflectionUtils.loadClass(config.getIndexClass(), 
config);
-      if (!(instance instanceof HoodieIndex)) {
-        throw new HoodieIndexException(config.getIndexClass() + " is not a 
subclass of HoodieIndex");
-      }
-      return (HoodieIndex) instance;
-    }
-    switch (config.getIndexType()) {
-      case HBASE:
-        return new HBaseIndex<>(config);
-      case INMEMORY:
-        return new InMemoryHashIndex<>(config);
-      case BLOOM:
-        return new HoodieBloomIndex<>(config);
-      case GLOBAL_BLOOM:
-        return new HoodieGlobalBloomIndex<>(config);
-      case SIMPLE:
-        return new HoodieSimpleIndex<>(config);
-      case GLOBAL_SIMPLE:
-        return new HoodieGlobalSimpleIndex<>(config);
-      default:
-        throw new HoodieIndexException("Index type unspecified, set " + 
config.getIndexType());
-    }
-  }
-
   /**
    * Checks if the given [Keys] exists in the hoodie table and returns [Key, 
Option[partitionPath, fileID]] If the
    * optional is empty, then the key is not found.
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
-  public abstract JavaPairRDD<HoodieKey, Option<Pair<String, String>>> 
fetchRecordLocation(
-      JavaRDD<HoodieKey> hoodieKeys, final JavaSparkContext jsc, 
HoodieTable<T> hoodieTable);
+  public abstract P fetchRecordLocation(

Review comment:
       now I understand `P` better. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/hbase/BaseHoodieHBaseIndex.java
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.hbase;
+
+import org.apache.hudi.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.config.HoodieHBaseIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieDependentSystemUnavailableException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Hoodie Index implementation backed by HBase.
+ */
+public abstract class BaseHoodieHBaseIndex<T extends HoodieRecordPayload, I, 
K, O, P> extends HoodieIndex<T, I, K, O, P> {

Review comment:
       are there any code changes here, i.e logic changes? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndex.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * Indexing mechanism based on bloom filter. Each parquet file includes its 
row_key bloom filter in its metadata.
+ */
+public abstract class BaseHoodieBloomIndex<T extends HoodieRecordPayload, I, 
K, O, P> extends HoodieIndex<T, I, K, O, P> {

Review comment:
       I suggest introducing a `SparkHoodieIndex` base class

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/HoodieEngineContext.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common;
+
+import org.apache.hudi.client.TaskContextSupplier;
+import org.apache.hudi.common.config.SerializableConfiguration;
+
+/**
+ * Base class contains the context information needed by the engine at 
runtime. It will be extended by different
+ * engine implementation if needed.
+ */
+public class HoodieEngineContext {
+  /**
+   * A wrapped hadoop configuration which can be serialized.
+   */
+  private SerializableConfiguration hadoopConf;

Review comment:
       I am okay leaving it as `hadoopConf` given that's what we wrap. leave it 
you both :) 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
##########
@@ -161,11 +108,11 @@ private static GenericRecord 
transformRecordBasedOnNewSchema(GenericDatumReader<
   /**
    * Consumer that dequeues records from queue and sends to Merge Handle.
    */
-  private static class UpdateHandler extends 
BoundedInMemoryQueueConsumer<GenericRecord, Void> {
+  static class UpdateHandler extends 
BoundedInMemoryQueueConsumer<GenericRecord, Void> {
 
-    private final HoodieMergeHandle upsertHandle;
+    private final HoodieWriteHandle upsertHandle;

Review comment:
       why is this no longer a mergeHandle? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to