This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 83e39e2  [HUDI-781] Add HoodieWriteableTestTable (#2040)
83e39e2 is described below

commit 83e39e2b179fb1db13f1c4865a1849855fc32b65
Author: Raymond Xu <[email protected]>
AuthorDate: Mon Sep 7 02:54:36 2020 -0700

    [HUDI-781] Add HoodieWriteableTestTable (#2040)
    
    - Introduce HoodieWriteableTestTable for writing records into files
    - Migrate writeParquetFiles() in HoodieClientTestUtils to 
HoodieWriteableTestTable
    - Adopt HoodieWrittableTestTable for test cases in
      - ITTestRepairsCommand.java
      - TestHoodieIndex.java
      - TestHoodieKeyLocationFetchHandle.java
      - TestHoodieGlobalBloomIndex.java
      - TestHoodieBloomIndex.java
    - Renamed HoodieTestTable and FileCreateUtils APIs
      - dataFile changed to baseFile
---
 .../hudi/cli/integ/ITTestRepairsCommand.java       |  51 +++-----
 .../org/apache/hudi/index/TestHoodieIndex.java     |  86 +++++---------
 .../hudi/index/bloom/TestHoodieBloomIndex.java     | 115 +++++++-----------
 .../index/bloom/TestHoodieGlobalBloomIndex.java    | 117 ++++++------------
 .../hudi/io/TestHoodieKeyLocationFetchHandle.java  |  64 +++-------
 .../java/org/apache/hudi/table/TestCleaner.java    |  96 +++++++--------
 .../apache/hudi/table/TestConsistencyGuard.java    |  26 ++--
 .../table/action/commit/TestUpsertPartitioner.java |   2 +-
 .../rollback/TestMarkerBasedRollbackStrategy.java  |  10 +-
 .../hudi/testutils/HoodieClientTestUtils.java      |  64 ----------
 .../hudi/testutils/HoodieWriteableTestTable.java   | 131 +++++++++++++++++++++
 .../hudi/common/testutils/FileCreateUtils.java     |  74 ++++++++----
 .../hudi/common/testutils/HoodieTestTable.java     |  49 ++++----
 13 files changed, 422 insertions(+), 463 deletions(-)

diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
index 7167965..57238f7 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
@@ -23,9 +23,7 @@ import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.commands.RepairsCommand;
 import org.apache.hudi.cli.commands.TableCommand;
 import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -33,7 +31,7 @@ import 
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
+import org.apache.hudi.testutils.HoodieWriteableTestTable;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileStatus;
@@ -43,17 +41,12 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.shell.core.CommandResult;
 
-import java.io.File;
 import java.io.IOException;
-import java.net.URISyntaxException;
-import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.List;
-import java.util.UUID;
 import java.util.stream.Collectors;
 
-import static org.apache.spark.sql.functions.lit;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -69,10 +62,10 @@ public class ITTestRepairsCommand extends 
AbstractShellIntegrationTest {
   private String repairedOutputPath;
 
   @BeforeEach
-  public void init() throws IOException, URISyntaxException {
-    String tablePath = basePath + File.separator + "test_table";
-    duplicatedPartitionPath = tablePath + File.separator + 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-    repairedOutputPath = basePath + File.separator + "tmp";
+  public void init() throws Exception {
+    final String tablePath = Paths.get(basePath, "test_table").toString();
+    duplicatedPartitionPath = Paths.get(tablePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).toString();
+    repairedOutputPath = Paths.get(basePath, "tmp").toString();
 
     HoodieCLI.conf = jsc.hadoopConfiguration();
 
@@ -83,33 +76,19 @@ public class ITTestRepairsCommand extends 
AbstractShellIntegrationTest {
 
     // generate 200 records
     Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+    HoodieWriteableTestTable testTable = 
HoodieWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);
 
-    String fileName1 = "1_0_20160401010101.parquet";
-    String fileName2 = "2_0_20160401010101.parquet";
-
-    List<HoodieRecord> hoodieRecords1 = 
SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
-    HoodieClientTestUtils.writeParquetFile(tablePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
-        fileName1, hoodieRecords1, schema, null, false);
-    List<HoodieRecord> hoodieRecords2 = 
SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
-    HoodieClientTestUtils.writeParquetFile(tablePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
-        fileName2, hoodieRecords2, schema, null, false);
-
-    // generate commit file
-    String fileId1 = UUID.randomUUID().toString();
-    String testWriteToken = "1-0-1";
-    String commitTime = FSUtils.getCommitTime(fileName1);
-    Files.createFile(Paths.get(duplicatedPartitionPath + "/"
-        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, 
commitTime, 1, testWriteToken)));
-    Files.createFile(Paths.get(tablePath + "/.hoodie/" + commitTime + 
".commit"));
+    HoodieRecord[] hoodieRecords1 = 
SchemaTestUtil.generateHoodieTestRecords(0, 100, schema).toArray(new 
HoodieRecord[100]);
+    HoodieRecord[] hoodieRecords2 = 
SchemaTestUtil.generateHoodieTestRecords(100, 100, schema).toArray(new 
HoodieRecord[100]);
+    testTable.addCommit("20160401010101")
+        .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
"1", hoodieRecords1)
+        .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
"2", hoodieRecords2)
+        .withLogFile(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
 
     // read records and get 10 to generate duplicates
-    Dataset df = sqlContext.read().parquet(duplicatedPartitionPath);
-
-    String fileName3 = "3_0_20160401010202.parquet";
-    commitTime = FSUtils.getCommitTime(fileName3);
-    df.limit(10).withColumn("_hoodie_commit_time", lit(commitTime))
-        .write().parquet(duplicatedPartitionPath + File.separator + fileName3);
-    Files.createFile(Paths.get(tablePath + "/.hoodie/" + commitTime + 
".commit"));
+    HoodieRecord[] dupRecords = Arrays.copyOf(hoodieRecords1, 10);
+    testTable.addCommit("20160401010202")
+        .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
"3", dupRecords);
 
     metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
   }
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java 
b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index 39ee532..aefad87 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -20,10 +20,8 @@ package org.apache.hudi.index;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
@@ -39,7 +37,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
+import org.apache.hudi.testutils.HoodieWriteableTestTable;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 
 import org.apache.avro.Schema;
@@ -50,7 +48,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -67,6 +64,7 @@ import static 
org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 public class TestHoodieIndex extends HoodieClientTestHarness {
 
@@ -247,6 +245,8 @@ public class TestHoodieIndex extends 
HoodieClientTestHarness {
   @EnumSource(value = IndexType.class, names = {"BLOOM", "SIMPLE",})
   public void testTagLocationAndFetchRecordLocations(IndexType indexType) 
throws Exception {
     setUp(indexType);
+    String p1 = "2016/01/31";
+    String p2 = "2015/01/31";
     String rowKey1 = UUID.randomUUID().toString();
     String rowKey2 = UUID.randomUUID().toString();
     String rowKey3 = UUID.randomUUID().toString();
@@ -279,12 +279,10 @@ public class TestHoodieIndex extends 
HoodieClientTestHarness {
     }
 
     // We create three parquet file, each having one record. (two different 
partitions)
-    String filename1 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", 
Collections.singletonList(record1), SCHEMA, null, true);
-    String filename2 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", 
Collections.singletonList(record2), SCHEMA, null, true);
-    String filename3 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", 
Collections.singletonList(record4), SCHEMA, null, true);
+    HoodieWriteableTestTable testTable = 
HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
+    String fileId1 = testTable.addCommit("001").withInserts(p1, record1);
+    String fileId2 = testTable.addCommit("002").withInserts(p1, record2);
+    String fileId3 = testTable.addCommit("003").withInserts(p2, record4);
 
     // We do the tag again
     metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -295,13 +293,13 @@ public class TestHoodieIndex extends 
