This is an automated email from the ASF dual-hosted git repository.

satish pushed a commit to branch release-0.12.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 31b6acb973f75e6d27e3e04940db26d9af1ab6db
Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Tue Nov 29 00:38:47 2022 +0800

    [HUDI-4209] Avoid using HDFS in HoodieClientTestHarness (#7305)
---
 .../index/bloom/TestFlinkHoodieBloomIndex.java     |   2 +
 .../testutils/HoodieFlinkClientTestHarness.java    |  84 ++-----------
 .../testutils/HoodieFlinkWriteableTestTable.java   |   2 +-
 .../hudi/client/TestFileBasedLockProvider.java     |  91 ++++++--------
 .../apache/hudi/client/TestHoodieReadClient.java   |  37 ++----
 .../java/org/apache/hudi/client/TestMultiFS.java   |  63 +++++++---
 .../hudi/client/TestUpdateSchemaEvolution.java     |   3 +-
 .../TestBulkInsertInternalPartitioner.java         |  16 +--
 .../commit/TestCopyOnWriteActionExecutor.java      |   3 +-
 .../TestMergeOnReadRollbackActionExecutor.java     |  25 +---
 .../hudi/table/marker/TestDirectWriteMarkers.java  |  13 +-
 .../hudi/testutils/HoodieClientTestHarness.java    | 110 ++++-------------
 .../org/apache/hudi/common/fs/TestFSUtils.java     |  89 +++++++-------
 .../fs/TestFSUtilsWithRetryWrapperEnable.java      |   1 -
 .../common/functional/TestHoodieLogFormat.java     |  62 ++++------
 .../TestHoodieLogFormatAppendFailure.java          |   8 +-
 .../hudi/common/table/TestTimelineUtils.java       |  14 +--
 .../table/view/TestIncrementalFSViewSync.java      | 133 ++++++++++-----------
 .../common/testutils/HoodieCommonTestHarness.java  |  25 ++--
 .../testutils/minicluster/HdfsTestService.java     |  33 ++---
 .../testutils/minicluster/MiniClusterUtil.java     |  59 ---------
 .../hive/TestHoodieCombineHiveInputFormat.java     |  32 +++--
 22 files changed, 323 insertions(+), 582 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
index d07eff01c27..6dd5a1c27e2 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.data.HoodieListPairData;
+import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -92,6 +93,7 @@ public class TestFlinkHoodieBloomIndex extends 
HoodieFlinkClientTestHarness {
 
   private HoodieWriteConfig makeConfig(boolean rangePruning, boolean 
treeFiltering, boolean bucketizedChecking) {
     return HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withEngineType(EngineType.FLINK)
         
.withIndexConfig(HoodieIndexConfig.newBuilder().bloomIndexPruneByRanges(rangePruning)
             
.bloomIndexTreebasedFilter(treeFiltering).bloomIndexBucketizedChecking(bucketizedChecking)
             .bloomIndexKeysPerBucket(2).build())
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java
index 2f4e9eeddc9..828154931d3 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java
@@ -30,69 +30,39 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.bloom.TestFlinkHoodieBloomIndex;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.TestInfo;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 /**
  * The test harness for resource initialization and cleanup.
  */
-public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness 
implements Serializable {
+public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness {
 
   protected static final Logger LOG = 
LogManager.getLogger(HoodieFlinkClientTestHarness.class);
-  private String testMethodName;
-  protected transient Configuration hadoopConf = null;
-  protected transient FileSystem fs;
-  protected transient MiniClusterWithClientResource flinkCluster = null;
-  protected transient HoodieFlinkEngineContext context = null;
-  protected transient ExecutorService executorService;
-  protected transient HoodieFlinkWriteClient writeClient;
-  protected transient HoodieTableFileSystemView tableView;
+  protected Configuration hadoopConf;
+  protected FileSystem fs;
+  protected HoodieFlinkEngineContext context;
+  protected ExecutorService executorService;
+  protected HoodieFlinkWriteClient writeClient;
+  protected HoodieTableFileSystemView tableView;
 
   protected final FlinkTaskContextSupplier supplier = new 
FlinkTaskContextSupplier(null);
 
-  // dfs
-  protected transient HdfsTestService hdfsTestService;
-  protected transient MiniDFSCluster dfsCluster;
-  protected transient DistributedFileSystem dfs;
-
-  @BeforeEach
-  public void setTestMethodName(TestInfo testInfo) {
-    if (testInfo.getTestMethod().isPresent()) {
-      testMethodName = testInfo.getTestMethod().get().getName();
-    } else {
-      testMethodName = "Unknown";
-    }
-  }
-
-  protected void initFlinkMiniCluster() {
-    flinkCluster = new MiniClusterWithClientResource(
-        new MiniClusterResourceConfiguration.Builder()
-            .setNumberSlotsPerTaskManager(2)
-            .setNumberTaskManagers(1)
-            .build());
-  }
-
   protected void initFileSystem() {
     hadoopConf = new Configuration();
     initFileSystemWithConfiguration(hadoopConf);
@@ -100,9 +70,7 @@ public class HoodieFlinkClientTestHarness extends 
HoodieCommonTestHarness implem
   }
 
   private void initFileSystemWithConfiguration(Configuration configuration) {
-    if (basePath == null) {
-      throw new IllegalStateException("The base path has not been 
initialized.");
-    }
+    checkState(basePath != null);
     fs = FSUtils.getFs(basePath, configuration);
     if (fs instanceof LocalFileSystem) {
       LocalFileSystem lfs = (LocalFileSystem) fs;
@@ -124,9 +92,7 @@ public class HoodieFlinkClientTestHarness extends 
HoodieCommonTestHarness implem
   }
 
   protected void initMetaClient(HoodieTableType tableType) throws IOException {
-    if (basePath == null) {
-      throw new IllegalStateException("The base path has not been 
initialized.");
-    }
+    checkState(basePath != null);
     metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType);
   }
 
@@ -156,18 +122,10 @@ public class HoodieFlinkClientTestHarness extends 
HoodieCommonTestHarness implem
     cleanupFlinkContexts();
     cleanupTestDataGenerator();
     cleanupFileSystem();
-    cleanupDFS();
     cleanupExecutorService();
     System.gc();
   }
 
-  protected void cleanupFlinkMiniCluster() {
-    if (flinkCluster != null) {
-      flinkCluster.after();
-      flinkCluster = null;
-    }
-  }
-
   /**
    * Simple test sink function.
    */
@@ -185,7 +143,7 @@ public class HoodieFlinkClientTestHarness extends 
HoodieCommonTestHarness implem
   /**
    * Cleanups hoodie clients.
    */
-  protected void cleanupClients() throws java.io.IOException {
+  protected void cleanupClients() {
     if (metaClient != null) {
       metaClient = null;
     }
@@ -199,24 +157,6 @@ public class HoodieFlinkClientTestHarness extends 
HoodieCommonTestHarness implem
     }
   }
 
-  /**
-   * Cleanups the distributed file system.
-   *
-   * @throws IOException
-   */
-  protected void cleanupDFS() throws java.io.IOException {
-    if (hdfsTestService != null) {
-      hdfsTestService.stop();
-      dfsCluster.shutdown(true, true);
-      hdfsTestService = null;
-      dfsCluster = null;
-      dfs = null;
-    }
-    // Need to closeAll to clear FileSystem.Cache, required because DFS and 
LocalFS used in the
-    // same JVM
-    FileSystem.closeAll();
-  }
-
   /**
    * Cleanups the executor service.
    */
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
index 2a69e6fd671..cd4096663f3 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
@@ -60,7 +60,7 @@ public class HoodieFlinkWriteableTestTable extends 
HoodieWriteableTestTable {
   }
 
   public static HoodieFlinkWriteableTestTable of(HoodieTableMetaClient 
metaClient, Schema schema, BloomFilter filter) {
-    return new HoodieFlinkWriteableTestTable(metaClient.getBasePath(), 
metaClient.getRawFs(), metaClient, schema, filter);
+    return new 
HoodieFlinkWriteableTestTable(metaClient.getBasePathV2().toString(), 
metaClient.getRawFs(), metaClient, schema, filter);
   }
 
   public static HoodieFlinkWriteableTestTable of(HoodieTableMetaClient 
metaClient, Schema schema) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java
index 208e9cd62e7..e81a85c5978 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java
@@ -19,22 +19,17 @@
 
 package org.apache.hudi.client;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
 import org.apache.hudi.common.config.LockConfiguration;
-import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieLockException;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -43,93 +38,75 @@ import static 
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PA
 import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestFileBasedLockProvider {
-  private static HdfsTestService hdfsTestService;
-  private static MiniDFSCluster dfsCluster;
-  private static LockConfiguration lockConfiguration;
-  private static Configuration hadoopConf;
 
-  @BeforeAll
-  public static void setup() throws IOException {
-    hdfsTestService = new HdfsTestService();
-    dfsCluster = hdfsTestService.start(true);
-    hadoopConf = dfsCluster.getFileSystem().getConf();
+  @TempDir
+  Path tempDir;
+  String basePath;
+  LockConfiguration lockConfiguration;
+  Configuration hadoopConf;
 
+  @BeforeEach
+  public void setUp() throws IOException {
+    basePath = tempDir.toUri().getPath();
     Properties properties = new Properties();
-    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/");
+    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath);
     properties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1");
     properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
     properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, 
"1000");
     properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3");
     lockConfiguration = new LockConfiguration(properties);
-  }
-
-  @AfterAll
-  public static void cleanUpAfterAll() throws IOException {
-    Path workDir = dfsCluster.getFileSystem().getWorkingDirectory();
-    FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf());
-    fs.delete(new Path("/tmp"), true);
-    if (hdfsTestService != null) {
-      hdfsTestService.stop();
-      hdfsTestService = null;
-    }
-  }
-
-  @AfterEach
-  public void cleanUpAfterEach() throws IOException {
-    Path workDir = dfsCluster.getFileSystem().getWorkingDirectory();
-    FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf());
-    fs.delete(new Path("/tmp/lock"), true);
+    hadoopConf = new Configuration();
   }
 
   @Test
   public void testAcquireLock() {
     FileSystemBasedLockProvider fileBasedLockProvider = new 
FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
-    
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
-          .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
+    assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+        .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
     fileBasedLockProvider.unlock();
   }
 
   @Test
   public void testAcquireLockWithDefaultPath() {
     lockConfiguration.getConfig().remove(FILESYSTEM_LOCK_PATH_PROP_KEY);
-    
lockConfiguration.getConfig().setProperty(HoodieWriteConfig.BASE_PATH.key(), 
"/tmp/");
+    
lockConfiguration.getConfig().setProperty(HoodieWriteConfig.BASE_PATH.key(), 
basePath);
     FileSystemBasedLockProvider fileBasedLockProvider = new 
FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
-    
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
-          .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
+    assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+        .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
     fileBasedLockProvider.unlock();
-    lockConfiguration.getConfig().setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, 
"/tmp/");
+    lockConfiguration.getConfig().setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, 
basePath);
   }
 
   @Test
   public void testUnLock() {
     FileSystemBasedLockProvider fileBasedLockProvider = new 
FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
-    
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
-          .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
+    assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+        .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
     fileBasedLockProvider.unlock();
-    
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
-          .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
+    assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+        .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
   }
 
   @Test
   public void testReentrantLock() {
     FileSystemBasedLockProvider fileBasedLockProvider = new 
FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
-    
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
-          .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
-    
Assertions.assertFalse(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
-            .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
+    assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+        .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
+    assertFalse(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+        .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
     fileBasedLockProvider.unlock();
   }
 
   @Test
   public void testUnlockWithoutLock() {
-    try {
+    assertDoesNotThrow(() -> {
       FileSystemBasedLockProvider fileBasedLockProvider = new 
FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
       fileBasedLockProvider.unlock();
-    } catch (HoodieLockException e) {
-      Assertions.fail();
-    }
+    });
   }
-
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
index bc1d6e03c04..eab9f0937c7 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 
 import org.apache.spark.api.java.JavaPairRDD;
@@ -33,7 +32,6 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -50,16 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 @SuppressWarnings("unchecked")
 public class TestHoodieReadClient extends HoodieClientTestBase {
 
-  @Override
-  protected void initPath() {
-    try {
-      java.nio.file.Path basePath = tempDir.resolve("dataset");
-      java.nio.file.Files.createDirectories(basePath);
-      this.basePath = basePath.toUri().toString();
-    } catch (IOException ioe) {
-      throw new HoodieIOException(ioe.getMessage(), ioe);
-    }
-  }
+  private static final int PARALLELISM = 2;
 
   /**
    * Test ReadFilter API after writing new records using 
HoodieWriteClient.insert.
@@ -91,15 +80,13 @@ public class TestHoodieReadClient extends 
HoodieClientTestBase {
   @Test
   public void testReadFilterExistAfterBulkInsertPrepped() throws Exception {
     
testReadFilterExist(getConfigBuilder().withBulkInsertParallelism(1).build(),
-        (writeClient, recordRDD, instantTime) -> {
-          return writeClient.bulkInsertPreppedRecords(recordRDD, instantTime, 
Option.empty());
-        });
+        (writeClient, recordRDD, instantTime) -> 
writeClient.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()));
   }
 
   @Test
   public void testReadROViewFailsWithoutSqlContext() {
     SparkRDDReadClient readClient = new SparkRDDReadClient(context, 
getConfig());
-    JavaRDD<HoodieKey> recordsRDD = jsc.parallelize(new ArrayList<>(), 1);
+    JavaRDD<HoodieKey> recordsRDD = jsc.parallelize(new ArrayList<>(), 
PARALLELISM);
     assertThrows(IllegalStateException.class, () -> {
       readClient.readROView(recordsRDD, 1);
     });
@@ -115,18 +102,18 @@ public class TestHoodieReadClient extends 
HoodieClientTestBase {
    */
   private void testReadFilterExist(HoodieWriteConfig config,
       Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, 
JavaRDD<HoodieRecord>, String> writeFn) throws Exception {
-    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
       SparkRDDReadClient readClient = 
getHoodieReadClient(config.getBasePath());
       String newCommitTime = writeClient.startCommit();
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
-      JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
+      JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, PARALLELISM);
 
       JavaRDD<HoodieRecord> filteredRDD = readClient.filterExists(recordsRDD);
 
       // Should not find any files
       assertEquals(100, filteredRDD.collect().size());
 
-      JavaRDD<HoodieRecord> smallRecordsRDD = 
jsc.parallelize(records.subList(0, 75), 1);
+      JavaRDD<HoodieRecord> smallRecordsRDD = 
jsc.parallelize(records.subList(0, 75), PARALLELISM);
       // We create three base file, each having one record. (3 different 
partitions)
       List<WriteStatus> statuses = writeFn.apply(writeClient, smallRecordsRDD, 
newCommitTime).collect();
       // Verify there are no errors
@@ -146,14 +133,14 @@ public class TestHoodieReadClient extends 
HoodieClientTestBase {
       assertEquals(75, keysWithPaths.count());
 
       // verify rows match inserted records
-      Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 1);
+      Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 
PARALLELISM);
       assertEquals(75, rows.count());
 
       JavaRDD<HoodieKey> keysWithoutPaths = keyToPathPair.filter(keyPath -> 
!keyPath._2.isPresent())
           .map(keyPath -> keyPath._1);
 
       assertThrows(AnalysisException.class, () -> {
-        anotherReadClient.readROView(keysWithoutPaths, 1);
+        anotherReadClient.readROView(keysWithoutPaths, PARALLELISM);
       });
 
       // Actual tests of getPendingCompactions method are in 
TestAsyncCompaction
@@ -184,7 +171,7 @@ public class TestHoodieReadClient extends 
HoodieClientTestBase {
    */
   @Test
   public void testTagLocationAfterBulkInsert() throws Exception {
-    testTagLocation(getConfigBuilder().withBulkInsertParallelism(1).build(), 
SparkRDDWriteClient::bulkInsert,
+    
testTagLocation(getConfigBuilder().withBulkInsertParallelism(PARALLELISM).build(),
 SparkRDDWriteClient::bulkInsert,
         SparkRDDWriteClient::upsert, false);
   }
 
@@ -194,7 +181,7 @@ public class TestHoodieReadClient extends 
HoodieClientTestBase {
   @Test
   public void testTagLocationAfterBulkInsertPrepped() throws Exception {
     testTagLocation(
-        getConfigBuilder().withBulkInsertParallelism(1).build(), (writeClient, 
recordRDD, instantTime) -> writeClient
+        getConfigBuilder().withBulkInsertParallelism(PARALLELISM).build(), 
(writeClient, recordRDD, instantTime) -> writeClient
             .bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()),
         SparkRDDWriteClient::upsertPreppedRecords, true);
   }
@@ -223,7 +210,7 @@ public class TestHoodieReadClient extends 
HoodieClientTestBase {
       // since they have been modified in the DAG
       JavaRDD<HoodieRecord> recordRDD =
           
jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
-              .map(record -> new HoodieAvroRecord(record.getKey(), 
null)).collect(Collectors.toList()));
+              .map(record -> new HoodieAvroRecord(record.getKey(), 
null)).collect(Collectors.toList()), PARALLELISM);
       // Should have 100 records in table (check using Index), all in 
locations marked at commit
       SparkRDDReadClient readClient = 
getHoodieReadClient(hoodieWriteConfig.getBasePath());
       List<HoodieRecord> taggedRecords = 
readClient.tagLocation(recordRDD).collect();
@@ -239,7 +226,7 @@ public class TestHoodieReadClient extends 
HoodieClientTestBase {
           numRecords, 200, 2);
       recordRDD =
           
jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
-              .map(record -> new HoodieAvroRecord(record.getKey(), 
null)).collect(Collectors.toList()));
+              .map(record -> new HoodieAvroRecord(record.getKey(), 
null)).collect(Collectors.toList()), PARALLELISM);
       // Index should be able to locate all updates in correct locations.
       readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath());
       taggedRecords = readClient.tagLocation(recordRDD).collect();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
index df0fed027ce..2c05e528086 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
@@ -27,6 +27,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
@@ -35,15 +36,20 @@ import org.apache.hudi.testutils.HoodieClientTestHarness;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -51,15 +57,34 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 public class TestMultiFS extends HoodieClientTestHarness {
 
   private static final Logger LOG = LogManager.getLogger(TestMultiFS.class);
-  private String tablePath = "file:///tmp/hoodie/sample-table";
-  protected String tableName = "hoodie_rt";
-  private String tableType = HoodieTableType.COPY_ON_WRITE.name();
+  private static final String TABLE_TYPE = 
HoodieTableType.COPY_ON_WRITE.name();
+  private static final String TABLE_NAME = "hoodie_rt";
+  private static HdfsTestService hdfsTestService;
+  private static FileSystem dfs;
+  private String tablePath;
+  private String dfsBasePath;
+
+  @BeforeAll
+  public static void setUpAll() throws IOException {
+    hdfsTestService = new HdfsTestService();
+    MiniDFSCluster dfsCluster = hdfsTestService.start(true);
+    dfs = dfsCluster.getFileSystem();
+  }
+
+  @AfterAll
+  public static void cleanUpAll() {
+    hdfsTestService.stop();
+  }
 
   @BeforeEach
   public void setUp() throws Exception {
+    initPath();
     initSparkContexts();
-    initDFS();
     initTestDataGenerator();
+    tablePath = baseUri + "/sample-table";
+    dfsBasePath = dfs.getWorkingDirectory().toString();
+    dfs.mkdirs(new Path(dfsBasePath));
+    hadoopConf = dfs.getConf();
   }
 
   @AfterEach
@@ -69,7 +94,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
 
   protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
     return 
HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true)
-        
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 
2).forTable(tableName)
+        
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 
2).forTable(TABLE_NAME)
         
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
         .build();
   }
