This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6dcd0a3524fe7be0bbbd3e673ed7e1d4b035e0cb
Author: Balaji Varadarajan <varad...@uber.com>
AuthorDate: Tue Jun 2 01:49:37 2020 -0700

    [HUDI-988] Fix Unit Test Flakiness : Ensure all instantiations of 
HoodieWriteClient is closed properly. Fix bug in TestRollbacks. Make CLI unit 
tests for Hudi CLI check skip redering strings
---
 .../apache/hudi/cli/HoodieTableHeaderFields.java   |  16 +
 .../org/apache/hudi/cli/commands/StatsCommand.java |   4 +-
 .../cli/commands/AbstractShellIntegrationTest.java |   2 +-
 .../hudi/cli/commands/TestRepairsCommand.java      | 206 -----
 .../org/apache/hudi/client/HoodieWriteClient.java  |   2 +-
 .../apache/hudi/client/TestHoodieClientBase.java   | 938 ++++++++++-----------
 .../java/org/apache/hudi/client/TestMultiFS.java   |   4 -
 .../hudi/client/TestUpdateSchemaEvolution.java     |   4 +-
 .../hudi/common/HoodieClientTestHarness.java       | 426 +++++-----
 .../hudi/index/TestHBaseQPSResourceAllocator.java  |   2 +-
 .../java/org/apache/hudi/index/TestHbaseIndex.java |  17 +-
 .../org/apache/hudi/index/TestHoodieIndex.java     |   2 +-
 .../hudi/index/bloom/TestHoodieBloomIndex.java     |   2 +-
 .../index/bloom/TestHoodieGlobalBloomIndex.java    |   2 +-
 .../org/apache/hudi/io/TestHoodieMergeHandle.java  |  12 +-
 .../apache/hudi/table/TestCopyOnWriteTable.java    |   5 +-
 .../apache/hudi/table/TestMergeOnReadTable.java    |  38 +-
 .../hudi/table/compact/TestHoodieCompactor.java    |  12 +-
 pom.xml                                            |   1 +
 19 files changed, 745 insertions(+), 950 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
index 2e3bc01..708ae29 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
@@ -33,4 +33,20 @@ public class HoodieTableHeaderFields {
   public static final String HEADER_HOODIE_PROPERTY = "Property";
   public static final String HEADER_OLD_VALUE = "Old Value";
   public static final String HEADER_NEW_VALUE = "New Value";
+
+  /**
+   * Fields of Stats.
+   */
+  public static final String HEADER_COMMIT_TIME = "CommitTime";
+  public static final String HEADER_TOTAL_UPSERTED = "Total Upserted";
+  public static final String HEADER_TOTAL_WRITTEN = "Total Written";
+  public static final String HEADER_WRITE_AMPLIFICATION_FACTOR = "Write 
Amplification Factor";
+  public static final String HEADER_HISTOGRAM_MIN = "Min";
+  public static final String HEADER_HISTOGRAM_10TH = "10th";
+  public static final String HEADER_HISTOGRAM_50TH = "50th";
+  public static final String HEADER_HISTOGRAM_AVG = "avg";
+  public static final String HEADER_HISTOGRAM_95TH = "95th";
+  public static final String HEADER_HISTOGRAM_MAX = "Max";
+  public static final String HEADER_HISTOGRAM_NUM_FILES = "NumFiles";
+  public static final String HEADER_HISTOGRAM_STD_DEV = "StdDev";
 }
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
index b05aee2..4874777 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
@@ -54,7 +54,7 @@ import java.util.stream.Collectors;
 @Component
 public class StatsCommand implements CommandMarker {
 
-  private static final int MAX_FILES = 1000000;
+  public static final int MAX_FILES = 1000000;
 
   @CliCommand(value = "stats wa", help = "Write Amplification. Ratio of how 
many records were upserted to how many "
       + "records were actually written")
@@ -97,7 +97,7 @@ public class StatsCommand implements CommandMarker {
     return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, 
descending, limit, headerOnly, rows);
   }
 
