This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 626f78f  Revert "[HUDI-781] Introduce HoodieTestTable for test 
preparation (#1871)"
626f78f is described below

commit 626f78f6f639cae2d3d57d29e7ef0642cb0be7ee
Author: Balaji Varadarajan <balaji.varadara...@robinhood.com>
AuthorDate: Mon Aug 10 22:13:02 2020 -0700

    Revert "[HUDI-781] Introduce HoodieTestTable for test preparation (#1871)"
    
    This reverts commit b2e703d4427abca02b053fa4444cd5058aa256ef.
---
 .../org/apache/hudi/io/HoodieAppendHandle.java     |   1 -
 .../org/apache/hudi/io/HoodieCreateHandle.java     |   1 -
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   1 -
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |   3 +-
 .../src/main/java/org/apache/hudi/io}/IOType.java  |  15 +-
 .../java/org/apache/hudi/table/MarkerFiles.java    |  15 +-
 .../rollback/MarkerBasedRollbackStrategy.java      |   8 +-
 .../table/upgrade/ZeroToOneUpgradeHandler.java     |   2 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   2 +-
 .../java/org/apache/hudi/table/TestCleaner.java    | 393 +++++++++++++--------
 .../apache/hudi/table/TestConsistencyGuard.java    |  28 +-
 .../org/apache/hudi/table/TestMarkerFiles.java     |  10 +-
 .../table/action/commit/TestUpsertPartitioner.java |   8 +-
 .../table/action/compact/TestHoodieCompactor.java  |   7 +-
 .../rollback/TestMarkerBasedRollbackStrategy.java  |  69 ++--
 .../hudi/testutils/HoodieClientTestUtils.java      |  99 ++++--
 .../hudi/common/testutils/FileCreateUtils.java     | 113 ------
 .../hudi/common/testutils/HoodieTestTable.java     | 232 ------------
 .../hudi/common/testutils/HoodieTestUtils.java     | 102 +++---
 19 files changed, 467 insertions(+), 642 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java 
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 7996a77..7a8e5ab 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -32,7 +32,6 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
-import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java 
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 5a76dc7..705e98d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
-import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java 
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 8d54065..f0ea284 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
-import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java 
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 5ea8c38..d148b1b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -24,7 +24,6 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
@@ -34,13 +33,13 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.MarkerFiles;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.table.MarkerFiles;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/IOType.java 
b/hudi-client/src/main/java/org/apache/hudi/io/IOType.java
similarity index 57%
rename from hudi-common/src/main/java/org/apache/hudi/common/model/IOType.java
rename to hudi-client/src/main/java/org/apache/hudi/io/IOType.java
index bd29ff0..aa6660e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/IOType.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/IOType.java
@@ -7,17 +7,16 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
-package org.apache.hudi.common.model;
+package org.apache.hudi.io;
 
 /**
  * Types of lower level I/O operations done on each file slice.
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java 
b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
index 9577cea..8a310fd 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
@@ -18,27 +18,26 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hudi.io.IOType;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
 
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
index 2a137b4..40b81a2 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -18,10 +18,10 @@
 
 package org.apache.hudi.table.action.rollback;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -29,21 +29,19 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.io.IOType;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.MarkerFiles;
-
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import scala.Tuple2;
-
 /**
  * Performs rollback using marker files generated during the write..
  */
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
 
b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index e9c9e28..4960ff5 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -21,13 +21,13 @@ package org.apache.hudi.table.upgrade;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.io.IOType;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.MarkerFiles;
 import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 1c4988a..51d8a6a 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -48,6 +47,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndex.IndexType;
+import org.apache.hudi.io.IOType;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.MarkerFiles;
 import org.apache.hudi.table.action.commit.WriteHelper;
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 55fbab2..0376ec3 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -35,7 +35,6 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -44,8 +43,8 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import 
org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator;
 import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.testutils.FileSystemTestUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -57,16 +56,13 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -486,97 +482,125 @@ public class TestCleaner extends HoodieClientTestBase {
    * Test HoodieTable.clean() Cleaning by versions logic.
    */
   @Test
-  public void testKeepLatestFileVersions() throws Exception {
+  public void testKeepLatestFileVersions() throws IOException {
     HoodieWriteConfig config =
         
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
             .withCompactionConfig(HoodieCompactionConfig.newBuilder()
                 
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
             .build();
 
-    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
-    String p0 = "2020/01/01";
-    String p1 = "2020/01/02";
-
     // make 1 commit, with 1 file per partition
-    Map<String, String> partitionAndFileId000 = 
testTable.addCommit("000").withInserts(p0, p1);
+    HoodieTestUtils.createCommitFiles(basePath, "000");
+
+    String file1P0C0 =
+        HoodieTestUtils.createNewDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
+    String file1P1C0 =
+        HoodieTestUtils.createNewDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
     List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
     assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files");
-    assertTrue(testTable.filesExist(partitionAndFileId000, "000"));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
+        file1P0C0));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
+        file1P1C0));
 
     // make next commit, with 1 insert & 1 update per partition
-    String file1P0C0 = partitionAndFileId000.get(p0);
-    String file1P1C0 = partitionAndFileId000.get(p1);
-    Map<String, String> partitionAndFileId001 = testTable.addCommit("001")
-        .withUpdates(p0, file1P0C0)
-        .withUpdates(p1, file1P1C0)
-        .withInserts(p0, p1);
+    HoodieTestUtils.createCommitFiles(basePath, "001");
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    String file2P0C1 =
+        HoodieTestUtils.createNewDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
+    String file2P1C1 =
+        HoodieTestUtils.createNewDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert
+    HoodieTestUtils.createDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // 
update
+    HoodieTestUtils.createDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // 
update
+
     List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
     assertEquals(1,
-        getCleanStat(hoodieCleanStatsTwo, p0).getSuccessDeleteFiles()
+        getCleanStat(hoodieCleanStatsTwo, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
             .size(), "Must clean 1 file");
     assertEquals(1,
-        getCleanStat(hoodieCleanStatsTwo, p1).getSuccessDeleteFiles()
+        getCleanStat(hoodieCleanStatsTwo, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
             .size(), "Must clean 1 file");
-    String file2P0C1 = partitionAndFileId001.get(p0);
-    String file2P1C1 = partitionAndFileId001.get(p1);
-    assertTrue(testTable.fileExists(p0, "001", file2P0C1));
-    assertTrue(testTable.fileExists(p1, "001", file2P1C1));
-    assertFalse(testTable.fileExists(p0, "000", file1P0C0));
-    assertFalse(testTable.fileExists(p1, "000", file1P1C0));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+        file2P0C1));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001",
+        file2P1C1));
+    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
+        file1P0C0));
+    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
+        "000", file1P1C0));
 
     // make next commit, with 2 updates to existing files, and 1 insert
-    String file3P0C2 = testTable.addCommit("002")
-        .withUpdates(p0, file1P0C0, file2P0C1)
-        .withInserts(p0, "002").get(p0);
+    HoodieTestUtils.createCommitFiles(basePath, "002");
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    HoodieTestUtils.createDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // 
update
+    HoodieTestUtils.createDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // 
update
+    String file3P0C2 =
+        HoodieTestUtils.createNewDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002");
+
     List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
     assertEquals(2,
-        getCleanStat(hoodieCleanStatsThree, p0)
+        getCleanStat(hoodieCleanStatsThree, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
             .getSuccessDeleteFiles().size(), "Must clean two files");
-    assertFalse(testTable.fileExists(p0, "001", file1P0C0));
-    assertFalse(testTable.fileExists(p0, "001", file2P0C1));
-    assertTrue(testTable.fileExists(p0, "002", file3P0C2));
+    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+        file1P0C0));
+    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+        file2P0C1));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
+        file3P0C2));
 
     // No cleaning on partially written file, with no commit.
-    testTable.forCommit("003").withUpdates(p0, file3P0C2);
+    HoodieTestUtils.createDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // 
update
     List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
     assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
-    assertTrue(testTable.fileExists(p0, "003", file3P0C2));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
+        file3P0C2));
   }
 
   /**
    * Test HoodieTable.clean() Cleaning by versions logic for MOR table with 
Log files.
    */
   @Test