@@ -78,8 +103,8 @@ public class TestMultiFS extends HoodieClientTestHarness {
   public void readLocalWriteHDFS() throws Exception {
     // Initialize table and filesystem
     HoodieTableMetaClient.withPropertyBuilder()
-        .setTableType(tableType)
-        .setTableName(tableName)
+        .setTableType(TABLE_TYPE)
+        .setTableName(TABLE_NAME)
         .setPayloadClass(HoodieAvroPayload.class)
         .initTable(hadoopConf, dfsBasePath);
 
@@ -88,8 +113,8 @@ public class TestMultiFS extends HoodieClientTestHarness {
     HoodieWriteConfig localConfig = getHoodieWriteConfig(tablePath);
 
     HoodieTableMetaClient.withPropertyBuilder()
-        .setTableType(tableType)
-        .setTableName(tableName)
+        .setTableType(TABLE_TYPE)
+        .setTableName(TABLE_NAME)
         .setPayloadClass(HoodieAvroPayload.class)
         
.setRecordKeyFields(localConfig.getProps().getProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()))
         
.setPartitionFields(localConfig.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()))
@@ -102,8 +127,8 @@ public class TestMultiFS extends HoodieClientTestHarness {
       // Write generated data to hdfs (only inserts)
       String readCommitTime = hdfsWriteClient.startCommit();
       LOG.info("Starting commit " + readCommitTime);
-      List<HoodieRecord> records = dataGen.generateInserts(readCommitTime, 
100);
-      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      List<HoodieRecord> records = dataGen.generateInserts(readCommitTime, 10);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 2);
       hdfsWriteClient.upsert(writeRecords, readCommitTime);
 
       // Read from hdfs
@@ -111,19 +136,19 @@ public class TestMultiFS extends HoodieClientTestHarness {
       HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(dfsBasePath).build();
       HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
       Dataset<Row> readRecords = HoodieClientTestUtils.readCommit(dfsBasePath, 
sqlContext, timeline, readCommitTime);
-      assertEquals(readRecords.count(), records.size(), "Should contain 100 
records");
+      assertEquals(readRecords.count(), records.size());
 
       // Write to local
       HoodieTableMetaClient.withPropertyBuilder()
-        .setTableType(tableType)
-        .setTableName(tableName)
-        .setPayloadClass(HoodieAvroPayload.class)
-        .initTable(hadoopConf, tablePath);
+          .setTableType(TABLE_TYPE)
+          .setTableName(TABLE_NAME)
+          .setPayloadClass(HoodieAvroPayload.class)
+          .initTable(hadoopConf, tablePath);
 
       String writeCommitTime = localWriteClient.startCommit();
       LOG.info("Starting write commit " + writeCommitTime);
-      List<HoodieRecord> localRecords = 
dataGen.generateInserts(writeCommitTime, 100);
-      JavaRDD<HoodieRecord> localWriteRecords = jsc.parallelize(localRecords, 
1);
+      List<HoodieRecord> localRecords = 
dataGen.generateInserts(writeCommitTime, 10);
+      JavaRDD<HoodieRecord> localWriteRecords = jsc.parallelize(localRecords, 
2);
       LOG.info("Writing to path: " + tablePath);
       localWriteClient.upsert(localWriteRecords, writeCommitTime);
 
@@ -133,7 +158,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
       timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
       Dataset<Row> localReadRecords =
           HoodieClientTestUtils.readCommit(tablePath, sqlContext, timeline, 
writeCommitTime);
-      assertEquals(localReadRecords.count(), localRecords.size(), "Should 
contain 100 records");
+      assertEquals(localReadRecords.count(), localRecords.size());
     }
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index a5926196ea3..d49014c69e7 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -48,6 +48,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.function.Executable;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -59,7 +60,7 @@ import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResou
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
+public class TestUpdateSchemaEvolution extends HoodieClientTestHarness 
implements Serializable {
 
   @BeforeEach
   public void setUp() throws Exception {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
index 4d2f5e0c5e2..5cb7859888c 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.execution.bulkinsert;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -28,6 +26,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.junit.jupiter.api.Test;
@@ -36,6 +37,7 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -47,7 +49,7 @@ import java.util.stream.Stream;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
+public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase 
implements Serializable {
   private static final Comparator<HoodieRecord<? extends HoodieRecordPayload>> 
KEY_COMPARATOR =
       Comparator.comparing(o -> (o.getPartitionPath() + "+" + 
o.getRecordKey()));
 
@@ -70,8 +72,7 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase {
     return records.map(record -> record.getPartitionPath()).countByValue();
   }
 
-  private static JavaRDD<HoodieRecord> 
generateTripleTestRecordsForBulkInsert(JavaSparkContext jsc)
-      throws Exception {
+  private static JavaRDD<HoodieRecord> 
generateTripleTestRecordsForBulkInsert(JavaSparkContext jsc) {
     return 
generateTestRecordsForBulkInsert(jsc).union(generateTestRecordsForBulkInsert(jsc))
         .union(generateTestRecordsForBulkInsert(jsc));
   }
@@ -135,8 +136,7 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase {
   @ParameterizedTest(name = "[{index}] {0}")
   @MethodSource("configParams")
   public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode,
-                                                boolean isGloballySorted, 
boolean isLocallySorted)
-      throws Exception {
+                                                boolean isGloballySorted, 
boolean isLocallySorted) {
     JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
     JavaRDD<HoodieRecord> records2 = 
generateTripleTestRecordsForBulkInsert(jsc);
     
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode),
@@ -146,7 +146,7 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase {
   }
 
   @Test
-  public void testCustomColumnSortPartitioner() throws Exception {
+  public void testCustomColumnSortPartitioner() {
     String sortColumnString = "rider";
     String[] sortColumns = sortColumnString.split(",");
     Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator = 
getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index ebe041d638d..ffa93b638f5 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -74,6 +74,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
+import java.io.Serializable;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -95,7 +96,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
+public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase 
implements Serializable {
 
   private static final Logger LOG = 
LogManager.getLogger(TestCopyOnWriteActionExecutor.class);
   private static final Schema SCHEMA = 
getSchemaFromResource(TestCopyOnWriteActionExecutor.class, 
"/exampleSchema.avsc");
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index 1c4de34e5ee..c952c77fc4e 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -77,7 +77,6 @@ public class TestMergeOnReadRollbackActionExecutor extends 
HoodieClientRollbackT
   public void setUp() throws Exception {
     initPath();
     initSparkContexts();
-    //just generate tow partitions
     dataGen = new HoodieTestDataGenerator(new String[] 
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
     initFileSystem();
     initMetaClient();
@@ -89,7 +88,7 @@ public class TestMergeOnReadRollbackActionExecutor extends 
HoodieClientRollbackT
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = {true})
+  @ValueSource(booleans = {false, true})
   public void testMergeOnReadRollbackActionExecutor(boolean isUsingMarkers) 
throws IOException {
     //1. prepare data and assert data result
     List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
@@ -158,11 +157,9 @@ public class TestMergeOnReadRollbackActionExecutor extends 
HoodieClientRollbackT
 
   @Test
   public void testRollbackForCanIndexLogFile() throws IOException {
-    cleanupResources();
-    setUpDFS();
     //1. prepare data and assert data result
     //just generate one partitions
-    dataGen = new HoodieTestDataGenerator(new 
String[]{DEFAULT_FIRST_PARTITION_PATH});
+    dataGen = new HoodieTestDataGenerator(new String[] 
{DEFAULT_FIRST_PARTITION_PATH});
     HoodieWriteConfig cfg = 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 
2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
         .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
@@ -177,7 +174,7 @@ public class TestMergeOnReadRollbackActionExecutor extends 
HoodieClientRollbackT
             
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()).withRollbackUsingMarkers(false).withAutoCommit(false).build();
 
     //1. prepare data
-    new HoodieTestDataGenerator().writePartitionMetadata(fs, new 
String[]{DEFAULT_FIRST_PARTITION_PATH}, basePath);
+    new HoodieTestDataGenerator().writePartitionMetadata(fs, new String[] 
{DEFAULT_FIRST_PARTITION_PATH}, basePath);
     SparkRDDWriteClient client = getHoodieWriteClient(cfg);
     // Write 1 (only inserts)
     String newCommitTime = "001";
@@ -231,8 +228,8 @@ public class TestMergeOnReadRollbackActionExecutor extends 
HoodieClientRollbackT
             .getInstantDetails(new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime))
             .get(),
         HoodieCommitMetadata.class);
-    assert 
commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_FIRST_PARTITION_PATH);
-    assert 
commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_SECOND_PARTITION_PATH);
+    
assertTrue(commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_FIRST_PARTITION_PATH));
+    
assertTrue(commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_SECOND_PARTITION_PATH));
     List<HoodieWriteStat> hoodieWriteStatOptionList = 
commitMetadata.getPartitionToWriteStats().get(DEFAULT_FIRST_PARTITION_PATH);
     // Both update and insert record should enter same existing fileGroup due 
to small file handling
     assertEquals(1, hoodieWriteStatOptionList.size());
@@ -285,8 +282,7 @@ public class TestMergeOnReadRollbackActionExecutor extends 
HoodieClientRollbackT
    * Test Cases for rolling back when there is no base file.
    */
   @Test
-  public void testRollbackWhenFirstCommitFail() throws Exception {
-
+  public void testRollbackWhenFirstCommitFail() {
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
         .withRollbackUsingMarkers(false)
         .withPath(basePath).build();
@@ -296,13 +292,4 @@ public class TestMergeOnReadRollbackActionExecutor extends 
HoodieClientRollbackT
       client.rollback("001");
     }
   }
-
-  private void setUpDFS() throws IOException {
-    initDFS();
-    initSparkContexts();
-    //just generate two partitions
-    dataGen = new HoodieTestDataGenerator(new String[] 
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
-    initFileSystem();
-    initDFSMetaClient();
-  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java
index fa6df3ba73d..0e9f990048e 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -46,10 +47,10 @@ public class TestDirectWriteMarkers extends 
TestWriteMarkersBase {
     this.jsc = new JavaSparkContext(
         
HoodieClientTestUtils.getSparkConfForTest(TestDirectWriteMarkers.class.getName()));
     this.context = new HoodieSparkEngineContext(jsc);
-    this.fs = FSUtils.getFs(metaClient.getBasePath(), 
metaClient.getHadoopConf());
-    this.markerFolderPath =  new Path(metaClient.getMarkerFolderPath("000"));
+    this.fs = FSUtils.getFs(metaClient.getBasePathV2().toString(), 
metaClient.getHadoopConf());
+    this.markerFolderPath =  new 
Path(Paths.get(metaClient.getMarkerFolderPath("000")).toUri());
     this.writeMarkers = new DirectWriteMarkers(
-        fs, metaClient.getBasePath(), markerFolderPath.toString(), "000");
+        fs, metaClient.getBasePathV2().toString(), 
markerFolderPath.toString(), "000");
   }
 
   @AfterEach
@@ -65,11 +66,11 @@ public class TestDirectWriteMarkers extends 
TestWriteMarkersBase {
         .sorted().collect(Collectors.toList());
     assertEquals(3, markerFiles.size());
     assertIterableEquals(CollectionUtils.createImmutableList(
-            "file:" + markerFolderPath.toString()
+            markerFolderPath.toString()
                 + (isTablePartitioned ? "/2020/06/01" : "") + 
"/file1.marker.MERGE",
-            "file:" + markerFolderPath.toString()
+            markerFolderPath.toString()
                 + (isTablePartitioned ? "/2020/06/02" : "") + 
"/file2.marker.APPEND",
-            "file:" + markerFolderPath.toString()
+            markerFolderPath.toString()
                 + (isTablePartitioned ? "/2020/06/03" : "") + 
"/file3.marker.CREATE"),
         markerFiles.stream().map(m -> 
m.getPath().toString()).collect(Collectors.toList())
     );
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index a41c62cdcee..9d305777adf 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -47,7 +47,6 @@ import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -74,8 +73,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -84,12 +81,10 @@ import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.SparkSessionExtensions;
 import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestInfo;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -118,42 +113,32 @@ import static org.junit.jupiter.api.Assertions.fail;
 /**
  * The test harness for resource initialization and cleanup.
  */
-public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness 
implements Serializable {
+public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieClientTestHarness.class);
-
-  protected static int timelineServicePort =
-      FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue();
-  private String testMethodName;
-  protected transient JavaSparkContext jsc = null;
-  protected transient HoodieSparkEngineContext context = null;
-  protected transient SparkSession sparkSession = null;
-  protected transient Configuration hadoopConf = null;
-  protected transient SQLContext sqlContext;
-  protected transient FileSystem fs;
-  protected transient ExecutorService executorService;
-  protected transient HoodieTableMetaClient metaClient;
-  protected transient SparkRDDWriteClient writeClient;
-  protected transient SparkRDDReadClient readClient;
-  protected transient HoodieTableFileSystemView tableView;
-  protected transient TimelineService timelineService;
-
-  protected final SparkTaskContextSupplier supplier = new 
SparkTaskContextSupplier();
-
-  // dfs
-  protected String dfsBasePath;
-  protected transient HdfsTestService hdfsTestService;
-  protected transient MiniDFSCluster dfsCluster;
-  protected transient DistributedFileSystem dfs;
+  protected static int timelineServicePort = 
FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue();
 
   @AfterAll
   public static void tearDownAll() throws IOException {
     FileSystem.closeAll();
   }
 
-  protected Option<Consumer<SparkSessionExtensions>> 
getSparkSessionExtensionsInjector() {
-    return Option.empty();
-  }
+  protected JavaSparkContext jsc;
+  protected HoodieSparkEngineContext context;
+  protected SparkSession sparkSession;
+  protected Configuration hadoopConf;
+  protected SQLContext sqlContext;
+  protected FileSystem fs;
+  protected ExecutorService executorService;
+  protected HoodieTableMetaClient metaClient;
+  protected SparkRDDWriteClient writeClient;
+  protected SparkRDDReadClient readClient;
+  protected HoodieTableFileSystemView tableView;
+
+  protected TimelineService timelineService;
+  protected final SparkTaskContextSupplier supplier = new 
SparkTaskContextSupplier();
+
+  private String testMethodName;
 
   @BeforeEach
   public void setTestMethodName(TestInfo testInfo) {
@@ -185,11 +170,14 @@ public abstract class HoodieClientTestHarness extends 
HoodieCommonTestHarness im
     cleanupSparkContexts();
     cleanupTestDataGenerator();
     cleanupFileSystem();
-    cleanupDFS();
     cleanupExecutorService();
     System.gc();
   }
 
+  protected Option<Consumer<SparkSessionExtensions>> 
getSparkSessionExtensionsInjector() {
+    return Option.empty();
+  }
+
   /**
    * Initializes the Spark contexts ({@link JavaSparkContext} and {@link 
SQLContext}) with the given application name.
    *
@@ -397,58 +385,6 @@ public abstract class HoodieClientTestHarness extends 
HoodieCommonTestHarness im
     }
   }
 
-  /**
-   * Initializes a distributed file system and base directory.
-   *
-   * @throws IOException
-   */
-  protected void initDFS() throws IOException {
-    hdfsTestService = new HdfsTestService();
-    dfsCluster = hdfsTestService.start(true);
-
-    // Create a temp folder as the base path
-    dfs = dfsCluster.getFileSystem();
-    dfsBasePath = dfs.getWorkingDirectory().toString();
-    this.basePath = dfsBasePath;
-    this.hadoopConf = dfs.getConf();
-    dfs.mkdirs(new Path(dfsBasePath));
-  }
-
-  /**
-   * Initializes an instance of {@link HoodieTableMetaClient} with a special 
table type specified by
-   * {@code getTableType()}.
-   *
-   * @throws IOException
-   */
-  protected void initDFSMetaClient() throws IOException {
-    if (dfsBasePath == null) {
-      throw new IllegalStateException("The base path has not been 
initialized.");
-    }
-
-    if (jsc == null) {
-      throw new IllegalStateException("The Spark context has not been 
initialized.");
-    }
-    metaClient = HoodieTestUtils.init(dfs.getConf(), dfsBasePath, 
getTableType());
-  }
-
-  /**
-   * Cleanups the distributed file system.
-   *
-   * @throws IOException
-   */
-  protected void cleanupDFS() throws IOException {
-    if (hdfsTestService != null) {
-      hdfsTestService.stop();
-      dfsCluster.shutdown(true, true);
-      hdfsTestService = null;
-      dfsCluster = null;
-      dfs = null;
-    }
-    // Need to closeAll to clear FileSystem.Cache, required because DFS and 
LocalFS used in the
-    // same JVM
-    FileSystem.closeAll();
-  }
-
   /**
    * Initializes executor service with a fixed thread pool.
    *
@@ -713,7 +649,7 @@ public abstract class HoodieClientTestHarness extends 
HoodieCommonTestHarness im
 
     List<MetadataPartitionType> enabledPartitionTypes = 
metadataWriter.getEnabledPartitionTypes();
 
-    Assertions.assertEquals(enabledPartitionTypes.size(), 
metadataTablePartitions.size());
+    assertEquals(enabledPartitionTypes.size(), metadataTablePartitions.size());
 
     Map<String, MetadataPartitionType> partitionTypeMap = 
enabledPartitionTypes.stream()
         .collect(Collectors.toMap(MetadataPartitionType::getPartitionPath, 
Function.identity()));
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index 590c357664e..55b54f2d8d7 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -41,7 +41,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -66,11 +65,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 public class TestFSUtils extends HoodieCommonTestHarness {
 
-  private final long minRollbackToKeep = 10;
-  private final long minCleanToKeep = 10;
-
-  private static String TEST_WRITE_TOKEN = "1-0-1";
-  public static final String BASE_FILE_EXTENSION = 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
+  private static final String TEST_WRITE_TOKEN = "1-0-1";
+  private static final String BASE_FILE_EXTENSION = 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
 
   @Rule
   public final EnvironmentVariables environmentVariables = new 
EnvironmentVariables();
@@ -78,7 +74,6 @@ public class TestFSUtils extends HoodieCommonTestHarness {
   @BeforeEach
   public void setUp() throws IOException {
     initMetaClient();
-    basePath = "file:" + basePath;
   }
 
   @AfterEach
@@ -100,7 +95,6 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     assertEquals(FSUtils.maskWithoutFileId(instantTime, taskPartitionId), "*_" 
+ taskPartitionId + "_" + instantTime + BASE_FILE_EXTENSION);
   }
 
-  @Test
   /**
    * Tests if process Files return only paths excluding marker directories 
Cleaner, Rollback and compaction-scheduling
    * logic was recursively processing all subfolders including that of 
".hoodie" when looking for partition-paths. This
@@ -108,6 +102,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
    * of ".hoodie" folder) is deleted underneath by compactor. This code tests 
the fix by ensuring ".hoodie" and their
    * subfolders are never processed.
    */
+  @Test
   public void testProcessFiles() throws Exception {
     // All directories including marker dirs.
     List<String> folders =
@@ -122,9 +117,9 @@ public class TestFSUtils extends HoodieCommonTestHarness {
 
     // Files inside partitions and marker directories
     List<String> files = Stream.of("2016/04/15/1_1-0-1_20190528120000",
-        "2016/05/16/2_1-0-1_20190528120000",
-        ".hoodie/.temp/2/2016/05/16/2_1-0-1_20190528120000",
-        ".hoodie/.temp/2/2016/04/15/1_1-0-1_20190528120000")
+            "2016/05/16/2_1-0-1_20190528120000",
+            ".hoodie/.temp/2/2016/05/16/2_1-0-1_20190528120000",
+            ".hoodie/.temp/2/2016/04/15/1_1-0-1_20190528120000")
         .map(fileName -> fileName + BASE_FILE_EXTENSION)
         .collect(Collectors.toList());
 
@@ -322,7 +317,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     assertEquals(LOG_STR, FSUtils.getFileExtensionFromLog(new 
Path(logFileName)));
 
     // create three versions of log file
-    java.nio.file.Path partitionPath = Paths.get(URI.create(basePath + "/" + 
partitionStr));
+    java.nio.file.Path partitionPath = Paths.get(basePath, partitionStr);
     Files.createDirectories(partitionPath);
     String log1 = FSUtils.makeLogFileName(fileId, LOG_EXTENTION, instantTime, 
1, writeToken);
     Files.createFile(partitionPath.resolve(log1));
@@ -345,7 +340,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     assertEquals("file4.parquet", FSUtils.getFileName("file4.parquet", ""));
   }
 
-  private void prepareTestDirectory(FileSystem fileSystem, String rootDir) 
throws IOException {
+  private void prepareTestDirectory(FileSystem fileSystem, Path rootDir) 
throws IOException {
     // Directory structure
     // .hoodie/.temp/
     //  - subdir1
@@ -353,14 +348,13 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     //  - subdir2
     //    - file2.txt
     //  - file3
-    Path dirPath = new Path(rootDir);
     String subDir1 = rootDir + "/subdir1";
     String file1 = subDir1 + "/file1.txt";
     String subDir2 = rootDir + "/subdir2";
     String file2 = subDir2 + "/file2.txt";
     String file3 = rootDir + "/file3.txt";
-    String[] dirs = new String[]{rootDir, subDir1, subDir2};
-    String[] files = new String[]{file1, file2, file3};
+    String[] dirs = new String[] {rootDir.toString(), subDir1, subDir2};
+    String[] files = new String[] {file1, file2, file3};
     // clean up first
     cleanUpTestDirectory(fileSystem, rootDir);
     for (String dir : dirs) {
@@ -371,86 +365,85 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     }
   }
 
-  private void cleanUpTestDirectory(FileSystem fileSystem, String rootDir) 
throws IOException {
-    fileSystem.delete(new Path(rootDir), true);
+  private void cleanUpTestDirectory(FileSystem fileSystem, Path rootDir) 
throws IOException {
+    fileSystem.delete(rootDir, true);
   }
 
   @Test
   public void testDeleteExistingDir() throws IOException {
-    String rootDir = basePath + "/.hoodie/.temp";
+    Path rootDir = getHoodieTempDir();
     FileSystem fileSystem = metaClient.getFs();
     prepareTestDirectory(fileSystem, rootDir);
 
-    Path rootDirPath = new Path(rootDir);
-    assertTrue(fileSystem.exists(rootDirPath));
+    assertTrue(fileSystem.exists(rootDir));
     assertTrue(FSUtils.deleteDir(
-        new HoodieLocalEngineContext(metaClient.getHadoopConf()), fileSystem, 
rootDirPath, 2));
-    assertFalse(fileSystem.exists(rootDirPath));
+        new HoodieLocalEngineContext(metaClient.getHadoopConf()), fileSystem, 
rootDir, 2));
+    assertFalse(fileSystem.exists(rootDir));
   }
 
   @Test
   public void testDeleteNonExistingDir() throws IOException {
-    String rootDir = basePath + "/.hoodie/.temp";
+    Path rootDir = getHoodieTempDir();
     FileSystem fileSystem = metaClient.getFs();
     cleanUpTestDirectory(fileSystem, rootDir);
 
     assertFalse(FSUtils.deleteDir(
-        new HoodieLocalEngineContext(metaClient.getHadoopConf()), fileSystem, 
new Path(rootDir), 2));
+        new HoodieLocalEngineContext(metaClient.getHadoopConf()), fileSystem, 
rootDir, 2));
   }
 
   @Test
   public void testDeleteSubDirectoryRecursively() throws IOException {
-    String rootDir = basePath + "/.hoodie/.temp";
-    String subPathStr = rootDir + "/subdir1";
+    Path rootDir = getHoodieTempDir();
+    Path subDir = new Path(rootDir, "subdir1");
     FileSystem fileSystem = metaClient.getFs();
     prepareTestDirectory(fileSystem, rootDir);
 
     assertTrue(FSUtils.deleteSubPath(
-        subPathStr, new SerializableConfiguration(fileSystem.getConf()), 
true));
+        subDir.toString(), new 
SerializableConfiguration(fileSystem.getConf()), true));
   }
 
   @Test
   public void testDeleteSubDirectoryNonRecursively() throws IOException {
-    String rootDir = basePath + "/.hoodie/.temp";
-    String subPathStr = rootDir + "/subdir1";
+    Path rootDir = getHoodieTempDir();
+    Path subDir = new Path(rootDir, "subdir1");
     FileSystem fileSystem = metaClient.getFs();
     prepareTestDirectory(fileSystem, rootDir);
 
     assertThrows(
         HoodieIOException.class,
         () -> FSUtils.deleteSubPath(
-            subPathStr, new SerializableConfiguration(fileSystem.getConf()), 
false));
+            subDir.toString(), new 
SerializableConfiguration(fileSystem.getConf()), false));
   }
 
   @Test
   public void testDeleteSubPathAsFile() throws IOException {
-    String rootDir = basePath + "/.hoodie/.temp";
-    String subPathStr = rootDir + "/file3.txt";
+    Path rootDir = getHoodieTempDir();
+    Path subDir = new Path(rootDir, "file3.txt");
     FileSystem fileSystem = metaClient.getFs();
     prepareTestDirectory(fileSystem, rootDir);
 
     assertTrue(FSUtils.deleteSubPath(
-        subPathStr, new SerializableConfiguration(fileSystem.getConf()), 
false));
+        subDir.toString(), new 
SerializableConfiguration(fileSystem.getConf()), false));
   }
 
   @Test
   public void testDeleteNonExistingSubDirectory() throws IOException {
-    String rootDir = basePath + "/.hoodie/.temp";
-    String subPathStr = rootDir + "/subdir10";
+    Path rootDir = getHoodieTempDir();
+    Path subDir = new Path(rootDir, "subdir10");
     FileSystem fileSystem = metaClient.getFs();
     cleanUpTestDirectory(fileSystem, rootDir);
 
     assertFalse(FSUtils.deleteSubPath(
-        subPathStr, new SerializableConfiguration(fileSystem.getConf()), 
true));
+        subDir.toString(), new 
SerializableConfiguration(fileSystem.getConf()), true));
   }
 
   @Test
   public void testParallelizeSubPathProcessWithExistingDir() throws 