-  private Comparable[] printFileSizeHistogram(String commitTime, Snapshot s) {
+  public Comparable[] printFileSizeHistogram(String commitTime, Snapshot s) {
     return new Comparable[] {commitTime, s.getMin(), s.getValue(0.1), 
s.getMedian(), s.getMean(), s.get95thPercentile(),
         s.getMax(), s.size(), s.getStdDev()};
   }
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
index ad81af5..d9f1688 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
@@ -58,4 +58,4 @@ public abstract class AbstractShellIntegrationTest extends 
HoodieClientTestHarne
   protected static JLineShellComponent getShell() {
     return shell;
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
deleted file mode 100644
index 9e78ac7..0000000
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.cli.commands;
-
-import org.apache.hudi.cli.HoodieCLI;
-import org.apache.hudi.cli.HoodiePrintHelper;
-import org.apache.hudi.cli.HoodieTableHeaderFields;
-import org.apache.hudi.common.HoodieTestDataGenerator;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.TimelineLayoutVersion;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.FSUtils;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.junit.Before;
-import org.junit.Test;
-import org.springframework.shell.core.CommandResult;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * Test class for {@link RepairsCommand}.
- */
-public class TestRepairsCommand extends AbstractShellIntegrationTest {
-
-  private String tablePath;
-
-  @Before
-  public void init() throws IOException {
-    String tableName = "test_table";
-    tablePath = basePath + File.separator + tableName;
-
-    // Create table and connect
-    new TableCommand().createTable(
-        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
-        "", TimelineLayoutVersion.VERSION_1, 
"org.apache.hudi.common.model.HoodieAvroPayload");
-  }
-
-  /**
-   * Test case for dry run 'repair addpartitionmeta'.
-   */
-  @Test
-  public void testAddPartitionMetaWithDryRun() throws IOException {
-    // create commit instant
-    Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit"));
-
-    // create partition path
-    String partition1 = tablePath + File.separator + 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-    String partition2 = tablePath + File.separator + 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
-    String partition3 = tablePath + File.separator + 
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
-    assertTrue(fs.mkdirs(new Path(partition1)));
-    assertTrue(fs.mkdirs(new Path(partition2)));
-    assertTrue(fs.mkdirs(new Path(partition3)));
-
-    // default is dry run.
-    CommandResult cr = getShell().executeCommand("repair addpartitionmeta");
-    assertTrue(cr.isSuccess());
-
-    // expected all 'No'.
-    String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, 
tablePath)
-        .stream()
-        .map(partition -> new String[] {partition, "No", "None"})
-        .toArray(String[][]::new);
-    String expected = HoodiePrintHelper.print(new String[] 
{HoodieTableHeaderFields.HEADER_PARTITION_PATH,
-        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, 
HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
-
-    assertEquals(expected, cr.getResult().toString());
-  }
-
-  /**
-   * Test case for real run 'repair addpartitionmeta'.
-   */
-  @Test
-  public void testAddPartitionMetaWithRealRun() throws IOException {
-    // create commit instant
-    Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit"));
-
-    // create partition path
-    String partition1 = tablePath + File.separator + 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-    String partition2 = tablePath + File.separator + 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
-    String partition3 = tablePath + File.separator + 
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
-    assertTrue(fs.mkdirs(new Path(partition1)));
-    assertTrue(fs.mkdirs(new Path(partition2)));
-    assertTrue(fs.mkdirs(new Path(partition3)));
-
-    CommandResult cr = getShell().executeCommand("repair addpartitionmeta 
--dryrun false");
-    assertTrue(cr.isSuccess());
-
-    List<String> paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, 
tablePath);
-    // after dry run, the action will be 'Repaired'
-    String[][] rows = paths.stream()
-        .map(partition -> new String[] {partition, "No", "Repaired"})
-        .toArray(String[][]::new);
-    String expected = HoodiePrintHelper.print(new String[] 
{HoodieTableHeaderFields.HEADER_PARTITION_PATH,
-        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, 
HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
-
-    assertEquals(expected, cr.getResult().toString());
-
-    cr = getShell().executeCommand("repair addpartitionmeta");
-
-    // after real run, Metadata is present now.
-    rows = paths.stream()
-        .map(partition -> new String[] {partition, "Yes", "None"})
-        .toArray(String[][]::new);
-    expected = HoodiePrintHelper.print(new String[] 
{HoodieTableHeaderFields.HEADER_PARTITION_PATH,
-        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, 
HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
-    assertEquals(expected, cr.getResult().toString());
-  }
-
-  /**
-   * Test case for 'repair overwrite-hoodie-props'.
-   */
-  @Test
-  public void testOverwriteHoodieProperties() throws IOException {
-    URL newProps = 
this.getClass().getClassLoader().getResource("table-config.properties");
-    assertNotNull("New property file must exist", newProps);
-
-    CommandResult cr = getShell().executeCommand("repair 
overwrite-hoodie-props --new-props-file " + newProps.getPath());
-    assertTrue(cr.isSuccess());
-
-    Map<String, String> oldProps = 
HoodieCLI.getTableMetaClient().getTableConfig().getProps();
-
-    // after overwrite, the stored value in .hoodie is equals to which read 
from properties.
-    Map<String, String> result = 
HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps();
-    Properties expectProps = new Properties();
-    expectProps.load(new FileInputStream(new File(newProps.getPath())));
-
-    Map<String, String> expected = expectProps.entrySet().stream()
-        .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> 
String.valueOf(e.getValue())));
-    assertEquals(expected, result);
-
-    // check result
-    List<String> allPropsStr = Arrays.asList("hoodie.table.name", 
"hoodie.table.type",
-        "hoodie.archivelog.folder", "hoodie.timeline.layout.version");
-    String[][] rows = allPropsStr.stream().sorted().map(key -> new String[] 
{key,
-        oldProps.getOrDefault(key, null), result.getOrDefault(key, null)})
-        .toArray(String[][]::new);
-    String expect = HoodiePrintHelper.print(new String[] 
{HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
-        HoodieTableHeaderFields.HEADER_OLD_VALUE, 
HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
-
-    assertEquals(expect, cr.getResult().toString());
-  }
-
-  /**
-   * Test case for 'repair corrupted clean files'.
-   */
-  @Test
-  public void testRemoveCorruptedPendingCleanAction() throws IOException {
-    HoodieCLI.conf = jsc.hadoopConfiguration();
-
-    Configuration conf = HoodieCLI.conf;
-
-    metaClient = HoodieCLI.getTableMetaClient();
-
-    // Create four requested files
-    for (int i = 100; i < 104; i++) {
-      String timestamp = String.valueOf(i);
-      // Write corrupted requested Compaction
-      
HoodieTestCommitMetadataGenerator.createEmptyCleanRequestedFile(tablePath, 
timestamp, conf);
-    }
-
-    // reload meta client
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    // first, there are four instants
-    assertEquals(4, 
metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
-
-    CommandResult cr = getShell().executeCommand("repair corrupted clean 
files");
-    assertTrue(cr.isSuccess());
-
-    // reload meta client
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    assertEquals(0, 
metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
-  }
-}
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java 
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 37dfe3d..f5f6233 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -120,7 +120,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     this(jsc, clientConfig, rollbackPending, 
HoodieIndex.createIndex(clientConfig, jsc));
   }
 
-  HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, 
boolean rollbackPending, HoodieIndex index) {
+  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig, boolean rollbackPending, HoodieIndex index) {
     this(jsc, clientConfig, rollbackPending, index, Option.empty());
   }
 
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java 
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
index 5f47bf5..6e6458b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.client;
 
-import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
@@ -50,7 +50,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.sql.SQLContext;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -73,496 +72,477 @@ import static org.junit.Assert.assertTrue;
  */
 public class TestHoodieClientBase extends HoodieClientTestHarness {
 
-  private static final Logger LOG = 
LogManager.getLogger(TestHoodieClientBase.class);
-
-  @Before
-  public void setUp() throws Exception {
-    initResources();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    cleanupResources();
-  }
-
-  protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
-    return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, 
cfg.getTableName()));
-  }
-
-  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
-    return getHoodieWriteClient(cfg, false);
-  }
-
-  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, 
boolean rollbackInflightCommit) {
-    return getHoodieWriteClient(cfg, rollbackInflightCommit, 
HoodieIndex.createIndex(cfg, jsc));
-  }
-
-  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, 
boolean rollbackInflightCommit,
-                                                   HoodieIndex index) {
-    return new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
-  }
-
-  protected HoodieReadClient getHoodieReadClient(String basePath) {
-    return new HoodieReadClient(jsc, basePath, 
SQLContext.getOrCreate(jsc.sc()));
-  }
-
-  /**
-   * Get Default HoodieWriteConfig for tests.
-   *
-   * @return Default Hoodie Write Config for tests
-   */
-  protected HoodieWriteConfig getConfig() {
-    return getConfigBuilder().build();
-  }
-
-  protected HoodieWriteConfig getConfig(IndexType indexType) {
-    return getConfigBuilder(indexType).build();
-  }
-
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  protected HoodieWriteConfig.Builder getConfigBuilder() {
-    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
-  }
-
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
-    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, 
indexType);
-  }
-
-  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
-    return getConfigBuilder(schemaStr, IndexType.BLOOM);
-  }
-
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType 
indexType) {
-    return 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
-        .withParallelism(2, 
2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
-        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
-        .withWriteStatusClass(MetadataMergeWriteStatus.class)
-        
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
-        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
 * 1024).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 
* 1024).build())
-        .forTable("test-trip-table")
-        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
-        
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
-            
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
-  }
-
-  protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, 
HoodieWriteConfig config) {
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-    ((SyncableFileSystemView) (table.getSliceView())).reset();
-    return table;
-  }
-
-  /**
-   * Assert no failures in writing hoodie files.
-   *
-   * @param statuses List of Write Status
-   */
-  public static void assertNoWriteErrors(List<WriteStatus> statuses) {
-    // Verify there are no errors
-    for (WriteStatus status : statuses) {
-      assertFalse("Errors found in write of " + status.getFileId(), 
status.hasErrors());
+    private static final Logger LOG = 
LogManager.getLogger(TestHoodieClientBase.class);
+
+    @Before
+    public void setUp() throws Exception {
+        initResources();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        cleanupResources();
+    }
+
+    protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
+        return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, 
cfg.getTableName()));
+    }
+
+    /**
+     * Get Default HoodieWriteConfig for tests.
+     *
+     * @return Default Hoodie Write Config for tests
+     */
+    protected HoodieWriteConfig getConfig() {
+        return getConfigBuilder().build();
     }
-  }
-
-  void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, 
FileSystem fs) throws IOException {
-    Set<String> partitionPathSet = inputRecords.stream()
-        .map(HoodieRecord::getPartitionPath)
-        .collect(Collectors.toSet());
-    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), 
fs);
-  }
-
-  void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem 
fs) throws IOException {
-    Set<String> partitionPathSet = inputKeys.stream()
-        .map(HoodieKey::getPartitionPath)
-        .collect(Collectors.toSet());
-    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), 
fs);
-  }
-
-  /**
-   * Ensure presence of partition meta-data at known depth.
-   *
-   * @param partitionPaths Partition paths to check
-   * @param fs             File System
-   * @throws IOException in case of error
-   */
-  void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws 
IOException {
-    for (String partitionPath : partitionPaths) {
-      assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new 
Path(basePath, partitionPath)));
-      HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new 
Path(basePath, partitionPath));
-      pmeta.readFromFS();
-      Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, 
pmeta.getPartitionDepth());
+
+    protected HoodieWriteConfig getConfig(IndexType indexType) {
+        return getConfigBuilder(indexType).build();
+    }
+
+    /**
+     * Get Config builder with default configs set.
+     *
+     * @return Config Builder
+     */
+    protected HoodieWriteConfig.Builder getConfigBuilder() {
+        return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
     }
-  }
-
-  /**
-   * Ensure records have location field set.
-   *
-   * @param taggedRecords Tagged Records
-   * @param commitTime    Commit Timestamp
-   */
-  protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String 
commitTime) {
-    for (HoodieRecord rec : taggedRecords) {
-      assertTrue("Record " + rec + " found with no location.", 
rec.isCurrentLocationKnown());
-      assertEquals("All records should have commit time " + commitTime + ", 
since updates were made",
-          rec.getCurrentLocation().getInstantTime(), commitTime);
+
+    /**
+     * Get Config builder with default configs set.
+     *
+     * @return Config Builder
+     */
+    HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
+        return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, 
indexType);
     }