-  public void testKeepLatestFileVersionsMOR() throws Exception {
+  public void testKeepLatestFileVersionsMOR() throws IOException {
+
     HoodieWriteConfig config =
         
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
             .withCompactionConfig(HoodieCompactionConfig.newBuilder()
                 
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
             .build();
 
-    HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, 
basePath, HoodieTableType.MERGE_ON_READ);
-    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
-    String p0 = "2020/01/01";
+    HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
 
     // Make 3 files, one base file and 2 log files associated with base file
-    String file1P0 = testTable.addDeltaCommit("000").withInserts(p0).get(p0);
-    testTable.forDeltaCommit("000")
-        .withLogFile(p0, file1P0, 1)
-        .withLogFile(p0, file1P0, 2);
-
-    // Make 2 files, one base file and 1 log files associated with base file
-    testTable.addDeltaCommit("001")
-        .withUpdates(p0, file1P0)
-        .withLogFile(p0, file1P0, 3);
+    String file1P0 =
+        HoodieTestUtils.createNewDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
+    String file2P0L0 = HoodieTestUtils.createNewLogFile(fs, basePath,
+        HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0, 
Option.empty());
+    HoodieTestUtils.createNewLogFile(fs, basePath,
+        HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0, 
Option.of(2));
+    // make 1 compaction commit
+    HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "000");
+
+    // Make 4 files, one base file and 3 log files associated with base file
+    HoodieTestUtils.createDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0);
+    file2P0L0 = HoodieTestUtils.createNewLogFile(fs, basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        "001", file1P0, Option.of(3));
+    // make 1 compaction commit
+    HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "001");
 
     List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
     assertEquals(3,
-        getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles()
+        getCleanStat(hoodieCleanStats, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
             .size(), "Must clean three files, one parquet and 2 log files");
-    assertFalse(testTable.fileExists(p0, "000", file1P0));
-    assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2));
-    assertTrue(testTable.fileExists(p0, "001", file1P0));
-    assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
+    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
+        file1P0));
+    assertFalse(HoodieTestUtils.doesLogFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
+        file2P0L0, Option.empty()));
+    assertFalse(HoodieTestUtils.doesLogFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
+        file2P0L0, Option.of(2)));
   }
 
   @Test
@@ -628,33 +652,33 @@ public class TestCleaner extends HoodieClientTestBase {
     );
     metadata.setVersion(CleanerUtils.CLEAN_METADATA_VERSION_1);
 
-    // Now upgrade and check
+    // NOw upgrade and check
     CleanMetadataMigrator metadataMigrator = new 
CleanMetadataMigrator(metaClient);
     metadata = metadataMigrator.upgradeToLatest(metadata, 
metadata.getVersion());
-    assertCleanMetadataPathEquals(newExpected, metadata);
+    testCleanMetadataPathEquality(metadata, newExpected);
 
     CleanMetadataMigrator migrator = new CleanMetadataMigrator(metaClient);
     HoodieCleanMetadata oldMetadata =
         migrator.migrateToVersion(metadata, metadata.getVersion(), 
CleanerUtils.CLEAN_METADATA_VERSION_1);
     assertEquals(CleanerUtils.CLEAN_METADATA_VERSION_1, 
oldMetadata.getVersion());
-    assertCleanMetadataEquals(metadata, oldMetadata);
-    assertCleanMetadataPathEquals(oldExpected, oldMetadata);
+    testCleanMetadataEquality(metadata, oldMetadata);
+    testCleanMetadataPathEquality(oldMetadata, oldExpected);
 
     HoodieCleanMetadata newMetadata = migrator.upgradeToLatest(oldMetadata, 
oldMetadata.getVersion());
     assertEquals(CleanerUtils.LATEST_CLEAN_METADATA_VERSION, 
newMetadata.getVersion());
-    assertCleanMetadataEquals(oldMetadata, newMetadata);
-    assertCleanMetadataPathEquals(newExpected, newMetadata);
-    assertCleanMetadataPathEquals(oldExpected, oldMetadata);
+    testCleanMetadataEquality(oldMetadata, newMetadata);
+    testCleanMetadataPathEquality(newMetadata, newExpected);
+    testCleanMetadataPathEquality(oldMetadata, oldExpected);
   }
 
-  private static void assertCleanMetadataEquals(HoodieCleanMetadata expected, 
HoodieCleanMetadata actual) {
-    assertEquals(expected.getEarliestCommitToRetain(), 
actual.getEarliestCommitToRetain());
-    assertEquals(expected.getStartCleanTime(), actual.getStartCleanTime());
-    assertEquals(expected.getTimeTakenInMillis(), 
actual.getTimeTakenInMillis());
-    assertEquals(expected.getTotalFilesDeleted(), 
actual.getTotalFilesDeleted());
+  public void testCleanMetadataEquality(HoodieCleanMetadata input1, 
HoodieCleanMetadata input2) {
+    assertEquals(input1.getEarliestCommitToRetain(), 
input2.getEarliestCommitToRetain());
+    assertEquals(input1.getStartCleanTime(), input2.getStartCleanTime());
+    assertEquals(input1.getTimeTakenInMillis(), input2.getTimeTakenInMillis());
+    assertEquals(input1.getTotalFilesDeleted(), input2.getTotalFilesDeleted());
 
-    Map<String, HoodieCleanPartitionMetadata> map1 = 
expected.getPartitionMetadata();
-    Map<String, HoodieCleanPartitionMetadata> map2 = 
actual.getPartitionMetadata();
+    Map<String, HoodieCleanPartitionMetadata> map1 = 
input1.getPartitionMetadata();
+    Map<String, HoodieCleanPartitionMetadata> map2 = 
input2.getPartitionMetadata();
 
     assertEquals(map1.keySet(), map2.keySet());
 
@@ -669,7 +693,7 @@ public class TestCleaner extends HoodieClientTestBase {
     assertEquals(policies1, policies2);
   }
 
-  private static void assertCleanMetadataPathEquals(Map<String, Tuple3> 
expected, HoodieCleanMetadata metadata) {
+  private void testCleanMetadataPathEquality(HoodieCleanMetadata metadata, 
Map<String, Tuple3> expected) {
 
     Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = 
metadata.getPartitionMetadata();
 
@@ -683,40 +707,54 @@ public class TestCleaner extends HoodieClientTestBase {
     }
   }
 
-  private static Stream<Arguments> argumentsForTestKeepLatestCommits() {
-    return Stream.of(
-        Arguments.of(false, false),
-        Arguments.of(true, false),
-        Arguments.of(false, true)
-    );
+  /**
+   * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log 
files.
+   */
+  @Test
+  public void testKeepLatestCommits() throws IOException {
+    testKeepLatestCommits(false, false);
   }
 
   /**
    * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log 
files. Here the operations are simulated
    * such that first clean attempt failed after files were cleaned and a 
subsequent cleanup succeeds.
    */
-  @ParameterizedTest
-  @MethodSource("argumentsForTestKeepLatestCommits")
-  public void testKeepLatestCommits(boolean simulateFailureRetry, boolean 
enableIncrementalClean) throws Exception {
+  @Test
+  public void testKeepLatestCommitsWithFailureRetry() throws IOException {
+    testKeepLatestCommits(true, false);
+  }
+
+  /**
+   * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log 
files.
+   */
+  @Test
+  public void testKeepLatestCommitsIncrMode() throws IOException {
+    testKeepLatestCommits(false, true);
+  }
+
+  /**
+   * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log 
files.
+   */
+  private void testKeepLatestCommits(boolean simulateFailureRetry, boolean 
enableIncrementalClean) throws IOException {
     HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withIncrementalCleaningMode(enableIncrementalClean)
             
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
         .build();
 
-    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
-    String p0 = "2020/01/01";
-    String p1 = "2020/01/02";
-
     // make 1 commit, with 1 file per partition
-    Map<String, String> partitionAndFileId000 = 
testTable.addInflightCommit("000").withInserts(p0, p1);
-    String file1P0C0 = partitionAndFileId000.get(p0);
-    String file1P1C0 = partitionAndFileId000.get(p1);
+    HoodieTestUtils.createInflightCommitFiles(basePath, "000");
+
+    String file1P0C0 =
+        HoodieTestUtils.createNewDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
+    String file1P1C0 =
+        HoodieTestUtils.createNewDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
+
     HoodieCommitMetadata commitMetadata = generateCommitMetadata(
         Collections.unmodifiableMap(new HashMap<String, List<String>>() {
           {
-            put(p0, CollectionUtils.createImmutableList(file1P0C0));
-            put(p1, CollectionUtils.createImmutableList(file1P1C0));
+            put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
CollectionUtils.createImmutableList(file1P0C0));
+            put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, 
CollectionUtils.createImmutableList(file1P1C0));
           }
         })
     );
@@ -728,20 +766,29 @@ public class TestCleaner extends HoodieClientTestBase {
 
     List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, 
simulateFailureRetry);
     assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions 
and clean any files");
-    assertTrue(testTable.fileExists(p0, "000", file1P0C0));
-    assertTrue(testTable.fileExists(p1, "000", file1P1C0));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
+        file1P0C0));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
+        file1P1C0));
 
     // make next commit, with 1 insert & 1 update per partition