IOException {
-    String rootDir = basePath + "/.hoodie/.temp";
+    Path rootDir = getHoodieTempDir();
     FileSystem fileSystem = metaClient.getFs();
     prepareTestDirectory(fileSystem, rootDir);
     Map<String, List<String>> result = FSUtils.parallelizeSubPathProcess(
-        new HoodieLocalEngineContext(fileSystem.getConf()), fileSystem, new 
Path(rootDir), 2,
+        new HoodieLocalEngineContext(fileSystem.getConf()), fileSystem, 
rootDir, 2,
         fileStatus -> !fileStatus.getPath().getName().contains("1"),
         pairOfSubPathAndConf -> {
           Path subPath = new Path(pairOfSubPathAndConf.getKey());
@@ -478,18 +471,22 @@ public class TestFSUtils extends HoodieCommonTestHarness {
 
   @Test
   public void testGetFileStatusAtLevel() throws IOException {
-    String rootDir = basePath + "/.hoodie/.temp";
+    Path hoodieTempDir = getHoodieTempDir();
     FileSystem fileSystem = metaClient.getFs();
-    prepareTestDirectory(fileSystem, rootDir);
+    prepareTestDirectory(fileSystem, hoodieTempDir);
     List<FileStatus> fileStatusList = FSUtils.getFileStatusAtLevel(
         new HoodieLocalEngineContext(fileSystem.getConf()), fileSystem,
-        new Path(basePath), 3, 2);
+        new Path(baseUri), 3, 2);
     assertEquals(CollectionUtils.createImmutableSet(
-            basePath + "/.hoodie/.temp/subdir1/file1.txt",
-            basePath + "/.hoodie/.temp/subdir2/file2.txt"),
+            new Path(baseUri.toString(), ".hoodie/.temp/subdir1/file1.txt"),
+            new Path(baseUri.toString(), ".hoodie/.temp/subdir2/file2.txt")),
         fileStatusList.stream()
-            .map(fileStatus -> fileStatus.getPath().toString())
-            .filter(filePath -> filePath.endsWith(".txt"))
+            .map(FileStatus::getPath)
+            .filter(filePath -> filePath.getName().endsWith(".txt"))
             .collect(Collectors.toSet()));
   }
+
+  private Path getHoodieTempDir() {
+    return new Path(baseUri.toString(), ".hoodie/.temp");
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
index aa976cf1127..90e78bccbc5 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
@@ -54,7 +54,6 @@ public class TestFSUtilsWithRetryWrapperEnable extends 
TestFSUtils {
   @BeforeEach
   public void setUp() throws IOException {
     initMetaClient();
-    basePath = "file:" + basePath;
     FileSystemRetryConfig fileSystemRetryConfig = 
FileSystemRetryConfig.newBuilder().withFileSystemActionRetryEnabled(true).build();
     maxRetryIntervalMs = fileSystemRetryConfig.getMaxRetryIntervalMs();
     maxRetryNumbers = fileSystemRetryConfig.getMaxRetryNumbers();
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index d6fa6944117..b969550f857 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -46,7 +46,7 @@ import org.apache.hudi.common.testutils.HadoopMapRedUtils;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
-import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
+import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
 import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
@@ -60,6 +60,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -68,6 +69,8 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -107,6 +110,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   private static final HoodieLogBlockType DEFAULT_DATA_BLOCK_TYPE = 
HoodieLogBlockType.AVRO_DATA_BLOCK;
   private static final int BUFFER_SIZE = 4096;
 
+  private static HdfsTestService hdfsTestService;
   private static FileSystem fs;
   private Path partitionPath;
   private String spillableBasePath;
@@ -114,22 +118,22 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @BeforeAll
   public static void setUpClass() throws IOException, InterruptedException {
     // Append is not supported in LocalFileSystem. HDFS needs to be setup.
-    MiniClusterUtil.setUp();
-    fs = MiniClusterUtil.fileSystem;
+    hdfsTestService = new HdfsTestService();
+    fs = hdfsTestService.start(true).getFileSystem();
   }
 
   @AfterAll
   public static void tearDownClass() {
-    MiniClusterUtil.shutdown();
-    fs = null;
+    hdfsTestService.stop();
   }
 
   @BeforeEach
-  public void setUp() throws IOException, InterruptedException {
-    this.basePath = tempDir.toUri().getPath();
-    this.partitionPath = new Path(basePath, "partition_path");
-    this.spillableBasePath = new Path(basePath, 
".spillable_path").toUri().getPath();
-    HoodieTestUtils.init(MiniClusterUtil.configuration, basePath, 
HoodieTableType.MERGE_ON_READ);
+  public void setUp(TestInfo testInfo) throws IOException, 
InterruptedException {
+    Path workDir = fs.getWorkingDirectory();
+    basePath = new Path(workDir.toString(), testInfo.getDisplayName() + 
System.currentTimeMillis()).toString();
+    partitionPath = new Path(basePath, "partition_path");
+    spillableBasePath = new Path(workDir.toString(), 
".spillable_path").toString();
+    HoodieTestUtils.init(fs.getConf(), basePath, 
HoodieTableType.MERGE_ON_READ);
   }
 
   @Test
@@ -311,43 +315,17 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     }, "getCurrentSize should fail after the logAppender is closed");
   }
 
-  /*
-   * This is actually a test on concurrent append and not recovery lease. 
Commenting this out.
-   * https://issues.apache.org/jira/browse/HUDI-117
-   */
-
-  /**
-   * @Test public void testLeaseRecovery() throws IOException, 
URISyntaxException, InterruptedException { Writer writer
-   * = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
-   * 
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
-   * .overBaseCommit("100").withFs(fs).build(); List<IndexedRecord> records =
-   * SchemaTestUtil.generateTestRecords(0, 100); 
Map<HoodieLogBlock.HeaderMetadataType, String> header =
-   * Maps.newHashMap(); 
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
-   * header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString()); HoodieAvroDataBlock
-   * dataBlock = new HoodieAvroDataBlock(records, header); writer = 
writer.appendBlock(dataBlock); long size1 =
-   * writer.getCurrentSize(); // do not close this writer - this simulates a 
data note appending to a log dying
-   * without closing the file // writer.close();
-   * <p>
-   * writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
-   * 
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1").overBaseCommit("100")
-   * .withFs(fs).build(); records = SchemaTestUtil.generateTestRecords(0, 100);
-   * header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString()); dataBlock = new
-   * HoodieAvroDataBlock(records, header); writer = 
writer.appendBlock(dataBlock); long size2 =
-   * writer.getCurrentSize(); assertTrue("We just wrote a new block - size2 
should be > size1", size2 > size1);
-   * assertEquals("Write should be auto-flushed. The size reported by 
FileStatus and the writer should match",
-   * size2, fs.getFileStatus(writer.getLogFile().getPath()).getLen()); 
writer.close(); }
-   */
-
   @Test
-  public void testAppendNotSupported() throws IOException, URISyntaxException, 
InterruptedException {
+  public void testAppendNotSupported(@TempDir java.nio.file.Path tempDir) 
throws IOException, URISyntaxException, InterruptedException {
     // Use some fs like LocalFileSystem, that does not support appends
-    Path localPartitionPath = new Path("file://" + partitionPath);
-    FileSystem localFs = FSUtils.getFs(localPartitionPath.toString(), 
HoodieTestUtils.getDefaultHadoopConf());
-    Path testPath = new Path(localPartitionPath, "append_test");
+    Path localTempDir = new Path(tempDir.toUri());
+    FileSystem localFs = FSUtils.getFs(localTempDir.toString(), 
HoodieTestUtils.getDefaultHadoopConf());
+    assertTrue(localFs instanceof LocalFileSystem);
+    Path testPath = new Path(localTempDir, "append_test");
     localFs.mkdirs(testPath);
 
     // Some data & append two times.
-    List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
+    List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 5);
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
index e1b761e50d0..d18732dfaae 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
@@ -26,7 +26,6 @@ import 
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
-import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
 
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
@@ -56,11 +55,6 @@ import java.util.concurrent.TimeoutException;
 import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 
-/**
- * This class is intentionally using a different way of setting up the 
MiniDFSCluster and not relying on
- * {@link MiniClusterUtil} to reproduce append() issue : 
https://issues.apache.org/jira/browse/HDFS-6325 Reference :
- * https://issues.apache.org/jira/secure/attachment/12645053/HDFS-6325.patch.
- */
 public class TestHoodieLogFormatAppendFailure {
 
   private static File baseDir;
@@ -69,7 +63,7 @@ public class TestHoodieLogFormatAppendFailure {
   @BeforeAll
   public static void setUpClass() throws IOException {
     // NOTE : The MiniClusterDFS leaves behind the directory under which the 
cluster was created
-    baseDir = new File("/tmp/" + UUID.randomUUID().toString());
+    baseDir = new File("/tmp/" + UUID.randomUUID());
     FileUtil.fullyDelete(baseDir);
     // Append is not supported in LocalFileSystem. HDFS needs to be setup.
     Configuration conf = new Configuration();
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index e66a1300173..842e7069ec1 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -133,20 +133,20 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     // verify modified partitions included cleaned data
     List<String> partitions = 
TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1",
 10));
     assertEquals(5, partitions.size());
-    assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4", 
"5"}));
+    assertEquals(partitions, Arrays.asList("0", "2", "3", "4", "5"));
 
     partitions = 
TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1",
 "4"));
     assertEquals(4, partitions.size());