-  }
-
-  /**
-   * Assert that there is no duplicate key at the partition level.
-   *
-   * @param records List of Hoodie records
-   */
-  void assertNodupesWithinPartition(List<HoodieRecord> records) {
-    Map<String, Set<String>> partitionToKeys = new HashMap<>();
-    for (HoodieRecord r : records) {
-      String key = r.getRecordKey();
-      String partitionPath = r.getPartitionPath();
-      if (!partitionToKeys.containsKey(partitionPath)) {
-        partitionToKeys.put(partitionPath, new HashSet<>());
-      }
-      assertFalse("key " + key + " is duplicate within partition " + 
partitionPath, partitionToKeys.get(partitionPath).contains(key));
-      partitionToKeys.get(partitionPath).add(key);
+
+    HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
+        return getConfigBuilder(schemaStr, IndexType.BLOOM);
     }
-  }
-
-  /**
-   * Helper to generate records generation function for testing Prepped 
version of API. Prepped APIs expect the records
-   * to be already de-duped and have location set. This wrapper takes care of 
record-location setting. Uniqueness is
-   * guaranteed by record-generation function itself.
-   *
-   * @param writeConfig       Hoodie Write Config
-   * @param recordGenFunction Records Generation function
-   * @return Wrapped function
-   */
-  private Function2<List<HoodieRecord>, String, Integer> 
wrapRecordsGenFunctionForPreppedCalls(
-      final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, 
String, Integer> recordGenFunction) {
-    return (commit, numRecords) -> {
-      final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
-      List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
-      final HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, 
jsc);
-      JavaRDD<HoodieRecord> taggedRecords = 
index.tagLocation(jsc.parallelize(records, 1), jsc, table);
-      return taggedRecords.collect();
-    };
-  }
-
-  /**
-   * Helper to generate delete keys generation function for testing Prepped 
version of API. Prepped APIs expect the keys
-   * to be already de-duped and have location set. This wrapper takes care of 
record-location setting. Uniqueness is
-   * guaranteed by key-generation function itself.
-   *
-   * @param writeConfig    Hoodie Write Config
-   * @param keyGenFunction Keys Generation function
-   * @return Wrapped function
-   */
-  private Function<Integer, List<HoodieKey>> 
wrapDeleteKeysGenFunctionForPreppedCalls(
-      final HoodieWriteConfig writeConfig, final Function<Integer, 
List<HoodieKey>> keyGenFunction) {
-    return (numRecords) -> {
-      final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
-      List<HoodieKey> records = keyGenFunction.apply(numRecords);
-      final HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, 
jsc);
-      JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
-          .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
-      JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, 
jsc, table);
-      return taggedRecords.map(record -> record.getKey()).collect();
-    };
-  }
-
-  /**
-   * Generate wrapper for record generation function for testing Prepped APIs.
-   *
-   * @param isPreppedAPI Flag to indicate if this is for testing 
prepped-version of APIs
-   * @param writeConfig  Hoodie Write Config
-   * @param wrapped      Actual Records Generation function
-   * @return Wrapped Function
-   */
-  protected Function2<List<HoodieRecord>, String, Integer> 
generateWrapRecordsFn(boolean isPreppedAPI,
-                                                                               
  HoodieWriteConfig writeConfig,
-                                                                               
  Function2<List<HoodieRecord>, String, Integer> wrapped) {
-    if (isPreppedAPI) {
-      return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
-    } else {
-      return wrapped;
+
+    /**
+     * Get Config builder with default configs set.
+     *
+     * @return Config Builder
+     */
+    HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType 
indexType) {
+        return 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
+            .withParallelism(2, 
2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
+            .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+            .withWriteStatusClass(MetadataMergeWriteStatus.class)
+            
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+            
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
 * 1024).build())
+            
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 
1024).build())
+            .forTable("test-trip-table")
+            
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
+            
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+                
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
     }
-  }
-
-  /**
-   * Generate wrapper for delete key generation function for testing Prepped 
APIs.
-   *
-   * @param isPreppedAPI Flag to indicate if this is for testing 
prepped-version of APIs
-   * @param writeConfig  Hoodie Write Config
-   * @param wrapped      Actual Records Generation function
-   * @return Wrapped Function
-   */
-  Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean 
isPreppedAPI,
-                                                                       
HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
-    if (isPreppedAPI) {
-      return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
-    } else {
-      return wrapped;
+
+    protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, 
HoodieWriteConfig config) {
+        HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, 
jsc);
+        ((SyncableFileSystemView) (table.getSliceView())).reset();
+        return table;
     }
-  }
-
-  /**
-   * Helper to insert first batch of records and do regular assertions on the 
state after successful completion.
-   *
-   * @param writeConfig            Hoodie Write Config
-   * @param client                 Hoodie Write Client
-   * @param newCommitTime          New Commit Timestamp to be used
-   * @param initCommitTime         Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit Number of records to be added in the new 
commit
-   * @param writeFn                Write Function to be used for insertion
-   * @param isPreppedAPI           Boolean flag to indicate writeFn expects 
prepped records
-   * @param assertForCommit        Enable Assertion of Writes
-   * @param expRecordsInThisCommit Expected number of records in this commit
-   * @return RDD of write-status
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, 
HoodieWriteClient client, String newCommitTime,
-                                        String initCommitTime, int 
numRecordsInThisCommit,
-                                        Function3<JavaRDD<WriteStatus>, 
HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
-                                        boolean assertForCommit, int 
expRecordsInThisCommit) throws Exception {
-    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
-        generateWrapRecordsFn(isPreppedAPI, writeConfig, 
dataGen::generateInserts);
-
-    return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), 
initCommitTime, numRecordsInThisCommit,
-        recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, 
expRecordsInThisCommit, 1);
-  }
-
-  /**
-   * Helper to upsert batch of records and do regular assertions on the state 
after successful completion.
-   *
-   * @param writeConfig                  Hoodie Write Config
-   * @param client                       Hoodie Write Client
-   * @param newCommitTime                New Commit Timestamp to be used
-   * @param prevCommitTime               Commit Timestamp used in previous 
commit
-   * @param commitTimesBetweenPrevAndNew Sample of Timestamps between 
prevCommitTime and newCommitTime
-   * @param initCommitTime               Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit       Number of records to be added in the 
new commit
-   * @param writeFn                      Write Function to be used for upsert
-   * @param isPreppedAPI                 Boolean flag to indicate writeFn 
expects prepped records
-   * @param assertForCommit              Enable Assertion of Writes
-   * @param expRecordsInThisCommit       Expected number of records in this 
commit
-   * @param expTotalRecords              Expected number of records when 
scanned
-   * @param expTotalCommits              Expected number of commits (including 
this commit)
-   * @return RDD of write-status
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, 
HoodieWriteClient client, String newCommitTime,
-                                   String prevCommitTime, Option<List<String>> 
commitTimesBetweenPrevAndNew, String initCommitTime,
-                                   int numRecordsInThisCommit,
-                                   Function3<JavaRDD<WriteStatus>, 
HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
-                                   boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws 
Exception {
-    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
-        generateWrapRecordsFn(isPreppedAPI, writeConfig, 
dataGen::generateUniqueUpdates);
-
-    return writeBatch(client, newCommitTime, prevCommitTime, 
commitTimesBetweenPrevAndNew, initCommitTime,
-        numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, 
expRecordsInThisCommit, expTotalRecords,
-        expTotalCommits);
-  }
-
-  /**
-   * Helper to delete batch of keys and do regular assertions on the state 
after successful completion.
-   *
-   * @param writeConfig            Hoodie Write Config
-   * @param client                 Hoodie Write Client
-   * @param newCommitTime          New Commit Timestamp to be used
-   * @param prevCommitTime         Commit Timestamp used in previous commit
-   * @param initCommitTime         Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit Number of records to be added in the new 
commit
-   * @param deleteFn               Delete Function to be used for deletes
-   * @param isPreppedAPI           Boolean flag to indicate writeFn expects 
prepped records
-   * @param assertForCommit        Enable Assertion of Writes
-   * @param expRecordsInThisCommit Expected number of records in this commit
-   * @param expTotalRecords        Expected number of records when scanned
-   * @return RDD of write-status
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, 
HoodieWriteClient client, String newCommitTime,
-                                   String prevCommitTime, String 
initCommitTime,
-                                   int numRecordsInThisCommit,
-                                   Function3<JavaRDD<WriteStatus>, 
HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
-                                   boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords) throws Exception {
-    final Function<Integer, List<HoodieKey>> keyGenFunction =
-        generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, 
dataGen::generateUniqueDeletes);
-
-    return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, 
numRecordsInThisCommit,
-        keyGenFunction,
-        deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
-  }
-
-  /**
-   * Helper to insert/upsert batch of records and do regular assertions on the 
state after successful completion.
-   *
-   * @param client                       Hoodie Write Client
-   * @param newCommitTime                New Commit Timestamp to be used
-   * @param prevCommitTime               Commit Timestamp used in previous 
commit
-   * @param commitTimesBetweenPrevAndNew Sample of Timestamps between 
prevCommitTime and newCommitTime
-   * @param initCommitTime               Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit       Number of records to be added in the 
new commit
-   * @param recordGenFunction            Records Generation Function
-   * @param writeFn                      Write Function to be used for upsert
-   * @param assertForCommit              Enable Assertion of Writes
-   * @param expRecordsInThisCommit       Expected number of records in this 
commit
-   * @param expTotalRecords              Expected number of records when 
scanned
-   * @param expTotalCommits              Expected number of commits (including 
this commit)
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String 
newCommitTime, String prevCommitTime,
-                                  Option<List<String>> 
commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
-                                  Function2<List<HoodieRecord>, String, 
Integer> recordGenFunction,
-                                  Function3<JavaRDD<WriteStatus>, 
HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
-                                  boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws 
Exception {
-
-    // Write 1 (only inserts)
-    client.startCommitWithTime(newCommitTime);
-
-    List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, 
numRecordsInThisCommit);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-
-    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, 
newCommitTime);
-    List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
-
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(records, fs);
-
-    // verify that there is a commit
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
-
-    if (assertForCommit) {
-      assertEquals("Expecting " + expTotalCommits + " commits.", 
expTotalCommits,
-          timeline.findInstantsAfter(initCommitTime, 
Integer.MAX_VALUE).countInstants());
-      Assert.assertEquals("Latest commit should be " + newCommitTime, 
newCommitTime,
-          timeline.lastInstant().get().getTimestamp());
-      assertEquals("Must contain " + expRecordsInThisCommit + " records", 
expRecordsInThisCommit,
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, 
newCommitTime).count());
-
-      // Check the entire dataset has all records still
-      String[] fullPartitionPaths = new 
String[dataGen.getPartitionPaths().length];
-      for (int i = 0; i < fullPartitionPaths.length; i++) {
-        fullPartitionPaths[i] = String.format("%s/%s/*", basePath, 
dataGen.getPartitionPaths()[i]);
-      }
-      assertEquals("Must contain " + expTotalRecords + " records", 
expTotalRecords,
-          HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, 
fullPartitionPaths).count());
-
-      // Check that the incremental consumption from prevCommitTime
-      assertEquals("Incremental consumption from " + prevCommitTime + " should 
give all records in latest commit",
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, 
newCommitTime).count(),
-          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, 
prevCommitTime).count());
-      if (commitTimesBetweenPrevAndNew.isPresent()) {
-        commitTimesBetweenPrevAndNew.get().forEach(ct -> {
-          assertEquals("Incremental consumption from " + ct + " should give 
all records in latest commit",
-              HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, 
newCommitTime).count(),
-              HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, 
ct).count());
-        });
-      }
+
+    /**
+     * Assert no failures in writing hoodie files.
+     *
+     * @param statuses List of Write Status
+     */
+    public static void assertNoWriteErrors(List<WriteStatus> statuses) {
+        // Verify there are no errors
+        for (WriteStatus status : statuses) {
+            assertFalse("Errors found in write of " + status.getFileId(), 
status.hasErrors());
+        }
     }