-    Map<String, String> partitionAndFileId001 = 
testTable.addInflightCommit("001").withInserts(p0, p1);
-    String file2P0C1 = partitionAndFileId001.get(p0);
-    String file2P1C1 = partitionAndFileId001.get(p1);
-    testTable.forCommit("001")
-        .withUpdates(p0, file1P0C0)
-        .withUpdates(p1, file1P1C0);
+    HoodieTestUtils.createInflightCommitFiles(basePath, "001");
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    String file2P0C1 =
+        HoodieTestUtils
+            .createNewDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
+    String file2P1C1 =
+        HoodieTestUtils
+            .createNewDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert
+    HoodieTestUtils
+        .createDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // 
update
+    HoodieTestUtils
+        .createDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // 
update
     commitMetadata = generateCommitMetadata(new HashMap<String, 
List<String>>() {
       {
-        put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
-        put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
+        put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
+        put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, 
CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
       }
     });
     metaClient.getActiveTimeline().saveAsComplete(
@@ -749,18 +796,28 @@ public class TestCleaner extends HoodieClientTestBase {
         
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
     List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, 
simulateFailureRetry);
     assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions 
and clean any files");
-    assertTrue(testTable.fileExists(p0, "001", file2P0C1));
-    assertTrue(testTable.fileExists(p1, "001", file2P1C1));
-    assertTrue(testTable.fileExists(p0, "000", file1P0C0));
-    assertTrue(testTable.fileExists(p1, "000", file1P1C0));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+        file2P0C1));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001",
+        file2P1C1));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
+        file1P0C0));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
+        file1P1C0));
 
     // make next commit, with 2 updates to existing files, and 1 insert
-    String file3P0C2 = testTable.addInflightCommit("002")
-        .withUpdates(p0, file1P0C0)
-        .withUpdates(p0, file2P0C1)
-        .withInserts(p0).get(p0);
+    HoodieTestUtils.createInflightCommitFiles(basePath, "002");
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    HoodieTestUtils
+        .createDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // 
update
+    HoodieTestUtils
+        .createDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // 
update
+    String file3P0C2 =
+        HoodieTestUtils.createNewDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002");
+
     commitMetadata = generateCommitMetadata(CollectionUtils
-        .createImmutableMap(p0,
+        
.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
             CollectionUtils.createImmutableList(file1P0C0, file2P0C1, 
file3P0C2)));
     metaClient.getActiveTimeline().saveAsComplete(
         new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "002"),
@@ -769,35 +826,49 @@ public class TestCleaner extends HoodieClientTestBase {
     List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, 
simulateFailureRetry);
     assertEquals(0, hoodieCleanStatsThree.size(),
         "Must not clean any file. We have to keep 1 version before the latest 
commit time to keep");
-    assertTrue(testTable.fileExists(p0, "000", file1P0C0));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
+        file1P0C0));
 
     // make next commit, with 2 updates to existing files, and 1 insert
-    String file4P0C3 = testTable.addInflightCommit("003")
-        .withUpdates(p0, file1P0C0)
-        .withUpdates(p0, file2P0C1)
-        .withInserts(p0).get(p0);
+    HoodieTestUtils.createInflightCommitFiles(basePath, "003");
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    HoodieTestUtils
+        .createDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); // 
update
+    HoodieTestUtils
+        .createDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file2P0C1); // 
update
+    String file4P0C3 =
+        HoodieTestUtils.createNewDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003");
     commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(
-        p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, 
file4P0C3)));
+        HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3)));
     metaClient.getActiveTimeline().saveAsComplete(
         new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "003"),
         
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
 
     List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, 
simulateFailureRetry);
     assertEquals(1,
-        getCleanStat(hoodieCleanStatsFour, p0).getSuccessDeleteFiles()
+        getCleanStat(hoodieCleanStatsFour, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
             .size(), "Must not clean one old file");
 
-    assertFalse(testTable.fileExists(p0, "000", file1P0C0));
-    assertTrue(testTable.fileExists(p0, "001", file1P0C0));
-    assertTrue(testTable.fileExists(p0, "002", file1P0C0));
-    assertTrue(testTable.fileExists(p0, "001", file2P0C1));
-    assertTrue(testTable.fileExists(p0, "002", file2P0C1));
-    assertTrue(testTable.fileExists(p0, "002", file3P0C2));
-    assertTrue(testTable.fileExists(p0, "003", file4P0C3));
+    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
+        file1P0C0));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+        file1P0C0));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
+        file1P0C0));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+        file2P0C1));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
+        file2P0C1));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
+        file3P0C2));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003",
+        file4P0C3));
 
     // No cleaning on partially written file, with no commit.
-    testTable.forCommit("004").withUpdates(p0, file3P0C2);
-    commitMetadata = 
generateCommitMetadata(CollectionUtils.createImmutableMap(p0,
+    HoodieTestUtils
+        .createDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // 
update
+    commitMetadata = 
generateCommitMetadata(CollectionUtils.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
         CollectionUtils.createImmutableList(file3P0C2)));
     metaClient.getActiveTimeline().createNewInstant(
         new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, 
"004"));
@@ -805,40 +876,41 @@ public class TestCleaner extends HoodieClientTestBase {
         new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, 
"004"),
         
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
     List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 
simulateFailureRetry);
-    HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, p0);
-    assertNull(cleanStat, "Must not clean any files");
-    assertTrue(testTable.fileExists(p0, "001", file1P0C0));
-    assertTrue(testTable.fileExists(p0, "001", file2P0C1));
-    assertTrue(testTable.fileExists(p0, "004", file3P0C2));
+    HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+    assertEquals(0,
+        cleanStat != null ? cleanStat.getSuccessDeleteFiles().size() : 0, 
"Must not clean any files");
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+        file1P0C0));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+        file2P0C1));
   }
 
   /**
    * Test Cleaning functionality of table.rollback() API.
    */
   @Test
-  public void testCleanMarkerDataFilesOnRollback() throws Exception {
-    HoodieTestTable testTable = HoodieTestTable.of(metaClient)
-        .addRequestedCommit("000")
-        .withMarkerFiles("default", 10, IOType.MERGE);
-    final int numTempFilesBefore = testTable.listAllFilesInTempFolder().size();
-    assertEquals(10, numTempFilesBefore, "Some marker files are created.");
+  public void testCleanMarkerDataFilesOnRollback() throws IOException {
+    List<String> markerFiles = createMarkerFiles("000", 10);
+    assertEquals(10, markerFiles.size(), "Some marker files are created.");
+    assertEquals(markerFiles.size(), getTotalTempFiles(), "Some marker files 
are created.");
 
     HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath).build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
+    table.getActiveTimeline().createNewInstant(new 
HoodieInstant(State.REQUESTED,
+        HoodieTimeline.COMMIT_ACTION, "000"));
     table.getActiveTimeline().transitionRequestedToInflight(
         new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, 
"000"), Option.empty());
     metaClient.reloadActiveTimeline();
     table.rollback(jsc, "001", new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, "000"), true);
-    final int numTempFilesAfter = testTable.listAllFilesInTempFolder().size();
-    assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
+    assertEquals(0, getTotalTempFiles(), "All temp files are deleted.");
   }
 
   /**
    * Test CLeaner Stat when there are no partition paths.
    */
   @Test