-    assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4"}));
+    assertEquals(partitions, Arrays.asList("0", "2", "3", "4"));
 
     // verify only commit actions
     partitions = 
TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsAfter("1",
 10));
     assertEquals(4, partitions.size());
-    assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"}));
+    assertEquals(partitions, Arrays.asList("2", "3", "4", "5"));
 
     partitions = 
TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsInRange("1",
 "4"));
     assertEquals(3, partitions.size());
-    assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"}));
+    assertEquals(partitions, Arrays.asList("2", "3", "4"));
   }
 
   @Test
@@ -194,10 +194,10 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
 
     // verify modified partitions included cleaned data
     List<String> partitions = 
TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1",
 10));
-    assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"}));
+    assertEquals(partitions, Arrays.asList("2", "3", "4", "5"));
 
     partitions = 
TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1",
 "4"));
-    assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"}));
+    assertEquals(partitions, Arrays.asList("2", "3", "4"));
   }
 
   @Test
@@ -354,4 +354,4 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     return TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata);
   }
 
-}
\ No newline at end of file
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
index d0ec6af5f76..3f862242ee5 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
@@ -59,7 +59,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -88,17 +87,15 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
 
   private static final Logger LOG = 
LogManager.getLogger(TestIncrementalFSViewSync.class);
   private static final int NUM_FILE_IDS_PER_PARTITION = 10;