HoodieClientTestHarness {
     // Check results
     for (HoodieRecord record : taggedRecordRDD.collect()) {
       if (record.getRecordKey().equals(rowKey1)) {
-        if (record.getPartitionPath().equals("2015/01/31")) {
-          assertEquals(record.getCurrentLocation().getFileId(), 
FSUtils.getFileId(filename3));
+        if (record.getPartitionPath().equals(p2)) {
+          assertEquals(record.getCurrentLocation().getFileId(), fileId3);
         } else {
-          assertEquals(record.getCurrentLocation().getFileId(), 
FSUtils.getFileId(filename1));
+          assertEquals(record.getCurrentLocation().getFileId(), fileId1);
         }
       } else if (record.getRecordKey().equals(rowKey2)) {
-        assertEquals(record.getCurrentLocation().getFileId(), 
FSUtils.getFileId(filename2));
+        assertEquals(record.getCurrentLocation().getFileId(), fileId2);
       } else if (record.getRecordKey().equals(rowKey3)) {
         assertFalse(record.isCurrentLocationKnown());
       }
@@ -312,15 +310,15 @@ public class TestHoodieIndex extends 
HoodieClientTestHarness {
     for (Tuple2<HoodieKey, Option<Pair<String, String>>> entry : 
recordLocations.collect()) {
       if (entry._1.getRecordKey().equals(rowKey1)) {
         assertTrue(entry._2.isPresent(), "Row1 should have been present ");
-        if (entry._1.getPartitionPath().equals("2015/01/31")) {
+        if (entry._1.getPartitionPath().equals(p2)) {
           assertTrue(entry._2.isPresent(), "Row1 should have been present ");
-          assertEquals(entry._2.get().getRight(), 
FSUtils.getFileId(filename3));
+          assertEquals(entry._2.get().getRight(), fileId3);
         } else {
-          assertEquals(entry._2.get().getRight(), 
FSUtils.getFileId(filename1));
+          assertEquals(entry._2.get().getRight(), fileId1);
         }
       } else if (entry._1.getRecordKey().equals(rowKey2)) {
         assertTrue(entry._2.isPresent(), "Row2 should have been present ");
-        assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename2));
+        assertEquals(entry._2.get().getRight(), fileId2);
       } else if (entry._1.getRecordKey().equals(rowKey3)) {
         assertFalse(entry._2.isPresent(), "Row3 should have been absent ");
       }
@@ -338,12 +336,13 @@ public class TestHoodieIndex extends 
HoodieClientTestHarness {
             .build()).build();
     writeClient = getHoodieWriteClient(config);
     index = writeClient.getIndex();
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, 
jsc.hadoopConfiguration());
+    HoodieWriteableTestTable testTable = 
HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
+    final String p1 = "2016/01/31";
+    final String p2 = "2016/02/28";
 
     // Create the original partition, and put a record, along with the meta 
file
     // "2016/01/31": 1 file (1_0_20160131101010.parquet)
-    new File(basePath + "/2016/01/31").mkdirs();
-    new File(basePath + "/2016/01/31/" + 
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
-
     // this record will be saved in table and will be tagged to an empty record
     RawTripTestPayload originalPayload =
         new 
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
@@ -359,7 +358,7 @@ public class TestHoodieIndex extends 
HoodieClientTestHarness {
     - tag the new partition of the incomingRecord
     */
     RawTripTestPayload incomingPayload =
-        new 
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}");
+        new 
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-28T03:16:41.415Z\",\"number\":12}");
     HoodieRecord incomingRecord =
         new HoodieRecord(new HoodieKey(incomingPayload.getRowKey(), 
incomingPayload.getPartitionPath()),
             incomingPayload);
@@ -376,67 +375,42 @@ public class TestHoodieIndex extends 
HoodieClientTestHarness {
             incomingPayloadSamePartition);
 
     // We have some records to be tagged (two different partitions)
-    HoodieClientTestUtils
-        .writeParquetFile(basePath, "2016/01/31", 
Collections.singletonList(originalRecord), SCHEMA, null, false);
-
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.create(metaClient, config, 
jsc.hadoopConfiguration());
-
-    // Add some commits
-    new File(basePath + "/.hoodie").mkdirs();
+    testTable.addCommit("1000").withInserts(p1, originalRecord);
 
     // test against incoming record with a different partition
     JavaRDD<HoodieRecord> recordRDD = 
jsc.parallelize(Collections.singletonList(incomingRecord));
-    JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, 
table);
+    JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, 
hoodieTable);
 
     assertEquals(2, taggedRecordRDD.count());
     for (HoodieRecord record : taggedRecordRDD.collect()) {
       switch (record.getPartitionPath()) {
-        case "2016/01/31":
+        case p1:
           assertEquals("000", record.getRecordKey());
           assertTrue(record.getData() instanceof EmptyHoodieRecordPayload);
           break;
-        case "2016/02/31":
+        case p2:
           assertEquals("000", record.getRecordKey());
           assertEquals(incomingPayload.getJsonData(), ((RawTripTestPayload) 
record.getData()).getJsonData());
           break;
         default:
-          assertFalse(true, String.format("Should not get partition path: %s", 
record.getPartitionPath()));
+          fail(String.format("Should not get partition path: %s", 
record.getPartitionPath()));
       }
     }
 
     // test against incoming record with the same partition
     JavaRDD<HoodieRecord> recordRDDSamePartition = jsc
         .parallelize(Collections.singletonList(incomingRecordSamePartition));
-    JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = 
index.tagLocation(recordRDDSamePartition, jsc, table);
+    JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = 
index.tagLocation(recordRDDSamePartition, jsc, hoodieTable);
 
     assertEquals(1, taggedRecordRDDSamePartition.count());
     HoodieRecord record = taggedRecordRDDSamePartition.first();
     assertEquals("000", record.getRecordKey());
-    assertEquals("2016/01/31", record.getPartitionPath());
+    assertEquals(p1, record.getPartitionPath());
     assertEquals(incomingPayloadSamePartition.getJsonData(), 
((RawTripTestPayload) record.getData()).getJsonData());
   }
 
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  public HoodieWriteConfig.Builder getConfigBuilder() {
-    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
-  }
-
-  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
-    return getConfigBuilder(schemaStr, indexType);
-  }
-
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  private HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, 
IndexType indexType) {
-    return 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
+  private HoodieWriteConfig.Builder getConfigBuilder() {
+    return 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 
2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
         .withWriteStatusClass(MetadataMergeWriteStatus.class)
         
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
 
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index f6fd5f4..4584324 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -21,7 +21,6 @@ package org.apache.hudi.index.bloom;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -33,7 +32,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.HoodieKeyLookupHandle;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
+import org.apache.hudi.testutils.HoodieWriteableTestTable;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.Path;
@@ -46,12 +45,8 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.io.IOException;
-import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -105,18 +100,17 @@ public class TestHoodieBloomIndex extends 
HoodieClientTestHarness {
 
   @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
   @MethodSource("configParams")
-  public void testLoadInvolvedFiles(boolean rangePruning, boolean 
treeFiltering, boolean bucketizedChecking) throws IOException {
+  public void testLoadInvolvedFiles(boolean rangePruning, boolean 
treeFiltering, boolean bucketizedChecking) throws Exception {
     HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, 
bucketizedChecking);
     HoodieBloomIndex index = new HoodieBloomIndex(config);
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, 
hadoopConf);
+    HoodieWriteableTestTable testTable = 
HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
 
     // Create some partitions, and put some files
     // "2016/01/21": 0 file
     // "2016/04/01": 1 file (2_0_20160401010101.parquet)
-    // "2015/03/12": 3 files (1_0_20150312101010.parquet, 
3_0_20150312101010.parquet,
-    // 4_0_20150312101010.parquet)
-    Files.createDirectories(Paths.get(basePath, "2016", "01", "21"));
-    Files.createDirectories(Paths.get(basePath, "2016", "04", "01"));
-    Files.createDirectories(Paths.get(basePath, "2015", "03", "12"));
+    // "2015/03/12": 3 files (1_0_20150312101010.parquet, 
3_0_20150312101010.parquet, 4_0_20150312101010.parquet)
+    testTable.withPartitionMetaFiles("2016/01/21", "2016/04/01", "2015/03/12");
 
     RawTripTestPayload rowChange1 =
         new 
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
@@ -135,29 +129,17 @@ public class TestHoodieBloomIndex extends 
HoodieClientTestHarness {
     HoodieRecord record4 =
         new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), 
rowChange4.getPartitionPath()), rowChange4);
 
-    HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", 
"2_0_20160401010101.parquet", new ArrayList<>(),
-        SCHEMA, null, false);
-    HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", 
"1_0_20150312101010.parquet", new ArrayList<>(),
-        SCHEMA, null, false);
-    HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", 
"3_0_20150312101010.parquet", Collections.singletonList(record1),
-        SCHEMA, null, false);
-    HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", 
"4_0_20150312101010.parquet",
-        Arrays.asList(record2, record3, record4), SCHEMA, null, false);
-
     List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01", 
"2015/03/12");
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
-    List<Tuple2<String, BloomIndexFileInfo>> filesList = 
index.loadInvolvedFiles(partitions, jsc, table);
+    List<Tuple2<String, BloomIndexFileInfo>> filesList = 
index.loadInvolvedFiles(partitions, jsc, hoodieTable);
     // Still 0, as no valid commit
     assertEquals(0, filesList.size());
 
-    // Add some commits
-    java.nio.file.Path hoodieDir = Files.createDirectories(Paths.get(basePath, 
".hoodie"));
-    Files.createFile(hoodieDir.resolve("20160401010101.commit"));
-    Files.createFile(hoodieDir.resolve("20150312101010.commit"));
+    testTable.addCommit("20160401010101").withInserts("2016/04/01", "2");
+    testTable.addCommit("20150312101010").withInserts("2015/03/12", "1")
+        .withInserts("2015/03/12", "3", record1)
+        .withInserts("2015/03/12", "4", record2, record3, record4);
 