-  public void testCleaningWithZeroPartitionPaths() throws Exception {
+  public void testCleaningWithZeroPartitionPaths() throws IOException {
     HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
@@ -847,7 +919,9 @@ public class TestCleaner extends HoodieClientTestBase {
     // Make a commit, although there are no partitionPaths.
     // Example use-case of this is when a client wants to create a table
     // with just some commit metadata, but no data/partitionPaths.
-    HoodieTestTable.of(metaClient).addCommit("000");
+    HoodieTestUtils.createCommitFiles(basePath, "000");
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
 
     List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
     assertTrue(hoodieCleanStatsOne.isEmpty(), "HoodieCleanStats should be 
empty for a table with empty partitionPaths");
@@ -878,9 +952,21 @@ public class TestCleaner extends HoodieClientTestBase {
    * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log 
files. Here the operations are simulated
    * such that first clean attempt failed after files were cleaned and a 
subsequent cleanup succeeds.
    */
-  @ParameterizedTest
-  @ValueSource(booleans = {false, true})
-  public void testKeepLatestVersionsWithPendingCompactions(boolean 
retryFailure) throws IOException {
+  @Test
+  public void testKeepLatestVersionsWithPendingCompactions() throws 
IOException {
+    testKeepLatestVersionsWithPendingCompactions(false);
+  }
+
+
+  /**
+   * Test Keep Latest Versions when there are pending compactions.
+   */
+  @Test
+  public void testKeepLatestVersionsWithPendingCompactionsAndFailureRetry() 
throws IOException {
+    testKeepLatestVersionsWithPendingCompactions(true);
+  }
+
+  private void testKeepLatestVersionsWithPendingCompactions(boolean 
retryFailure) throws IOException {
     HoodieWriteConfig config =
         
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
             .withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -1032,6 +1118,33 @@ public class TestCleaner extends HoodieClientTestBase {
         "Correct number of files under compaction deleted");
   }
 
+  /**
+   * Utility method to create temporary data files.
+   *
+   * @param instantTime Commit Timestamp
+   * @param numFiles Number for files to be generated
+   * @return generated files
+   * @throws IOException in case of error
+   */
+  private List<String> createMarkerFiles(String instantTime, int numFiles) 
throws IOException {
+    List<String> files = new ArrayList<>();
+    for (int i = 0; i < numFiles; i++) {
+      files.add(HoodieClientTestUtils.createNewMarkerFile(basePath, 
"2019/03/29", instantTime));
+    }
+    return files;
+  }
+
+  /***
+   * Helper method to return temporary files count.
+   * 
+   * @return Number of temporary files found
+   * @throws IOException in case of error
+   */
+  private int getTotalTempFiles() throws IOException {
+    return FileSystemTestUtils.listRecursive(fs, new Path(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME))
+        .size();
+  }
+
   private Stream<Pair<String, String>> convertPathToFileIdWithCommitTime(final 
HoodieTableMetaClient metaClient,
       List<String> paths) {
     Predicate<String> roFilePredicate =
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
index 1f638c3..da4224a 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
@@ -22,8 +22,8 @@ import org.apache.hudi.common.fs.ConsistencyGuard;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
 import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
-import org.apache.hudi.common.testutils.FileCreateUtils;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
 
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterEach;
@@ -66,9 +66,9 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
   @ParameterizedTest
   @MethodSource("consistencyGuardType")
   public void testCheckPassingAppearAndDisAppear(String consistencyGuardType) 
throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f2");
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f3");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f2");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f3");
 
     ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000);
     ConsistencyGuard passing = 
consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName())
@@ -88,7 +88,7 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
 
   @Test
   public void testCheckFailingAppearFailSafe() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
     ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 
getConsistencyGuardConfig());
     assertThrows(TimeoutException.class, () -> {
       passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
@@ -98,7 +98,7 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
 
   @Test
   public void testCheckFailingAppearTimedWait() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
     ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, 
getConsistencyGuardConfig());
     passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
           .asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath 
+ "/partition/path/f2_1-0-2_000.parquet"));
@@ -106,7 +106,7 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
 
   @Test
   public void testCheckFailingAppearsFailSafe() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
     ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 
getConsistencyGuardConfig());
     assertThrows(TimeoutException.class, () -> {
       passing.waitTillFileAppears(new Path(basePath + 
"/partition/path/f1_1-0-2_000.parquet"));
@@ -115,14 +115,14 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
 
   @Test
   public void testCheckFailingAppearsTimedWait() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
     ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, 
getConsistencyGuardConfig());
     passing.waitTillFileAppears(new Path(basePath + 
"/partition/path/f1_1-0-2_000.parquet"));
   }
 
   @Test
   public void testCheckFailingDisappearFailSafe() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
     ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 
getConsistencyGuardConfig());
     assertThrows(TimeoutException.class, () -> {
       passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
@@ -132,7 +132,7 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
 
   @Test
   public void testCheckFailingDisappearTimedWait() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
     ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, 
getConsistencyGuardConfig());
     passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
           .asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath 
+ "/partition/path/f2_1-0-2_000.parquet"));
@@ -140,8 +140,8 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
 
   @Test
   public void testCheckFailingDisappearsFailSafe() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
     ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 
getConsistencyGuardConfig());
     assertThrows(TimeoutException.class, () -> {
       passing.waitTillFileDisappears(new Path(basePath + 
"/partition/path/f1_1-0-1_000.parquet"));
@@ -150,8 +150,8 @@ public class TestConsistencyGuard extends 
HoodieClientTestHarness {
 
   @Test
   public void testCheckFailingDisappearsTimedWait() throws Exception {
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
-    FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
     ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, 
getConsistencyGuardConfig());
     passing.waitTillFileDisappears(new Path(basePath + 
"/partition/path/f1_1-0-1_000.parquet"));
   }
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java
index 55b7b50..af679ce 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java
@@ -18,17 +18,17 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.testutils.FileSystemTestUtils;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.IOType;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
 
b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index f49d6d5..d8bb946 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.testutils.FileCreateUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -35,6 +34,7 @@ import org.apache.hudi.table.HoodieCopyOnWriteTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
 
 import org.apache.avro.Schema;
 import org.apache.log4j.LogManager;
@@ -73,8 +73,8 @@ public class TestUpsertPartitioner extends 
HoodieClientTestBase {
             
.insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())
         .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 
* 1024).build()).build();
 
-    FileCreateUtils.createCommit(basePath, "001");
-    FileCreateUtils.createDataFile(basePath, testPartitionPath, "001", 
"file1", fileSize);
+    HoodieClientTestUtils.fakeCommit(basePath, "001");
+    HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", 
"file1", fileSize);
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) 
HoodieTable.create(metaClient, config, hadoopConf);
 
@@ -193,7 +193,7 @@ public class TestUpsertPartitioner extends 
HoodieClientTestBase {
         
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
             .insertSplitSize(totalInsertNum / 
2).autoTuneInsertSplits(false).build()).build();
 
-    FileCreateUtils.createCommit(basePath, "001");
+    HoodieClientTestUtils.fakeCommit(basePath, "001");
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) 
HoodieTable.create(metaClient, config, hadoopConf);
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {testPartitionPath});
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
 
b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index c044bee..1529d79 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -53,9 +53,6 @@ import org.junit.jupiter.api.Test;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit;
-import static 
org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit;
-import static 
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -169,9 +166,7 @@ public class TestHoodieCompactor extends 
HoodieClientTestHarness {
           assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 
log file written for every data file");
         }
       }
-      createDeltaCommit(basePath, newCommitTime);
-      createRequestedDeltaCommit(basePath, newCommitTime);
-      createInflightDeltaCommit(basePath, newCommitTime);
+      HoodieTestUtils.createDeltaCommitFiles(basePath, newCommitTime);
 
       // Do a compaction
       table = HoodieTable.create(config, hadoopConf);
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
 
b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
index 83e7ea0..c6652ed 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
@@ -20,14 +20,16 @@ package org.apache.hudi.table.action.rollback;
 
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.FileSystemTestUtils;
+import org.apache.hudi.io.IOType;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
 
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -53,20 +55,38 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
     cleanupResources();
   }
 