-
-  private static String TEST_WRITE_TOKEN = "1-0-1";
-
-  private final List<String> partitions = Arrays.asList("2018/01/01", 
"2018/01/02", "2019/03/01");
-  private final List<String> fileIdsPerPartition =
+  private static final String TEST_WRITE_TOKEN = "1-0-1";
+  private static final List<String> PARTITIONS = Arrays.asList("2018/01/01", 
"2018/01/02", "2019/03/01");
+  private static final List<String> FILE_IDS_PER_PARTITION =
       IntStream.range(0, NUM_FILE_IDS_PER_PARTITION).mapToObj(x -> 
UUID.randomUUID().toString()).collect(Collectors.toList());
 
   @BeforeEach
   public void init() throws IOException {
     initMetaClient();
-    for (String p : partitions) {
+    for (String p : PARTITIONS) {
       Files.createDirectories(Paths.get(basePath, p));
     }
     refreshFsView();
@@ -113,7 +110,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
   public void testEmptyPartitionsAndTimeline() throws IOException {
     SyncableFileSystemView view = getFileSystemView(metaClient);
     assertFalse(view.getLastInstant().isPresent());
-    partitions.forEach(p -> assertEquals(0, 
view.getLatestFileSlices(p).count()));
+    PARTITIONS.forEach(p -> assertEquals(0, 
view.getLatestFileSlices(p).count()));
     view.close();
   }
 
@@ -191,7 +188,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
     assertEquals("11", view.getLastInstant().get().getTimestamp());
     assertEquals(State.COMPLETED, view.getLastInstant().get().getState());
     assertEquals(HoodieTimeline.COMMIT_ACTION, 
view.getLastInstant().get().getAction());
-    partitions.forEach(p -> assertEquals(0, 
view.getLatestFileSlices(p).count()));
+    PARTITIONS.forEach(p -> assertEquals(0, 
view.getLatestFileSlices(p).count()));
 
     metaClient.reloadActiveTimeline();
     SyncableFileSystemView newView = getFileSystemView(metaClient);
@@ -234,7 +231,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
     assertEquals("11", view.getLastInstant().get().getTimestamp());
     assertEquals(State.COMPLETED, view.getLastInstant().get().getState());
     assertEquals(HoodieTimeline.COMMIT_ACTION, 
view.getLastInstant().get().getAction());
-    partitions.forEach(p -> assertEquals(0, 
view.getLatestFileSlices(p).count()));
+    PARTITIONS.forEach(p -> assertEquals(0, 
view.getLatestFileSlices(p).count()));
 
     metaClient.reloadActiveTimeline();
     SyncableFileSystemView newView = getFileSystemView(metaClient);
@@ -250,7 +247,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
     // restore instants in reverse order till we rollback all replace instants
     testRestore(view, Arrays.asList("15", "16"), instantsToFiles,
          Arrays.asList(getHoodieReplaceInstant("14"), 
getHoodieReplaceInstant("13")),
-         "17", true, 1, fileIdsPerPartition.size());
+         "17", true, 1, FILE_IDS_PER_PARTITION.size());
 
     // clear files from inmemory view for replaced instants
     instantsToFiles.remove("14");
@@ -277,8 +274,8 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
         metaClient.reloadActiveTimeline();
         SyncableFileSystemView newView = getFileSystemView(metaClient);
         // 1 fileId is replaced for every partition, so subtract 
partitions.size()
-        expectedSlicesPerPartition = expectedSlicesPerPartition + 
fileIdsPerPartition.size()  -  1;
-        areViewsConsistent(view, newView, expectedSlicesPerPartition * 
partitions.size());
+        expectedSlicesPerPartition = expectedSlicesPerPartition + 
FILE_IDS_PER_PARTITION.size()  -  1;
+        areViewsConsistent(view, newView, expectedSlicesPerPartition * 
PARTITIONS.size());
         newView.close();
       } catch (IOException e) {
         throw new HoodieIOException("unable to test replace", e);
@@ -308,7 +305,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
       int lastPartition = file.lastIndexOf("/");
       return Pair.of(file.substring(0, lastPartition), 
file.substring(lastPartition + 1));
     }).collect(Collectors.groupingBy(Pair::getKey, 
Collectors.mapping(Pair::getValue, Collectors.toList())));
-    return partitions.stream()
+    return PARTITIONS.stream()
         .map(p -> Pair.of(p, 
FSUtils.getFileId(partitionToFileIdsList.get(p).get(0))))
         .collect(Collectors.groupingBy(Pair::getKey, 
Collectors.mapping(Pair::getValue, Collectors.toList())));
   }