-    table = HoodieTable.create(metaClient, config, hadoopConf);
-    filesList = index.loadInvolvedFiles(partitions, jsc, table);
+    filesList = index.loadInvolvedFiles(partitions, jsc, hoodieTable);
     assertEquals(4, filesList.size());
 
     if (rangePruning) {
@@ -211,8 +193,8 @@ public class TestHoodieBloomIndex extends 
HoodieClientTestHarness {
   }
 
   @Test
-  public void testCheckUUIDsAgainstOneFile() throws IOException, 
InterruptedException {
-
+  public void testCheckUUIDsAgainstOneFile() throws Exception {
+    final String partition = "2016/01/31";
     // Create some records to use
     String recordStr1 = 
"{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
         + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
@@ -239,8 +221,9 @@ public class TestHoodieBloomIndex extends 
HoodieClientTestHarness {
     // record2, record3).
     BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 
0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
     filter.add(record3.getRecordKey());
-    String filename = HoodieClientTestUtils.writeParquetFile(basePath, 
"2016/01/31", Arrays.asList(record1, record2),
-        SCHEMA, filter, true);
+    HoodieWriteableTestTable testTable = 
HoodieWriteableTestTable.of(metaClient, SCHEMA, filter);
+    String fileId = testTable.addCommit("000").withInserts(partition, record1, 
record2);
+    String filename = testTable.getBaseFileNameById(fileId);
 
     // The bloom filter contains 3 records
     assertTrue(filter.mightContain(record1.getRecordKey()));
@@ -254,10 +237,9 @@ public class TestHoodieBloomIndex extends 
HoodieClientTestHarness {
 
     HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath).build();
     HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
-    HoodieKeyLookupHandle keyHandle = new HoodieKeyLookupHandle<>(config, 
table,
-        Pair.of("2016/01/31/", FSUtils.getFileId(filename)));
+    HoodieKeyLookupHandle keyHandle = new HoodieKeyLookupHandle<>(config, 
table, Pair.of(partition, fileId));
     List<String> results = keyHandle.checkCandidatesAgainstFile(hadoopConf, 
uuids,
-        new Path(basePath + "/2016/01/31/" + filename));
+        new Path(Paths.get(basePath, partition, filename).toString()));
     assertEquals(results.size(), 2);
     assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
         || results.get(1).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0"));
@@ -314,12 +296,12 @@ public class TestHoodieBloomIndex extends 
HoodieClientTestHarness {
 
     // Also create the metadata and config
     HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, 
bucketizedChecking);
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, 
hadoopConf);
+    HoodieWriteableTestTable testTable = 
HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
 
     // Let's tag
     HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
-    JavaRDD<HoodieRecord> taggedRecordRDD = bloomIndex.tagLocation(recordRDD, 
jsc, table);
+    JavaRDD<HoodieRecord> taggedRecordRDD = bloomIndex.tagLocation(recordRDD, 
jsc, hoodieTable);
 
     // Should not find any files
     for (HoodieRecord record : taggedRecordRDD.collect()) {
@@ -327,29 +309,23 @@ public class TestHoodieBloomIndex extends 
HoodieClientTestHarness {
     }
 
     // We create three parquet file, each having one record. (two different 
partitions)
-    String filename1 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", 
Collections.singletonList(record1), SCHEMA, null, true);
-    String filename2 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", 
Collections.singletonList(record2), SCHEMA, null, true);
-    String filename3 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", 
Collections.singletonList(record4), SCHEMA, null, true);
+    String fileId1 = testTable.addCommit("001").withInserts("2016/01/31", 
record1);
+    String fileId2 = testTable.addCommit("002").withInserts("2016/01/31", 
record2);
+    String fileId3 = testTable.addCommit("003").withInserts("2015/01/31", 
record4);
 
     // We do the tag again
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    table = HoodieTable.create(metaClient, config, hadoopConf);
-
-    taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
+    taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, 
HoodieTable.create(metaClient, config, hadoopConf));
 
     // Check results
     for (HoodieRecord record : taggedRecordRDD.collect()) {
       if (record.getRecordKey().equals(rowKey1)) {
         if (record.getPartitionPath().equals("2015/01/31")) {
-          assertEquals(record.getCurrentLocation().getFileId(), 
FSUtils.getFileId(filename3));
+          assertEquals(record.getCurrentLocation().getFileId(), fileId3);
         } else {
-          assertEquals(record.getCurrentLocation().getFileId(), 
FSUtils.getFileId(filename1));
+          assertEquals(record.getCurrentLocation().getFileId(), fileId1);
         }
       } else if (record.getRecordKey().equals(rowKey2)) {
-        assertEquals(record.getCurrentLocation().getFileId(), 
FSUtils.getFileId(filename2));
+        assertEquals(record.getCurrentLocation().getFileId(), fileId2);
       } else if (record.getRecordKey().equals(rowKey3)) {
         assertFalse(record.isCurrentLocationKnown());
       }
@@ -385,13 +361,13 @@ public class TestHoodieBloomIndex extends 
HoodieClientTestHarness {
 
     // Also create the metadata and config
     HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, 
bucketizedChecking);
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, 
hadoopConf);
+    HoodieWriteableTestTable testTable = 
HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
 
     // Let's tag
     HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
     JavaPairRDD<HoodieKey, Option<Pair<String, String>>> taggedRecordRDD =
-        bloomIndex.fetchRecordLocation(keysRDD, jsc, table);
+        bloomIndex.fetchRecordLocation(keysRDD, jsc, hoodieTable);
 
     // Should not find any files
     for (Tuple2<HoodieKey, Option<Pair<String, String>>> record : 
taggedRecordRDD.collect()) {
@@ -399,29 +375,26 @@ public class TestHoodieBloomIndex extends 
HoodieClientTestHarness {
     }
 
     // We create three parquet file, each having one record. (two different 
partitions)
-    String filename1 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", 
Collections.singletonList(record1), SCHEMA, null, true);
-    String filename2 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", 
Collections.singletonList(record2), SCHEMA, null, true);
-    String filename3 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", 
Collections.singletonList(record4), SCHEMA, null, true);
+    String fileId1 = testTable.addCommit("001").withInserts("2016/01/31", 
record1);
+    String fileId2 = testTable.addCommit("002").withInserts("2016/01/31", 
record2);
+    String fileId3 = testTable.addCommit("003").withInserts("2015/01/31", 
record4);
 
     // We do the tag again
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    table = HoodieTable.create(metaClient, config, hadoopConf);
-    taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table);
+    hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
+    taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, 
hoodieTable);
 
     // Check results
     for (Tuple2<HoodieKey, Option<Pair<String, String>>> record : 
taggedRecordRDD.collect()) {
       if 
(record._1.getRecordKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) {
         assertTrue(record._2.isPresent());
-        assertEquals(FSUtils.getFileId(filename1), record._2.get().getRight());
+        assertEquals(fileId1, record._2.get().getRight());
       } else if 
(record._1.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) {
         assertTrue(record._2.isPresent());
         if (record._1.getPartitionPath().equals("2015/01/31")) {
-          assertEquals(FSUtils.getFileId(filename3), 
record._2.get().getRight());
+          assertEquals(fileId3, record._2.get().getRight());
         } else {
-          assertEquals(FSUtils.getFileId(filename2), 
record._2.get().getRight());
+          assertEquals(fileId2, record._2.get().getRight());
         }
       } else if 
(record._1.getRecordKey().equals("3eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) {
         assertFalse(record._2.isPresent());
@@ -431,7 +404,7 @@ public class TestHoodieBloomIndex extends 
HoodieClientTestHarness {
 
   @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
   @MethodSource("configParams")
-  public void testBloomFilterFalseError(boolean rangePruning, boolean 
treeFiltering, boolean bucketizedChecking) throws IOException, 
InterruptedException {
+  public void testBloomFilterFalseError(boolean rangePruning, boolean 
treeFiltering, boolean bucketizedChecking) throws Exception {
     // We have two hoodie records
     String recordStr1 = 
"{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
         + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
@@ -449,8 +422,8 @@ public class TestHoodieBloomIndex extends 
HoodieClientTestHarness {
     BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 
0.0000001, -1,
         BloomFilterTypeCode.SIMPLE.name());
     filter.add(record2.getRecordKey());
-    String filename =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", 
Collections.singletonList(record1), SCHEMA, filter, true);
+    HoodieWriteableTestTable testTable = 
HoodieWriteableTestTable.of(metaClient, SCHEMA, filter);
+    String fileId = testTable.addCommit("000").withInserts("2016/01/31", 
record1);
     assertTrue(filter.mightContain(record1.getRecordKey()));
     assertTrue(filter.mightContain(record2.getRecordKey()));
 
@@ -466,7 +439,7 @@ public class TestHoodieBloomIndex extends 
HoodieClientTestHarness {
     // Check results
     for (HoodieRecord record : taggedRecordRDD.collect()) {
       if (record.getKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) {
-        assertEquals(record.getCurrentLocation().getFileId(), 
FSUtils.getFileId(filename));
+        assertEquals(record.getCurrentLocation().getFileId(), fileId);
       } else if 
(record.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) {
         assertFalse(record.isCurrentLocationKnown());
       }
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
 
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 3c4600d..197a3bb 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -18,18 +18,15 @@
 
 package org.apache.hudi.index.bloom;
 
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
+import org.apache.hudi.testutils.HoodieWriteableTestTable;
 
 import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -39,10 +36,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -78,21 +71,17 @@ public class TestHoodieGlobalBloomIndex extends 
HoodieClientTestHarness {
   }
 
   @Test
-  public void testLoadInvolvedFiles() throws IOException {
+  public void testLoadInvolvedFiles() throws Exception {
     HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath).build();
     HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config);
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, 
hadoopConf);
+    HoodieWriteableTestTable testTable = 
HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
 
     // Create some partitions, and put some files, along with the meta file
     // "2016/01/21": 0 file
     // "2016/04/01": 1 file (2_0_20160401010101.parquet)
-    // "2015/03/12": 3 files (1_0_20150312101010.parquet, 
3_0_20150312101010.parquet,
-    // 4_0_20150312101010.parquet)
-    Path dir1 = Files.createDirectories(Paths.get(basePath, "2016", "01", 
"21"));
-    
Files.createFile(dir1.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-    Path dir2 = Files.createDirectories(Paths.get(basePath, "2016", "04", 
"01"));
-    
Files.createFile(dir2.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-    Path dir3 = Files.createDirectories(Paths.get(basePath, "2015", "03", 
"12"));
-    
Files.createFile(dir3.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
+    // "2015/03/12": 3 files (1_0_20150312101010.parquet, 
3_0_20150312101010.parquet, 4_0_20150312101010.parquet)
+    testTable.withPartitionMetaFiles("2016/01/21", "2016/04/01", "2015/03/12");
 
     RawTripTestPayload rowChange1 =
         new 
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
@@ -111,31 +100,19 @@ public class TestHoodieGlobalBloomIndex extends 
HoodieClientTestHarness {
     HoodieRecord record4 =
         new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), 
rowChange4.getPartitionPath()), rowChange4);
 
-    HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", 
"2_0_20160401010101.parquet", new ArrayList<>(),
-        SCHEMA, null, false);
-    HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", 
"1_0_20150312101010.parquet", new ArrayList<>(),
-        SCHEMA, null, false);
-    HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", 
"3_0_20150312101010.parquet", Collections.singletonList(record1),
-        SCHEMA, null, false);
-    HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", 
"4_0_20150312101010.parquet",
-        Arrays.asList(record2, record3, record4), SCHEMA, null, false);
-
     // intentionally missed the partition "2015/03/12" to see if the 
GlobalBloomIndex can pick it up
     List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01");
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
     // partitions will NOT be respected by this loadInvolvedFiles(...) call
-    List<Tuple2<String, BloomIndexFileInfo>> filesList = 
index.loadInvolvedFiles(partitions, jsc, table);
+    List<Tuple2<String, BloomIndexFileInfo>> filesList = 
index.loadInvolvedFiles(partitions, jsc, hoodieTable);
     // Still 0, as no valid commit
     assertEquals(0, filesList.size());
 
-    // Add some commits
-    Path hoodieDir = Files.createDirectories(Paths.get(basePath, ".hoodie"));
-    Files.createFile(hoodieDir.resolve("20160401010101.commit"));
-    Files.createFile(hoodieDir.resolve("20150312101010.commit"));
+    testTable.addCommit("20160401010101").withInserts("2016/04/01", "2");
+    testTable.addCommit("20150312101010").withInserts("2015/03/12", "1")
+        .withInserts("2015/03/12", "3", record1)
+        .withInserts("2015/03/12", "4", record2, record3, record4);
 
-    table = HoodieTable.create(metaClient, config, hadoopConf);
-    filesList = index.loadInvolvedFiles(partitions, jsc, table);
+    filesList = index.loadInvolvedFiles(partitions, jsc, hoodieTable);
     assertEquals(4, filesList.size());
 
     Map<String, BloomIndexFileInfo> filesMap = toFileMap(filesList);
@@ -201,18 +178,14 @@ public class TestHoodieGlobalBloomIndex extends 
HoodieClientTestHarness {
   public void testTagLocation() throws Exception {
     HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath).build();
     HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config);
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, 
hadoopConf);
+    HoodieWriteableTestTable testTable = 
HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
 
     // Create some partitions, and put some files, along with the meta file
     // "2016/01/21": 0 file
     // "2016/04/01": 1 file (2_0_20160401010101.parquet)
-    // "2015/03/12": 3 files (1_0_20150312101010.parquet, 
3_0_20150312101010.parquet,
-    // 4_0_20150312101010.parquet)
-    Path dir1 = Files.createDirectories(Paths.get(basePath, "2016", "01", 
"21"));
-    
Files.createFile(dir1.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-    Path dir2 = Files.createDirectories(Paths.get(basePath, "2016", "04", 
"01"));
-    
Files.createFile(dir2.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-    Path dir3 = Files.createDirectories(Paths.get(basePath, "2015", "03", 
"12"));
-    
Files.createFile(dir3.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
+    // "2015/03/12": 3 files (1_0_20150312101010.parquet, 
3_0_20150312101010.parquet, 4_0_20150312101010.parquet)
+    testTable.withPartitionMetaFiles("2016/01/21", "2016/04/01", "2015/03/12");
 
     RawTripTestPayload rowChange1 =
         new 
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
@@ -242,33 +215,23 @@ public class TestHoodieGlobalBloomIndex extends 
HoodieClientTestHarness {
 
     JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, 
record2, record3, record5));
 
-    String filename0 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", 
Collections.singletonList(record1), SCHEMA, null, false);
-    String filename1 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", new 
ArrayList<>(), SCHEMA, null, false);
-    String filename2 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", 
Collections.singletonList(record2), SCHEMA, null, false);
-    String filename3 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", 
Collections.singletonList(record4), SCHEMA, null, false);
-
     // intentionally missed the partition "2015/03/12" to see if the 