+  private void givenCommit0(boolean isDeltaCommit) throws Exception {
+    HoodieClientTestUtils.fakeDataFile(basePath, "partA", "000", "f2");
+    if (isDeltaCommit) {
+      HoodieClientTestUtils.fakeDeltaCommit(basePath, "000");
+    } else {
+      HoodieClientTestUtils.fakeCommit(basePath, "000");
+    }
+  }
+
+  private void givenInflightCommit1(boolean isDeltaCommit) throws Exception {
+    HoodieClientTestUtils.fakeDataFile(basePath, "partB", "001", "f1");
+    HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f1", 
IOType.CREATE);
+
+    HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f3", 
IOType.CREATE);
+
+    if (isDeltaCommit) {
+      HoodieClientTestUtils.fakeLogFile(basePath, "partA", "001", "f2", 0);
+      HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", 
IOType.APPEND);
+      HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f4", 
IOType.APPEND);
+      HoodieClientTestUtils.fakeInflightDeltaCommit(basePath, "001");
+    } else {
+      HoodieClientTestUtils.fakeDataFile(basePath, "partA", "001", "f2");
+      HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", 
IOType.MERGE);
+      HoodieClientTestUtils.fakeInFlightCommit(basePath, "001");
+    }
+  }
+
   @Test
   public void testCopyOnWriteRollback() throws Exception {
     // given: wrote some base files and corresponding markers
-    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
-    String f0 = testTable.addRequestedCommit("000")
-        .withInserts("partA").get("partA");
-    String f1 = testTable.addCommit("001")
-        .withUpdates("partA", f0)
-        .withInserts("partB").get("partB");
-    String f2 = "f2";
-    testTable.forCommit("001")
-        .withMarkerFile("partA", f0, IOType.MERGE)
-        .withMarkerFile("partB", f1, IOType.CREATE)
-        .withMarkerFile("partA", f2, IOType.CREATE);
+    givenCommit0(false);
+    givenInflightCommit1(false);
 
     // when
     List<HoodieRollbackStat> stats = new 
MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), 
hadoopConf), jsc, getConfig(), "002")
@@ -75,8 +95,8 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
     // then: ensure files are deleted correctly, non-existent files reported 
as failed deletes
     assertEquals(2, stats.size());
 
-    List<FileStatus> partAFiles = testTable.listAllFiles("partA");
-    List<FileStatus> partBFiles = testTable.listAllFiles("partB");
+    List<FileStatus> partAFiles = FileSystemTestUtils.listRecursive(fs, new 
Path(basePath + "/partA"));
+    List<FileStatus> partBFiles = FileSystemTestUtils.listRecursive(fs, new 
Path(basePath + "/partB"));
 
     assertEquals(0, partBFiles.size());
     assertEquals(1, partAFiles.size());
@@ -87,19 +107,8 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
   @Test
   public void testMergeOnReadRollback() throws Exception {
     // given: wrote some base + log files and corresponding markers
-    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
-    String f2 = testTable.addRequestedDeltaCommit("000")
-        .withInserts("partA").get("partA");
-    String f1 = testTable.addDeltaCommit("001")
-        .withLogFile("partA", f2)
-        .withInserts("partB").get("partB");
-    String f3 = "f3";
-    String f4 = "f4";
-    testTable.forDeltaCommit("001")
-        .withMarkerFile("partB", f1, IOType.CREATE)
-        .withMarkerFile("partA", f3, IOType.CREATE)
-        .withMarkerFile("partA", f2, IOType.APPEND)
-        .withMarkerFile("partB", f4, IOType.APPEND);
+    givenCommit0(true);
+    givenInflightCommit1(true);
 
     // when
     List<HoodieRollbackStat> stats = new 
MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), 
hadoopConf), jsc, getConfig(), "002")
@@ -108,12 +117,12 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
     // then: ensure files are deleted, rollback block is appended (even if 
append does not exist)
     assertEquals(2, stats.size());
     // will have the log file
-    List<FileStatus> partBFiles = testTable.listAllFiles("partB");
+    List<FileStatus> partBFiles = FileSystemTestUtils.listRecursive(fs, new 
Path(basePath + "/partB"));
     assertEquals(1, partBFiles.size());
     
assertTrue(partBFiles.get(0).getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()));
     assertTrue(partBFiles.get(0).getLen() > 0);
 
-    List<FileStatus> partAFiles = testTable.listAllFiles("partA");
+    List<FileStatus> partAFiles = FileSystemTestUtils.listRecursive(fs, new 
Path(basePath + "/partA"));
     assertEquals(3, partAFiles.size());
     assertEquals(2, partAFiles.stream().filter(s -> 
s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count());
     assertEquals(1, partAFiles.stream().filter(s -> 
s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f
 -> f.getLen() > 0).count());
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
 
b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index b61abb0..6db6529 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -35,10 +35,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
-import org.apache.hudi.common.testutils.FileCreateUtils;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.IOType;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.io.storage.HoodieParquetWriter;
 
@@ -59,6 +59,7 @@ import org.apache.spark.sql.SQLContext;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -73,6 +74,57 @@ import java.util.stream.Collectors;
 public class HoodieClientTestUtils {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieClientTestUtils.class);
+  public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
+
+  private static void fakeMetaFile(String basePath, String instantTime, String 
suffix) throws IOException {
+    String parentPath = basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME;
+    new File(parentPath).mkdirs();
+    new File(parentPath + "/" + instantTime + suffix).createNewFile();
+  }
+
+  public static void fakeCommit(String basePath, String instantTime) throws 
IOException {
+    fakeMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION);
+  }
+
+  public static void fakeDeltaCommit(String basePath, String instantTime) 
throws IOException {
+    fakeMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION);
+  }
+
+  public static void fakeInflightDeltaCommit(String basePath, String 
instantTime) throws IOException {
+    fakeMetaFile(basePath, instantTime, 
HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
+  }
+
+  public static void fakeInFlightCommit(String basePath, String instantTime) 
throws IOException {
+    fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_EXTENSION);
+  }
+
+  public static void fakeDataFile(String basePath, String partitionPath, 
String instantTime, String fileId)
+      throws Exception {
+    fakeDataFile(basePath, partitionPath, instantTime, fileId, 0);
+  }
+
+  public static void fakeDataFile(String basePath, String partitionPath, 
String instantTime, String fileId, long length)
+      throws Exception {
+    String parentPath = String.format("%s/%s", basePath, partitionPath);
+    new File(parentPath).mkdirs();
+    String path = String.format("%s/%s", parentPath, 
FSUtils.makeDataFileName(instantTime, "1-0-1", fileId));
+    new File(path).createNewFile();
+    new RandomAccessFile(path, "rw").setLength(length);
+  }
+
+  public static void fakeLogFile(String basePath, String partitionPath, String 
baseInstantTime, String fileId, int version)
+          throws Exception {
+    fakeLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0);
+  }
+
+  public static void fakeLogFile(String basePath, String partitionPath, String 
baseInstantTime, String fileId, int version, int length)
+          throws Exception {
+    String parentPath = String.format("%s/%s", basePath, partitionPath);
+    new File(parentPath).mkdirs();
+    String path = String.format("%s/%s", parentPath, 
FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), 
baseInstantTime, version, "1-0-1"));
+    new File(path).createNewFile();
+    new RandomAccessFile(path, "rw").setLength(length);
+  }
 
   /**
    * Returns a Spark config for this test.
@@ -101,8 +153,8 @@ public class HoodieClientTestUtils {
     return HoodieReadClient.addHoodieSupport(sparkConf);
   }
 
-  private static HashMap<String, String> getLatestFileIDsToFullPath(String 
basePath, HoodieTimeline commitTimeline,
-                                                                    
List<HoodieInstant> commitsToReturn) throws IOException {
+  public static HashMap<String, String> getLatestFileIDsToFullPath(String 
basePath, HoodieTimeline commitTimeline,
+                                                                   
List<HoodieInstant> commitsToReturn) throws IOException {
     HashMap<String, String> fileIdToFullPath = new HashMap<>();
     for (HoodieInstant commit : commitsToReturn) {
       HoodieCommitMetadata metadata =
@@ -175,8 +227,6 @@ public class HoodieClientTestUtils {
 
   /**
    * Find total basefiles for passed in paths.
-   * <p>
-   * TODO move to {@link FileCreateUtils}.
    */
   public static Map<String, Integer> getBaseFileCountForPaths(String basePath, 
FileSystem fs,
       String... paths) {
@@ -195,9 +245,6 @@ public class HoodieClientTestUtils {
     }
   }
 