-    return result;
-  }
-
-  /**
-   * Helper to delete batch of hoodie keys and do regular assertions on the 
state after successful completion.
-   *
-   * @param client                 Hoodie Write Client
-   * @param newCommitTime          New Commit Timestamp to be used
-   * @param prevCommitTime         Commit Timestamp used in previous commit
-   * @param initCommitTime         Begin Timestamp (usually "000")
-   * @param keyGenFunction         Key Generation function
-   * @param deleteFn               Write Function to be used for delete
-   * @param assertForCommit        Enable Assertion of Writes
-   * @param expRecordsInThisCommit Expected number of records in this commit
-   * @param expTotalRecords        Expected number of records when scanned
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String 
newCommitTime, String prevCommitTime,
-                                   String initCommitTime, int 
numRecordsInThisCommit,
-                                   Function<Integer, List<HoodieKey>> 
keyGenFunction,
-                                   Function3<JavaRDD<WriteStatus>, 
HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
-                                   boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords) throws Exception {
-
-    // Delete 1 (only deletes)
-    client.startCommitWithTime(newCommitTime);
-
-    List<HoodieKey> keysToDelete = 
keyGenFunction.apply(numRecordsInThisCommit);
-    JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
-
-    JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, 
newCommitTime);
-    List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
-
-    // check the partition metadata is written out
-    assertPartitionMetadataForKeys(keysToDelete, fs);
-
-    // verify that there is a commit
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
-
-    if (assertForCommit) {
-      assertEquals("Expecting 3 commits.", 3,
-          timeline.findInstantsAfter(initCommitTime, 
Integer.MAX_VALUE).countInstants());
-      Assert.assertEquals("Latest commit should be " + newCommitTime, 
newCommitTime,
-          timeline.lastInstant().get().getTimestamp());
-      assertEquals("Must contain " + expRecordsInThisCommit + " records", 
expRecordsInThisCommit,
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, 
newCommitTime).count());
-
-      // Check the entire dataset has all records still
-      String[] fullPartitionPaths = new 
String[dataGen.getPartitionPaths().length];
-      for (int i = 0; i < fullPartitionPaths.length; i++) {
-        fullPartitionPaths[i] = String.format("%s/%s/*", basePath, 
dataGen.getPartitionPaths()[i]);
-      }
-      assertEquals("Must contain " + expTotalRecords + " records", 
expTotalRecords,
-          HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, 
fullPartitionPaths).count());
-
-      // Check that the incremental consumption from prevCommitTime
-      assertEquals("Incremental consumption from " + prevCommitTime + " should 
give no records in latest commit,"
-              + " since it is a delete operation",
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, 
newCommitTime).count(),
-          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, 
prevCommitTime).count());
+
+    void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, 
FileSystem fs) throws IOException {
+        Set<String> partitionPathSet = inputRecords.stream()
+            .map(HoodieRecord::getPartitionPath)
+            .collect(Collectors.toSet());
+        
assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
     }
-    return result;
-  }
 
-  /**
-   * Get Cleaner state corresponding to a partition path.
-   *
-   * @param hoodieCleanStatsTwo List of Clean Stats
-   * @param partitionPath       Partition path for filtering
-   * @return Cleaner state corresponding to partition path
-   */
-  protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> 
hoodieCleanStatsTwo, String partitionPath) {
-    return hoodieCleanStatsTwo.stream().filter(e -> 
e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
-  }
+    void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem 
fs) throws IOException {
+        Set<String> partitionPathSet = inputKeys.stream()
+            .map(HoodieKey::getPartitionPath)
+            .collect(Collectors.toSet());
+        
assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+    }
 
-  // Functional Interfaces for passing lambda and Hoodie Write API contexts
+    /**
+     * Ensure presence of partition meta-data at known depth.
+     *
+     * @param partitionPaths Partition paths to check
+     * @param fs File System
+     * @throws IOException in case of error
+     */
+    void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) 
throws IOException {
+        for (String partitionPath : partitionPaths) {
+            assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new 
Path(basePath, partitionPath)));
+            HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, 
new Path(basePath, partitionPath));
+            pmeta.readFromFS();
+            
Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, 
pmeta.getPartitionDepth());
+        }
+    }
 
-  @FunctionalInterface
-  public interface Function2<R, T1, T2> {
+    /**
+     * Ensure records have location field set.
+     *
+     * @param taggedRecords Tagged Records
+     * @param commitTime Commit Timestamp
+     */
+    protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String 
commitTime) {
+        for (HoodieRecord rec : taggedRecords) {
+            assertTrue("Record " + rec + " found with no location.", 
rec.isCurrentLocationKnown());
+            assertEquals("All records should have commit time " + commitTime + 
", since updates were made",
+                rec.getCurrentLocation().getInstantTime(), commitTime);
+        }
+    }
 
-    R apply(T1 v1, T2 v2) throws IOException;
-  }
+    /**
+     * Assert that there is no duplicate key at the partition level.
+     *
+     * @param records List of Hoodie records
+     */
+    void assertNodupesWithinPartition(List<HoodieRecord> records) {
+        Map<String, Set<String>> partitionToKeys = new HashMap<>();
+        for (HoodieRecord r : records) {
+            String key = r.getRecordKey();
+            String partitionPath = r.getPartitionPath();
+            if (!partitionToKeys.containsKey(partitionPath)) {
+                partitionToKeys.put(partitionPath, new HashSet<>());
+            }
+            assertFalse("key " + key + " is duplicate within partition " + 
partitionPath, partitionToKeys.get(partitionPath).contains(key));
+            partitionToKeys.get(partitionPath).add(key);
+        }
+    }
 