GlobalBloomIndex can pick it up
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
-
-    // Add some commits
-    Files.createDirectories(Paths.get(basePath, ".hoodie"));
+    String fileId1 = testTable.addCommit("1000").withInserts("2016/04/01", 
record1);
+    String fileId2 = testTable.addCommit("2000").withInserts("2015/03/12");
+    String fileId3 = testTable.addCommit("3000").withInserts("2015/03/12", 
record2);
+    String fileId4 = testTable.addCommit("4000").withInserts("2015/03/12", 
record4);
 
     // partitions will NOT be respected by this loadInvolvedFiles(...) call
-    JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, 
table);
+    JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, 
hoodieTable);
 
     for (HoodieRecord record : taggedRecordRDD.collect()) {
       switch (record.getRecordKey()) {
         case "000":
-          assertEquals(record.getCurrentLocation().getFileId(), 
FSUtils.getFileId(filename0));
+          assertEquals(record.getCurrentLocation().getFileId(), fileId1);
           assertEquals(((RawTripTestPayload) record.getData()).getJsonData(), 
rowChange1.getJsonData());
           break;
         case "001":
-          assertEquals(record.getCurrentLocation().getFileId(), 
FSUtils.getFileId(filename2));
+          assertEquals(record.getCurrentLocation().getFileId(), fileId3);
           assertEquals(((RawTripTestPayload) record.getData()).getJsonData(), 
rowChange2.getJsonData());
           break;
         case "002":
@@ -276,11 +239,11 @@ public class TestHoodieGlobalBloomIndex extends 
HoodieClientTestHarness {
           assertEquals(((RawTripTestPayload) record.getData()).getJsonData(), 
rowChange3.getJsonData());
           break;
         case "003":
-          assertEquals(record.getCurrentLocation().getFileId(), 
FSUtils.getFileId(filename3));
+          assertEquals(record.getCurrentLocation().getFileId(), fileId4);
           assertEquals(((RawTripTestPayload) record.getData()).getJsonData(), 
rowChange5.getJsonData());
           break;
         case "004":
-          assertEquals(record.getCurrentLocation().getFileId(), 
FSUtils.getFileId(filename3));
+          assertEquals(record.getCurrentLocation().getFileId(), fileId4);
           assertEquals(((RawTripTestPayload) record.getData()).getJsonData(), 
rowChange4.getJsonData());
           break;
         default:
@@ -296,12 +259,13 @@ public class TestHoodieGlobalBloomIndex extends 
HoodieClientTestHarness {
         
.withIndexConfig(HoodieIndexConfig.newBuilder().withBloomIndexUpdatePartitionPath(true).build())
         .build();
     HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config);
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, 
jsc.hadoopConfiguration());
+    HoodieWriteableTestTable testTable = 
HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
+    final String p1 = "2016/01/31";
+    final String p2 = "2016/02/28";
 
     // Create the original partition, and put a record, along with the meta 
file
     // "2016/01/31": 1 file (1_0_20160131101010.parquet)
-    Path dir = Files.createDirectories(Paths.get(basePath, "2016", "01", 
"31"));
-    
Files.createFile(dir.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-
     // this record will be saved in table and will be tagged to an empty record
     RawTripTestPayload originalPayload =
         new 
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
@@ -317,7 +281,7 @@ public class TestHoodieGlobalBloomIndex extends 
HoodieClientTestHarness {
      - tag the new partition of the incomingRecord
     */
     RawTripTestPayload incomingPayload =
-        new 
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}");
+        new 
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-28T03:16:41.415Z\",\"number\":12}");
     HoodieRecord incomingRecord =
         new HoodieRecord(new HoodieKey(incomingPayload.getRowKey(), 
incomingPayload.getPartitionPath()),
             incomingPayload);
@@ -334,27 +298,20 @@ public class TestHoodieGlobalBloomIndex extends 
HoodieClientTestHarness {
             new HoodieKey(incomingPayloadSamePartition.getRowKey(), 
incomingPayloadSamePartition.getPartitionPath()),
             incomingPayloadSamePartition);
 
-    HoodieClientTestUtils
-        .writeParquetFile(basePath, "2016/01/31", 
Collections.singletonList(originalRecord), SCHEMA, null, false);
-
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
-
-    // Add some commits
-    Files.createDirectories(Paths.get(basePath, ".hoodie"));
+    testTable.addCommit("1000").withInserts(p1, originalRecord);
 
     // test against incoming record with a different partition
     JavaRDD<HoodieRecord> recordRDD = 
jsc.parallelize(Collections.singletonList(incomingRecord));
-    JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, 
table);
+    JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, 
hoodieTable);
 
     assertEquals(2, taggedRecordRDD.count());
     for (HoodieRecord record : taggedRecordRDD.collect()) {
       switch (record.getPartitionPath()) {
-        case "2016/01/31":
+        case p1:
           assertEquals("000", record.getRecordKey());
           assertTrue(record.getData() instanceof EmptyHoodieRecordPayload);
           break;
-        case "2016/02/31":
+        case p2:
           assertEquals("000", record.getRecordKey());
           assertEquals(incomingPayload.getJsonData(), ((RawTripTestPayload) 
record.getData()).getJsonData());
           break;
@@ -366,17 +323,17 @@ public class TestHoodieGlobalBloomIndex extends 
HoodieClientTestHarness {
     // test against incoming record with the same partition
     JavaRDD<HoodieRecord> recordRDDSamePartition = jsc
         .parallelize(Collections.singletonList(incomingRecordSamePartition));
-    JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = 
index.tagLocation(recordRDDSamePartition, jsc, table);
+    JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = 
index.tagLocation(recordRDDSamePartition, jsc, hoodieTable);
 
     assertEquals(1, taggedRecordRDDSamePartition.count());
     HoodieRecord record = taggedRecordRDDSamePartition.first();
     assertEquals("000", record.getRecordKey());
-    assertEquals("2016/01/31", record.getPartitionPath());
+    assertEquals(p1, record.getPartitionPath());
     assertEquals(incomingPayloadSamePartition.getJsonData(), 
((RawTripTestPayload) record.getData()).getJsonData());
   }
 
   // convert list to map to avoid sorting order dependencies
-  private Map<String, BloomIndexFileInfo> toFileMap(List<Tuple2<String, 
BloomIndexFileInfo>> filesList) {
+  private static Map<String, BloomIndexFileInfo> toFileMap(List<Tuple2<String, 
BloomIndexFileInfo>> filesList) {
     Map<String, BloomIndexFileInfo> filesMap = new HashMap<>();
     for (Tuple2<String, BloomIndexFileInfo> t : filesList) {
       filesMap.put(t._1() + "/" + t._2().getFileId(), t._2());
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
 
b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
index ba5d6dd..ba20f5d 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
@@ -19,16 +19,12 @@
 package org.apache.hudi.io;
 
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -37,7 +33,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
+import org.apache.hudi.testutils.HoodieWriteableTestTable;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 
 import org.apache.spark.api.java.JavaSparkContext;
@@ -46,19 +42,18 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
 import scala.Tuple2;
 
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.makeNewCommitTime;
 import static 
org.apache.hudi.common.testutils.Transformations.recordsToPartitionRecordsMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -88,17 +83,12 @@ public class TestHoodieKeyLocationFetchHandle extends 
HoodieClientTestHarness {
 
   @Test
   public void testFetchHandle() throws Exception {
-
-    String commitTime = "000";
-    List<HoodieRecord> records = dataGen.generateInserts(commitTime, 100);
+    List<HoodieRecord> records = dataGen.generateInserts(makeNewCommitTime(), 
100);
     Map<String, List<HoodieRecord>> partitionRecordsMap = 
recordsToPartitionRecordsMap(records);
-
-    Map<Tuple2<String, String>, List<Tuple2<HoodieKey, HoodieRecordLocation>>> 
expectedList = writeToParquetAndGetExpectedRecordLocations(partitionRecordsMap);
-
-    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable hoodieTable = HoodieTable.create(metaClient, config, 
jsc.hadoopConfiguration());
-
-    Files.createDirectories(Paths.get(basePath, ".hoodie"));
+    HoodieWriteableTestTable testTable = 
HoodieWriteableTestTable.of(hoodieTable, AVRO_SCHEMA_WITH_METADATA_FIELDS);
+    Map<Tuple2<String, String>, List<Tuple2<HoodieKey, HoodieRecordLocation>>> 
expectedList =
+        writeToParquetAndGetExpectedRecordLocations(partitionRecordsMap, 
testTable);
 
     List<Tuple2<String, HoodieBaseFile>> partitionPathFileIdPairs = 
loadAllFilesForPartitions(new ArrayList<>(partitionRecordsMap.keySet()), jsc, 
hoodieTable);
 
@@ -112,7 +102,7 @@ public class TestHoodieKeyLocationFetchHandle extends 
HoodieClientTestHarness {
   }
 
   private Map<Tuple2<String, String>, List<Tuple2<HoodieKey, 
HoodieRecordLocation>>> writeToParquetAndGetExpectedRecordLocations(
-      Map<String, List<HoodieRecord>> partitionRecordsMap) throws Exception {
+      Map<String, List<HoodieRecord>> partitionRecordsMap, 
HoodieWriteableTestTable testTable) throws Exception {
     Map<Tuple2<String, String>, List<Tuple2<HoodieKey, HoodieRecordLocation>>> 
expectedList = new HashMap<>();
     for (Map.Entry<String, List<HoodieRecord>> entry : 
partitionRecordsMap.entrySet()) {
       int totalRecordsPerPartition = entry.getValue().size();
@@ -138,7 +128,9 @@ public class TestHoodieKeyLocationFetchHandle extends 
HoodieClientTestHarness {
       }
 
       for (List<HoodieRecord> recordsPerSlice : recordsForFileSlices) {
-        Tuple2<String, String> fileIdInstantTimePair = 
writeToParquet(entry.getKey(), recordsPerSlice);
+        String instantTime = makeNewCommitTime();
+        String fileId = 
testTable.addCommit(instantTime).withInserts(entry.getKey(), 
recordsPerSlice.toArray(new HoodieRecord[0]));
+        Tuple2<String, String> fileIdInstantTimePair = new Tuple2<>(fileId, 
instantTime);
         List<Tuple2<HoodieKey, HoodieRecordLocation>> expectedEntries = new 
ArrayList<>();
         for (HoodieRecord record : recordsPerSlice) {
           expectedEntries.add(new Tuple2<>(record.getKey(), new 
HoodieRecordLocation(fileIdInstantTimePair._2, fileIdInstantTimePair._1)));
@@ -149,31 +141,16 @@ public class TestHoodieKeyLocationFetchHandle extends 
HoodieClientTestHarness {
     return expectedList;
   }
 
-  protected List<Tuple2<String, HoodieBaseFile>> 
loadAllFilesForPartitions(List<String> partitions, final JavaSparkContext jsc,
-                                                                           
final HoodieTable hoodieTable) {
-
+  private static List<Tuple2<String, HoodieBaseFile>> 
loadAllFilesForPartitions(List<String> partitions, JavaSparkContext jsc,
+      HoodieTable hoodieTable) {
     // Obtain the latest data files from all the partitions.
     List<Pair<String, HoodieBaseFile>> partitionPathFileIDList = 
HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, jsc, 
hoodieTable);
     return partitionPathFileIDList.stream()
         .map(pf -> new Tuple2<>(pf.getKey(), pf.getValue())).collect(toList());
   }
 
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  public HoodieWriteConfig.Builder getConfigBuilder() {
-    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
-  }
-
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  private HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
-    return 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
+  private HoodieWriteConfig.Builder getConfigBuilder() {
+    return 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 
2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
         .withWriteStatusClass(MetadataMergeWriteStatus.class)
         
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
@@ -184,15 +161,4 @@ public class TestHoodieKeyLocationFetchHandle extends 
HoodieClientTestHarness {
         
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
             
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
   }
-
-  private Tuple2<String, String> writeToParquet(String partitionPath, 
List<HoodieRecord> records) throws Exception {
-    Thread.sleep(100);
-    String instantTime = HoodieTestUtils.makeNewCommitTime();
-    String fileId = UUID.randomUUID().toString();
-    String filename = FSUtils.makeDataFileName(instantTime, "1-0-1", fileId);
-    HoodieTestUtils.createCommitFiles(basePath, instantTime);
-    HoodieClientTestUtils.writeParquetFile(basePath, partitionPath, filename, 
records, AVRO_SCHEMA_WITH_METADATA_FIELDS, null,
-        true);
-    return new Tuple2<>(fileId, instantTime);
-  }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index f7ce95c..8016acc 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -527,18 +527,18 @@ public class TestCleaner extends HoodieClientTestBase {
         : UUID.randomUUID().toString();
     String file1P1C0 = enableBootstrapSourceClean ? 
bootstrapMapping.get(p1).get(0).getFileId()
         : UUID.randomUUID().toString();
-    testTable.addCommit("00000000000001").withUpdates(p0, 
file1P0C0).withUpdates(p1, file1P1C0);
+    testTable.addCommit("00000000000001").withBaseFilesInPartition(p0, 
file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
 
     List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
     assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files");
-    assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
-    assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0));
+    assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
 
     // make next commit, with 1 insert & 1 update per partition
     Map<String, String> partitionAndFileId002 = 
testTable.addCommit("00000000000002")
-        .withUpdates(p0, file1P0C0)
-        .withUpdates(p1, file1P1C0)
-        .withInserts(p0, p1);
+        .withBaseFilesInPartition(p0, file1P0C0)
+        .withBaseFilesInPartition(p1, file1P1C0)
+        .withBaseFilesInPartitions(p0, p1);
 
     List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
     // enableBootstrapSourceClean would delete the bootstrap base file as the 
same time
@@ -559,10 +559,10 @@ public class TestCleaner extends HoodieClientTestBase {
     cleanStat = getCleanStat(hoodieCleanStatsTwo, p1);
     String file2P0C1 = partitionAndFileId002.get(p0);
     String file2P1C1 = partitionAndFileId002.get(p1);
-    assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
-    assertTrue(testTable.fileExists(p1, "00000000000002", file2P1C1));
-    assertFalse(testTable.fileExists(p0, "00000000000001", file1P0C0));
-    assertFalse(testTable.fileExists(p1, "00000000000001", file1P1C0));
+    assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+    assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1));
+    assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
     assertEquals(enableBootstrapSourceClean ? 2 : 1, 
cleanStat.getSuccessDeleteFiles().size()
         + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
         : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean 
at least 1 file");
@@ -579,21 +579,21 @@ public class TestCleaner extends HoodieClientTestBase {
 
     // make next commit, with 2 updates to existing files, and 1 insert
     String file3P0C2 = testTable.addCommit("00000000000003")
-        .withUpdates(p0, file1P0C0, file2P0C1)
-        .withInserts(p0).get(p0);
+        .withBaseFilesInPartition(p0, file1P0C0, file2P0C1)
+        .withBaseFilesInPartitions(p0).get(p0);
     List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
     assertEquals(2,
         getCleanStat(hoodieCleanStatsThree, p0)
             .getSuccessDeleteFiles().size(), "Must clean two files");
-    assertFalse(testTable.fileExists(p0, "00000000000002", file1P0C0));
-    assertFalse(testTable.fileExists(p0, "00000000000002", file2P0C1));
-    assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2));
+    assertFalse(testTable.baseFileExists(p0, "00000000000002", file1P0C0));
+    assertFalse(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+    assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2));
 
     // No cleaning on partially written file, with no commit.
-    testTable.forCommit("00000000000004").withUpdates(p0, file3P0C2);
+    testTable.forCommit("00000000000004").withBaseFilesInPartition(p0, 
file3P0C2);
     List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
     assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
-    assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2));
+    assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2));
   }
 
   /**
@@ -613,23 +613,23 @@ public class TestCleaner extends HoodieClientTestBase {
     String p0 = "2020/01/01";
 
     // Make 3 files, one base file and 2 log files associated with base file
-    String file1P0 = testTable.addDeltaCommit("000").withInserts(p0).get(p0);
+    String file1P0 = 
testTable.addDeltaCommit("000").withBaseFilesInPartitions(p0).get(p0);
     testTable.forDeltaCommit("000")
         .withLogFile(p0, file1P0, 1)
         .withLogFile(p0, file1P0, 2);
 
     // Make 2 files, one base file and 1 log files associated with base file
     testTable.addDeltaCommit("001")
-        .withUpdates(p0, file1P0)
+        .withBaseFilesInPartition(p0, file1P0)
         .withLogFile(p0, file1P0, 3);
 
     List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
     assertEquals(3,
         getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles()
             .size(), "Must clean three files, one parquet and 2 log files");
-    assertFalse(testTable.fileExists(p0, "000", file1P0));
+    assertFalse(testTable.baseFileExists(p0, "000", file1P0));
     assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2));
-    assertTrue(testTable.fileExists(p0, "001", file1P0));
+    assertTrue(testTable.baseFileExists(p0, "001", file1P0));
     assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
   }
 
@@ -831,7 +831,7 @@ public class TestCleaner extends HoodieClientTestBase {
         : UUID.randomUUID().toString();
     String file1P1C0 = enableBootstrapSourceClean ? 
bootstrapMapping.get(p1).get(0).getFileId()
         : UUID.randomUUID().toString();
-    testTable.addInflightCommit("00000000000001").withUpdates(p0, 
file1P0C0).withUpdates(p1, file1P1C0);
+    testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, 
file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
 
     HoodieCommitMetadata commitMetadata = generateCommitMetadata(
         Collections.unmodifiableMap(new HashMap<String, List<String>>() {
@@ -849,14 +849,14 @@ public class TestCleaner extends HoodieClientTestBase {
 
     List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, 
simulateFailureRetry);
     assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions 
and clean any files");
-    assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
-    assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0));
+    assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
 
     // make next commit, with 1 insert & 1 update per partition
-    Map<String, String> partitionAndFileId002 = 
testTable.addInflightCommit("00000000000002").withInserts(p0, p1);
+    Map<String, String> partitionAndFileId002 = 
testTable.addInflightCommit("00000000000002").withBaseFilesInPartitions(p0, p1);
     String file2P0C1 = partitionAndFileId002.get(p0);
     String file2P1C1 = partitionAndFileId002.get(p1);
-    testTable.forCommit("00000000000002").withUpdates(p0, 
file1P0C0).withUpdates(p1, file1P1C0);
+    testTable.forCommit("00000000000002").withBaseFilesInPartition(p0, 
file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
     commitMetadata = generateCommitMetadata(new HashMap<String, 
List<String>>() {
       {
         put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
@@ -868,16 +868,16 @@ public class TestCleaner extends HoodieClientTestBase {
         
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
     List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, 
simulateFailureRetry);
     assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions 
and clean any files");
-    assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
-    assertTrue(testTable.fileExists(p1, "00000000000002", file2P1C1));
-    assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
-    assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0));
+    assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+    assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1));
+    assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
 
     // make next commit, with 2 updates to existing files, and 1 insert
     String file3P0C2 = testTable.addInflightCommit("00000000000003")
-        .withUpdates(p0, file1P0C0)
-        .withUpdates(p0, file2P0C1)
-        .withInserts(p0).get(p0);
+        .withBaseFilesInPartition(p0, file1P0C0)
+        .withBaseFilesInPartition(p0, file2P0C1)
+        .withBaseFilesInPartitions(p0).get(p0);
     commitMetadata = generateCommitMetadata(CollectionUtils
         .createImmutableMap(p0,
             CollectionUtils.createImmutableList(file1P0C0, file2P0C1, 
file3P0C2)));
@@ -888,13 +888,13 @@ public class TestCleaner extends HoodieClientTestBase {
     List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, 
simulateFailureRetry);
     assertEquals(0, hoodieCleanStatsThree.size(),
         "Must not clean any file. We have to keep 1 version before the latest 
commit time to keep");
-    assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
 
     // make next commit, with 2 updates to existing files, and 1 insert
     String file4P0C3 = testTable.addInflightCommit("00000000000004")
-        .withUpdates(p0, file1P0C0)
-        .withUpdates(p0, file2P0C1)
-        .withInserts(p0).get(p0);
+        .withBaseFilesInPartition(p0, file1P0C0)
+        .withBaseFilesInPartition(p0, file2P0C1)
+        .withBaseFilesInPartitions(p0).get(p0);
     commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(
         p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, 
file4P0C3)));
     metaClient.getActiveTimeline().saveAsComplete(
@@ -908,20 +908,20 @@ public class TestCleaner extends HoodieClientTestBase {
     assertEquals(enableBootstrapSourceClean ? 2 : 1, 
partitionCleanStat.getSuccessDeleteFiles().size()
         + (partitionCleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
         : partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size()), 
"Must clean at least one old file");
-    assertFalse(testTable.fileExists(p0, "00000000000001", file1P0C0));
-    assertTrue(testTable.fileExists(p0, "00000000000002", file1P0C0));
-    assertTrue(testTable.fileExists(p0, "00000000000003", file1P0C0));
-    assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
-    assertTrue(testTable.fileExists(p0, "00000000000003", file2P0C1));
-    assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2));
-    assertTrue(testTable.fileExists(p0, "00000000000004", file4P0C3));
+    assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p0, "00000000000002", file1P0C0));
+    assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
+    assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+    assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
+    assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2));
+    assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
     if (enableBootstrapSourceClean) {
       assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
           p0).get(0).getBoostrapFileStatus().getPath().getUri())));
     }
 
     // No cleaning on partially written file, with no commit.
-    testTable.forCommit("00000000000005").withUpdates(p0, file3P0C2);
+    testTable.forCommit("00000000000005").withBaseFilesInPartition(p0, 
file3P0C2);
     commitMetadata = 
generateCommitMetadata(CollectionUtils.createImmutableMap(p0,
         CollectionUtils.createImmutableList(file3P0C2)));
     metaClient.getActiveTimeline().createNewInstant(
@@ -932,9 +932,9 @@ public class TestCleaner extends HoodieClientTestBase {
     List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 
simulateFailureRetry);
     HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, p0);
     assertNull(cleanStat, "Must not clean any files");
-    assertTrue(testTable.fileExists(p0, "00000000000002", file1P0C0));
-    assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
-    assertTrue(testTable.fileExists(p0, "00000000000005", file3P0C2));
+    assertTrue(testTable.baseFileExists(p0, "00000000000002", file1P0C0));
+    assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+    assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
   }
 
   /**
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
index 1f638c3..8b19ac1 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
@@ -66,9 +66,9 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
   @ParameterizedTest
   @MethodSource("consistencyGuardType")
   public void testCheckPassingAppearAndDisAppear(String consistencyGuardType) 
throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f2");
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f3");
+    FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
+    FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f2");
+    FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f3");
 
     ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000);
     ConsistencyGuard passing = 
consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName())
@@ -88,7 +88,7 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
 
   @Test
   public void testCheckFailingAppearFailSafe() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
     ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 
getConsistencyGuardConfig());
     assertThrows(TimeoutException.class, () -> {
       passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
@@ -98,7 +98,7 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
 
   @Test
   public void testCheckFailingAppearTimedWait() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
     ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, 
getConsistencyGuardConfig());
     passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
           .asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath 
+ "/partition/path/f2_1-0-2_000.parquet"));
@@ -106,7 +106,7 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
 
   @Test
   public void testCheckFailingAppearsFailSafe() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
     ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 
getConsistencyGuardConfig());
     assertThrows(TimeoutException.class, () -> {
       passing.waitTillFileAppears(new Path(basePath + 
"/partition/path/f1_1-0-2_000.parquet"));
@@ -115,14 +115,14 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
 
   @Test
   public void testCheckFailingAppearsTimedWait() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
     ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, 
getConsistencyGuardConfig());
     passing.waitTillFileAppears(new Path(basePath + 
"/partition/path/f1_1-0-2_000.parquet"));
   }
 
   @Test
   public void testCheckFailingDisappearFailSafe() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
     ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 
getConsistencyGuardConfig());
     assertThrows(TimeoutException.class, () -> {
       passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
@@ -132,7 +132,7 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
 
   @Test
   public void testCheckFailingDisappearTimedWait() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
     ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, 
getConsistencyGuardConfig());
     passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
           .asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath 
+ "/partition/path/f2_1-0-2_000.parquet"));
@@ -140,8 +140,8 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
 
   @Test
   public void testCheckFailingDisappearsFailSafe() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
+    FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
     ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 
getConsistencyGuardConfig());
     assertThrows(TimeoutException.class, () -> {
       passing.waitTillFileDisappears(new Path(basePath + 
"/partition/path/f1_1-0-1_000.parquet"));
@@ -150,8 +150,8 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
 
   @Test
   public void testCheckFailingDisappearsTimedWait() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
+    FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
     ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, 
getConsistencyGuardConfig());
     passing.waitTillFileDisappears(new Path(basePath + 
"/partition/path/f1_1-0-1_000.parquet"));
   }
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
 
b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index 138f60e..2fc9fe8 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -75,7 +75,7 @@ public class TestUpsertPartitioner extends 
HoodieClientTestBase {
         .build();
 
     FileCreateUtils.createCommit(basePath, "001");
-    FileCreateUtils.createDataFile(basePath, testPartitionPath, "001", 
"file1", fileSize);
+    FileCreateUtils.createBaseFile(basePath, testPartitionPath, "001", 
"file1", fileSize);
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) 
HoodieTable.create(metaClient, config, hadoopConf);
 
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
 
b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
index 83e7ea0..8c4da54 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
@@ -58,10 +58,10 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
     // given: wrote some base files and corresponding markers
     HoodieTestTable testTable = HoodieTestTable.of(metaClient);
     String f0 = testTable.addRequestedCommit("000")
-        .withInserts("partA").get("partA");
+        .withBaseFilesInPartitions("partA").get("partA");
     String f1 = testTable.addCommit("001")
-        .withUpdates("partA", f0)
-        .withInserts("partB").get("partB");
+        .withBaseFilesInPartition("partA", f0)
+        .withBaseFilesInPartitions("partB").get("partB");
     String f2 = "f2";
     testTable.forCommit("001")
         .withMarkerFile("partA", f0, IOType.MERGE)
@@ -89,10 +89,10 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
     // given: wrote some base + log files and corresponding markers
     HoodieTestTable testTable = HoodieTestTable.of(metaClient);
     String f2 = testTable.addRequestedDeltaCommit("000")
-        .withInserts("partA").get("partA");
+        .withBaseFilesInPartitions("partA").get("partA");
     String f1 = testTable.addDeltaCommit("001")
         .withLogFile("partA", f2)
-        .withInserts("partB").get("partB");
+        .withBaseFilesInPartitions("partB").get("partB");
     String f3 = "f3";
     String f4 = "f4";
     testTable.forDeltaCommit("001")
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
 
b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index c4c67fa..307e068 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -19,12 +19,7 @@
 package org.apache.hudi.testutils;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.avro.HoodieAvroWriteSupport;
 import org.apache.hudi.client.HoodieReadClient;
-import org.apache.hudi.client.SparkTaskContextSupplier;
-import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.BloomFilterFactory;
-import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -36,11 +31,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
-import org.apache.hudi.io.storage.HoodieParquetWriter;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -52,9 +43,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -62,13 +50,11 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 
 import java.io.IOException;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -214,54 +200,4 @@ public class HoodieClientTestUtils {
     return valuesAsList.stream();
   }
 
-  /**
-   * TODO Incorporate into {@link 
org.apache.hudi.common.testutils.HoodieTestTable}.
-   */
-  public static String writeParquetFile(String basePath, String partitionPath, 
String filename,
-                                        List<HoodieRecord> records, Schema 
schema, BloomFilter filter, boolean createCommitTime) throws IOException {
-
-    if (filter == null) {
-      filter = BloomFilterFactory
-          .createBloomFilter(10000, 0.0000001, -1, 
BloomFilterTypeCode.SIMPLE.name());
-    }
-    HoodieAvroWriteSupport writeSupport =
-        new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), 
schema, filter);
-    String instantTime = FSUtils.getCommitTime(filename);
-    HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, 
CompressionCodecName.GZIP,
-        ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 
* 1024 * 1024,
-        HoodieTestUtils.getDefaultHadoopConf(), 
Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO));
-    HoodieParquetWriter writer =
-        new HoodieParquetWriter(instantTime, new Path(basePath + "/" + 
partitionPath + "/" + filename), config,
-                schema, new SparkTaskContextSupplier());
-    int seqId = 1;
-    for (HoodieRecord record : records) {
-      GenericRecord avroRecord = (GenericRecord) 
record.getData().getInsertValue(schema).get();
-      HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, instantTime, "" + 
seqId++);
-      HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), 
record.getPartitionPath(), filename);
-      writer.writeAvro(record.getRecordKey(), avroRecord);
-      filter.add(record.getRecordKey());
-    }
-    writer.close();
-
-    if (createCommitTime) {
-      HoodieTestUtils.createMetadataFolder(basePath);
-      HoodieTestUtils.createCommitFiles(basePath, instantTime);
-    }
-    return filename;
-  }
-
-  /**
-   * TODO Incorporate into {@link 
org.apache.hudi.common.testutils.HoodieTestTable}.
-   */
-  public static String writeParquetFile(String basePath, String partitionPath, 
List<HoodieRecord> records,
-                                        Schema schema, BloomFilter filter, 
boolean createCommitTime) throws IOException, InterruptedException {
-    Thread.sleep(1000);
-    String instantTime = HoodieTestUtils.makeNewCommitTime();
-    String fileId = UUID.randomUUID().toString();
-    String filename = FSUtils.makeDataFileName(instantTime, "1-0-1", fileId);
-    HoodieTestUtils.createCommitFiles(basePath, instantTime);
-    return HoodieClientTestUtils.writeParquetFile(basePath, partitionPath, 
filename, records, schema, filter,
-        createCommitTime);
-  }
-
 }
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
 