@@ -339,7 +336,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
     instantsToFiles = testMultipleWriteSteps(view1, 
Collections.singletonList("11"), true, "11");
 
     SyncableFileSystemView view2 =
-        
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build());
+        
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePathV2().toString()).build());
 
     // Run 2 more ingestion on MOR table. View1 is not yet synced but View2 is
     instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("12", 
"13"), true, "11"));
@@ -349,9 +346,9 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
 
     view2.sync();
     SyncableFileSystemView view3 =
-        
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build());
+        
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePathV2().toString()).build());
     view3.sync();
-    areViewsConsistent(view1, view2, partitions.size() * 
fileIdsPerPartition.size());
+    areViewsConsistent(view1, view2, PARTITIONS.size() * 
FILE_IDS_PER_PARTITION.size());
 
     /*
      * Case where a compaction is scheduled and then unscheduled
@@ -359,9 +356,9 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
     scheduleCompaction(view2, "15");
     unscheduleCompaction(view2, "15", "14", "11");
     view1.sync();
-    areViewsConsistent(view1, view2, partitions.size() * 
fileIdsPerPartition.size());
+    areViewsConsistent(view1, view2, PARTITIONS.size() * 
FILE_IDS_PER_PARTITION.size());
     SyncableFileSystemView view4 =
-        
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build());
+        
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePathV2().toString()).build());
     view4.sync();
 
     /*
@@ -373,9 +370,9 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
     testMultipleWriteSteps(view2, Collections.singletonList("16"), false, 
"16", 2,
         Collections.singletonList(new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "18")));
     view1.sync();
-    areViewsConsistent(view1, view2, partitions.size() * 
fileIdsPerPartition.size() * 2);
+    areViewsConsistent(view1, view2, PARTITIONS.size() * 
FILE_IDS_PER_PARTITION.size() * 2);
     SyncableFileSystemView view5 =
-        
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build());
+        
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePathV2().toString()).build());
     view5.sync();
 
     /*
@@ -396,9 +393,9 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
     // Run one more round of ingestion
     instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("23", 
"24"), true, "20", 2));
     view1.sync();
-    areViewsConsistent(view1, view2, partitions.size() * 
fileIdsPerPartition.size() * 2);
+    areViewsConsistent(view1, view2, PARTITIONS.size() * 
FILE_IDS_PER_PARTITION.size() * 2);
     SyncableFileSystemView view6 =
-        
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build());
+        
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePathV2().toString()).build());
     view6.sync();
 
     /*
@@ -415,7 +412,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
 
     Arrays.asList(view1, view2, view3, view4, view5, view6).forEach(v -> {
       v.sync();
-      areViewsConsistent(v, view1, partitions.size() * 
fileIdsPerPartition.size() * 3);
+      areViewsConsistent(v, view1, PARTITIONS.size() * 
FILE_IDS_PER_PARTITION.size() * 3);
     });
 
     view1.close();
@@ -456,7 +453,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
       List<String> cleanedInstants, int numFilesAddedPerInstant, int 
numFilesReplacedPerInstant) {
     final int netFilesAddedPerInstant = numFilesAddedPerInstant - 
numFilesReplacedPerInstant;
     assertEquals(newCleanerInstants.size(), cleanedInstants.size());
-    long exp = partitions.stream().mapToLong(p1 -> 
view.getAllFileSlices(p1).count()).findAny().getAsLong();
+    long exp = PARTITIONS.stream().mapToLong(p1 -> 
view.getAllFileSlices(p1).count()).findAny().getAsLong();
     LOG.info("Initial File Slices :" + exp);
     for (int idx = 0; idx < newCleanerInstants.size(); idx++) {
       String instant = cleanedInstants.get(idx);
@@ -466,25 +463,25 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
 
         performClean(instant, filesToDelete, newCleanerInstants.get(idx));
 
-        exp -= fileIdsPerPartition.size() - numFilesReplacedPerInstant;
+        exp -= FILE_IDS_PER_PARTITION.size() - numFilesReplacedPerInstant;
         final long expTotalFileSlicesPerPartition = exp;
         view.sync();
         assertTrue(view.getLastInstant().isPresent());
         assertEquals(newCleanerInstants.get(idx), 
view.getLastInstant().get().getTimestamp());
         assertEquals(State.COMPLETED, view.getLastInstant().get().getState());
         assertEquals(HoodieTimeline.CLEAN_ACTION, 
view.getLastInstant().get().getAction());
-        partitions.forEach(p -> {
+        PARTITIONS.forEach(p -> {
           LOG.info("PARTITION : " + p);
           LOG.info("\tFileSlices :" + 
view.getAllFileSlices(p).collect(Collectors.toList()));
         });
 
         final int instantIdx = newCleanerInstants.size() - idx;
-        partitions.forEach(p -> assertEquals(fileIdsPerPartition.size() + 
instantIdx * netFilesAddedPerInstant, view.getLatestFileSlices(p).count()));
-        partitions.forEach(p -> assertEquals(expTotalFileSlicesPerPartition, 
view.getAllFileSlices(p).count()));
+        PARTITIONS.forEach(p -> assertEquals(FILE_IDS_PER_PARTITION.size() + 
instantIdx * netFilesAddedPerInstant, view.getLatestFileSlices(p).count()));
+        PARTITIONS.forEach(p -> assertEquals(expTotalFileSlicesPerPartition, 
view.getAllFileSlices(p).count()));
 
         metaClient.reloadActiveTimeline();
         SyncableFileSystemView newView = getFileSystemView(metaClient);
-        areViewsConsistent(view, newView, expTotalFileSlicesPerPartition * 
partitions.size());
+        areViewsConsistent(view, newView, expTotalFileSlicesPerPartition * 
PARTITIONS.size());
         newView.close();
       } catch (IOException e) {
         throw new HoodieException(e);
@@ -511,16 +508,16 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
       Map<String, List<String>> instantsToFiles, List<HoodieInstant> 
rolledBackInstants, String emptyRestoreInstant,
       boolean isRestore, int totalReplacedFileSlicesPerPartition, int 
totalFilesAddedPerPartitionPerInstant) {
     assertEquals(newRestoreInstants.size(), rolledBackInstants.size());
-    long initialFileSlices = partitions.stream().mapToLong(p -> 
view.getAllFileSlices(p).count()).findAny().getAsLong();
+    long initialFileSlices = PARTITIONS.stream().mapToLong(p -> 
view.getAllFileSlices(p).count()).findAny().getAsLong();
     final int numFileSlicesAddedPerInstant = 
(totalFilesAddedPerPartitionPerInstant - totalReplacedFileSlicesPerPartition);
-    final long expectedLatestFileSlices = fileIdsPerPartition.size() + 
(rolledBackInstants.size()) * numFileSlicesAddedPerInstant;
+    final long expectedLatestFileSlices = FILE_IDS_PER_PARTITION.size() + 
(rolledBackInstants.size()) * numFileSlicesAddedPerInstant;
     IntStream.range(0, newRestoreInstants.size()).forEach(idx -> {
       HoodieInstant instant = rolledBackInstants.get(idx);
       try {
         boolean isDeltaCommit = 
HoodieTimeline.DELTA_COMMIT_ACTION.equalsIgnoreCase(instant.getAction());
-        performRestore(instant, instantsToFiles.get(instant.getTimestamp()), 
newRestoreInstants.get(idx), isRestore);
+        performRestore(instant, 
instantsToFiles.getOrDefault(instant.getTimestamp(), Collections.emptyList()), 
newRestoreInstants.get(idx), isRestore);
         final long expTotalFileSlicesPerPartition =
-            isDeltaCommit ? initialFileSlices : initialFileSlices - ((idx + 1) 
* (fileIdsPerPartition.size() - totalReplacedFileSlicesPerPartition));
+            isDeltaCommit ? initialFileSlices : initialFileSlices - ((idx + 1) 
* (FILE_IDS_PER_PARTITION.size() - totalReplacedFileSlicesPerPartition));
         view.sync();
         assertTrue(view.getLastInstant().isPresent());
         LOG.info("Last Instant is :" + view.getLastInstant().get());
@@ -532,15 +529,15 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
 
         if (HoodieTimeline.compareTimestamps(newRestoreInstants.get(idx), 
HoodieTimeline.GREATER_THAN_OR_EQUALS, emptyRestoreInstant
         )) {
-          partitions.forEach(p -> assertEquals(0, 
view.getLatestFileSlices(p).count()));
+          PARTITIONS.forEach(p -> assertEquals(0, 
view.getLatestFileSlices(p).count()));
         } else {
-          partitions.forEach(p -> assertEquals(expectedLatestFileSlices - (idx 
+ 1) * numFileSlicesAddedPerInstant, view.getLatestFileSlices(p).count()));
+          PARTITIONS.forEach(p -> assertEquals(expectedLatestFileSlices - (idx 
+ 1) * numFileSlicesAddedPerInstant, view.getLatestFileSlices(p).count()));
         }
-        partitions.forEach(p -> assertEquals(expTotalFileSlicesPerPartition, 
view.getAllFileSlices(p).count()));
+        PARTITIONS.forEach(p -> assertEquals(expTotalFileSlicesPerPartition, 
view.getAllFileSlices(p).count()));
 
         metaClient.reloadActiveTimeline();
         SyncableFileSystemView newView = getFileSystemView(metaClient);
-        areViewsConsistent(view, newView, expTotalFileSlicesPerPartition * 
partitions.size());
+        areViewsConsistent(view, newView, expTotalFileSlicesPerPartition * 
PARTITIONS.size());
         newView.close();
       } catch (IOException e) {
         throw new HoodieException(e);
@@ -613,20 +610,15 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
    *
    * @param files List of files to be deleted
    */