-  /**
-   * TODO Incorporate into {@link 
org.apache.hudi.common.testutils.HoodieTestTable}.
-   */
   public static String writeParquetFile(String basePath, String partitionPath, 
String filename,
                                         List<HoodieRecord> records, Schema 
schema, BloomFilter filter, boolean createCommitTime) throws IOException {
 
@@ -213,7 +260,7 @@ public class HoodieClientTestUtils {
         HoodieTestUtils.getDefaultHadoopConf(), 
Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO));
     HoodieParquetWriter writer =
         new HoodieParquetWriter(instantTime, new Path(basePath + "/" + 
partitionPath + "/" + filename), config,
-            schema, new SparkTaskContextSupplier());
+                schema, new SparkTaskContextSupplier());
     int seqId = 1;
     for (HoodieRecord record : records) {
       GenericRecord avroRecord = (GenericRecord) 
record.getData().getInsertValue(schema).get();
@@ -231,9 +278,6 @@ public class HoodieClientTestUtils {
     return filename;
   }
 
-  /**
-   * TODO Incorporate into {@link 
org.apache.hudi.common.testutils.HoodieTestTable}.
-   */
   public static String writeParquetFile(String basePath, String partitionPath, 
List<HoodieRecord> records,
                                         Schema schema, BloomFilter filter, 
boolean createCommitTime) throws IOException, InterruptedException {
     Thread.sleep(1000);
@@ -245,9 +289,27 @@ public class HoodieClientTestUtils {
         createCommitTime);
   }
 
-  /**
-   * TODO move to {@link FileCreateUtils}.
-   */
+  public static String createNewMarkerFile(String basePath, String 
partitionPath, String instantTime)
+      throws IOException {
+    return createMarkerFile(basePath, partitionPath, instantTime);
+  }
+
+  public static String createMarkerFile(String basePath, String partitionPath, 
String instantTime)
+          throws IOException {
+    return createMarkerFile(basePath, partitionPath, instantTime, 
UUID.randomUUID().toString(), IOType.MERGE);
+  }
+
+  public static String createMarkerFile(String basePath, String partitionPath, 
String instantTime, String fileID, IOType ioType)
+          throws IOException {
+    String folderPath = basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME 
+ "/" + instantTime + "/" + partitionPath + "/";
+    new File(folderPath).mkdirs();
+    String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, 
DEFAULT_WRITE_TOKEN, instantTime,
+        HoodieFileFormat.PARQUET.getFileExtension(), 
HoodieTableMetaClient.MARKER_EXTN, ioType);
+    File f = new File(folderPath + markerFileName);
+    f.createNewFile();
+    return f.getAbsolutePath();
+  }
+
   public static void createMarkerFile(String basePath, String partitionPath, 
String instantTime, String dataFileName) throws IOException {
     createTempFolderForMarkerFiles(basePath);
     String folderPath = getTempFolderName(basePath);
@@ -256,9 +318,6 @@ public class HoodieClientTestUtils {
     new File(folderPath + "/" + instantTime + "/" + partitionPath + "/" + 
dataFileName + ".marker.MERGE").createNewFile();
   }
 
-  /**
-   * TODO Incorporate into {@link 
org.apache.hudi.common.testutils.HoodieTestTable}.
-   */
   public static int getTotalMarkerFileCount(String basePath, String 
partitionPath, String instantTime) {
     String folderPath = getTempFolderName(basePath);
     File markerDir = new File(folderPath + "/" + instantTime + "/" + 
partitionPath);
@@ -268,11 +327,11 @@ public class HoodieClientTestUtils {
     return 0;
   }
 
-  private static void createTempFolderForMarkerFiles(String basePath) {
+  public static void createTempFolderForMarkerFiles(String basePath) {
     new File(basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME).mkdirs();
   }
 
-  private static String getTempFolderName(String basePath) {
+  public static String getTempFolderName(String basePath) {
     return basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME;
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
deleted file mode 100644
index 2da8e29..0000000
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.common.testutils;
-
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-
-public class FileCreateUtils {
-
-  private static void createMetaFile(String basePath, String instantTime, 
String suffix) throws IOException {
-    Path parentPath = Paths.get(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME);
-    Files.createDirectories(parentPath);
-    Path metaFilePath = parentPath.resolve(instantTime + suffix);
-    if (Files.notExists(metaFilePath)) {
-      Files.createFile(metaFilePath);
-    }
-  }
-
-  public static void createCommit(String basePath, String instantTime) throws 
IOException {
-    createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION);
-  }
-
-  public static void createRequestedCommit(String basePath, String 
instantTime) throws IOException {
-    createMetaFile(basePath, instantTime, 
HoodieTimeline.REQUESTED_COMMIT_EXTENSION);
-  }
-
-  public static void createInflightCommit(String basePath, String instantTime) 
throws IOException {
-    createMetaFile(basePath, instantTime, 
HoodieTimeline.INFLIGHT_COMMIT_EXTENSION);
-  }
-
-  public static void createDeltaCommit(String basePath, String instantTime) 
throws IOException {
-    createMetaFile(basePath, instantTime, 
HoodieTimeline.DELTA_COMMIT_EXTENSION);
-  }
-
-  public static void createRequestedDeltaCommit(String basePath, String 
instantTime) throws IOException {
-    createMetaFile(basePath, instantTime, 
HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION);
-  }
-
-  public static void createInflightDeltaCommit(String basePath, String 
instantTime) throws IOException {
-    createMetaFile(basePath, instantTime, 
HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
-  }
-
-  public static void createDataFile(String basePath, String partitionPath, 
String instantTime, String fileId)
-      throws Exception {
-    createDataFile(basePath, partitionPath, instantTime, fileId, 0);
-  }
-
-  public static void createDataFile(String basePath, String partitionPath, 
String instantTime, String fileId, long length)
-      throws Exception {
-    Path parentPath = Paths.get(basePath, partitionPath);
-    Files.createDirectories(parentPath);
-    Path dataFilePath = 
parentPath.resolve(FSUtils.makeDataFileName(instantTime, "1-0-1", fileId));
-    if (Files.notExists(dataFilePath)) {
-      Files.createFile(dataFilePath);
-    }
-    new RandomAccessFile(dataFilePath.toFile(), "rw").setLength(length);
-  }
-
-  public static void createLogFile(String basePath, String partitionPath, 
String baseInstantTime, String fileId, int version)
-      throws Exception {
-    createLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 
0);
-  }
-
-  public static void createLogFile(String basePath, String partitionPath, 
String baseInstantTime, String fileId, int version, int length)
-      throws Exception {
-    Path parentPath = Paths.get(basePath, partitionPath);
-    Files.createDirectories(parentPath);
-    Path logFilePath = parentPath.resolve(FSUtils.makeLogFileName(fileId, 
HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, 
"1-0-1"));
-    if (Files.notExists(logFilePath)) {
-      Files.createFile(logFilePath);
-    }
-    new RandomAccessFile(logFilePath.toFile(), "rw").setLength(length);
-  }
-
-  public static String createMarkerFile(String basePath, String partitionPath, 
String instantTime, String fileID, IOType ioType)
-      throws IOException {
-    Path folderPath = Paths.get(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
-    Files.createDirectories(folderPath);
-    String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, "1-0-1", 
instantTime,
-        HoodieFileFormat.PARQUET.getFileExtension(), 
HoodieTableMetaClient.MARKER_EXTN, ioType);
-    Path markerFilePath = folderPath.resolve(markerFileName);
-    if (Files.notExists(markerFilePath)) {
-      Files.createFile(markerFilePath);
-    }
-    return markerFilePath.toAbsolutePath().toString();
-  }
-}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
deleted file mode 100644
index 32f2d45..0000000
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.common.testutils;
-
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.ValidationUtils;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
-import java.util.stream.IntStream;
-
-import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit;
-import static 
org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit;
-import static 
org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit;
-import static 
org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit;
-import static 
org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile;
-import static 
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCommit;
-import static 
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit;
-
-public class HoodieTestTable {
-
-  private final String basePath;
-  private final FileSystem fs;
-  private HoodieTableMetaClient metaClient;
-  private String currentInstantTime;
-
-  private HoodieTestTable(String basePath, FileSystem fs, 
HoodieTableMetaClient metaClient) {
-    ValidationUtils.checkArgument(Objects.equals(basePath, 
metaClient.getBasePath()));
-    ValidationUtils.checkArgument(Objects.equals(fs, metaClient.getRawFs()));
-    this.basePath = basePath;
-    this.fs = fs;
-    this.metaClient = metaClient;
-  }
-
-  public static HoodieTestTable of(HoodieTableMetaClient metaClient) {
-    return new HoodieTestTable(metaClient.getBasePath(), 
metaClient.getRawFs(), metaClient);
-  }
-
-  public HoodieTestTable addRequestedCommit(String instantTime) throws 
Exception {
-    createRequestedCommit(basePath, instantTime);
-    currentInstantTime = instantTime;
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    return this;
-  }
-
-  public HoodieTestTable addRequestedDeltaCommit(String instantTime) throws 
Exception {
-    createRequestedDeltaCommit(basePath, instantTime);
-    currentInstantTime = instantTime;
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    return this;
-  }
-
-  public HoodieTestTable addInflightCommit(String instantTime) throws 
Exception {
-    createRequestedCommit(basePath, instantTime);
-    createInflightCommit(basePath, instantTime);
-    currentInstantTime = instantTime;
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    return this;
-  }
-
-  public HoodieTestTable addInflightDeltaCommit(String instantTime) throws 
Exception {
-    createRequestedDeltaCommit(basePath, instantTime);
-    createInflightDeltaCommit(basePath, instantTime);
-    currentInstantTime = instantTime;
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    return this;
-  }
-
-  public HoodieTestTable addCommit(String instantTime) throws Exception {
-    createRequestedCommit(basePath, instantTime);
-    createInflightCommit(basePath, instantTime);
-    createCommit(basePath, instantTime);
-    currentInstantTime = instantTime;
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    return this;
-  }
-
-  public HoodieTestTable addDeltaCommit(String instantTime) throws Exception {
-    createRequestedDeltaCommit(basePath, instantTime);
-    createInflightDeltaCommit(basePath, instantTime);
-    createDeltaCommit(basePath, instantTime);
-    currentInstantTime = instantTime;
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    return this;
-  }
-
-  public HoodieTestTable forCommit(String instantTime) {
-    currentInstantTime = instantTime;
-    return this;
-  }
-
-  public HoodieTestTable forDeltaCommit(String instantTime) {
-    currentInstantTime = instantTime;
-    return this;
-  }
-
-  public HoodieTestTable withMarkerFile(String partitionPath, IOType ioType) 
throws IOException {
-    return withMarkerFile(partitionPath, UUID.randomUUID().toString(), ioType);
-  }
-
-  public HoodieTestTable withMarkerFile(String partitionPath, String fileId, 
IOType ioType) throws IOException {
-    createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, 
ioType);
-    return this;
-  }
-
-  public HoodieTestTable withMarkerFiles(String partitionPath, int num, IOType 
ioType) throws IOException {
-    String[] fileIds = IntStream.range(0, num).mapToObj(i -> 
UUID.randomUUID().toString()).toArray(String[]::new);
-    return withMarkerFiles(partitionPath, fileIds, ioType);
-  }
-
-  public HoodieTestTable withMarkerFiles(String partitionPath, String[] 
fileIds, IOType ioType) throws IOException {
-    for (String fileId : fileIds) {
-      createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, 
ioType);
-    }
-    return this;
-  }
-
-  /**
-   * Insert one base file to each of the given distinct partitions.
-   *
-   * @return A {@link Map} of partition and its newly inserted file's id.
-   */
-  public Map<String, String> withInserts(String... partitions) throws 
Exception {
-    Map<String, String> partitionFileIdMap = new HashMap<>();
-    for (String p : partitions) {
-      String fileId = UUID.randomUUID().toString();
-      FileCreateUtils.createDataFile(basePath, p, currentInstantTime, fileId);
-      partitionFileIdMap.put(p, fileId);
-    }
-    return partitionFileIdMap;
-  }
-
-  public HoodieTestTable withUpdates(String partition, String... fileIds) 
throws Exception {
-    for (String f : fileIds) {
-      FileCreateUtils.createDataFile(basePath, partition, currentInstantTime, 
f);
-    }
-    return this;
-  }
-
-  public String withLogFile(String partitionPath) throws Exception {
-    String fileId = UUID.randomUUID().toString();
-    withLogFile(partitionPath, fileId);
-    return fileId;
-  }
-
-  public HoodieTestTable withLogFile(String partitionPath, String fileId) 
throws Exception {
-    return withLogFile(partitionPath, fileId, 0);
-  }
-
-  public HoodieTestTable withLogFile(String partitionPath, String fileId, int 
version) throws Exception {
-    FileCreateUtils.createLogFile(basePath, partitionPath, currentInstantTime, 
fileId, version);
-    return this;
-  }
-
-  public boolean filesExist(Map<String, String> partitionAndFileId, String 
instantTime) {
-    return partitionAndFileId.entrySet().stream().allMatch(entry -> {
-      String partition = entry.getKey();
-      String fileId = entry.getValue();
-      return fileExists(partition, instantTime, fileId);
-    });
-  }
-
-  public boolean filesExist(String partition, String instantTime, String... 
fileIds) {
-    return Arrays.stream(fileIds).allMatch(f -> fileExists(partition, 
instantTime, f));
-  }
-
-  public boolean fileExists(String partition, String instantTime, String 
fileId) {
-    try {
-      return fs.exists(new Path(Paths.get(basePath, partition,
-          FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)).toString()));
-    } catch (IOException e) {
-      throw new HoodieTestTableException(e);
-    }
-  }
-
-  public boolean logFilesExist(String partition, String instantTime, String 
fileId, int... versions) {
-    return Arrays.stream(versions).allMatch(v -> logFileExists(partition, 
instantTime, fileId, v));
-  }
-
-  public boolean logFileExists(String partition, String instantTime, String 
fileId, int version) {
-    try {
-      return fs.exists(new Path(Paths.get(basePath, partition,
-          FSUtils.makeLogFileName(fileId, 
HoodieFileFormat.HOODIE_LOG.getFileExtension(), instantTime, version, 
"1-0-1")).toString()));
-    } catch (IOException e) {
-      throw new HoodieTestTableException(e);
-    }
-  }
-
-  public List<FileStatus> listAllFiles(String partitionPath) throws 
IOException {
-    return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, 
partitionPath).toString()));
-  }
-
-  public List<FileStatus> listAllFilesInTempFolder() throws IOException {
-    return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME).toString()));
-  }
-
-  public static class HoodieTestTableException extends RuntimeException {
-    public HoodieTestTableException(Throwable t) {
-      super(t);
-    }
-  }
-}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 8b38b25..92d431c 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -99,6 +99,7 @@ import static org.junit.jupiter.api.Assertions.fail;
  */
 public class HoodieTestUtils {
 
+  public static final String TEST_EXTENSION = ".test";
   public static final String RAW_TRIPS_TEST_NAME = "raw_trips";
   public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
   public static final int DEFAULT_LOG_VERSION = 1;
@@ -137,7 +138,7 @@ public class HoodieTestUtils {
   }
 
   public static HoodieTableMetaClient init(Configuration hadoopConf, String 
basePath, HoodieTableType tableType,
-      String tableName)
+                                           String tableName)
       throws IOException {
     Properties properties = new Properties();
     properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, 
tableName);
@@ -145,7 +146,7 @@ public class HoodieTestUtils {
   }
 
   public static HoodieTableMetaClient init(Configuration hadoopConf, String 
basePath, HoodieTableType tableType,
-      HoodieFileFormat baseFileFormat)
+                                           HoodieFileFormat baseFileFormat)
       throws IOException {
     Properties properties = new Properties();
     
properties.setProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, 
baseFileFormat.toString());
@@ -153,7 +154,7 @@ public class HoodieTestUtils {
   }
 
   public static HoodieTableMetaClient init(Configuration hadoopConf, String 
basePath, HoodieTableType tableType,
-      Properties properties)
+                                           Properties properties)
       throws IOException {
     properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, 
RAW_TRIPS_TEST_NAME);
     properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, 