b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
new file mode 100644
index 0000000..c2faa83
--- /dev/null
+++ 
b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.testutils;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.FileCreateUtils;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
+import org.apache.hudi.io.storage.HoodieParquetWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
+
+public class HoodieWriteableTestTable extends HoodieTestTable {
+
+  private final Schema schema;
+  private final BloomFilter filter;
+
+  private HoodieWriteableTestTable(String basePath, FileSystem fs, 
HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) {
+    super(basePath, fs, metaClient);
+    this.schema = schema;
+    this.filter = filter;
+  }
+
+  public static HoodieWriteableTestTable of(HoodieTableMetaClient metaClient, 
Schema schema, BloomFilter filter) {
+    return new HoodieWriteableTestTable(metaClient.getBasePath(), 
metaClient.getRawFs(), metaClient, schema, filter);
+  }
+
+  public static HoodieWriteableTestTable of(HoodieTableMetaClient metaClient, 
Schema schema) {
+    BloomFilter filter = BloomFilterFactory
+        .createBloomFilter(10000, 0.0000001, -1, 
BloomFilterTypeCode.SIMPLE.name());
+    return of(metaClient, schema, filter);
+  }
+
+  public static HoodieWriteableTestTable of(HoodieTable hoodieTable, Schema 
schema) {
+    HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+    return of(metaClient, schema);
+  }
+
+  public static HoodieWriteableTestTable of(HoodieTable hoodieTable, Schema 
schema, BloomFilter filter) {
+    HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+    return of(metaClient, schema, filter);
+  }
+
+  @Override
+  public HoodieWriteableTestTable addCommit(String instantTime) throws 
Exception {
+    return (HoodieWriteableTestTable) super.addCommit(instantTime);
+  }
+
+  @Override
+  public HoodieWriteableTestTable forCommit(String instantTime) {
+    return (HoodieWriteableTestTable) super.forCommit(instantTime);
+  }
+
+  public String withInserts(String partition) throws Exception {
+    return withInserts(partition, new HoodieRecord[0]);
+  }
+
+  public String withInserts(String partition, HoodieRecord... records) throws 
Exception {
+    String fileId = UUID.randomUUID().toString();
+    withInserts(partition, fileId, records);
+    return fileId;
+  }
+
+  public HoodieWriteableTestTable withInserts(String partition, String fileId) 
throws Exception {
+    return withInserts(partition, fileId, new HoodieRecord[0]);
+  }
+
+  public HoodieWriteableTestTable withInserts(String partition, String fileId, 
HoodieRecord... records) throws Exception {
+    FileCreateUtils.createPartitionMetaFile(basePath, partition);
+    String fileName = baseFileName(currentInstantTime, fileId);
+
+    HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+        new AvroSchemaConverter().convert(schema), schema, filter);
+    HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, 
CompressionCodecName.GZIP,
+        ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 
* 1024 * 1024,
+        new Configuration(), 
Double.parseDouble(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO));
+    try (HoodieParquetWriter writer = new HoodieParquetWriter(
+        currentInstantTime,
+        new Path(Paths.get(basePath, partition, fileName).toString()),
+        config, schema, new SparkTaskContextSupplier())) {
+      int seqId = 1;
+      for (HoodieRecord record : records) {
+        GenericRecord avroRecord = (GenericRecord) 
record.getData().getInsertValue(schema).get();
+        HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, 
currentInstantTime, String.valueOf(seqId++));
+        HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, 
record.getRecordKey(), record.getPartitionPath(), fileName);
+        writer.writeAvro(record.getRecordKey(), avroRecord);
+        filter.add(record.getRecordKey());
+      }
+    }
+
+    return this;
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 987b567..f2a1144 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.testutils;
 
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -40,6 +41,32 @@ import java.util.Map;
 
 public class FileCreateUtils {
 
+  private static final String WRITE_TOKEN = "1-0-1";
+
+  public static String baseFileName(String instantTime, String fileId) {
+    return baseFileName(instantTime, fileId, 
HoodieFileFormat.PARQUET.getFileExtension());
+  }
+
+  public static String baseFileName(String instantTime, String fileId, String 
fileExtension) {
+    return FSUtils.makeDataFileName(instantTime, WRITE_TOKEN, fileId, 
fileExtension);
+  }
+
+  public static String logFileName(String instantTime, String fileId, int 
version) {
+    return logFileName(instantTime, fileId, version, 
HoodieFileFormat.HOODIE_LOG.getFileExtension());
+  }
+
+  public static String logFileName(String instantTime, String fileId, int 
version, String fileExtension) {
+    return FSUtils.makeLogFileName(fileId, fileExtension, instantTime, 
version, WRITE_TOKEN);
+  }
+
+  public static String markerFileName(String instantTime, String fileId, 
IOType ioType) {
+    return markerFileName(instantTime, fileId, ioType, 
HoodieFileFormat.PARQUET.getFileExtension());
+  }
+
+  public static String markerFileName(String instantTime, String fileId, 
IOType ioType, String fileExtension) {
+    return String.format("%s_%s_%s%s%s.%s", fileId, WRITE_TOKEN, instantTime, 
fileExtension, HoodieTableMetaClient.MARKER_EXTN, ioType);
+  }
+
   private static void createMetaFile(String basePath, String instantTime, 
String suffix) throws IOException {
     Path parentPath = Paths.get(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME);
     Files.createDirectories(parentPath);
@@ -73,45 +100,52 @@ public class FileCreateUtils {
     createMetaFile(basePath, instantTime, 
HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
   }
 
-  public static void createDataFile(String basePath, String partitionPath, 
String instantTime, String fileId)
+  public static void createPartitionMetaFile(String basePath, String 
partitionPath) throws IOException {
+    Path parentPath = Paths.get(basePath, partitionPath);
+    Files.createDirectories(parentPath);
+    Path metaFilePath = 
parentPath.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
+    if (Files.notExists(metaFilePath)) {
+      Files.createFile(metaFilePath);
+    }
+  }
+
+  public static void createBaseFile(String basePath, String partitionPath, 
String instantTime, String fileId)
       throws Exception {
-    createDataFile(basePath, partitionPath, instantTime, fileId, 0);
+    createBaseFile(basePath, partitionPath, instantTime, fileId, 0);
   }
 
-  public static void createDataFile(String basePath, String partitionPath, 
String instantTime, String fileId, long length)
+  public static void createBaseFile(String basePath, String partitionPath, 
String instantTime, String fileId, long length)
       throws Exception {
     Path parentPath = Paths.get(basePath, partitionPath);
     Files.createDirectories(parentPath);
-    Path dataFilePath = 
parentPath.resolve(FSUtils.makeDataFileName(instantTime, "1-0-1", fileId));
-    if (Files.notExists(dataFilePath)) {
-      Files.createFile(dataFilePath);
+    Path baseFilePath = parentPath.resolve(baseFileName(instantTime, fileId));
+    if (Files.notExists(baseFilePath)) {
+      Files.createFile(baseFilePath);
     }
-    new RandomAccessFile(dataFilePath.toFile(), "rw").setLength(length);
+    new RandomAccessFile(baseFilePath.toFile(), "rw").setLength(length);
   }
 
-  public static void createLogFile(String basePath, String partitionPath, 
String baseInstantTime, String fileId, int version)
+  public static void createLogFile(String basePath, String partitionPath, 
String instantTime, String fileId, int version)
       throws Exception {
-    createLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 
0);
+    createLogFile(basePath, partitionPath, instantTime, fileId, version, 0);
   }
 
-  public static void createLogFile(String basePath, String partitionPath, 
String baseInstantTime, String fileId, int version, int length)
+  public static void createLogFile(String basePath, String partitionPath, 
String instantTime, String fileId, int version, int length)
       throws Exception {
     Path parentPath = Paths.get(basePath, partitionPath);
     Files.createDirectories(parentPath);
-    Path logFilePath = parentPath.resolve(FSUtils.makeLogFileName(fileId, 
HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, 
"1-0-1"));
+    Path logFilePath = parentPath.resolve(logFileName(instantTime, fileId, 
version));
     if (Files.notExists(logFilePath)) {
       Files.createFile(logFilePath);
     }
     new RandomAccessFile(logFilePath.toFile(), "rw").setLength(length);
   }
 
-  public static String createMarkerFile(String basePath, String partitionPath, 
String instantTime, String fileID, IOType ioType)
+  public static String createMarkerFile(String basePath, String partitionPath, 
String instantTime, String fileId, IOType ioType)
       throws IOException {
-    Path folderPath = Paths.get(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
-    Files.createDirectories(folderPath);
-    String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, "1-0-1", 
instantTime,
-        HoodieFileFormat.PARQUET.getFileExtension(), 
HoodieTableMetaClient.MARKER_EXTN, ioType);
-    Path markerFilePath = folderPath.resolve(markerFileName);
+    Path parentPath = Paths.get(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
+    Files.createDirectories(parentPath);
+    Path markerFilePath = parentPath.resolve(markerFileName(instantTime, 
fileId, ioType));
     if (Files.notExists(markerFilePath)) {
       Files.createFile(markerFilePath);
     }
@@ -119,11 +153,11 @@ public class FileCreateUtils {
   }
 
   public static long getTotalMarkerFileCount(String basePath, String 
partitionPath, String instantTime, IOType ioType) throws IOException {
-    Path markerDir = Paths.get(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
-    if (Files.notExists(markerDir)) {
+    Path parentPath = Paths.get(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
+    if (Files.notExists(parentPath)) {
       return 0;
     }
-    return Files.list(markerDir).filter(p -> p.getFileName().toString()
+    return Files.list(parentPath).filter(p -> p.getFileName().toString()
         .endsWith(String.format("%s.%s", HoodieTableMetaClient.MARKER_EXTN, 
ioType))).count();
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 32f2d45..c99fe1d 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -19,8 +19,6 @@
 
 package org.apache.hudi.common.testutils;
 
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -39,6 +37,7 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.stream.IntStream;
 
+import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
 import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit;
@@ -46,15 +45,16 @@ import static 
org.apache.hudi.common.testutils.FileCreateUtils.createInflightDel
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCommit;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit;
+import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName;
 
 public class HoodieTestTable {
 
-  private final String basePath;
-  private final FileSystem fs;
-  private HoodieTableMetaClient metaClient;
-  private String currentInstantTime;
+  protected final String basePath;
+  protected final FileSystem fs;
+  protected HoodieTableMetaClient metaClient;
+  protected String currentInstantTime;
 
-  private HoodieTestTable(String basePath, FileSystem fs, 
HoodieTableMetaClient metaClient) {
+  protected HoodieTestTable(String basePath, FileSystem fs, 
HoodieTableMetaClient metaClient) {
     ValidationUtils.checkArgument(Objects.equals(basePath, 
metaClient.getBasePath()));
     ValidationUtils.checkArgument(Objects.equals(fs, metaClient.getRawFs()));
     this.basePath = basePath;
@@ -124,6 +124,13 @@ public class HoodieTestTable {
     return this;
   }
 
+  public HoodieTestTable withPartitionMetaFiles(String... partitionPaths) 
throws IOException {
+    for (String partitionPath : partitionPaths) {
+      FileCreateUtils.createPartitionMetaFile(basePath, partitionPath);
+    }
+    return this;
+  }
+
   public HoodieTestTable withMarkerFile(String partitionPath, IOType ioType) 
throws IOException {
     return withMarkerFile(partitionPath, UUID.randomUUID().toString(), ioType);
   }
@@ -150,19 +157,19 @@ public class HoodieTestTable {
    *
    * @return A {@link Map} of partition and its newly inserted file's id.
    */
-  public Map<String, String> withInserts(String... partitions) throws 
Exception {
+  public Map<String, String> withBaseFilesInPartitions(String... partitions) 
throws Exception {
     Map<String, String> partitionFileIdMap = new HashMap<>();
     for (String p : partitions) {
       String fileId = UUID.randomUUID().toString();
-      FileCreateUtils.createDataFile(basePath, p, currentInstantTime, fileId);
+      FileCreateUtils.createBaseFile(basePath, p, currentInstantTime, fileId);
       partitionFileIdMap.put(p, fileId);
     }
     return partitionFileIdMap;
   }
 
-  public HoodieTestTable withUpdates(String partition, String... fileIds) 
throws Exception {
+  public HoodieTestTable withBaseFilesInPartition(String partition, String... 
fileIds) throws Exception {
     for (String f : fileIds) {
-      FileCreateUtils.createDataFile(basePath, partition, currentInstantTime, 
f);
+      FileCreateUtils.createBaseFile(basePath, partition, currentInstantTime, 
f);
     }
     return this;
   }
@@ -182,35 +189,37 @@ public class HoodieTestTable {
     return this;
   }
 
-  public boolean filesExist(Map<String, String> partitionAndFileId, String 
instantTime) {
+  public boolean baseFilesExist(Map<String, String> partitionAndFileId, String 
instantTime) {
     return partitionAndFileId.entrySet().stream().allMatch(entry -> {
       String partition = entry.getKey();
       String fileId = entry.getValue();
-      return fileExists(partition, instantTime, fileId);
+      return baseFileExists(partition, instantTime, fileId);
     });
   }
 
-  public boolean filesExist(String partition, String instantTime, String... 
fileIds) {
-    return Arrays.stream(fileIds).allMatch(f -> fileExists(partition, 
instantTime, f));
+  public boolean baseFilesExist(String partition, String instantTime, 
String... fileIds) {
+    return Arrays.stream(fileIds).allMatch(f -> baseFileExists(partition, 
instantTime, f));
   }
 
-  public boolean fileExists(String partition, String instantTime, String 
fileId) {
+  public boolean baseFileExists(String partition, String instantTime, String 
fileId) {
     try {
-      return fs.exists(new Path(Paths.get(basePath, partition,
-          FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)).toString()));
+      return fs.exists(new Path(Paths.get(basePath, partition, 
baseFileName(instantTime, fileId)).toString()));
     } catch (IOException e) {
       throw new HoodieTestTableException(e);
     }
   }
 
+  public String getBaseFileNameById(String fileId) {
+    return baseFileName(currentInstantTime, fileId);
+  }
+
   public boolean logFilesExist(String partition, String instantTime, String 
fileId, int... versions) {
     return Arrays.stream(versions).allMatch(v -> logFileExists(partition, 
instantTime, fileId, v));
   }
 
   public boolean logFileExists(String partition, String instantTime, String 
fileId, int version) {
     try {
-      return fs.exists(new Path(Paths.get(basePath, partition,
-          FSUtils.makeLogFileName(fileId, 
HoodieFileFormat.HOODIE_LOG.getFileExtension(), instantTime, version, 
"1-0-1")).toString()));
+      return fs.exists(new Path(Paths.get(basePath, partition, 
logFileName(instantTime, fileId, version)).toString()));
     } catch (IOException e) {
       throw new HoodieTestTableException(e);
     }

Reply via email to