-  private Map<String, List<String>> deleteFiles(List<String> files) {
-
-    if (null == files) {
-      return new HashMap<>();
-    }
-
+  private Map<String, List<String>> deleteFiles(List<String> files) throws 
IOException {
     Map<String, List<String>> partititonToFiles = new HashMap<>();
-    partitions.forEach(p -> partititonToFiles.put(p, new ArrayList<>()));
+    PARTITIONS.forEach(p -> partititonToFiles.put(p, new ArrayList<>()));
 
     for (String f : files) {
-      String fullPath = String.format("%s/%s", metaClient.getBasePath(), f);
-      new File(fullPath).delete();
-      String partition = 
partitions.stream().filter(f::startsWith).findAny().get();
-      partititonToFiles.get(partition).add(fullPath);
+      java.nio.file.Path fullPath = 
Paths.get(metaClient.getBasePathV2().toString(), f);
+      Files.delete(fullPath);
+      String partition = 
PARTITIONS.stream().filter(f::startsWith).findAny().get();
+      partititonToFiles.get(partition).add(fullPath.toUri().toString());
     }
     return partititonToFiles;
   }
@@ -637,19 +629,19 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
    * @param view Hoodie View
    * @param instantTime COmpaction Instant Time
    */
-  private void scheduleCompaction(SyncableFileSystemView view, String 
instantTime) throws IOException {
-    List<Pair<String, FileSlice>> slices = partitions.stream()
+  private void scheduleCompaction(SyncableFileSystemView view, String  
instantTime) throws IOException {
+    List<Pair<String, FileSlice>> slices = PARTITIONS.stream()
         .flatMap(p -> view.getLatestFileSlices(p).map(s -> Pair.of(p, 
s))).collect(Collectors.toList());
 
-    long initialExpTotalFileSlices = partitions.stream().mapToLong(p -> 
view.getAllFileSlices(p).count()).sum();
-
+    long initialExpTotalFileSlices = PARTITIONS.stream().mapToLong(p -> 
view.getAllFileSlices(p).count()).sum();
+    HoodieInstant compactionRequestedInstant = new 
HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime);
     HoodieCompactionPlan plan = CompactionUtils.buildFromFileSlices(slices, 
Option.empty(), Option.empty());
     HoodieInstant compactionInstant = new HoodieInstant(State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, instantTime);
     metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant,
         TimelineMetadataUtils.serializeCompactionPlan(plan));
 
     view.sync();
-    partitions.forEach(p -> {
+    PARTITIONS.forEach(p -> {
       view.getLatestFileSlices(p).forEach(fs -> {
         assertEquals(instantTime, fs.getBaseInstantTime());
         assertEquals(p, fs.getPartitionPath());
@@ -663,7 +655,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
 
     metaClient.reloadActiveTimeline();
     SyncableFileSystemView newView = getFileSystemView(metaClient);
-    areViewsConsistent(view, newView, initialExpTotalFileSlices + 
partitions.size() * fileIdsPerPartition.size());
+    areViewsConsistent(view, newView, initialExpTotalFileSlices + 
PARTITIONS.size() * FILE_IDS_PER_PARTITION.size());
     newView.close();
   }
 
@@ -683,7 +675,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
 
     view.sync();
     assertEquals(newLastInstant, view.getLastInstant().get().getTimestamp());
-    partitions.forEach(p -> view.getLatestFileSlices(p).forEach(fs -> 
assertEquals(newBaseInstant, fs.getBaseInstantTime())));
+    PARTITIONS.forEach(p -> view.getLatestFileSlices(p).forEach(fs -> 
assertEquals(newBaseInstant, fs.getBaseInstantTime())));
   }
 
   /**
@@ -762,20 +754,20 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
       assertEquals(lastInstant.getAction(), 
view.getLastInstant().get().getAction(),
           "Expected Last=" + lastInstant + ", Found Instants="
               + view.getTimeline().getInstants());
-      partitions.forEach(p -> assertEquals(fileIdsPerPartition.size(), 
view.getLatestFileSlices(p).count()));
-      final long expTotalFileSlicesPerPartition = fileIdsPerPartition.size() * 
multiple;
-      partitions.forEach(p -> assertEquals(expTotalFileSlicesPerPartition, 
view.getAllFileSlices(p).count()));
+      PARTITIONS.forEach(p -> assertEquals(FILE_IDS_PER_PARTITION.size(), 
view.getLatestFileSlices(p).count()));
+      final long expTotalFileSlicesPerPartition = 
FILE_IDS_PER_PARTITION.size() * multiple;
+      PARTITIONS.forEach(p -> assertEquals(expTotalFileSlicesPerPartition, 
view.getAllFileSlices(p).count()));
       if (deltaCommit) {
-        partitions.forEach(p ->
+        PARTITIONS.forEach(p ->
             view.getLatestFileSlices(p).forEach(f -> 
assertEquals(baseInstantForDeltaCommit, f.getBaseInstantTime()))
         );
       } else {
-        partitions.forEach(p -> view.getLatestBaseFiles(p).forEach(f -> 
assertEquals(instant, f.getCommitTime())));
+        PARTITIONS.forEach(p -> view.getLatestBaseFiles(p).forEach(f -> 
assertEquals(instant, f.getCommitTime())));
       }
 
       metaClient.reloadActiveTimeline();
       SyncableFileSystemView newView = getFileSystemView(metaClient);
-      areViewsConsistent(view, newView, fileIdsPerPartition.size() * 
partitions.size() * multiple);
+      areViewsConsistent(view, newView, FILE_IDS_PER_PARTITION.size() * 
PARTITIONS.size() * multiple);
       newView.close();
       instantToFiles.put(instant, filePaths);
       if (!deltaCommit) {
@@ -797,9 +789,9 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
     assertEquals(view1.getLastInstant(), view2.getLastInstant());
 
     // View Checks
-    Map<HoodieFileGroupId, HoodieFileGroup> fileGroupsMap1 = 
partitions.stream().flatMap(view1::getAllFileGroups)
+    Map<HoodieFileGroupId, HoodieFileGroup> fileGroupsMap1 = 
PARTITIONS.stream().flatMap(view1::getAllFileGroups)
         .collect(Collectors.toMap(HoodieFileGroup::getFileGroupId, fg -> fg));
-    Map<HoodieFileGroupId, HoodieFileGroup> fileGroupsMap2 = 
partitions.stream().flatMap(view2::getAllFileGroups)
+    Map<HoodieFileGroupId, HoodieFileGroup> fileGroupsMap2 = 
PARTITIONS.stream().flatMap(view2::getAllFileGroups)
         .collect(Collectors.toMap(HoodieFileGroup::getFileGroupId, fg -> fg));
     assertEquals(fileGroupsMap1.keySet(), fileGroupsMap2.keySet());
     long gotSlicesCount = fileGroupsMap1.keySet().stream()
@@ -843,20 +835,19 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
   }
 
   private List<Pair<String, HoodieWriteStat>> generateDataForInstant(String 
baseInstant, String instant, boolean deltaCommit) {
-    return generateDataForInstant(baseInstant, instant, deltaCommit, 
fileIdsPerPartition);
+    return generateDataForInstant(baseInstant, instant, deltaCommit, 
FILE_IDS_PER_PARTITION);
   }
 
   private List<Pair<String, HoodieWriteStat>> generateDataForInstant(String 
baseInstant, String instant, boolean deltaCommit, List<String> fileIds) {
-    return partitions.stream().flatMap(p -> fileIds.stream().map(f -> {
+    return PARTITIONS.stream().flatMap(p -> fileIds.stream().map(f -> {
       try {
-        File file = new File(basePath + "/" + p + "/"
-            + (deltaCommit
+        java.nio.file.Path filePath = Paths.get(basePath, p, deltaCommit
             ? FSUtils.makeLogFileName(f, ".log", baseInstant, 
Integer.parseInt(instant), TEST_WRITE_TOKEN)
-            : FSUtils.makeBaseFileName(instant, TEST_WRITE_TOKEN, f)));
-        file.createNewFile();
+            : FSUtils.makeBaseFileName(instant, TEST_WRITE_TOKEN, f));
+        Files.createFile(filePath);
         HoodieWriteStat w = new HoodieWriteStat();
         w.setFileId(f);
-        w.setPath(String.format("%s/%s", p, file.getName()));
+        w.setPath(String.format("%s/%s", p, filePath.getFileName()));
         return Pair.of(p, w);
       } catch (IOException e) {
         throw new HoodieException(e);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index 60f4b63cb8c..2accc283fec 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -28,16 +28,18 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.net.URI;
 
 /**
  * The common hoodie test harness to provide the basic infrastructure.
  */
 public class HoodieCommonTestHarness {
 
-  protected String tableName = null;
-  protected String basePath = null;
-  protected transient HoodieTestDataGenerator dataGen = null;
-  protected transient HoodieTableMetaClient metaClient;
+  protected String tableName;
+  protected String basePath;
+  protected URI baseUri;
+  protected HoodieTestDataGenerator dataGen;
+  protected HoodieTableMetaClient metaClient;
   @TempDir
   public java.nio.file.Path tempDir;
 
@@ -52,7 +54,8 @@ public class HoodieCommonTestHarness {
     try {
       java.nio.file.Path basePath = tempDir.resolve("dataset");
       java.nio.file.Files.createDirectories(basePath);
-      this.basePath = basePath.toString();
+      this.basePath = basePath.toAbsolutePath().toString();
+      this.baseUri = basePath.toUri();
     } catch (IOException ioe) {
       throw new HoodieIOException(ioe.getMessage(), ioe);
     }
@@ -60,7 +63,6 @@ public class HoodieCommonTestHarness {
 
   /**
    * Initializes a test data generator which used to generate test datas.
-   *
    */
   protected void initTestDataGenerator() {
     dataGen = new HoodieTestDataGenerator();
@@ -72,7 +74,6 @@ public class HoodieCommonTestHarness {
 
   /**
    * Cleanups test data generator.
-   *
    */
   protected void cleanupTestDataGenerator() {
     if (dataGen != null) {
@@ -87,8 +88,10 @@ public class HoodieCommonTestHarness {
    * @throws IOException
    */
   protected void initMetaClient() throws IOException {
-    metaClient = HoodieTestUtils.init(tempDir.toAbsolutePath().toString(), 
getTableType());
-    basePath = metaClient.getBasePath();
+    if (basePath == null) {
+      initPath();
+    }
+    metaClient = HoodieTestUtils.init(basePath, getTableType());
   }
 
   protected void cleanMetaClient() {
@@ -121,8 +124,8 @@ public class HoodieCommonTestHarness {
   protected SyncableFileSystemView 
getFileSystemViewWithUnCommittedSlices(HoodieTableMetaClient metaClient) {
     try {
       return new HoodieTableFileSystemView(metaClient,
-              metaClient.getActiveTimeline(),
-              HoodieTestTable.of(metaClient).listAllBaseAndLogFiles()
+          metaClient.getActiveTimeline(),
+          HoodieTestTable.of(metaClient).listAllBaseAndLogFiles()
       );
     } catch (IOException ioe) {
       throw new HoodieIOException("Error getting file system view", ioe);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java
index 727e1e4db6b..617449820fe 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java
@@ -19,16 +19,13 @@
 package org.apache.hudi.common.testutils.minicluster;
 
 import org.apache.hudi.common.testutils.NetworkTestUtils;
-import org.apache.hudi.common.util.FileIOUtils;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.BindException;
 import java.nio.file.Files;
@@ -45,7 +42,7 @@ public class HdfsTestService {
    * Configuration settings.
    */
   private final Configuration hadoopConf;
-  private final String workDir;
+  private final java.nio.file.Path dfsBaseDirPath;
 
   /**
    * Embedded HDFS cluster.
@@ -58,7 +55,7 @@ public class HdfsTestService {
 
   public HdfsTestService(Configuration hadoopConf) throws IOException {
     this.hadoopConf = hadoopConf;
-    this.workDir = 
Files.createTempDirectory("temp").toAbsolutePath().toString();
+    this.dfsBaseDirPath = Files.createTempDirectory("hdfs-test-service" + 
System.currentTimeMillis());
   }
 
   public Configuration getHadoopConf() {
@@ -66,14 +63,12 @@ public class HdfsTestService {
   }
 
   public MiniDFSCluster start(boolean format) throws IOException {
-    Objects.requireNonNull(workDir, "The work dir must be set before starting 
cluster.");
+    Objects.requireNonNull(dfsBaseDirPath, "dfs base dir must be set before 
starting cluster.");
 
     // If clean, then remove the work dir so we can start fresh.
-    String localDFSLocation = getDFSLocation(workDir);
     if (format) {
-      LOG.info("Cleaning HDFS cluster data at: " + localDFSLocation + " and 
starting fresh.");
-      File file = new File(localDFSLocation);
-      FileIOUtils.deleteDirectory(file);
+      LOG.info("Cleaning HDFS cluster data at: " + dfsBaseDirPath + " and 
starting fresh.");
+      Files.deleteIfExists(dfsBaseDirPath);
     }
 
     int loop = 0;
@@ -87,7 +82,7 @@ public class HdfsTestService {
         // Configure and start the HDFS cluster
         // boolean format = shouldFormatDFSCluster(localDFSLocation, clean);
         String bindIP = "127.0.0.1";
-        configureDFSCluster(hadoopConf, localDFSLocation, bindIP, 
namenodeRpcPort,
+        configureDFSCluster(hadoopConf, dfsBaseDirPath.toString(), bindIP, 
namenodeRpcPort,
             datanodePort, datanodeIpcPort, datanodeHttpPort);
         miniDfsCluster = new 
MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(format).checkDataNodeAddrConfig(true)
             .checkDataNodeHostConfig(true).build();
@@ -112,25 +107,15 @@ public class HdfsTestService {
     miniDfsCluster = null;
   }
 
-  /**
-   * Get the location on the local FS where we store the HDFS data.
-   *
-   * @param baseFsLocation The base location on the local filesystem we have 
write access to create dirs.
-   * @return The location for HDFS data.
-   */
-  private static String getDFSLocation(String baseFsLocation) {
-    return baseFsLocation + Path.SEPARATOR + "dfs";
-  }
-
   /**
    * Configure the DFS Cluster before launching it.
    *
    * @param config           The already created Hadoop configuration we'll 
further configure for HDFS
-   * @param localDFSLocation The location on the local filesystem where 
cluster data is stored
+   * @param dfsBaseDir       The location on the local filesystem where 
cluster data is stored
    * @param bindIP           An IP address we want to force the datanode and 
namenode to bind to.
    * @return The updated Configuration object.
    */
-  private static Configuration configureDFSCluster(Configuration config, 
String localDFSLocation, String bindIP,
+  private static Configuration configureDFSCluster(Configuration config, 
String dfsBaseDir, String bindIP,
       int namenodeRpcPort, int datanodePort, int datanodeIpcPort, int 
datanodeHttpPort) {
 
     LOG.info("HDFS force binding to ip: " + bindIP);
@@ -143,7 +128,7 @@ public class HdfsTestService {
     // issues with the internal IP addresses. This config disables that check,
     // and will allow a datanode to connect regardless.
     config.setBoolean("dfs.namenode.datanode.registration.ip-hostname-check", 
false);
-    config.set("hdfs.minidfs.basedir", localDFSLocation);
+    config.set("hdfs.minidfs.basedir", dfsBaseDir);
     // allow current user to impersonate others
     String user = System.getProperty("user.name");
     config.set("hadoop.proxyuser." + user + ".groups", "*");
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/MiniClusterUtil.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/MiniClusterUtil.java
deleted file mode 100644
index 135d875e438..00000000000
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/MiniClusterUtil.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.common.testutils.minicluster;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.zookeeper.server.ZooKeeperServer;
-
-import java.io.IOException;
-
-/**
- * A utility class about mini cluster.
- */
-public class MiniClusterUtil {
-
-  private static MiniDFSCluster dfsCluster;
-  private static ZooKeeperServer zkServer;
-  public static Configuration configuration;
-  public static FileSystem fileSystem;
-
-  public static void setUp() throws IOException, InterruptedException {
-    if (dfsCluster == null) {
-      HdfsTestService service = new HdfsTestService();
-      dfsCluster = service.start(true);
-      configuration = service.getHadoopConf();
-    }
-    if (zkServer == null) {
-      ZookeeperTestService zkService = new ZookeeperTestService(configuration);
-      zkServer = zkService.start();
-    }
-    fileSystem = FileSystem.get(configuration);
-  }
-
-  public static void shutdown() {
-    if (dfsCluster != null) {
-      dfsCluster.shutdown(true, true);
-    }
-    if (zkServer != null) {
-      zkServer.shutdown(true);
-    }
-  }
-}
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
index 9b26a7915dd..e8c286d8ab7 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.testutils.FileCreateUtils;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
-import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
+import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
@@ -73,28 +73,24 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
 
-  private JobConf jobConf;
-  private FileSystem fs;
-  private Configuration hadoopConf;
+  private static HdfsTestService hdfsTestService;
+  private static FileSystem fs;
 
   @BeforeAll
   public static void setUpClass() throws IOException, InterruptedException {
     // Append is not supported in LocalFileSystem. HDFS needs to be setup.
-    MiniClusterUtil.setUp();
+    hdfsTestService = new HdfsTestService();
+    fs = hdfsTestService.start(true).getFileSystem();
   }
 
   @AfterAll
   public static void tearDownClass() {
-    MiniClusterUtil.shutdown();
+    hdfsTestService.stop();
   }
 
   @BeforeEach
   public void setUp() throws IOException, InterruptedException {
-    this.fs = MiniClusterUtil.fileSystem;
-    jobConf = new JobConf();
-    hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
     assertTrue(fs.mkdirs(new Path(tempDir.toAbsolutePath().toString())));
-    HoodieTestUtils.init(MiniClusterUtil.configuration, 
tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
   }
 
   @Test
@@ -103,7 +99,7 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
     Configuration conf = new Configuration();
     // initial commit
     Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
-    HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), 
HoodieTableType.MERGE_ON_READ);
+    HoodieTestUtils.init(conf, tempDir.toAbsolutePath().toString(), 
HoodieTableType.MERGE_ON_READ);
     String commitTime = "100";
     final int numRecords = 1000;
     // Create 3 partitions, each partition holds one parquet file and 1000 
records
@@ -133,7 +129,7 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
 
     Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
     Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
-    jobConf = new JobConf(conf);
+    JobConf jobConf = new JobConf(conf);
     // Add three partition path to InputPaths
     Path[] partitionDirArray = new Path[partitionDirs.size()];
     partitionDirs.stream().map(p -> new 
Path(p.getPath())).collect(Collectors.toList()).toArray(partitionDirArray);
@@ -186,7 +182,7 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
     Configuration conf = new Configuration();
     // initial commit
     Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
-    HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), 
HoodieTableType.MERGE_ON_READ);
+    HoodieTestUtils.init(conf, tempDir.toAbsolutePath().toString(), 
HoodieTableType.MERGE_ON_READ);
     String commitTime = "100";
     final int numRecords = 1000;
     // Create 3 parquet files with 1000 records each
@@ -219,7 +215,7 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
 
     Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
     Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
-    jobConf = new JobConf(conf);
+    JobConf jobConf = new JobConf(conf);
     // Add the paths
     FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
     jobConf.set(HAS_MAP_WORK, "true");
@@ -258,7 +254,7 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
     Configuration conf = new Configuration();
     // initial commit
     Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
-    HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), 
HoodieTableType.MERGE_ON_READ);
+    HoodieTestUtils.init(conf, tempDir.toAbsolutePath().toString(), 
HoodieTableType.MERGE_ON_READ);
     String commitTime = "100";
     final int numRecords = 1000;
     // Create 3 parquet files with 1000 records each
@@ -292,7 +288,7 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
     mrwork.getMapWork().setPathToAliases(tableAlias);
     Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
     Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
-    jobConf = new JobConf(conf);
+    JobConf jobConf = new JobConf(conf);
     // Add the paths
     FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
     jobConf.set(HAS_MAP_WORK, "true");
@@ -328,7 +324,7 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
     Configuration conf = new Configuration();
     // initial commit
     Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
-    HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), 
HoodieTableType.MERGE_ON_READ);
+    HoodieTestUtils.init(conf, tempDir.toAbsolutePath().toString(), 
HoodieTableType.MERGE_ON_READ);
     String commitTime = "100";
     final int numRecords = 1000;
     // Create 3 parquet files with 1000 records each
@@ -364,7 +360,7 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
     mrwork.getMapWork().setPathToPartitionInfo(pt);
     Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
     Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
-    jobConf = new JobConf(conf);
+    JobConf jobConf = new JobConf(conf);
     // Add the paths
     FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
     jobConf.set(HAS_MAP_WORK, "true");

Reply via email to