tableType.name());
@@ -165,9 +166,6 @@ public class HoodieTestUtils {
     return COMMIT_FORMATTER.format(new Date());
   }
 
-  /**
-   * @deprecated Use {@link HoodieTestTable} instead.
-   */
   public static void createCommitFiles(String basePath, String... 
instantTimes) throws IOException {
     for (String instantTime : instantTimes) {
       new File(
@@ -178,6 +176,20 @@ public class HoodieTestUtils {
               + 
HoodieTimeline.makeInflightCommitFileName(instantTime)).createNewFile();
       new File(
           basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + 
HoodieTimeline.makeCommitFileName(instantTime))
+              .createNewFile();
+    }
+  }
+
+  public static void createDeltaCommitFiles(String basePath, String... 
instantTimes) throws IOException {
+    for (String instantTime : instantTimes) {
+      new File(
+          basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+              + 
HoodieTimeline.makeRequestedDeltaFileName(instantTime)).createNewFile();
+      new File(
+          basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+              + 
HoodieTimeline.makeInflightDeltaFileName(instantTime)).createNewFile();
+      new File(
+          basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + 
HoodieTimeline.makeDeltaFileName(instantTime))
           .createNewFile();
     }
   }
@@ -186,9 +198,6 @@ public class HoodieTestUtils {
     new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs();
   }
 
-  /**
-   * @deprecated Use {@link HoodieTestTable} instead.
-   */
   public static void createInflightCommitFiles(String basePath, String... 
instantTimes) throws IOException {
 
     for (String instantTime : instantTimes) {
@@ -202,12 +211,11 @@ public class HoodieTestUtils {
   public static void createPendingCleanFiles(HoodieTableMetaClient metaClient, 
String... instantTimes) {
     for (String instantTime : instantTimes) {
       Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(instantTime),
-          HoodieTimeline.makeInflightCleanerFileName(instantTime))
-          .forEach(f -> {
+          HoodieTimeline.makeInflightCleanerFileName(instantTime)).forEach(f 
-> {
             FSDataOutputStream os = null;
             try {
-              Path commitFile = new Path(Paths
-                  .get(metaClient.getBasePath(), 
HoodieTableMetaClient.METAFOLDER_NAME, f).toString());
+              Path commitFile = new Path(
+                  metaClient.getBasePath() + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
               os = metaClient.getFs().create(commitFile, true);
               // Write empty clean metadata
               os.write(TimelineMetadataUtils.serializeCleanerPlan(
@@ -229,12 +237,11 @@ public class HoodieTestUtils {
 
   public static void createCorruptedPendingCleanFiles(HoodieTableMetaClient 
metaClient, String commitTime) {
     Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(commitTime),
-        HoodieTimeline.makeInflightCleanerFileName(commitTime))
-        .forEach(f -> {
+        HoodieTimeline.makeInflightCleanerFileName(commitTime)).forEach(f -> {
           FSDataOutputStream os = null;
           try {
-            Path commitFile = new Path(Paths
-                .get(metaClient.getBasePath(), 
HoodieTableMetaClient.METAFOLDER_NAME, f).toString());
+            Path commitFile = new Path(
+                    metaClient.getBasePath() + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
             os = metaClient.getFs().create(commitFile, true);
             // Write empty clean metadata
             os.write(new byte[0]);
@@ -252,18 +259,18 @@ public class HoodieTestUtils {
         });
   }
 
-  /**
-   * @deprecated Use {@link HoodieTestTable} instead.
-   */
+  public static String createNewDataFile(String basePath, String 
partitionPath, String instantTime)
+      throws IOException {
+    String fileID = UUID.randomUUID().toString();
+    return createDataFile(basePath, partitionPath, instantTime, fileID);
+  }
+
   public static String createNewDataFile(String basePath, String 
partitionPath, String instantTime, long length)
       throws IOException {
     String fileID = UUID.randomUUID().toString();
     return createDataFileFixLength(basePath, partitionPath, instantTime, 
fileID, length);
   }
 
-  /**
-   * @deprecated Use {@link HoodieTestTable} instead.
-   */
   public static String createDataFile(String basePath, String partitionPath, 
String instantTime, String fileID)
       throws IOException {
     String folderPath = basePath + "/" + partitionPath + "/";
@@ -272,7 +279,7 @@ public class HoodieTestUtils {
     return fileID;
   }
 
-  private static String createDataFileFixLength(String basePath, String 
partitionPath, String instantTime, String fileID,
+  public static String createDataFileFixLength(String basePath, String 
partitionPath, String instantTime, String fileID,
       long length) throws IOException {
     String folderPath = basePath + "/" + partitionPath + "/";
     Files.createDirectories(Paths.get(folderPath));
@@ -284,9 +291,6 @@ public class HoodieTestUtils {
     return fileID;
   }
 
-  /**
-   * @deprecated Use {@link HoodieTestTable} instead.
-   */
   public static String createNewLogFile(FileSystem fs, String basePath, String 
partitionPath, String instantTime,
       String fileID, Option<Integer> version) throws IOException {
     String folderPath = basePath + "/" + partitionPath + "/";
@@ -303,6 +307,17 @@ public class HoodieTestUtils {
     return fileID;
   }
 
+  public static void createCompactionCommitFiles(FileSystem fs, String 
basePath, String... instantTimes)
+      throws IOException {
+    for (String instantTime : instantTimes) {
+      boolean createFile = fs.createNewFile(new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+          + HoodieTimeline.makeCommitFileName(instantTime)));
+      if (!createFile) {
+        throw new IOException("cannot create commit file for commit " + 
instantTime);
+      }
+    }
+  }
+
   public static void createCompactionRequest(HoodieTableMetaClient metaClient, 
String instant,
       List<Pair<String, FileSlice>> fileSliceList) throws IOException {
     HoodieCompactionPlan plan = 
CompactionUtils.buildFromFileSlices(fileSliceList, Option.empty(), 
Option.empty());
@@ -311,16 +326,10 @@ public class HoodieTestUtils {
         TimelineMetadataUtils.serializeCompactionPlan(plan));
   }
 
-  /**
-   * @deprecated Use {@link HoodieTestTable} instead.
-   */
   public static String getDataFilePath(String basePath, String partitionPath, 
String instantTime, String fileID) {
     return basePath + "/" + partitionPath + "/" + 
FSUtils.makeDataFileName(instantTime, DEFAULT_WRITE_TOKEN, fileID);
   }
 
-  /**
-   * @deprecated Use {@link HoodieTestTable} instead.
-   */
   public static String getLogFilePath(String basePath, String partitionPath, 
String instantTime, String fileID,
       Option<Integer> version) {
     return basePath + "/" + partitionPath + "/" + 
FSUtils.makeLogFileName(fileID, ".log", instantTime,
@@ -331,43 +340,36 @@ public class HoodieTestUtils {
     return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + 
instantTime + HoodieTimeline.COMMIT_EXTENSION;
   }
 
-  /**
-   * @deprecated Use {@link HoodieTestTable} instead.
-   */
   public static String getInflightCommitFilePath(String basePath, String 
instantTime) {
     return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + 
instantTime
         + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION;
   }
 
-  /**
-   * @deprecated Use {@link HoodieTestTable} instead.
-   */
   public static String getRequestedCompactionFilePath(String basePath, String 
instantTime) {
     return basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + 
instantTime
         + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION;
   }
 
-  /**
-   * @deprecated Use {@link HoodieTestTable} instead.
-   */
   public static boolean doesDataFileExist(String basePath, String 
partitionPath, String instantTime,
       String fileID) {
     return new File(getDataFilePath(basePath, partitionPath, instantTime, 
fileID)).exists();
   }
 
+  public static boolean doesLogFileExist(String basePath, String 
partitionPath, String instantTime, String fileID,
+      Option<Integer> version) {
+    return new File(getLogFilePath(basePath, partitionPath, instantTime, 
fileID, version)).exists();
+  }
+
   public static boolean doesCommitExist(String basePath, String instantTime) {
     return new File(
         basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + 
instantTime + HoodieTimeline.COMMIT_EXTENSION)
-        .exists();
+            .exists();
   }
 
-  /**
-   * @deprecated Use {@link HoodieTestTable} instead.
-   */
   public static boolean doesInflightExist(String basePath, String instantTime) 
{
     return new File(
         basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + 
instantTime + HoodieTimeline.INFLIGHT_EXTENSION)
-        .exists();
+            .exists();
   }
 
   public static void createCleanFiles(HoodieTableMetaClient metaClient, String 
basePath,
@@ -417,8 +419,8 @@ public class HoodieTestUtils {
       Writer logWriter;
       try {
         logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new 
Path(basePath, partitionPath))
-            
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
-            .overBaseCommit(location.getInstantTime()).withFs(fs).build();
+          
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
+          .overBaseCommit(location.getInstantTime()).withFs(fs).build();
 
         Map<HoodieLogBlock.HeaderMetadataType, String> header = new 
HashMap<>();
         header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
location.getInstantTime());
@@ -472,7 +474,7 @@ public class HoodieTestUtils {
 
   public static FileStatus[] listAllDataFilesAndLogFilesInPath(FileSystem fs, 
String basePath) throws IOException {
     return Stream.concat(Arrays.stream(listAllDataFilesInPath(fs, basePath)), 
Arrays.stream(listAllLogFilesInPath(fs, basePath)))
-        .toArray(FileStatus[]::new);
+            .toArray(FileStatus[]::new);
   }
 
   public static List<String> monotonicIncreasingCommitTimestamps(int 
numTimestamps, int startSecsDelta) {

Reply via email to