-  @FunctionalInterface
-  public interface Function3<R, T1, T2, T3> {
+    /**
+     * Helper to generate records generation function for testing Prepped 
version of API. Prepped APIs expect the records to be already de-duped and have 
location set. This wrapper takes care of
+     * record-location setting. Uniqueness is guaranteed by record-generation 
function itself.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param recordGenFunction Records Generation function
+     * @return Wrapped function
+     */
+    private Function2<List<HoodieRecord>, String, Integer> 
wrapRecordsGenFunctionForPreppedCalls(
+        final HoodieWriteConfig writeConfig, final 
Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
+        return (commit, numRecords) -> {
+            final HoodieIndex index = HoodieIndex.createIndex(writeConfig, 
jsc);
+            List<HoodieRecord> records = recordGenFunction.apply(commit, 
numRecords);
+            final HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+            HoodieTable table = HoodieTable.getHoodieTable(metaClient, 
writeConfig, jsc);
+            JavaRDD<HoodieRecord> taggedRecords = 
index.tagLocation(jsc.parallelize(records, 1), jsc, table);
+            return taggedRecords.collect();
+        };
+    }
 
-    R apply(T1 v1, T2 v2, T3 v3) throws IOException;
-  }
+    /**
+     * Helper to generate delete keys generation function for testing Prepped 
version of API. Prepped APIs expect the keys to be already de-duped and have 
location set. This wrapper takes care of
+     * record-location setting. Uniqueness is guaranteed by key-generation 
function itself.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param keyGenFunction Keys Generation function
+     * @return Wrapped function
+     */
+    private Function<Integer, List<HoodieKey>> 
wrapDeleteKeysGenFunctionForPreppedCalls(
+        final HoodieWriteConfig writeConfig, final Function<Integer, 
List<HoodieKey>> keyGenFunction) {
+        return (numRecords) -> {
+            final HoodieIndex index = HoodieIndex.createIndex(writeConfig, 
jsc);
+            List<HoodieKey> records = keyGenFunction.apply(numRecords);
+            final HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+            HoodieTable table = HoodieTable.getHoodieTable(metaClient, 
writeConfig, jsc);
+            JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
+                .map(key -> new HoodieRecord(key, new 
EmptyHoodieRecordPayload()));
+            JavaRDD<HoodieRecord> taggedRecords = 
index.tagLocation(recordsToDelete, jsc, table);
+            return taggedRecords.map(record -> record.getKey()).collect();
+        };
+    }
+
+    /**
+     * Generate wrapper for record generation function for testing Prepped 
APIs.
+     *
+     * @param isPreppedAPI Flag to indicate if this is for testing 
prepped-version of APIs
+     * @param writeConfig Hoodie Write Config
+     * @param wrapped Actual Records Generation function
+     * @return Wrapped Function
+     */
+    protected Function2<List<HoodieRecord>, String, Integer> 
generateWrapRecordsFn(boolean isPreppedAPI,
+        HoodieWriteConfig writeConfig,
+        Function2<List<HoodieRecord>, String, Integer> wrapped) {
+        if (isPreppedAPI) {
+            return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
+        } else {
+            return wrapped;
+        }
+    }
+
+    /**
+     * Generate wrapper for delete key generation function for testing Prepped 
APIs.
+     *
+     * @param isPreppedAPI Flag to indicate if this is for testing 
prepped-version of APIs
+     * @param writeConfig Hoodie Write Config
+     * @param wrapped Actual Records Generation function
+     * @return Wrapped Function
+     */
+    Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean 
isPreppedAPI,
+        HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> 
wrapped) {
+        if (isPreppedAPI) {
+            return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, 
wrapped);
+        } else {
+            return wrapped;
+        }
+    }
+
+    /**
+     * Helper to insert first batch of records and do regular assertions on 
the state after successful completion.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new 
commit
+     * @param writeFn Write Function to be used for insertion
+     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped 
records
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @return RDD of write-status
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, 
HoodieWriteClient client, String newCommitTime,
+        String initCommitTime, int numRecordsInThisCommit,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, 
JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+        boolean assertForCommit, int expRecordsInThisCommit) throws Exception {
+        final Function2<List<HoodieRecord>, String, Integer> recordGenFunction 
=
+            generateWrapRecordsFn(isPreppedAPI, writeConfig, 
dataGen::generateInserts);
+
+        return writeBatch(client, newCommitTime, initCommitTime, 
Option.empty(), initCommitTime, numRecordsInThisCommit,
+            recordGenFunction, writeFn, assertForCommit, 
expRecordsInThisCommit, expRecordsInThisCommit, 1);
+    }
+
+    /**
+     * Helper to upsert batch of records and do regular assertions on the 
state after successful completion.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param commitTimesBetweenPrevAndNew Sample of Timestamps between 
prevCommitTime and newCommitTime
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new 
commit
+     * @param writeFn Write Function to be used for upsert
+     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped 
records
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @param expTotalCommits Expected number of commits (including this 
commit)
+     * @return RDD of write-status
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, 
HoodieWriteClient client, String newCommitTime,
+        String prevCommitTime, Option<List<String>> 
commitTimesBetweenPrevAndNew, String initCommitTime,
+        int numRecordsInThisCommit,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, 
JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+        boolean assertForCommit, int expRecordsInThisCommit, int 
expTotalRecords, int expTotalCommits) throws Exception {
+        final Function2<List<HoodieRecord>, String, Integer> recordGenFunction 
=
+            generateWrapRecordsFn(isPreppedAPI, writeConfig, 
dataGen::generateUniqueUpdates);
+
+        return writeBatch(client, newCommitTime, prevCommitTime, 
commitTimesBetweenPrevAndNew, initCommitTime,
+            numRecordsInThisCommit, recordGenFunction, writeFn, 
assertForCommit, expRecordsInThisCommit, expTotalRecords,
+            expTotalCommits);
+    }
+
+    /**
+     * Helper to delete batch of keys and do regular assertions on the state 
after successful completion.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new 
commit
+     * @param deleteFn Delete Function to be used for deletes
+     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped 
records
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @return RDD of write-status
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, 
HoodieWriteClient client, String newCommitTime,
+        String prevCommitTime, String initCommitTime,
+        int numRecordsInThisCommit,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, 
String> deleteFn, boolean isPreppedAPI,
+        boolean assertForCommit, int expRecordsInThisCommit, int 
expTotalRecords) throws Exception {
+        final Function<Integer, List<HoodieKey>> keyGenFunction =
+            generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, 
dataGen::generateUniqueDeletes);
+
+        return deleteBatch(client, newCommitTime, prevCommitTime, 
initCommitTime, numRecordsInThisCommit,
+            keyGenFunction,
+            deleteFn, assertForCommit, expRecordsInThisCommit, 
expTotalRecords);
+    }
+
+    /**
+     * Helper to insert/upsert batch of records and do regular assertions on 
the state after successful completion.
+     *
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param commitTimesBetweenPrevAndNew Sample of Timestamps between 
prevCommitTime and newCommitTime
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new 
commit
+     * @param recordGenFunction Records Generation Function
+     * @param writeFn Write Function to be used for upsert
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @param expTotalCommits Expected number of commits (including this 
commit)
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String 
newCommitTime, String prevCommitTime,
+        Option<List<String>> commitTimesBetweenPrevAndNew, String 
initCommitTime, int numRecordsInThisCommit,
+        Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, 
JavaRDD<HoodieRecord>, String> writeFn,
+        boolean assertForCommit, int expRecordsInThisCommit, int 
expTotalRecords, int expTotalCommits) throws Exception {
+
+        // Write 1 (only inserts)
+        client.startCommitWithTime(newCommitTime);
+
+        List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, 
numRecordsInThisCommit);
+        JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+        JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, 
newCommitTime);
+        List<WriteStatus> statuses = result.collect();
+        assertNoWriteErrors(statuses);
+
+        // check the partition metadata is written out
+        assertPartitionMetadataForRecords(records, fs);
+
+        // verify that there is a commit
+        HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+        HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+        if (assertForCommit) {
+            assertEquals("Expecting " + expTotalCommits + " commits.", 
expTotalCommits,
+                timeline.findInstantsAfter(initCommitTime, 
Integer.MAX_VALUE).countInstants());
+            Assert.assertEquals("Latest commit should be " + newCommitTime, 
newCommitTime,
+                timeline.lastInstant().get().getTimestamp());
+            assertEquals("Must contain " + expRecordsInThisCommit + " 
records", expRecordsInThisCommit,
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, 
timeline, newCommitTime).count());
+
+            // Check the entire dataset has all records still
+            String[] fullPartitionPaths = new 
String[dataGen.getPartitionPaths().length];
+            for (int i = 0; i < fullPartitionPaths.length; i++) {
+                fullPartitionPaths[i] = String.format("%s/%s/*", basePath, 
dataGen.getPartitionPaths()[i]);
+            }
+            assertEquals("Must contain " + expTotalRecords + " records", 
expTotalRecords,
+                HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, 
fullPartitionPaths).count());
+
+            // Check that the incremental consumption from prevCommitTime
+            assertEquals("Incremental consumption from " + prevCommitTime + " 
should give all records in latest commit",
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, 
timeline, newCommitTime).count(),
+                HoodieClientTestUtils.readSince(basePath, sqlContext, 
timeline, prevCommitTime).count());
+            if (commitTimesBetweenPrevAndNew.isPresent()) {
+                commitTimesBetweenPrevAndNew.get().forEach(ct -> {
+                    assertEquals("Incremental consumption from " + ct + " 
should give all records in latest commit",
+                        HoodieClientTestUtils.readCommit(basePath, sqlContext, 
timeline, newCommitTime).count(),
+                        HoodieClientTestUtils.readSince(basePath, sqlContext, 
timeline, ct).count());
+                });
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Helper to delete batch of hoodie keys and do regular assertions on the 
state after successful completion.
+     *
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param keyGenFunction Key Generation function
+     * @param deleteFn Write Function to be used for delete
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String 
newCommitTime, String prevCommitTime,
+        String initCommitTime, int numRecordsInThisCommit,
+        Function<Integer, List<HoodieKey>> keyGenFunction,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, 
String> deleteFn,
+        boolean assertForCommit, int expRecordsInThisCommit, int 
expTotalRecords) throws Exception {
+
+        // Delete 1 (only deletes)
+        client.startCommitWithTime(newCommitTime);
+
+        List<HoodieKey> keysToDelete = 
keyGenFunction.apply(numRecordsInThisCommit);
+        JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
+
+        JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, 
newCommitTime);
+        List<WriteStatus> statuses = result.collect();
+        assertNoWriteErrors(statuses);
+
+        // check the partition metadata is written out
+        assertPartitionMetadataForKeys(keysToDelete, fs);
+
+        // verify that there is a commit
+        HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+        HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+        if (assertForCommit) {
+            assertEquals("Expecting 3 commits.", 3,
+                timeline.findInstantsAfter(initCommitTime, 
Integer.MAX_VALUE).countInstants());
+            Assert.assertEquals("Latest commit should be " + newCommitTime, 
newCommitTime,
+                timeline.lastInstant().get().getTimestamp());
+            assertEquals("Must contain " + expRecordsInThisCommit + " 
records", expRecordsInThisCommit,
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, 
timeline, newCommitTime).count());
+
+            // Check the entire dataset has all records still
+            String[] fullPartitionPaths = new 
String[dataGen.getPartitionPaths().length];
+            for (int i = 0; i < fullPartitionPaths.length; i++) {
+                fullPartitionPaths[i] = String.format("%s/%s/*", basePath, 
dataGen.getPartitionPaths()[i]);
+            }
+            assertEquals("Must contain " + expTotalRecords + " records", 
expTotalRecords,
+                HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, 
fullPartitionPaths).count());
+
+            // Check that the incremental consumption from prevCommitTime
+            assertEquals("Incremental consumption from " + prevCommitTime + " 
should give no records in latest commit,"
+                    + " since it is a delete operation",
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, 
timeline, newCommitTime).count(),
+                HoodieClientTestUtils.readSince(basePath, sqlContext, 
timeline, prevCommitTime).count());
+        }
+        return result;
+    }
+
+    /**
+     * Get Cleaner state corresponding to a partition path.
+     *
+     * @param hoodieCleanStatsTwo List of Clean Stats
+     * @param partitionPath Partition path for filtering
+     * @return Cleaner state corresponding to partition path
+     */
+    protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> 
hoodieCleanStatsTwo, String partitionPath) {
+        return hoodieCleanStatsTwo.stream().filter(e -> 
e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
+    }
+
+    // Functional Interfaces for passing lambda and Hoodie Write API contexts
+
+    @FunctionalInterface
+    public interface Function2<R, T1, T2> {
+
+        R apply(T1 v1, T2 v2) throws IOException;
+    }
+
+    @FunctionalInterface
+    public interface Function3<R, T1, T2, T3> {
+
+        R apply(T1 v1, T2 v2, T3 v3) throws IOException;
+    }
 
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java 
b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
index 9b70c10..8d3fa13 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
@@ -68,10 +68,6 @@ public class TestMultiFS extends HoodieClientTestHarness {
     cleanupTestDataGenerator();
   }
 
-  private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) 
throws Exception {
-    return new HoodieWriteClient(jsc, config);
-  }
-
   protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
     return 
HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true)
         
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 
2).forTable(tableName)
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
 
b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index aad8edf..ab6e940 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client;
 
 import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.TestRawTripPayload;
+import java.io.IOException;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -58,8 +59,9 @@ public class TestUpdateSchemaEvolution extends 
HoodieClientTestHarness {
   }
 
   @After
-  public void tearDown() {
+  public void tearDown() throws IOException {
     cleanupSparkContexts();
+    cleanupFileSystem();
   }
 
   //@Test
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java 
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
index 4e5721f..e4202f0 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
@@ -17,11 +17,15 @@
 
 package org.apache.hudi.common;
 
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.TestHoodieClientBase;
 import org.apache.hudi.common.minicluster.HdfsTestService;
 import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,225 +49,239 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness 
implements Serializable {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieClientTestHarness.class);
-
-  protected transient JavaSparkContext jsc = null;
-  protected transient SQLContext sqlContext;
-  protected transient FileSystem fs;
-  protected transient HoodieTestDataGenerator dataGen = null;
-  protected transient ExecutorService executorService;
-  protected transient HoodieTableMetaClient metaClient;
-  private static AtomicInteger instantGen = new AtomicInteger(1);
-
-  public String getNextInstant() {
-    return String.format("%09d", instantGen.getAndIncrement());
-  }
-
-  // dfs
-  protected String dfsBasePath;
-  protected transient HdfsTestService hdfsTestService;
-  protected transient MiniDFSCluster dfsCluster;
-  protected transient DistributedFileSystem dfs;
-
-  /**
-   * Initializes resource group for the subclasses of {@link 
TestHoodieClientBase}.
-   *
-   * @throws IOException
-   */
-  public void initResources() throws IOException {
-    initPath();
-    initSparkContexts();
-    initTestDataGenerator();
-    initFileSystem();
-    initMetaClient();
-  }
-
-  /**
-   * Cleanups resource group for the subclasses of {@link 
TestHoodieClientBase}.
-   * 
-   * @throws IOException
-   */
-  public void cleanupResources() throws IOException {
-    cleanupMetaClient();
-    cleanupSparkContexts();
-    cleanupTestDataGenerator();
-    cleanupFileSystem();
-  }
-
-  /**
-   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link 
SQLContext}) with the given application name.
-   *
-   * @param appName The specified application name.
-   */
-  protected void initSparkContexts(String appName) {
-    // Initialize a local spark env
-    jsc = new 
JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
-    jsc.setLogLevel("ERROR");
-
-    // SQLContext stuff
-    sqlContext = new SQLContext(jsc);
-  }
-
-  /**
-   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link 
SQLContext}) with a default name
-   * <b>TestHoodieClient</b>.
-   */
-  protected void initSparkContexts() {
-    initSparkContexts("TestHoodieClient");
-  }
-
-  /**
-   * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
-   */
-  protected void cleanupSparkContexts() {
-    if (sqlContext != null) {
-      LOG.info("Clearing sql context cache of spark-session used in previous 
test-case");
-      sqlContext.clearCache();
-      sqlContext = null;
+    private static final Logger LOG = 
LoggerFactory.getLogger(HoodieClientTestHarness.class);
+
+    protected transient JavaSparkContext jsc = null;
+    protected transient SQLContext sqlContext;
+    protected transient FileSystem fs;
+    protected transient HoodieTestDataGenerator dataGen = null;
+    protected transient ExecutorService executorService;
+    protected transient HoodieTableMetaClient metaClient;
+    private static AtomicInteger instantGen = new AtomicInteger(1);
+    protected transient HoodieWriteClient client;
+
+    public String getNextInstant() {
+        return String.format("%09d", instantGen.getAndIncrement());
+    }
+
+    // dfs
+    protected String dfsBasePath;
+    protected transient HdfsTestService hdfsTestService;
+    protected transient MiniDFSCluster dfsCluster;
+    protected transient DistributedFileSystem dfs;
+
+    /**
+     * Initializes resource group for the subclasses of {@link 
TestHoodieClientBase}.
+     */
+    public void initResources() throws IOException {
+        initPath();
+        initSparkContexts();
+        initTestDataGenerator();
+        initFileSystem();
+        initMetaClient();
+    }
+
+    /**
+     * Cleanups resource group for the subclasses of {@link 
TestHoodieClientBase}.
+     */
+    public void cleanupResources() throws IOException {
+        cleanupClients();
+        cleanupSparkContexts();
+        cleanupTestDataGenerator();
+        cleanupFileSystem();
+    }
+
+    /**
+     * Initializes the Spark contexts ({@link JavaSparkContext} and {@link 
SQLContext}) with the given application name.
+     *
+     * @param appName The specified application name.
+     */
+    protected void initSparkContexts(String appName) {
+        // Initialize a local spark env
+        jsc = new 
JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
+        jsc.setLogLevel("ERROR");
+
+        // SQLContext stuff
+        sqlContext = new SQLContext(jsc);
     }
 
-    if (jsc != null) {
-      LOG.info("Closing spark context used in previous test-case");
-      jsc.close();
-      jsc.stop();
-      jsc = null;
+    /**
+     * Initializes the Spark contexts ({@link JavaSparkContext} and {@link 
SQLContext}) with a default name
+     * <b>TestHoodieClient</b>.
+     */
+    protected void initSparkContexts() {
+        initSparkContexts("TestHoodieClient");
     }
-  }
-
-  /**
-   * Initializes a file system with the hadoop configuration of Spark context.
-   */
-  protected void initFileSystem() {
-    if (jsc == null) {
-      throw new IllegalStateException("The Spark context has not been 
initialized.");
+
+    /**
+     * Cleanups Spark contexts ({@link JavaSparkContext} and {@link 
SQLContext}).
+     */
+    protected void cleanupSparkContexts() {
+        if (sqlContext != null) {
+            LOG.info("Clearing sql context cache of spark-session used in 
previous test-case");
+            sqlContext.clearCache();
+            sqlContext = null;
+        }
+
+        if (jsc != null) {
+            LOG.info("Closing spark context used in previous test-case");
+            jsc.close();
+            jsc.stop();
+            jsc = null;
+        }
+    }
+
+    /**
+     * Initializes a file system with the hadoop configuration of Spark 
context.
+     */
+    protected void initFileSystem() {
+        if (jsc == null) {
+            throw new IllegalStateException("The Spark context has not been 
initialized.");
+        }
+
+        initFileSystemWithConfiguration(jsc.hadoopConfiguration());
     }
 
-    initFileSystemWithConfiguration(jsc.hadoopConfiguration());
-  }
-
-  /**
-   * Initializes file system with a default empty configuration.
-   */
-  protected void initFileSystemWithDefaultConfiguration() {
-    initFileSystemWithConfiguration(new Configuration());
-  }
-
-  /**
-   * Cleanups file system.
-   *
-   * @throws IOException
-   */
-  protected void cleanupFileSystem() throws IOException {
-    if (fs != null) {
-      LOG.warn("Closing file-system instance used in previous test-run");
-      fs.close();
+    /**
+     * Initializes file system with a default empty configuration.
+     */
+    protected void initFileSystemWithDefaultConfiguration() {
+        initFileSystemWithConfiguration(new Configuration());
     }
-  }
-
-  /**
-   * Initializes an instance of {@link HoodieTableMetaClient} with a special 
table type specified by
-   * {@code getTableType()}.
-   *
-   * @throws IOException
-   */
-  protected void initMetaClient() throws IOException {
-    if (basePath == null) {
-      throw new IllegalStateException("The base path has not been 
initialized.");
+
+    /**
+     * Cleanups file system.
+     */
+    protected void cleanupFileSystem() throws IOException {
+        if (fs != null) {
+            LOG.warn("Closing file-system instance used in previous test-run");
+            fs.close();
+        }
     }
 
-    if (jsc == null) {
-      throw new IllegalStateException("The Spark context has not been 
initialized.");
+    /**
+     * Initializes an instance of {@link HoodieTableMetaClient} with a special 
table type specified by {@code getTableType()}.
+     */
+    protected void initMetaClient() throws IOException {
+        if (basePath == null) {
+            throw new IllegalStateException("The base path has not been 
initialized.");
+        }
+
+        if (jsc == null) {
+            throw new IllegalStateException("The Spark context has not been 
initialized.");
+        }
+
+        metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, 
getTableType());
     }
 
-    metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, 
getTableType());
-  }
-
-  /**
-   * Cleanups table type.
-   */
-  protected void cleanupMetaClient() {
-    metaClient = null;
-  }
-
-  /**
-   * Initializes a test data generator which used to generate test datas.
-   *
-   */
-  protected void initTestDataGenerator() {
-    dataGen = new HoodieTestDataGenerator();
-  }
-
-  /**
-   * Cleanups test data generator.
-   *
-   */
-  protected void cleanupTestDataGenerator() {
-    dataGen = null;
-  }
-
-  /**
-   * Initializes a distributed file system and base directory.
-   *
-   * @throws IOException
-   */
-  protected void initDFS() throws IOException {
-    FileSystem.closeAll();
-    hdfsTestService = new HdfsTestService();
-    dfsCluster = hdfsTestService.start(true);
-
-    // Create a temp folder as the base path
-    dfs = dfsCluster.getFileSystem();
-    dfsBasePath = dfs.getWorkingDirectory().toString();
-    dfs.mkdirs(new Path(dfsBasePath));
-  }
-
-  /**
-   * Cleanups the distributed file system.
-   *
-   * @throws IOException
-   */
-  protected void cleanupDFS() throws IOException {
-    if (hdfsTestService != null) {
-      hdfsTestService.stop();
-      dfsCluster.shutdown();
+    /**
+     * Cleanups table type.
+     */
+    protected void cleanupClients() {
+        metaClient = null;
+        if (null != client) {
+            client.close();
+            client = null;
+        }
     }
-    // Need to closeAll to clear FileSystem.Cache, required because DFS and 
LocalFS used in the
-    // same JVM
-    FileSystem.closeAll();
-  }
-
-  /**
-   * Initializes executor service with a fixed thread pool.
-   *
-   * @param threadNum specify the capacity of the fixed thread pool
-   */
-  protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
-    executorService = Executors.newFixedThreadPool(threadNum);
-  }
-
-  /**
-   * Cleanups the executor service.
-   */
-  protected void cleanupExecutorService() {
-    if (this.executorService != null) {
-      this.executorService.shutdownNow();
-      this.executorService = null;
+
+    /**
+     * Initializes a test data generator which used to generate test datas.
+     */
+    protected void initTestDataGenerator() {
+        dataGen = new HoodieTestDataGenerator();
     }
-  }
 
-  private void initFileSystemWithConfiguration(Configuration configuration) {
-    if (basePath == null) {
-      throw new IllegalStateException("The base path has not been 
initialized.");
+    /**
+     * Cleanups test data generator.
+     */
+    protected void cleanupTestDataGenerator() {
+        dataGen = null;
     }
 
-    fs = FSUtils.getFs(basePath, configuration);
-    if (fs instanceof LocalFileSystem) {
-      LocalFileSystem lfs = (LocalFileSystem) fs;
-      // With LocalFileSystem, with checksum disabled, fs.open() returns an 
inputStream which is FSInputStream
-      // This causes ClassCastExceptions in LogRecordScanner (and potentially 
other places) calling fs.open
-      // So, for the tests, we enforce checksum verification to circumvent the 
problem
-      lfs.setVerifyChecksum(true);
+    /**
+     * Initializes a distributed file system and base directory.
+     */
+    protected void initDFS() throws IOException {
+        FileSystem.closeAll();
+        hdfsTestService = new HdfsTestService();
+        dfsCluster = hdfsTestService.start(true);
+
+        // Create a temp folder as the base path
+        dfs = dfsCluster.getFileSystem();
+        dfsBasePath = dfs.getWorkingDirectory().toString();
+        dfs.mkdirs(new Path(dfsBasePath));
     }
-  }
 
+    /**
+     * Cleanups the distributed file system.
+     */
+    protected void cleanupDFS() throws IOException {
+        if (hdfsTestService != null) {
+            hdfsTestService.stop();
+            dfsCluster.shutdown();
+            hdfsTestService = null;
+            dfsCluster = null;
+            dfs = null;
+        }
+        // Need to closeAll to clear FileSystem.Cache, required because DFS 
and LocalFS used in the
+        // same JVM
+        FileSystem.closeAll();
+    }
+
+    /**
+     * Initializes executor service with a fixed thread pool.
+     *
+     * @param threadNum specify the capacity of the fixed thread pool
+     */
+    protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
+        executorService = Executors.newFixedThreadPool(threadNum);
+    }
+
+    /**
+     * Cleanups the executor service.
+     */
+    protected void cleanupExecutorService() {
+        if (this.executorService != null) {
+            this.executorService.shutdownNow();
+            this.executorService = null;
+        }
+    }
+
+    private void initFileSystemWithConfiguration(Configuration configuration) {
+        if (basePath == null) {
+            throw new IllegalStateException("The base path has not been 
initialized.");
+        }
+
+        fs = FSUtils.getFs(basePath, configuration);
+        if (fs instanceof LocalFileSystem) {
+            LocalFileSystem lfs = (LocalFileSystem) fs;
+            // With LocalFileSystem, with checksum disabled, fs.open() returns 
an inputStream which is FSInputStream
+            // This causes ClassCastExceptions in LogRecordScanner (and 
potentially other places) calling fs.open
+            // So, for the tests, we enforce checksum verification to 
circumvent the problem
+            lfs.setVerifyChecksum(true);
+        }
+    }
+
+    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
+        return getHoodieWriteClient(cfg, false);
+    }
+
+    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, 
boolean rollbackInflightCommit) {
+        return getHoodieWriteClient(cfg, rollbackInflightCommit, 
HoodieIndex.createIndex(cfg, null));
+    }
+
+    public HoodieReadClient getHoodieReadClient(String basePath) {
+        return new HoodieReadClient(jsc, basePath, 
SQLContext.getOrCreate(jsc.sc()));
+    }
+
+    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, 
boolean rollbackInflightCommit,
+        HoodieIndex index) {
+        if (null != client) {
+            client.close();
+            client = null;
+        }
+        client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, 
index);
+        return client;
+    }
 }
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
 
b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
index 05638e2..6ddb578 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
@@ -60,7 +60,7 @@ public class TestHBaseQPSResourceAllocator extends 
HoodieClientTestHarness {
   @After
   public void tearDown() throws Exception {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
     if (utility != null) {
       utility.shutdownMiniCluster();
     }
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java 
b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
index 2893947..43f2fd1 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
@@ -86,6 +86,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
   @AfterClass
   public static void clean() throws Exception {
     if (utility != null) {
+      utility.deleteTable(tableName);
       utility.shutdownMiniCluster();
     }
   }
@@ -115,11 +116,7 @@ public class TestHbaseIndex extends 
HoodieClientTestHarness {
   public void tearDown() throws Exception {
     cleanupSparkContexts();
     cleanupTestDataGenerator();
-    cleanupMetaClient();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws 
Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
   }
 
   @Test
@@ -132,7 +129,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness 
{
     // Load to memory
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, 
jsc);
 
@@ -172,7 +169,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness 
{
     // Load to memory
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
     writeClient.startCommitWithTime(newCommitTime);
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, 
jsc);
@@ -206,7 +203,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness 
{
     // Load to memory
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    HoodieWriteClient writeClient = getWriteClient(config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
 
     String newCommitTime = writeClient.startCommit();
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
@@ -256,7 +253,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness 
{
     // only for test, set the hbaseConnection to mocked object
     index.setHbaseConnection(hbaseConnection);
 
-    HoodieWriteClient writeClient = getWriteClient(config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
 
     // start a commit and generate test data
     String newCommitTime = writeClient.startCommit();
@@ -281,7 +278,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness 
{
   public void testTotalPutsBatching() throws Exception {
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    HoodieWriteClient writeClient = getWriteClient(config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
 
     // start a commit and generate test data
     String newCommitTime = writeClient.startCommit();
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java 
b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index 91435f8..b97fefc 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -45,7 +45,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
   @After
   public void tearDown() {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
   }
 
   @Test
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
 
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index d29cfa4..105b0e8 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -107,7 +107,7 @@ public class TestHoodieBloomIndex extends 
HoodieClientTestHarness {
   public void tearDown() throws Exception {
     cleanupSparkContexts();
     cleanupFileSystem();
-    cleanupMetaClient();
+    cleanupClients();
   }
 
   private HoodieWriteConfig makeConfig() {
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
 
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index ddf2775..55d4526 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -80,7 +80,7 @@ public class TestHoodieGlobalBloomIndex extends 
HoodieClientTestHarness {
   @After
   public void tearDown() {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
   }
 
   @Test
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java 
b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index 664f4b5..7fd02bc 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -68,11 +68,8 @@ public class TestHoodieMergeHandle extends 
HoodieClientTestHarness {
     cleanupFileSystem();
     cleanupTestDataGenerator();
     cleanupSparkContexts();
-    cleanupMetaClient();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws 
Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
+    cleanupFileSystem();
   }
 
   @Test
@@ -83,9 +80,8 @@ public class TestHoodieMergeHandle extends 
HoodieClientTestHarness {
 
     // Build a write config with bulkinsertparallelism set
     HoodieWriteConfig cfg = getConfigBuilder().build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
-
       /**
        * Write 1 (only inserts) This will do a bulk insert of 44 records of 
which there are 2 records repeated 21 times
        * each. id1 (21 records), id2 (21 records), id3, id4
@@ -224,7 +220,7 @@ public class TestHoodieMergeHandle extends 
HoodieClientTestHarness {
   public void testHoodieMergeHandleWriteStatMetrics() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfigBuilder().build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index ec64080..6887531 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.HoodieClientTestUtils;
@@ -85,7 +86,7 @@ public class TestCopyOnWriteTable extends 
HoodieClientTestHarness {
   @After
   public void tearDown() throws Exception {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
     cleanupFileSystem();
     cleanupTestDataGenerator();
   }
@@ -129,6 +130,8 @@ public class TestCopyOnWriteTable extends 
HoodieClientTestHarness {
     // Prepare the AvroParquetIO
     HoodieWriteConfig config = makeHoodieClientConfig();
     String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
+    writeClient.startCommitWithTime(firstCommitTime);
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     String partitionPath = "/2016/01/31";
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 740caf2..fdc968d 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -96,16 +96,13 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
     cleanupDFS();
     cleanupSparkContexts();
     cleanupTestDataGenerator();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws 
Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
   }
 
   @Test
   public void testSimpleInsertAndUpdate() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts)
@@ -190,7 +187,7 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
   @Test
   public void testMetadataAggregateFromWriteStatus() throws Exception {
     HoodieWriteConfig cfg = 
getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       String newCommitTime = "001";
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
@@ -213,7 +210,7 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
   @Test
   public void testSimpleInsertUpdateAndDelete() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts, written as parquet file)
@@ -298,7 +295,7 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
     HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, 
HoodieTableType.COPY_ON_WRITE);
 
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts)
@@ -351,7 +348,7 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
   public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
 
     HoodieWriteConfig cfg = getConfig(false);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       // Test delta commit rollback
       /**
@@ -394,7 +391,7 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
        */
       final String commitTime1 = "002";
       // WriteClient with custom config (disable small file handling)
-      try (HoodieWriteClient secondClient = 
getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());) {
+      try (HoodieWriteClient secondClient = 
getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());) {
         secondClient.startCommitWithTime(commitTime1);
 
         List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -424,7 +421,7 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
        * Write 3 (inserts + updates - testing successful delta commit)
        */
       final String commitTime2 = "002";
-      try (HoodieWriteClient thirdClient = getWriteClient(cfg);) {
+      try (HoodieWriteClient thirdClient = getHoodieWriteClient(cfg);) {
         thirdClient.startCommitWithTime(commitTime2);
 
         List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -500,7 +497,7 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
   public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception 
{
 
     HoodieWriteConfig cfg = getConfig(false);
-    try (final HoodieWriteClient client = getWriteClient(cfg);) {
+    try (final HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       /**
        * Write 1 (only inserts)
        */
@@ -541,7 +538,7 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
        */
       newCommitTime = "002";
       // WriteClient with custom config (disable small file handling)
-      HoodieWriteClient nClient = 
getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());
+      HoodieWriteClient nClient = 
getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());
       nClient.startCommitWithTime(newCommitTime);
 
       List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -664,7 +661,7 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
   @Test
   public void testUpsertPartitioner() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts, written as parquet file)
@@ -743,7 +740,7 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
   public void testLogFileCountsAfterCompaction() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfig(true);
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -816,7 +813,7 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, 
IndexType.INMEMORY).build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -853,7 +850,7 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, 
IndexType.INMEMORY).build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -927,7 +924,7 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, 
IndexType.INMEMORY).build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -979,10 +976,9 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
   public void testRollingStatsInMetadata() throws Exception {
 
     HoodieWriteConfig cfg = getConfigBuilder(false, 
IndexType.INMEMORY).withAutoCommit(false).build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
       HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
-
       // Create a commit without rolling stats in metadata to test backwards 
compatibility
       HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
       String commitActionType = table.getMetaClient().getCommitActionType();
@@ -1080,7 +1076,7 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
   public void testRollingStatsWithSmallFileHandling() throws Exception {
 
     HoodieWriteConfig cfg = getConfigBuilder(false, 
IndexType.INMEMORY).withAutoCommit(false).build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
       Map<String, Long> fileIdToInsertsMap = new HashMap<>();
       Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
 
b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
index 8fa55ec..09d62a7 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
@@ -78,10 +78,7 @@ public class TestHoodieCompactor extends 
HoodieClientTestHarness {
     cleanupFileSystem();
     cleanupTestDataGenerator();
     cleanupSparkContexts();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws 
Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
   }
 
   private HoodieWriteConfig getConfig() {
@@ -114,8 +111,7 @@ public class TestHoodieCompactor extends 
HoodieClientTestHarness {
     HoodieWriteConfig config = getConfig();
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
-
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = writeClient.startCommit();
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
       JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
@@ -132,8 +128,8 @@ public class TestHoodieCompactor extends 
HoodieClientTestHarness {
   public void testWriteStatusContentsAfterCompaction() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfig();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
-      String newCommitTime = "100";
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
+     String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
diff --git a/pom.xml b/pom.xml
index f1990e1..6370c37 100644
--- a/pom.xml
+++ b/pom.xml
@@ -242,6 +242,7 @@
         <version>${maven-surefire-plugin.version}</version>
         <configuration>
           <skip>${skipUTs}</skip>
+          <argLine>-Xmx4g</argLine>
           <systemPropertyVariables>
             <log4j.configuration>
               ${surefire-log4j.file}

Reply via email to