This is an automated email from the ASF dual-hosted git repository. satish pushed a commit to branch release-0.12.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 31b6acb973f75e6d27e3e04940db26d9af1ab6db Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Tue Nov 29 00:38:47 2022 +0800 [HUDI-4209] Avoid using HDFS in HoodieClientTestHarness (#7305) --- .../index/bloom/TestFlinkHoodieBloomIndex.java | 2 + .../testutils/HoodieFlinkClientTestHarness.java | 84 ++----------- .../testutils/HoodieFlinkWriteableTestTable.java | 2 +- .../hudi/client/TestFileBasedLockProvider.java | 91 ++++++-------- .../apache/hudi/client/TestHoodieReadClient.java | 37 ++---- .../java/org/apache/hudi/client/TestMultiFS.java | 63 +++++++--- .../hudi/client/TestUpdateSchemaEvolution.java | 3 +- .../TestBulkInsertInternalPartitioner.java | 16 +-- .../commit/TestCopyOnWriteActionExecutor.java | 3 +- .../TestMergeOnReadRollbackActionExecutor.java | 25 +--- .../hudi/table/marker/TestDirectWriteMarkers.java | 13 +- .../hudi/testutils/HoodieClientTestHarness.java | 110 ++++------------- .../org/apache/hudi/common/fs/TestFSUtils.java | 89 +++++++------- .../fs/TestFSUtilsWithRetryWrapperEnable.java | 1 - .../common/functional/TestHoodieLogFormat.java | 62 ++++------ .../TestHoodieLogFormatAppendFailure.java | 8 +- .../hudi/common/table/TestTimelineUtils.java | 14 +-- .../table/view/TestIncrementalFSViewSync.java | 133 ++++++++++----------- .../common/testutils/HoodieCommonTestHarness.java | 25 ++-- .../testutils/minicluster/HdfsTestService.java | 33 ++--- .../testutils/minicluster/MiniClusterUtil.java | 59 --------- .../hive/TestHoodieCombineHiveInputFormat.java | 32 +++-- 22 files changed, 323 insertions(+), 582 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java index d07eff01c27..6dd5a1c27e2 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.data.HoodieListPairData; +import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -92,6 +93,7 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness { private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) { return HoodieWriteConfig.newBuilder().withPath(basePath) + .withEngineType(EngineType.FLINK) .withIndexConfig(HoodieIndexConfig.newBuilder().bloomIndexPruneByRanges(rangePruning) .bloomIndexTreebasedFilter(treeFiltering).bloomIndexBucketizedChecking(bucketizedChecking) .bloomIndexKeysPerBucket(2).build()) diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java index 2f4e9eeddc9..828154931d3 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java @@ -30,69 +30,39 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; -import org.apache.hudi.common.testutils.minicluster.HdfsTestService; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.bloom.TestFlinkHoodieBloomIndex; import org.apache.hudi.table.HoodieTable; -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestInfo; import java.io.IOException; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + /** * The test harness for resource initialization and cleanup. */ -public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implements Serializable { +public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness { protected static final Logger LOG = LogManager.getLogger(HoodieFlinkClientTestHarness.class); - private String testMethodName; - protected transient Configuration hadoopConf = null; - protected transient FileSystem fs; - protected transient MiniClusterWithClientResource flinkCluster = null; - protected transient HoodieFlinkEngineContext context = null; - protected transient ExecutorService executorService; - protected transient HoodieFlinkWriteClient writeClient; - protected transient HoodieTableFileSystemView tableView; + protected Configuration hadoopConf; + protected FileSystem fs; + protected HoodieFlinkEngineContext context; + protected ExecutorService executorService; + protected HoodieFlinkWriteClient writeClient; + protected HoodieTableFileSystemView tableView; protected final FlinkTaskContextSupplier supplier = new FlinkTaskContextSupplier(null); - // dfs - protected transient HdfsTestService hdfsTestService; - protected transient MiniDFSCluster dfsCluster; - protected transient DistributedFileSystem dfs; - - @BeforeEach - public void setTestMethodName(TestInfo testInfo) { - if (testInfo.getTestMethod().isPresent()) { - testMethodName = testInfo.getTestMethod().get().getName(); - } else { - testMethodName = "Unknown"; - } - } - - protected void initFlinkMiniCluster() { - flinkCluster = new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setNumberSlotsPerTaskManager(2) - .setNumberTaskManagers(1) - .build()); - } - protected void initFileSystem() { hadoopConf = new Configuration(); initFileSystemWithConfiguration(hadoopConf); @@ -100,9 +70,7 @@ public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implem } private void initFileSystemWithConfiguration(Configuration configuration) { - if (basePath == null) { - throw new IllegalStateException("The base path has not been initialized."); - } + checkState(basePath != null); fs = FSUtils.getFs(basePath, configuration); if (fs instanceof LocalFileSystem) { LocalFileSystem lfs = (LocalFileSystem) fs; @@ -124,9 +92,7 @@ public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implem } protected void initMetaClient(HoodieTableType tableType) throws IOException { - if (basePath == null) { - throw new IllegalStateException("The base path has not been initialized."); - } + checkState(basePath != null); metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType); } @@ -156,18 +122,10 @@ public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implem cleanupFlinkContexts(); cleanupTestDataGenerator(); cleanupFileSystem(); - cleanupDFS(); cleanupExecutorService(); System.gc(); } - protected void cleanupFlinkMiniCluster() { - if (flinkCluster != null) { - flinkCluster.after(); - flinkCluster = null; - } - } - /** * Simple test sink function. */ @@ -185,7 +143,7 @@ public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implem /** * Cleanups hoodie clients. */ - protected void cleanupClients() throws java.io.IOException { + protected void cleanupClients() { if (metaClient != null) { metaClient = null; } @@ -199,24 +157,6 @@ public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implem } } - /** - * Cleanups the distributed file system. - * - * @throws IOException - */ - protected void cleanupDFS() throws java.io.IOException { - if (hdfsTestService != null) { - hdfsTestService.stop(); - dfsCluster.shutdown(true, true); - hdfsTestService = null; - dfsCluster = null; - dfs = null; - } - // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the - // same JVM - FileSystem.closeAll(); - } - /** * Cleanups the executor service. */ diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java index 2a69e6fd671..cd4096663f3 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java @@ -60,7 +60,7 @@ public class HoodieFlinkWriteableTestTable extends HoodieWriteableTestTable { } public static HoodieFlinkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) { - return new HoodieFlinkWriteableTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient, schema, filter); + return new HoodieFlinkWriteableTestTable(metaClient.getBasePathV2().toString(), metaClient.getRawFs(), metaClient, schema, filter); } public static HoodieFlinkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java index 208e9cd62e7..e81a85c5978 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java @@ -19,22 +19,17 @@ package org.apache.hudi.client; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; import org.apache.hudi.common.config.LockConfiguration; -import org.apache.hudi.common.testutils.minicluster.HdfsTestService; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieLockException; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.io.IOException; +import java.nio.file.Path; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -43,93 +38,75 @@ import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PA import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestFileBasedLockProvider { - private static HdfsTestService hdfsTestService; - private static MiniDFSCluster dfsCluster; - private static LockConfiguration lockConfiguration; - private static Configuration hadoopConf; - @BeforeAll - public static void setup() throws IOException { - hdfsTestService = new HdfsTestService(); - dfsCluster = hdfsTestService.start(true); - hadoopConf = dfsCluster.getFileSystem().getConf(); + @TempDir + Path tempDir; + String basePath; + LockConfiguration lockConfiguration; + Configuration hadoopConf; + @BeforeEach + public void setUp() throws IOException { + basePath = tempDir.toUri().getPath(); Properties properties = new Properties(); - properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/"); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath); properties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1"); properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000"); properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "1000"); properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3"); lockConfiguration = new LockConfiguration(properties); - } - - @AfterAll - public static void cleanUpAfterAll() throws IOException { - Path workDir = dfsCluster.getFileSystem().getWorkingDirectory(); - FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf()); - fs.delete(new Path("/tmp"), true); - if (hdfsTestService != null) { - hdfsTestService.stop(); - hdfsTestService = null; - } - } - - @AfterEach - public void cleanUpAfterEach() throws IOException { - Path workDir = dfsCluster.getFileSystem().getWorkingDirectory(); - FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf()); - fs.delete(new Path("/tmp/lock"), true); + hadoopConf = new Configuration(); } @Test public void testAcquireLock() { FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); - Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() - .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); fileBasedLockProvider.unlock(); } @Test public void testAcquireLockWithDefaultPath() { lockConfiguration.getConfig().remove(FILESYSTEM_LOCK_PATH_PROP_KEY); - lockConfiguration.getConfig().setProperty(HoodieWriteConfig.BASE_PATH.key(), "/tmp/"); + lockConfiguration.getConfig().setProperty(HoodieWriteConfig.BASE_PATH.key(), basePath); FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); - Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() - .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); fileBasedLockProvider.unlock(); - lockConfiguration.getConfig().setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/"); + lockConfiguration.getConfig().setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath); } @Test public void testUnLock() { FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); - Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() - .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); fileBasedLockProvider.unlock(); - Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() - .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); } @Test public void testReentrantLock() { FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); - Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() - .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); - Assertions.assertFalse(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() - .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + assertFalse(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); fileBasedLockProvider.unlock(); } @Test public void testUnlockWithoutLock() { - try { + assertDoesNotThrow(() -> { FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); fileBasedLockProvider.unlock(); - } catch (HoodieLockException e) { - Assertions.fail(); - } + }); } - } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java index bc1d6e03c04..eab9f0937c7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.spark.api.java.JavaPairRDD; @@ -33,7 +32,6 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -50,16 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; @SuppressWarnings("unchecked") public class TestHoodieReadClient extends HoodieClientTestBase { - @Override - protected void initPath() { - try { - java.nio.file.Path basePath = tempDir.resolve("dataset"); - java.nio.file.Files.createDirectories(basePath); - this.basePath = basePath.toUri().toString(); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); - } - } + private static final int PARALLELISM = 2; /** * Test ReadFilter API after writing new records using HoodieWriteClient.insert. @@ -91,15 +80,13 @@ public class TestHoodieReadClient extends HoodieClientTestBase { @Test public void testReadFilterExistAfterBulkInsertPrepped() throws Exception { testReadFilterExist(getConfigBuilder().withBulkInsertParallelism(1).build(), - (writeClient, recordRDD, instantTime) -> { - return writeClient.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()); - }); + (writeClient, recordRDD, instantTime) -> writeClient.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty())); } @Test public void testReadROViewFailsWithoutSqlContext() { SparkRDDReadClient readClient = new SparkRDDReadClient(context, getConfig()); - JavaRDD<HoodieKey> recordsRDD = jsc.parallelize(new ArrayList<>(), 1); + JavaRDD<HoodieKey> recordsRDD = jsc.parallelize(new ArrayList<>(), PARALLELISM); assertThrows(IllegalStateException.class, () -> { readClient.readROView(recordsRDD, 1); }); @@ -115,18 +102,18 @@ public class TestHoodieReadClient extends HoodieClientTestBase { */ private void testReadFilterExist(HoodieWriteConfig config, Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn) throws Exception { - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { SparkRDDReadClient readClient = getHoodieReadClient(config.getBasePath()); String newCommitTime = writeClient.startCommit(); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); - JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1); + JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, PARALLELISM); JavaRDD<HoodieRecord> filteredRDD = readClient.filterExists(recordsRDD); // Should not find any files assertEquals(100, filteredRDD.collect().size()); - JavaRDD<HoodieRecord> smallRecordsRDD = jsc.parallelize(records.subList(0, 75), 1); + JavaRDD<HoodieRecord> smallRecordsRDD = jsc.parallelize(records.subList(0, 75), PARALLELISM); // We create three base file, each having one record. (3 different partitions) List<WriteStatus> statuses = writeFn.apply(writeClient, smallRecordsRDD, newCommitTime).collect(); // Verify there are no errors @@ -146,14 +133,14 @@ public class TestHoodieReadClient extends HoodieClientTestBase { assertEquals(75, keysWithPaths.count()); // verify rows match inserted records - Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 1); + Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, PARALLELISM); assertEquals(75, rows.count()); JavaRDD<HoodieKey> keysWithoutPaths = keyToPathPair.filter(keyPath -> !keyPath._2.isPresent()) .map(keyPath -> keyPath._1); assertThrows(AnalysisException.class, () -> { - anotherReadClient.readROView(keysWithoutPaths, 1); + anotherReadClient.readROView(keysWithoutPaths, PARALLELISM); }); // Actual tests of getPendingCompactions method are in TestAsyncCompaction @@ -184,7 +171,7 @@ public class TestHoodieReadClient extends HoodieClientTestBase { */ @Test public void testTagLocationAfterBulkInsert() throws Exception { - testTagLocation(getConfigBuilder().withBulkInsertParallelism(1).build(), SparkRDDWriteClient::bulkInsert, + testTagLocation(getConfigBuilder().withBulkInsertParallelism(PARALLELISM).build(), SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false); } @@ -194,7 +181,7 @@ public class TestHoodieReadClient extends HoodieClientTestBase { @Test public void testTagLocationAfterBulkInsertPrepped() throws Exception { testTagLocation( - getConfigBuilder().withBulkInsertParallelism(1).build(), (writeClient, recordRDD, instantTime) -> writeClient + getConfigBuilder().withBulkInsertParallelism(PARALLELISM).build(), (writeClient, recordRDD, instantTime) -> writeClient .bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()), SparkRDDWriteClient::upsertPreppedRecords, true); } @@ -223,7 +210,7 @@ public class TestHoodieReadClient extends HoodieClientTestBase { // since they have been modified in the DAG JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream) - .map(record -> new HoodieAvroRecord(record.getKey(), null)).collect(Collectors.toList())); + .map(record -> new HoodieAvroRecord(record.getKey(), null)).collect(Collectors.toList()), PARALLELISM); // Should have 100 records in table (check using Index), all in locations marked at commit SparkRDDReadClient readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath()); List<HoodieRecord> taggedRecords = readClient.tagLocation(recordRDD).collect(); @@ -239,7 +226,7 @@ public class TestHoodieReadClient extends HoodieClientTestBase { numRecords, 200, 2); recordRDD = jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream) - .map(record -> new HoodieAvroRecord(record.getKey(), null)).collect(Collectors.toList())); + .map(record -> new HoodieAvroRecord(record.getKey(), null)).collect(Collectors.toList()), PARALLELISM); // Index should be able to locate all updates in correct locations. readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath()); taggedRecords = readClient.tagLocation(recordRDD).collect(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java index df0fed027ce..2c05e528086 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.testutils.minicluster.HdfsTestService; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; @@ -35,15 +36,20 @@ import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -51,15 +57,34 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class TestMultiFS extends HoodieClientTestHarness { private static final Logger LOG = LogManager.getLogger(TestMultiFS.class); - private String tablePath = "file:///tmp/hoodie/sample-table"; - protected String tableName = "hoodie_rt"; - private String tableType = HoodieTableType.COPY_ON_WRITE.name(); + private static final String TABLE_TYPE = HoodieTableType.COPY_ON_WRITE.name(); + private static final String TABLE_NAME = "hoodie_rt"; + private static HdfsTestService hdfsTestService; + private static FileSystem dfs; + private String tablePath; + private String dfsBasePath; + + @BeforeAll + public static void setUpAll() throws IOException { + hdfsTestService = new HdfsTestService(); + MiniDFSCluster dfsCluster = hdfsTestService.start(true); + dfs = dfsCluster.getFileSystem(); + } + + @AfterAll + public static void cleanUpAll() { + hdfsTestService.stop(); + } @BeforeEach public void setUp() throws Exception { + initPath(); initSparkContexts(); - initDFS(); initTestDataGenerator(); + tablePath = baseUri + "/sample-table"; + dfsBasePath = dfs.getWorkingDirectory().toString(); + dfs.mkdirs(new Path(dfsBasePath)); + hadoopConf = dfs.getConf(); } @AfterEach @@ -69,7 +94,7 @@ public class TestMultiFS extends HoodieClientTestHarness { protected HoodieWriteConfig getHoodieWriteConfig(String basePath) { return HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true) - .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(TABLE_NAME) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .build(); } @@ -78,8 +103,8 @@ public class TestMultiFS extends HoodieClientTestHarness { public void readLocalWriteHDFS() throws Exception { // Initialize table and filesystem HoodieTableMetaClient.withPropertyBuilder() - .setTableType(tableType) - .setTableName(tableName) + .setTableType(TABLE_TYPE) + .setTableName(TABLE_NAME) .setPayloadClass(HoodieAvroPayload.class) .initTable(hadoopConf, dfsBasePath); @@ -88,8 +113,8 @@ public class TestMultiFS extends HoodieClientTestHarness { HoodieWriteConfig localConfig = getHoodieWriteConfig(tablePath); HoodieTableMetaClient.withPropertyBuilder() - .setTableType(tableType) - .setTableName(tableName) + .setTableType(TABLE_TYPE) + .setTableName(TABLE_NAME) .setPayloadClass(HoodieAvroPayload.class) .setRecordKeyFields(localConfig.getProps().getProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())) .setPartitionFields(localConfig.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) @@ -102,8 +127,8 @@ public class TestMultiFS extends HoodieClientTestHarness { // Write generated data to hdfs (only inserts) String readCommitTime = hdfsWriteClient.startCommit(); LOG.info("Starting commit " + readCommitTime); - List<HoodieRecord> records = dataGen.generateInserts(readCommitTime, 100); - JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); + List<HoodieRecord> records = dataGen.generateInserts(readCommitTime, 10); + JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 2); hdfsWriteClient.upsert(writeRecords, readCommitTime); // Read from hdfs @@ -111,19 +136,19 @@ public class TestMultiFS extends HoodieClientTestHarness { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(dfsBasePath).build(); HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); Dataset<Row> readRecords = HoodieClientTestUtils.readCommit(dfsBasePath, sqlContext, timeline, readCommitTime); - assertEquals(readRecords.count(), records.size(), "Should contain 100 records"); + assertEquals(readRecords.count(), records.size()); // Write to local HoodieTableMetaClient.withPropertyBuilder() - .setTableType(tableType) - .setTableName(tableName) - .setPayloadClass(HoodieAvroPayload.class) - .initTable(hadoopConf, tablePath); + .setTableType(TABLE_TYPE) + .setTableName(TABLE_NAME) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(hadoopConf, tablePath); String writeCommitTime = localWriteClient.startCommit(); LOG.info("Starting write commit " + writeCommitTime); - List<HoodieRecord> localRecords = dataGen.generateInserts(writeCommitTime, 100); - JavaRDD<HoodieRecord> localWriteRecords = jsc.parallelize(localRecords, 1); + List<HoodieRecord> localRecords = dataGen.generateInserts(writeCommitTime, 10); + JavaRDD<HoodieRecord> localWriteRecords = jsc.parallelize(localRecords, 2); LOG.info("Writing to path: " + tablePath); localWriteClient.upsert(localWriteRecords, writeCommitTime); @@ -133,7 +158,7 @@ public class TestMultiFS extends HoodieClientTestHarness { timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); Dataset<Row> localReadRecords = HoodieClientTestUtils.readCommit(tablePath, sqlContext, timeline, writeCommitTime); - assertEquals(localReadRecords.count(), localRecords.size(), "Should contain 100 records"); + assertEquals(localReadRecords.count(), localRecords.size()); } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java index a5926196ea3..d49014c69e7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java @@ -48,6 +48,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -59,7 +60,7 @@ import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResou import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; -public class TestUpdateSchemaEvolution extends HoodieClientTestHarness { +public class TestUpdateSchemaEvolution extends HoodieClientTestHarness implements Serializable { @BeforeEach public void setUp() throws Exception { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java index 4d2f5e0c5e2..5cb7859888c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java @@ -18,8 +18,6 @@ package org.apache.hudi.execution.bulkinsert; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -28,6 +26,9 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.junit.jupiter.api.Test; @@ -36,6 +37,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -47,7 +49,7 @@ import java.util.stream.Stream; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.junit.jupiter.api.Assertions.assertEquals; -public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { +public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase implements Serializable { private static final Comparator<HoodieRecord<? extends HoodieRecordPayload>> KEY_COMPARATOR = Comparator.comparing(o -> (o.getPartitionPath() + "+" + o.getRecordKey())); @@ -70,8 +72,7 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { return records.map(record -> record.getPartitionPath()).countByValue(); } - private static JavaRDD<HoodieRecord> generateTripleTestRecordsForBulkInsert(JavaSparkContext jsc) - throws Exception { + private static JavaRDD<HoodieRecord> generateTripleTestRecordsForBulkInsert(JavaSparkContext jsc) { return generateTestRecordsForBulkInsert(jsc).union(generateTestRecordsForBulkInsert(jsc)) .union(generateTestRecordsForBulkInsert(jsc)); } @@ -135,8 +136,7 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { @ParameterizedTest(name = "[{index}] {0}") @MethodSource("configParams") public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode, - boolean isGloballySorted, boolean isLocallySorted) - throws Exception { + boolean isGloballySorted, boolean isLocallySorted) { JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc); JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc); testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode), @@ -146,7 +146,7 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { } @Test - public void testCustomColumnSortPartitioner() throws Exception { + public void testCustomColumnSortPartitioner() { String sortColumnString = "rider"; String[] sortColumns = sortColumnString.split(","); Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator = getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index ebe041d638d..ffa93b638f5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -74,6 +74,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.File; +import java.io.Serializable; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; @@ -95,7 +96,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { +public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase implements Serializable { private static final Logger LOG = LogManager.getLogger(TestCopyOnWriteActionExecutor.class); private static final Schema SCHEMA = getSchemaFromResource(TestCopyOnWriteActionExecutor.class, "/exampleSchema.avsc"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java index 1c4de34e5ee..c952c77fc4e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java @@ -77,7 +77,6 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT public void setUp() throws Exception { initPath(); initSparkContexts(); - //just generate tow partitions dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}); initFileSystem(); initMetaClient(); @@ -89,7 +88,7 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT } @ParameterizedTest - @ValueSource(booleans = {true}) + @ValueSource(booleans = {false, true}) public void testMergeOnReadRollbackActionExecutor(boolean isUsingMarkers) throws IOException { //1. prepare data and assert data result List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>(); @@ -158,11 +157,9 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT @Test public void testRollbackForCanIndexLogFile() throws IOException { - cleanupResources(); - setUpDFS(); //1. prepare data and assert data result //just generate one partitions - dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH}); + dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH}); HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2) .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) @@ -177,7 +174,7 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()).withRollbackUsingMarkers(false).withAutoCommit(false).build(); //1. prepare data - new HoodieTestDataGenerator().writePartitionMetadata(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH}, basePath); + new HoodieTestDataGenerator().writePartitionMetadata(fs, new String[] {DEFAULT_FIRST_PARTITION_PATH}, basePath); SparkRDDWriteClient client = getHoodieWriteClient(cfg); // Write 1 (only inserts) String newCommitTime = "001"; @@ -231,8 +228,8 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT .getInstantDetails(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime)) .get(), HoodieCommitMetadata.class); - assert commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_FIRST_PARTITION_PATH); - assert commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_SECOND_PARTITION_PATH); + assertTrue(commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_FIRST_PARTITION_PATH)); + assertTrue(commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_SECOND_PARTITION_PATH)); List<HoodieWriteStat> hoodieWriteStatOptionList = commitMetadata.getPartitionToWriteStats().get(DEFAULT_FIRST_PARTITION_PATH); // Both update and insert record should enter same existing fileGroup due to small file handling assertEquals(1, hoodieWriteStatOptionList.size()); @@ -285,8 +282,7 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT * Test Cases for rolling back when there is no base file. */ @Test - public void testRollbackWhenFirstCommitFail() throws Exception { - + public void testRollbackWhenFirstCommitFail() { HoodieWriteConfig config = HoodieWriteConfig.newBuilder() .withRollbackUsingMarkers(false) .withPath(basePath).build(); @@ -296,13 +292,4 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT client.rollback("001"); } } - - private void setUpDFS() throws IOException { - initDFS(); - initSparkContexts(); - //just generate two partitions - dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}); - initFileSystem(); - initDFSMetaClient(); - } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java index fa6df3ba73d..0e9f990048e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import java.io.IOException; +import java.nio.file.Paths; import java.util.List; import java.util.stream.Collectors; @@ -46,10 +47,10 @@ public class TestDirectWriteMarkers extends TestWriteMarkersBase { this.jsc = new JavaSparkContext( HoodieClientTestUtils.getSparkConfForTest(TestDirectWriteMarkers.class.getName())); this.context = new HoodieSparkEngineContext(jsc); - this.fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf()); - this.markerFolderPath = new Path(metaClient.getMarkerFolderPath("000")); + this.fs = FSUtils.getFs(metaClient.getBasePathV2().toString(), metaClient.getHadoopConf()); + this.markerFolderPath = new Path(Paths.get(metaClient.getMarkerFolderPath("000")).toUri()); this.writeMarkers = new DirectWriteMarkers( - fs, metaClient.getBasePath(), markerFolderPath.toString(), "000"); + fs, metaClient.getBasePathV2().toString(), markerFolderPath.toString(), "000"); } @AfterEach @@ -65,11 +66,11 @@ public class TestDirectWriteMarkers extends TestWriteMarkersBase { .sorted().collect(Collectors.toList()); assertEquals(3, markerFiles.size()); assertIterableEquals(CollectionUtils.createImmutableList( - "file:" + markerFolderPath.toString() + markerFolderPath.toString() + (isTablePartitioned ? "/2020/06/01" : "") + "/file1.marker.MERGE", - "file:" + markerFolderPath.toString() + markerFolderPath.toString() + (isTablePartitioned ? "/2020/06/02" : "") + "/file2.marker.APPEND", - "file:" + markerFolderPath.toString() + markerFolderPath.toString() + (isTablePartitioned ? "/2020/06/03" : "") + "/file3.marker.CREATE"), markerFiles.stream().map(m -> m.getPath().toString()).collect(Collectors.toList()) ); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index a41c62cdcee..9d305777adf 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -47,7 +47,6 @@ import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; -import org.apache.hudi.common.testutils.minicluster.HdfsTestService; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -74,8 +73,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -84,12 +81,10 @@ import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSessionExtensions; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; import java.io.IOException; -import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -118,42 +113,32 @@ import static org.junit.jupiter.api.Assertions.fail; /** * The test harness for resource initialization and cleanup. */ -public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable { +public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness { private static final Logger LOG = LogManager.getLogger(HoodieClientTestHarness.class); - - protected static int timelineServicePort = - FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue(); - private String testMethodName; - protected transient JavaSparkContext jsc = null; - protected transient HoodieSparkEngineContext context = null; - protected transient SparkSession sparkSession = null; - protected transient Configuration hadoopConf = null; - protected transient SQLContext sqlContext; - protected transient FileSystem fs; - protected transient ExecutorService executorService; - protected transient HoodieTableMetaClient metaClient; - protected transient SparkRDDWriteClient writeClient; - protected transient SparkRDDReadClient readClient; - protected transient HoodieTableFileSystemView tableView; - protected transient TimelineService timelineService; - - protected final SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); - - // dfs - protected String dfsBasePath; - protected transient HdfsTestService hdfsTestService; - protected transient MiniDFSCluster dfsCluster; - protected transient DistributedFileSystem dfs; + protected static int timelineServicePort = FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue(); @AfterAll public static void tearDownAll() throws IOException { FileSystem.closeAll(); } - protected Option<Consumer<SparkSessionExtensions>> getSparkSessionExtensionsInjector() { - return Option.empty(); - } + protected JavaSparkContext jsc; + protected HoodieSparkEngineContext context; + protected SparkSession sparkSession; + protected Configuration hadoopConf; + protected SQLContext sqlContext; + protected FileSystem fs; + protected ExecutorService executorService; + protected HoodieTableMetaClient metaClient; + protected SparkRDDWriteClient writeClient; + protected SparkRDDReadClient readClient; + protected HoodieTableFileSystemView tableView; + + protected TimelineService timelineService; + protected final SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); + + private String testMethodName; @BeforeEach public void setTestMethodName(TestInfo testInfo) { @@ -185,11 +170,14 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im cleanupSparkContexts(); cleanupTestDataGenerator(); cleanupFileSystem(); - cleanupDFS(); cleanupExecutorService(); System.gc(); } + protected Option<Consumer<SparkSessionExtensions>> getSparkSessionExtensionsInjector() { + return Option.empty(); + } + /** * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name. * @@ -397,58 +385,6 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im } } - /** - * Initializes a distributed file system and base directory. - * - * @throws IOException - */ - protected void initDFS() throws IOException { - hdfsTestService = new HdfsTestService(); - dfsCluster = hdfsTestService.start(true); - - // Create a temp folder as the base path - dfs = dfsCluster.getFileSystem(); - dfsBasePath = dfs.getWorkingDirectory().toString(); - this.basePath = dfsBasePath; - this.hadoopConf = dfs.getConf(); - dfs.mkdirs(new Path(dfsBasePath)); - } - - /** - * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by - * {@code getTableType()}. - * - * @throws IOException - */ - protected void initDFSMetaClient() throws IOException { - if (dfsBasePath == null) { - throw new IllegalStateException("The base path has not been initialized."); - } - - if (jsc == null) { - throw new IllegalStateException("The Spark context has not been initialized."); - } - metaClient = HoodieTestUtils.init(dfs.getConf(), dfsBasePath, getTableType()); - } - - /** - * Cleanups the distributed file system. - * - * @throws IOException - */ - protected void cleanupDFS() throws IOException { - if (hdfsTestService != null) { - hdfsTestService.stop(); - dfsCluster.shutdown(true, true); - hdfsTestService = null; - dfsCluster = null; - dfs = null; - } - // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the - // same JVM - FileSystem.closeAll(); - } - /** * Initializes executor service with a fixed thread pool. * @@ -713,7 +649,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im List<MetadataPartitionType> enabledPartitionTypes = metadataWriter.getEnabledPartitionTypes(); - Assertions.assertEquals(enabledPartitionTypes.size(), metadataTablePartitions.size()); + assertEquals(enabledPartitionTypes.size(), metadataTablePartitions.size()); Map<String, MetadataPartitionType> partitionTypeMap = enabledPartitionTypes.stream() .collect(Collectors.toMap(MetadataPartitionType::getPartitionPath, Function.identity())); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java index 590c357664e..55b54f2d8d7 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java @@ -41,7 +41,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; -import java.net.URI; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; @@ -66,11 +65,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; */ public class TestFSUtils extends HoodieCommonTestHarness { - private final long minRollbackToKeep = 10; - private final long minCleanToKeep = 10; - - private static String TEST_WRITE_TOKEN = "1-0-1"; - public static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension(); + private static final String TEST_WRITE_TOKEN = "1-0-1"; + private static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension(); @Rule public final EnvironmentVariables environmentVariables = new EnvironmentVariables(); @@ -78,7 +74,6 @@ public class TestFSUtils extends HoodieCommonTestHarness { @BeforeEach public void setUp() throws IOException { initMetaClient(); - basePath = "file:" + basePath; } @AfterEach @@ -100,7 +95,6 @@ public class TestFSUtils extends HoodieCommonTestHarness { assertEquals(FSUtils.maskWithoutFileId(instantTime, taskPartitionId), "*_" + taskPartitionId + "_" + instantTime + BASE_FILE_EXTENSION); } - @Test /** * Tests if process Files return only paths excluding marker directories Cleaner, Rollback and compaction-scheduling * logic was recursively processing all subfolders including that of ".hoodie" when looking for partition-paths. This @@ -108,6 +102,7 @@ public class TestFSUtils extends HoodieCommonTestHarness { * of ".hoodie" folder) is deleted underneath by compactor. This code tests the fix by ensuring ".hoodie" and their * subfolders are never processed. */ + @Test public void testProcessFiles() throws Exception { // All directories including marker dirs. List<String> folders = @@ -122,9 +117,9 @@ public class TestFSUtils extends HoodieCommonTestHarness { // Files inside partitions and marker directories List<String> files = Stream.of("2016/04/15/1_1-0-1_20190528120000", - "2016/05/16/2_1-0-1_20190528120000", - ".hoodie/.temp/2/2016/05/16/2_1-0-1_20190528120000", - ".hoodie/.temp/2/2016/04/15/1_1-0-1_20190528120000") + "2016/05/16/2_1-0-1_20190528120000", + ".hoodie/.temp/2/2016/05/16/2_1-0-1_20190528120000", + ".hoodie/.temp/2/2016/04/15/1_1-0-1_20190528120000") .map(fileName -> fileName + BASE_FILE_EXTENSION) .collect(Collectors.toList()); @@ -322,7 +317,7 @@ public class TestFSUtils extends HoodieCommonTestHarness { assertEquals(LOG_STR, FSUtils.getFileExtensionFromLog(new Path(logFileName))); // create three versions of log file - java.nio.file.Path partitionPath = Paths.get(URI.create(basePath + "/" + partitionStr)); + java.nio.file.Path partitionPath = Paths.get(basePath, partitionStr); Files.createDirectories(partitionPath); String log1 = FSUtils.makeLogFileName(fileId, LOG_EXTENTION, instantTime, 1, writeToken); Files.createFile(partitionPath.resolve(log1)); @@ -345,7 +340,7 @@ public class TestFSUtils extends HoodieCommonTestHarness { assertEquals("file4.parquet", FSUtils.getFileName("file4.parquet", "")); } - private void prepareTestDirectory(FileSystem fileSystem, String rootDir) throws IOException { + private void prepareTestDirectory(FileSystem fileSystem, Path rootDir) throws IOException { // Directory structure // .hoodie/.temp/ // - subdir1 @@ -353,14 +348,13 @@ public class TestFSUtils extends HoodieCommonTestHarness { // - subdir2 // - file2.txt // - file3 - Path dirPath = new Path(rootDir); String subDir1 = rootDir + "/subdir1"; String file1 = subDir1 + "/file1.txt"; String subDir2 = rootDir + "/subdir2"; String file2 = subDir2 + "/file2.txt"; String file3 = rootDir + "/file3.txt"; - String[] dirs = new String[]{rootDir, subDir1, subDir2}; - String[] files = new String[]{file1, file2, file3}; + String[] dirs = new String[] {rootDir.toString(), subDir1, subDir2}; + String[] files = new String[] {file1, file2, file3}; // clean up first cleanUpTestDirectory(fileSystem, rootDir); for (String dir : dirs) { @@ -371,86 +365,85 @@ public class TestFSUtils extends HoodieCommonTestHarness { } } - private void cleanUpTestDirectory(FileSystem fileSystem, String rootDir) throws IOException { - fileSystem.delete(new Path(rootDir), true); + private void cleanUpTestDirectory(FileSystem fileSystem, Path rootDir) throws IOException { + fileSystem.delete(rootDir, true); } @Test public void testDeleteExistingDir() throws IOException { - String rootDir = basePath + "/.hoodie/.temp"; + Path rootDir = getHoodieTempDir(); FileSystem fileSystem = metaClient.getFs(); prepareTestDirectory(fileSystem, rootDir); - Path rootDirPath = new Path(rootDir); - assertTrue(fileSystem.exists(rootDirPath)); + assertTrue(fileSystem.exists(rootDir)); assertTrue(FSUtils.deleteDir( - new HoodieLocalEngineContext(metaClient.getHadoopConf()), fileSystem, rootDirPath, 2)); - assertFalse(fileSystem.exists(rootDirPath)); + new HoodieLocalEngineContext(metaClient.getHadoopConf()), fileSystem, rootDir, 2)); + assertFalse(fileSystem.exists(rootDir)); } @Test public void testDeleteNonExistingDir() throws IOException { - String rootDir = basePath + "/.hoodie/.temp"; + Path rootDir = getHoodieTempDir(); FileSystem fileSystem = metaClient.getFs(); cleanUpTestDirectory(fileSystem, rootDir); assertFalse(FSUtils.deleteDir( - new HoodieLocalEngineContext(metaClient.getHadoopConf()), fileSystem, new Path(rootDir), 2)); + new HoodieLocalEngineContext(metaClient.getHadoopConf()), fileSystem, rootDir, 2)); } @Test public void testDeleteSubDirectoryRecursively() throws IOException { - String rootDir = basePath + "/.hoodie/.temp"; - String subPathStr = rootDir + "/subdir1"; + Path rootDir = getHoodieTempDir(); + Path subDir = new Path(rootDir, "subdir1"); FileSystem fileSystem = metaClient.getFs(); prepareTestDirectory(fileSystem, rootDir); assertTrue(FSUtils.deleteSubPath( - subPathStr, new SerializableConfiguration(fileSystem.getConf()), true)); + subDir.toString(), new SerializableConfiguration(fileSystem.getConf()), true)); } @Test public void testDeleteSubDirectoryNonRecursively() throws IOException { - String rootDir = basePath + "/.hoodie/.temp"; - String subPathStr = rootDir + "/subdir1"; + Path rootDir = getHoodieTempDir(); + Path subDir = new Path(rootDir, "subdir1"); FileSystem fileSystem = metaClient.getFs(); prepareTestDirectory(fileSystem, rootDir); assertThrows( HoodieIOException.class, () -> FSUtils.deleteSubPath( - subPathStr, new SerializableConfiguration(fileSystem.getConf()), false)); + subDir.toString(), new SerializableConfiguration(fileSystem.getConf()), false)); } @Test public void testDeleteSubPathAsFile() throws IOException { - String rootDir = basePath + "/.hoodie/.temp"; - String subPathStr = rootDir + "/file3.txt"; + Path rootDir = getHoodieTempDir(); + Path subDir = new Path(rootDir, "file3.txt"); FileSystem fileSystem = metaClient.getFs(); prepareTestDirectory(fileSystem, rootDir); assertTrue(FSUtils.deleteSubPath( - subPathStr, new SerializableConfiguration(fileSystem.getConf()), false)); + subDir.toString(), new SerializableConfiguration(fileSystem.getConf()), false)); } @Test public void testDeleteNonExistingSubDirectory() throws IOException { - String rootDir = basePath + "/.hoodie/.temp"; - String subPathStr = rootDir + "/subdir10"; + Path rootDir = getHoodieTempDir(); + Path subDir = new Path(rootDir, "subdir10"); FileSystem fileSystem = metaClient.getFs(); cleanUpTestDirectory(fileSystem, rootDir); assertFalse(FSUtils.deleteSubPath( - subPathStr, new SerializableConfiguration(fileSystem.getConf()), true)); + subDir.toString(), new SerializableConfiguration(fileSystem.getConf()), true)); } @Test public void testParallelizeSubPathProcessWithExistingDir() throws IOException { - String rootDir = basePath + "/.hoodie/.temp"; + Path rootDir = getHoodieTempDir(); FileSystem fileSystem = metaClient.getFs(); prepareTestDirectory(fileSystem, rootDir); Map<String, List<String>> result = FSUtils.parallelizeSubPathProcess( - new HoodieLocalEngineContext(fileSystem.getConf()), fileSystem, new Path(rootDir), 2, + new HoodieLocalEngineContext(fileSystem.getConf()), fileSystem, rootDir, 2, fileStatus -> !fileStatus.getPath().getName().contains("1"), pairOfSubPathAndConf -> { Path subPath = new Path(pairOfSubPathAndConf.getKey()); @@ -478,18 +471,22 @@ public class TestFSUtils extends HoodieCommonTestHarness { @Test public void testGetFileStatusAtLevel() throws IOException { - String rootDir = basePath + "/.hoodie/.temp"; + Path hoodieTempDir = getHoodieTempDir(); FileSystem fileSystem = metaClient.getFs(); - prepareTestDirectory(fileSystem, rootDir); + prepareTestDirectory(fileSystem, hoodieTempDir); List<FileStatus> fileStatusList = FSUtils.getFileStatusAtLevel( new HoodieLocalEngineContext(fileSystem.getConf()), fileSystem, - new Path(basePath), 3, 2); + new Path(baseUri), 3, 2); assertEquals(CollectionUtils.createImmutableSet( - basePath + "/.hoodie/.temp/subdir1/file1.txt", - basePath + "/.hoodie/.temp/subdir2/file2.txt"), + new Path(baseUri.toString(), ".hoodie/.temp/subdir1/file1.txt"), + new Path(baseUri.toString(), ".hoodie/.temp/subdir2/file2.txt")), fileStatusList.stream() - .map(fileStatus -> fileStatus.getPath().toString()) - .filter(filePath -> filePath.endsWith(".txt")) + .map(FileStatus::getPath) + .filter(filePath -> filePath.getName().endsWith(".txt")) .collect(Collectors.toSet())); } + + private Path getHoodieTempDir() { + return new Path(baseUri.toString(), ".hoodie/.temp"); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java index aa976cf1127..90e78bccbc5 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java @@ -54,7 +54,6 @@ public class TestFSUtilsWithRetryWrapperEnable extends TestFSUtils { @BeforeEach public void setUp() throws IOException { initMetaClient(); - basePath = "file:" + basePath; FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().withFileSystemActionRetryEnabled(true).build(); maxRetryIntervalMs = fileSystemRetryConfig.getMaxRetryIntervalMs(); maxRetryNumbers = fileSystemRetryConfig.getMaxRetryNumbers(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index d6fa6944117..b969550f857 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -46,7 +46,7 @@ import org.apache.hudi.common.testutils.HadoopMapRedUtils; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; -import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil; +import org.apache.hudi.common.testutils.minicluster.HdfsTestService; import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ExternalSpillableMap; @@ -60,6 +60,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -68,6 +69,8 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; @@ -107,6 +110,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { private static final HoodieLogBlockType DEFAULT_DATA_BLOCK_TYPE = HoodieLogBlockType.AVRO_DATA_BLOCK; private static final int BUFFER_SIZE = 4096; + private static HdfsTestService hdfsTestService; private static FileSystem fs; private Path partitionPath; private String spillableBasePath; @@ -114,22 +118,22 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @BeforeAll public static void setUpClass() throws IOException, InterruptedException { // Append is not supported in LocalFileSystem. HDFS needs to be setup. - MiniClusterUtil.setUp(); - fs = MiniClusterUtil.fileSystem; + hdfsTestService = new HdfsTestService(); + fs = hdfsTestService.start(true).getFileSystem(); } @AfterAll public static void tearDownClass() { - MiniClusterUtil.shutdown(); - fs = null; + hdfsTestService.stop(); } @BeforeEach - public void setUp() throws IOException, InterruptedException { - this.basePath = tempDir.toUri().getPath(); - this.partitionPath = new Path(basePath, "partition_path"); - this.spillableBasePath = new Path(basePath, ".spillable_path").toUri().getPath(); - HoodieTestUtils.init(MiniClusterUtil.configuration, basePath, HoodieTableType.MERGE_ON_READ); + public void setUp(TestInfo testInfo) throws IOException, InterruptedException { + Path workDir = fs.getWorkingDirectory(); + basePath = new Path(workDir.toString(), testInfo.getDisplayName() + System.currentTimeMillis()).toString(); + partitionPath = new Path(basePath, "partition_path"); + spillableBasePath = new Path(workDir.toString(), ".spillable_path").toString(); + HoodieTestUtils.init(fs.getConf(), basePath, HoodieTableType.MERGE_ON_READ); } @Test @@ -311,43 +315,17 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { }, "getCurrentSize should fail after the logAppender is closed"); } - /* - * This is actually a test on concurrent append and not recovery lease. Commenting this out. - * https://issues.apache.org/jira/browse/HUDI-117 - */ - - /** - * @Test public void testLeaseRecovery() throws IOException, URISyntaxException, InterruptedException { Writer writer - * = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) - * .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") - * .overBaseCommit("100").withFs(fs).build(); List<IndexedRecord> records = - * SchemaTestUtil.generateTestRecords(0, 100); Map<HoodieLogBlock.HeaderMetadataType, String> header = - * Maps.newHashMap(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); - * header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); HoodieAvroDataBlock - * dataBlock = new HoodieAvroDataBlock(records, header); writer = writer.appendBlock(dataBlock); long size1 = - * writer.getCurrentSize(); // do not close this writer - this simulates a data note appending to a log dying - * without closing the file // writer.close(); - * <p> - * writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) - * .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1").overBaseCommit("100") - * .withFs(fs).build(); records = SchemaTestUtil.generateTestRecords(0, 100); - * header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); dataBlock = new - * HoodieAvroDataBlock(records, header); writer = writer.appendBlock(dataBlock); long size2 = - * writer.getCurrentSize(); assertTrue("We just wrote a new block - size2 should be > size1", size2 > size1); - * assertEquals("Write should be auto-flushed. The size reported by FileStatus and the writer should match", - * size2, fs.getFileStatus(writer.getLogFile().getPath()).getLen()); writer.close(); } - */ - @Test - public void testAppendNotSupported() throws IOException, URISyntaxException, InterruptedException { + public void testAppendNotSupported(@TempDir java.nio.file.Path tempDir) throws IOException, URISyntaxException, InterruptedException { // Use some fs like LocalFileSystem, that does not support appends - Path localPartitionPath = new Path("file://" + partitionPath); - FileSystem localFs = FSUtils.getFs(localPartitionPath.toString(), HoodieTestUtils.getDefaultHadoopConf()); - Path testPath = new Path(localPartitionPath, "append_test"); + Path localTempDir = new Path(tempDir.toUri()); + FileSystem localFs = FSUtils.getFs(localTempDir.toString(), HoodieTestUtils.getDefaultHadoopConf()); + assertTrue(localFs instanceof LocalFileSystem); + Path testPath = new Path(localTempDir, "append_test"); localFs.mkdirs(testPath); // Some data & append two times. - List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100); + List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 5); Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java index e1b761e50d0..d18732dfaae 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java @@ -26,7 +26,6 @@ import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.testutils.SchemaTestUtil; -import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; @@ -56,11 +55,6 @@ import java.util.concurrent.TimeoutException; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema; import static org.junit.jupiter.api.Assertions.assertNotEquals; -/** - * This class is intentionally using a different way of setting up the MiniDFSCluster and not relying on - * {@link MiniClusterUtil} to reproduce append() issue : https://issues.apache.org/jira/browse/HDFS-6325 Reference : - * https://issues.apache.org/jira/secure/attachment/12645053/HDFS-6325.patch. - */ public class TestHoodieLogFormatAppendFailure { private static File baseDir; @@ -69,7 +63,7 @@ public class TestHoodieLogFormatAppendFailure { @BeforeAll public static void setUpClass() throws IOException { // NOTE : The MiniClusterDFS leaves behind the directory under which the cluster was created - baseDir = new File("/tmp/" + UUID.randomUUID().toString()); + baseDir = new File("/tmp/" + UUID.randomUUID()); FileUtil.fullyDelete(baseDir); // Append is not supported in LocalFileSystem. HDFS needs to be setup. Configuration conf = new Configuration(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index e66a1300173..842e7069ec1 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -133,20 +133,20 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { // verify modified partitions included cleaned data List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); assertEquals(5, partitions.size()); - assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4", "5"})); + assertEquals(partitions, Arrays.asList("0", "2", "3", "4", "5")); partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4")); assertEquals(4, partitions.size()); - assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4"})); + assertEquals(partitions, Arrays.asList("0", "2", "3", "4")); // verify only commit actions partitions = TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); assertEquals(4, partitions.size()); - assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"})); + assertEquals(partitions, Arrays.asList("2", "3", "4", "5")); partitions = TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4")); assertEquals(3, partitions.size()); - assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"})); + assertEquals(partitions, Arrays.asList("2", "3", "4")); } @Test @@ -194,10 +194,10 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { // verify modified partitions included cleaned data List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); - assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"})); + assertEquals(partitions, Arrays.asList("2", "3", "4", "5")); partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4")); - assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"})); + assertEquals(partitions, Arrays.asList("2", "3", "4")); } @Test @@ -354,4 +354,4 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { return TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata); } -} \ No newline at end of file +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java index d0ec6af5f76..3f862242ee5 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java @@ -59,7 +59,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -88,17 +87,15 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { private static final Logger LOG = LogManager.getLogger(TestIncrementalFSViewSync.class); private static final int NUM_FILE_IDS_PER_PARTITION = 10; - - private static String TEST_WRITE_TOKEN = "1-0-1"; - - private final List<String> partitions = Arrays.asList("2018/01/01", "2018/01/02", "2019/03/01"); - private final List<String> fileIdsPerPartition = + private static final String TEST_WRITE_TOKEN = "1-0-1"; + private static final List<String> PARTITIONS = Arrays.asList("2018/01/01", "2018/01/02", "2019/03/01"); + private static final List<String> FILE_IDS_PER_PARTITION = IntStream.range(0, NUM_FILE_IDS_PER_PARTITION).mapToObj(x -> UUID.randomUUID().toString()).collect(Collectors.toList()); @BeforeEach public void init() throws IOException { initMetaClient(); - for (String p : partitions) { + for (String p : PARTITIONS) { Files.createDirectories(Paths.get(basePath, p)); } refreshFsView(); @@ -113,7 +110,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { public void testEmptyPartitionsAndTimeline() throws IOException { SyncableFileSystemView view = getFileSystemView(metaClient); assertFalse(view.getLastInstant().isPresent()); - partitions.forEach(p -> assertEquals(0, view.getLatestFileSlices(p).count())); + PARTITIONS.forEach(p -> assertEquals(0, view.getLatestFileSlices(p).count())); view.close(); } @@ -191,7 +188,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { assertEquals("11", view.getLastInstant().get().getTimestamp()); assertEquals(State.COMPLETED, view.getLastInstant().get().getState()); assertEquals(HoodieTimeline.COMMIT_ACTION, view.getLastInstant().get().getAction()); - partitions.forEach(p -> assertEquals(0, view.getLatestFileSlices(p).count())); + PARTITIONS.forEach(p -> assertEquals(0, view.getLatestFileSlices(p).count())); metaClient.reloadActiveTimeline(); SyncableFileSystemView newView = getFileSystemView(metaClient); @@ -234,7 +231,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { assertEquals("11", view.getLastInstant().get().getTimestamp()); assertEquals(State.COMPLETED, view.getLastInstant().get().getState()); assertEquals(HoodieTimeline.COMMIT_ACTION, view.getLastInstant().get().getAction()); - partitions.forEach(p -> assertEquals(0, view.getLatestFileSlices(p).count())); + PARTITIONS.forEach(p -> assertEquals(0, view.getLatestFileSlices(p).count())); metaClient.reloadActiveTimeline(); SyncableFileSystemView newView = getFileSystemView(metaClient); @@ -250,7 +247,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { // restore instants in reverse order till we rollback all replace instants testRestore(view, Arrays.asList("15", "16"), instantsToFiles, Arrays.asList(getHoodieReplaceInstant("14"), getHoodieReplaceInstant("13")), - "17", true, 1, fileIdsPerPartition.size()); + "17", true, 1, FILE_IDS_PER_PARTITION.size()); // clear files from inmemory view for replaced instants instantsToFiles.remove("14"); @@ -277,8 +274,8 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { metaClient.reloadActiveTimeline(); SyncableFileSystemView newView = getFileSystemView(metaClient); // 1 fileId is replaced for every partition, so subtract partitions.size() - expectedSlicesPerPartition = expectedSlicesPerPartition + fileIdsPerPartition.size() - 1; - areViewsConsistent(view, newView, expectedSlicesPerPartition * partitions.size()); + expectedSlicesPerPartition = expectedSlicesPerPartition + FILE_IDS_PER_PARTITION.size() - 1; + areViewsConsistent(view, newView, expectedSlicesPerPartition * PARTITIONS.size()); newView.close(); } catch (IOException e) { throw new HoodieIOException("unable to test replace", e); @@ -308,7 +305,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { int lastPartition = file.lastIndexOf("/"); return Pair.of(file.substring(0, lastPartition), file.substring(lastPartition + 1)); }).collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList()))); - return partitions.stream() + return PARTITIONS.stream() .map(p -> Pair.of(p, FSUtils.getFileId(partitionToFileIdsList.get(p).get(0)))) .collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList()))); } @@ -339,7 +336,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { instantsToFiles = testMultipleWriteSteps(view1, Collections.singletonList("11"), true, "11"); SyncableFileSystemView view2 = - getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build()); + getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePathV2().toString()).build()); // Run 2 more ingestion on MOR table. View1 is not yet synced but View2 is instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("12", "13"), true, "11")); @@ -349,9 +346,9 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { view2.sync(); SyncableFileSystemView view3 = - getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build()); + getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePathV2().toString()).build()); view3.sync(); - areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size()); + areViewsConsistent(view1, view2, PARTITIONS.size() * FILE_IDS_PER_PARTITION.size()); /* * Case where a compaction is scheduled and then unscheduled @@ -359,9 +356,9 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { scheduleCompaction(view2, "15"); unscheduleCompaction(view2, "15", "14", "11"); view1.sync(); - areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size()); + areViewsConsistent(view1, view2, PARTITIONS.size() * FILE_IDS_PER_PARTITION.size()); SyncableFileSystemView view4 = - getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build()); + getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePathV2().toString()).build()); view4.sync(); /* @@ -373,9 +370,9 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { testMultipleWriteSteps(view2, Collections.singletonList("16"), false, "16", 2, Collections.singletonList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "18"))); view1.sync(); - areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size() * 2); + areViewsConsistent(view1, view2, PARTITIONS.size() * FILE_IDS_PER_PARTITION.size() * 2); SyncableFileSystemView view5 = - getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build()); + getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePathV2().toString()).build()); view5.sync(); /* @@ -396,9 +393,9 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { // Run one more round of ingestion instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("23", "24"), true, "20", 2)); view1.sync(); - areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size() * 2); + areViewsConsistent(view1, view2, PARTITIONS.size() * FILE_IDS_PER_PARTITION.size() * 2); SyncableFileSystemView view6 = - getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build()); + getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePathV2().toString()).build()); view6.sync(); /* @@ -415,7 +412,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { Arrays.asList(view1, view2, view3, view4, view5, view6).forEach(v -> { v.sync(); - areViewsConsistent(v, view1, partitions.size() * fileIdsPerPartition.size() * 3); + areViewsConsistent(v, view1, PARTITIONS.size() * FILE_IDS_PER_PARTITION.size() * 3); }); view1.close(); @@ -456,7 +453,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { List<String> cleanedInstants, int numFilesAddedPerInstant, int numFilesReplacedPerInstant) { final int netFilesAddedPerInstant = numFilesAddedPerInstant - numFilesReplacedPerInstant; assertEquals(newCleanerInstants.size(), cleanedInstants.size()); - long exp = partitions.stream().mapToLong(p1 -> view.getAllFileSlices(p1).count()).findAny().getAsLong(); + long exp = PARTITIONS.stream().mapToLong(p1 -> view.getAllFileSlices(p1).count()).findAny().getAsLong(); LOG.info("Initial File Slices :" + exp); for (int idx = 0; idx < newCleanerInstants.size(); idx++) { String instant = cleanedInstants.get(idx); @@ -466,25 +463,25 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { performClean(instant, filesToDelete, newCleanerInstants.get(idx)); - exp -= fileIdsPerPartition.size() - numFilesReplacedPerInstant; + exp -= FILE_IDS_PER_PARTITION.size() - numFilesReplacedPerInstant; final long expTotalFileSlicesPerPartition = exp; view.sync(); assertTrue(view.getLastInstant().isPresent()); assertEquals(newCleanerInstants.get(idx), view.getLastInstant().get().getTimestamp()); assertEquals(State.COMPLETED, view.getLastInstant().get().getState()); assertEquals(HoodieTimeline.CLEAN_ACTION, view.getLastInstant().get().getAction()); - partitions.forEach(p -> { + PARTITIONS.forEach(p -> { LOG.info("PARTITION : " + p); LOG.info("\tFileSlices :" + view.getAllFileSlices(p).collect(Collectors.toList())); }); final int instantIdx = newCleanerInstants.size() - idx; - partitions.forEach(p -> assertEquals(fileIdsPerPartition.size() + instantIdx * netFilesAddedPerInstant, view.getLatestFileSlices(p).count())); - partitions.forEach(p -> assertEquals(expTotalFileSlicesPerPartition, view.getAllFileSlices(p).count())); + PARTITIONS.forEach(p -> assertEquals(FILE_IDS_PER_PARTITION.size() + instantIdx * netFilesAddedPerInstant, view.getLatestFileSlices(p).count())); + PARTITIONS.forEach(p -> assertEquals(expTotalFileSlicesPerPartition, view.getAllFileSlices(p).count())); metaClient.reloadActiveTimeline(); SyncableFileSystemView newView = getFileSystemView(metaClient); - areViewsConsistent(view, newView, expTotalFileSlicesPerPartition * partitions.size()); + areViewsConsistent(view, newView, expTotalFileSlicesPerPartition * PARTITIONS.size()); newView.close(); } catch (IOException e) { throw new HoodieException(e); @@ -511,16 +508,16 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { Map<String, List<String>> instantsToFiles, List<HoodieInstant> rolledBackInstants, String emptyRestoreInstant, boolean isRestore, int totalReplacedFileSlicesPerPartition, int totalFilesAddedPerPartitionPerInstant) { assertEquals(newRestoreInstants.size(), rolledBackInstants.size()); - long initialFileSlices = partitions.stream().mapToLong(p -> view.getAllFileSlices(p).count()).findAny().getAsLong(); + long initialFileSlices = PARTITIONS.stream().mapToLong(p -> view.getAllFileSlices(p).count()).findAny().getAsLong(); final int numFileSlicesAddedPerInstant = (totalFilesAddedPerPartitionPerInstant - totalReplacedFileSlicesPerPartition); - final long expectedLatestFileSlices = fileIdsPerPartition.size() + (rolledBackInstants.size()) * numFileSlicesAddedPerInstant; + final long expectedLatestFileSlices = FILE_IDS_PER_PARTITION.size() + (rolledBackInstants.size()) * numFileSlicesAddedPerInstant; IntStream.range(0, newRestoreInstants.size()).forEach(idx -> { HoodieInstant instant = rolledBackInstants.get(idx); try { boolean isDeltaCommit = HoodieTimeline.DELTA_COMMIT_ACTION.equalsIgnoreCase(instant.getAction()); - performRestore(instant, instantsToFiles.get(instant.getTimestamp()), newRestoreInstants.get(idx), isRestore); + performRestore(instant, instantsToFiles.getOrDefault(instant.getTimestamp(), Collections.emptyList()), newRestoreInstants.get(idx), isRestore); final long expTotalFileSlicesPerPartition = - isDeltaCommit ? initialFileSlices : initialFileSlices - ((idx + 1) * (fileIdsPerPartition.size() - totalReplacedFileSlicesPerPartition)); + isDeltaCommit ? initialFileSlices : initialFileSlices - ((idx + 1) * (FILE_IDS_PER_PARTITION.size() - totalReplacedFileSlicesPerPartition)); view.sync(); assertTrue(view.getLastInstant().isPresent()); LOG.info("Last Instant is :" + view.getLastInstant().get()); @@ -532,15 +529,15 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { if (HoodieTimeline.compareTimestamps(newRestoreInstants.get(idx), HoodieTimeline.GREATER_THAN_OR_EQUALS, emptyRestoreInstant )) { - partitions.forEach(p -> assertEquals(0, view.getLatestFileSlices(p).count())); + PARTITIONS.forEach(p -> assertEquals(0, view.getLatestFileSlices(p).count())); } else { - partitions.forEach(p -> assertEquals(expectedLatestFileSlices - (idx + 1) * numFileSlicesAddedPerInstant, view.getLatestFileSlices(p).count())); + PARTITIONS.forEach(p -> assertEquals(expectedLatestFileSlices - (idx + 1) * numFileSlicesAddedPerInstant, view.getLatestFileSlices(p).count())); } - partitions.forEach(p -> assertEquals(expTotalFileSlicesPerPartition, view.getAllFileSlices(p).count())); + PARTITIONS.forEach(p -> assertEquals(expTotalFileSlicesPerPartition, view.getAllFileSlices(p).count())); metaClient.reloadActiveTimeline(); SyncableFileSystemView newView = getFileSystemView(metaClient); - areViewsConsistent(view, newView, expTotalFileSlicesPerPartition * partitions.size()); + areViewsConsistent(view, newView, expTotalFileSlicesPerPartition * PARTITIONS.size()); newView.close(); } catch (IOException e) { throw new HoodieException(e); @@ -613,20 +610,15 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { * * @param files List of files to be deleted */ - private Map<String, List<String>> deleteFiles(List<String> files) { - - if (null == files) { - return new HashMap<>(); - } - + private Map<String, List<String>> deleteFiles(List<String> files) throws IOException { Map<String, List<String>> partititonToFiles = new HashMap<>(); - partitions.forEach(p -> partititonToFiles.put(p, new ArrayList<>())); + PARTITIONS.forEach(p -> partititonToFiles.put(p, new ArrayList<>())); for (String f : files) { - String fullPath = String.format("%s/%s", metaClient.getBasePath(), f); - new File(fullPath).delete(); - String partition = partitions.stream().filter(f::startsWith).findAny().get(); - partititonToFiles.get(partition).add(fullPath); + java.nio.file.Path fullPath = Paths.get(metaClient.getBasePathV2().toString(), f); + Files.delete(fullPath); + String partition = PARTITIONS.stream().filter(f::startsWith).findAny().get(); + partititonToFiles.get(partition).add(fullPath.toUri().toString()); } return partititonToFiles; } @@ -637,19 +629,19 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { * @param view Hoodie View * @param instantTime COmpaction Instant Time */ - private void scheduleCompaction(SyncableFileSystemView view, String instantTime) throws IOException { - List<Pair<String, FileSlice>> slices = partitions.stream() + private void scheduleCompaction(SyncableFileSystemView view, String instantTime) throws IOException { + List<Pair<String, FileSlice>> slices = PARTITIONS.stream() .flatMap(p -> view.getLatestFileSlices(p).map(s -> Pair.of(p, s))).collect(Collectors.toList()); - long initialExpTotalFileSlices = partitions.stream().mapToLong(p -> view.getAllFileSlices(p).count()).sum(); - + long initialExpTotalFileSlices = PARTITIONS.stream().mapToLong(p -> view.getAllFileSlices(p).count()).sum(); + HoodieInstant compactionRequestedInstant = new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime); HoodieCompactionPlan plan = CompactionUtils.buildFromFileSlices(slices, Option.empty(), Option.empty()); HoodieInstant compactionInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime); metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant, TimelineMetadataUtils.serializeCompactionPlan(plan)); view.sync(); - partitions.forEach(p -> { + PARTITIONS.forEach(p -> { view.getLatestFileSlices(p).forEach(fs -> { assertEquals(instantTime, fs.getBaseInstantTime()); assertEquals(p, fs.getPartitionPath()); @@ -663,7 +655,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { metaClient.reloadActiveTimeline(); SyncableFileSystemView newView = getFileSystemView(metaClient); - areViewsConsistent(view, newView, initialExpTotalFileSlices + partitions.size() * fileIdsPerPartition.size()); + areViewsConsistent(view, newView, initialExpTotalFileSlices + PARTITIONS.size() * FILE_IDS_PER_PARTITION.size()); newView.close(); } @@ -683,7 +675,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { view.sync(); assertEquals(newLastInstant, view.getLastInstant().get().getTimestamp()); - partitions.forEach(p -> view.getLatestFileSlices(p).forEach(fs -> assertEquals(newBaseInstant, fs.getBaseInstantTime()))); + PARTITIONS.forEach(p -> view.getLatestFileSlices(p).forEach(fs -> assertEquals(newBaseInstant, fs.getBaseInstantTime()))); } /** @@ -762,20 +754,20 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { assertEquals(lastInstant.getAction(), view.getLastInstant().get().getAction(), "Expected Last=" + lastInstant + ", Found Instants=" + view.getTimeline().getInstants()); - partitions.forEach(p -> assertEquals(fileIdsPerPartition.size(), view.getLatestFileSlices(p).count())); - final long expTotalFileSlicesPerPartition = fileIdsPerPartition.size() * multiple; - partitions.forEach(p -> assertEquals(expTotalFileSlicesPerPartition, view.getAllFileSlices(p).count())); + PARTITIONS.forEach(p -> assertEquals(FILE_IDS_PER_PARTITION.size(), view.getLatestFileSlices(p).count())); + final long expTotalFileSlicesPerPartition = FILE_IDS_PER_PARTITION.size() * multiple; + PARTITIONS.forEach(p -> assertEquals(expTotalFileSlicesPerPartition, view.getAllFileSlices(p).count())); if (deltaCommit) { - partitions.forEach(p -> + PARTITIONS.forEach(p -> view.getLatestFileSlices(p).forEach(f -> assertEquals(baseInstantForDeltaCommit, f.getBaseInstantTime())) ); } else { - partitions.forEach(p -> view.getLatestBaseFiles(p).forEach(f -> assertEquals(instant, f.getCommitTime()))); + PARTITIONS.forEach(p -> view.getLatestBaseFiles(p).forEach(f -> assertEquals(instant, f.getCommitTime()))); } metaClient.reloadActiveTimeline(); SyncableFileSystemView newView = getFileSystemView(metaClient); - areViewsConsistent(view, newView, fileIdsPerPartition.size() * partitions.size() * multiple); + areViewsConsistent(view, newView, FILE_IDS_PER_PARTITION.size() * PARTITIONS.size() * multiple); newView.close(); instantToFiles.put(instant, filePaths); if (!deltaCommit) { @@ -797,9 +789,9 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { assertEquals(view1.getLastInstant(), view2.getLastInstant()); // View Checks - Map<HoodieFileGroupId, HoodieFileGroup> fileGroupsMap1 = partitions.stream().flatMap(view1::getAllFileGroups) + Map<HoodieFileGroupId, HoodieFileGroup> fileGroupsMap1 = PARTITIONS.stream().flatMap(view1::getAllFileGroups) .collect(Collectors.toMap(HoodieFileGroup::getFileGroupId, fg -> fg)); - Map<HoodieFileGroupId, HoodieFileGroup> fileGroupsMap2 = partitions.stream().flatMap(view2::getAllFileGroups) + Map<HoodieFileGroupId, HoodieFileGroup> fileGroupsMap2 = PARTITIONS.stream().flatMap(view2::getAllFileGroups) .collect(Collectors.toMap(HoodieFileGroup::getFileGroupId, fg -> fg)); assertEquals(fileGroupsMap1.keySet(), fileGroupsMap2.keySet()); long gotSlicesCount = fileGroupsMap1.keySet().stream() @@ -843,20 +835,19 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { } private List<Pair<String, HoodieWriteStat>> generateDataForInstant(String baseInstant, String instant, boolean deltaCommit) { - return generateDataForInstant(baseInstant, instant, deltaCommit, fileIdsPerPartition); + return generateDataForInstant(baseInstant, instant, deltaCommit, FILE_IDS_PER_PARTITION); } private List<Pair<String, HoodieWriteStat>> generateDataForInstant(String baseInstant, String instant, boolean deltaCommit, List<String> fileIds) { - return partitions.stream().flatMap(p -> fileIds.stream().map(f -> { + return PARTITIONS.stream().flatMap(p -> fileIds.stream().map(f -> { try { - File file = new File(basePath + "/" + p + "/" - + (deltaCommit + java.nio.file.Path filePath = Paths.get(basePath, p, deltaCommit ? FSUtils.makeLogFileName(f, ".log", baseInstant, Integer.parseInt(instant), TEST_WRITE_TOKEN) - : FSUtils.makeBaseFileName(instant, TEST_WRITE_TOKEN, f))); - file.createNewFile(); + : FSUtils.makeBaseFileName(instant, TEST_WRITE_TOKEN, f)); + Files.createFile(filePath); HoodieWriteStat w = new HoodieWriteStat(); w.setFileId(f); - w.setPath(String.format("%s/%s", p, file.getName())); + w.setPath(String.format("%s/%s", p, filePath.getFileName())); return Pair.of(p, w); } catch (IOException e) { throw new HoodieException(e); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java index 60f4b63cb8c..2accc283fec 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java @@ -28,16 +28,18 @@ import org.apache.hudi.exception.HoodieIOException; import org.junit.jupiter.api.io.TempDir; import java.io.IOException; +import java.net.URI; /** * The common hoodie test harness to provide the basic infrastructure. */ public class HoodieCommonTestHarness { - protected String tableName = null; - protected String basePath = null; - protected transient HoodieTestDataGenerator dataGen = null; - protected transient HoodieTableMetaClient metaClient; + protected String tableName; + protected String basePath; + protected URI baseUri; + protected HoodieTestDataGenerator dataGen; + protected HoodieTableMetaClient metaClient; @TempDir public java.nio.file.Path tempDir; @@ -52,7 +54,8 @@ public class HoodieCommonTestHarness { try { java.nio.file.Path basePath = tempDir.resolve("dataset"); java.nio.file.Files.createDirectories(basePath); - this.basePath = basePath.toString(); + this.basePath = basePath.toAbsolutePath().toString(); + this.baseUri = basePath.toUri(); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); } @@ -60,7 +63,6 @@ public class HoodieCommonTestHarness { /** * Initializes a test data generator which used to generate test datas. - * */ protected void initTestDataGenerator() { dataGen = new HoodieTestDataGenerator(); @@ -72,7 +74,6 @@ public class HoodieCommonTestHarness { /** * Cleanups test data generator. - * */ protected void cleanupTestDataGenerator() { if (dataGen != null) { @@ -87,8 +88,10 @@ public class HoodieCommonTestHarness { * @throws IOException */ protected void initMetaClient() throws IOException { - metaClient = HoodieTestUtils.init(tempDir.toAbsolutePath().toString(), getTableType()); - basePath = metaClient.getBasePath(); + if (basePath == null) { + initPath(); + } + metaClient = HoodieTestUtils.init(basePath, getTableType()); } protected void cleanMetaClient() { @@ -121,8 +124,8 @@ public class HoodieCommonTestHarness { protected SyncableFileSystemView getFileSystemViewWithUnCommittedSlices(HoodieTableMetaClient metaClient) { try { return new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline(), - HoodieTestTable.of(metaClient).listAllBaseAndLogFiles() + metaClient.getActiveTimeline(), + HoodieTestTable.of(metaClient).listAllBaseAndLogFiles() ); } catch (IOException ioe) { throw new HoodieIOException("Error getting file system view", ioe); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java index 727e1e4db6b..617449820fe 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java @@ -19,16 +19,13 @@ package org.apache.hudi.common.testutils.minicluster; import org.apache.hudi.common.testutils.NetworkTestUtils; -import org.apache.hudi.common.util.FileIOUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.File; import java.io.IOException; import java.net.BindException; import java.nio.file.Files; @@ -45,7 +42,7 @@ public class HdfsTestService { * Configuration settings. */ private final Configuration hadoopConf; - private final String workDir; + private final java.nio.file.Path dfsBaseDirPath; /** * Embedded HDFS cluster. @@ -58,7 +55,7 @@ public class HdfsTestService { public HdfsTestService(Configuration hadoopConf) throws IOException { this.hadoopConf = hadoopConf; - this.workDir = Files.createTempDirectory("temp").toAbsolutePath().toString(); + this.dfsBaseDirPath = Files.createTempDirectory("hdfs-test-service" + System.currentTimeMillis()); } public Configuration getHadoopConf() { @@ -66,14 +63,12 @@ public class HdfsTestService { } public MiniDFSCluster start(boolean format) throws IOException { - Objects.requireNonNull(workDir, "The work dir must be set before starting cluster."); + Objects.requireNonNull(dfsBaseDirPath, "dfs base dir must be set before starting cluster."); // If clean, then remove the work dir so we can start fresh. - String localDFSLocation = getDFSLocation(workDir); if (format) { - LOG.info("Cleaning HDFS cluster data at: " + localDFSLocation + " and starting fresh."); - File file = new File(localDFSLocation); - FileIOUtils.deleteDirectory(file); + LOG.info("Cleaning HDFS cluster data at: " + dfsBaseDirPath + " and starting fresh."); + Files.deleteIfExists(dfsBaseDirPath); } int loop = 0; @@ -87,7 +82,7 @@ public class HdfsTestService { // Configure and start the HDFS cluster // boolean format = shouldFormatDFSCluster(localDFSLocation, clean); String bindIP = "127.0.0.1"; - configureDFSCluster(hadoopConf, localDFSLocation, bindIP, namenodeRpcPort, + configureDFSCluster(hadoopConf, dfsBaseDirPath.toString(), bindIP, namenodeRpcPort, datanodePort, datanodeIpcPort, datanodeHttpPort); miniDfsCluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(format).checkDataNodeAddrConfig(true) .checkDataNodeHostConfig(true).build(); @@ -112,25 +107,15 @@ public class HdfsTestService { miniDfsCluster = null; } - /** - * Get the location on the local FS where we store the HDFS data. - * - * @param baseFsLocation The base location on the local filesystem we have write access to create dirs. - * @return The location for HDFS data. - */ - private static String getDFSLocation(String baseFsLocation) { - return baseFsLocation + Path.SEPARATOR + "dfs"; - } - /** * Configure the DFS Cluster before launching it. * * @param config The already created Hadoop configuration we'll further configure for HDFS - * @param localDFSLocation The location on the local filesystem where cluster data is stored + * @param dfsBaseDir The location on the local filesystem where cluster data is stored * @param bindIP An IP address we want to force the datanode and namenode to bind to. * @return The updated Configuration object. */ - private static Configuration configureDFSCluster(Configuration config, String localDFSLocation, String bindIP, + private static Configuration configureDFSCluster(Configuration config, String dfsBaseDir, String bindIP, int namenodeRpcPort, int datanodePort, int datanodeIpcPort, int datanodeHttpPort) { LOG.info("HDFS force binding to ip: " + bindIP); @@ -143,7 +128,7 @@ public class HdfsTestService { // issues with the internal IP addresses. This config disables that check, // and will allow a datanode to connect regardless. config.setBoolean("dfs.namenode.datanode.registration.ip-hostname-check", false); - config.set("hdfs.minidfs.basedir", localDFSLocation); + config.set("hdfs.minidfs.basedir", dfsBaseDir); // allow current user to impersonate others String user = System.getProperty("user.name"); config.set("hadoop.proxyuser." + user + ".groups", "*"); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/MiniClusterUtil.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/MiniClusterUtil.java deleted file mode 100644 index 135d875e438..00000000000 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/MiniClusterUtil.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.common.testutils.minicluster; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.zookeeper.server.ZooKeeperServer; - -import java.io.IOException; - -/** - * A utility class about mini cluster. - */ -public class MiniClusterUtil { - - private static MiniDFSCluster dfsCluster; - private static ZooKeeperServer zkServer; - public static Configuration configuration; - public static FileSystem fileSystem; - - public static void setUp() throws IOException, InterruptedException { - if (dfsCluster == null) { - HdfsTestService service = new HdfsTestService(); - dfsCluster = service.start(true); - configuration = service.getHadoopConf(); - } - if (zkServer == null) { - ZookeeperTestService zkService = new ZookeeperTestService(configuration); - zkServer = zkService.start(); - } - fileSystem = FileSystem.get(configuration); - } - - public static void shutdown() { - if (dfsCluster != null) { - dfsCluster.shutdown(true, true); - } - if (zkServer != null) { - zkServer.shutdown(true); - } - } -} diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java index 9b26a7915dd..e8c286d8ab7 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java @@ -28,7 +28,7 @@ import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; -import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil; +import org.apache.hudi.common.testutils.minicluster.HdfsTestService; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; @@ -73,28 +73,24 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { - private JobConf jobConf; - private FileSystem fs; - private Configuration hadoopConf; + private static HdfsTestService hdfsTestService; + private static FileSystem fs; @BeforeAll public static void setUpClass() throws IOException, InterruptedException { // Append is not supported in LocalFileSystem. HDFS needs to be setup. - MiniClusterUtil.setUp(); + hdfsTestService = new HdfsTestService(); + fs = hdfsTestService.start(true).getFileSystem(); } @AfterAll public static void tearDownClass() { - MiniClusterUtil.shutdown(); + hdfsTestService.stop(); } @BeforeEach public void setUp() throws IOException, InterruptedException { - this.fs = MiniClusterUtil.fileSystem; - jobConf = new JobConf(); - hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); assertTrue(fs.mkdirs(new Path(tempDir.toAbsolutePath().toString()))); - HoodieTestUtils.init(MiniClusterUtil.configuration, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); } @Test @@ -103,7 +99,7 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { Configuration conf = new Configuration(); // initial commit Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); - HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); + HoodieTestUtils.init(conf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); String commitTime = "100"; final int numRecords = 1000; // Create 3 partitions, each partition holds one parquet file and 1000 records @@ -133,7 +129,7 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString()); Utilities.setMapRedWork(conf, mrwork, mapWorkPath); - jobConf = new JobConf(conf); + JobConf jobConf = new JobConf(conf); // Add three partition path to InputPaths Path[] partitionDirArray = new Path[partitionDirs.size()]; partitionDirs.stream().map(p -> new Path(p.getPath())).collect(Collectors.toList()).toArray(partitionDirArray); @@ -186,7 +182,7 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { Configuration conf = new Configuration(); // initial commit Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); - HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); + HoodieTestUtils.init(conf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); String commitTime = "100"; final int numRecords = 1000; // Create 3 parquet files with 1000 records each @@ -219,7 +215,7 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString()); Utilities.setMapRedWork(conf, mrwork, mapWorkPath); - jobConf = new JobConf(conf); + JobConf jobConf = new JobConf(conf); // Add the paths FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); jobConf.set(HAS_MAP_WORK, "true"); @@ -258,7 +254,7 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { Configuration conf = new Configuration(); // initial commit Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); - HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); + HoodieTestUtils.init(conf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); String commitTime = "100"; final int numRecords = 1000; // Create 3 parquet files with 1000 records each @@ -292,7 +288,7 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { mrwork.getMapWork().setPathToAliases(tableAlias); Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString()); Utilities.setMapRedWork(conf, mrwork, mapWorkPath); - jobConf = new JobConf(conf); + JobConf jobConf = new JobConf(conf); // Add the paths FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); jobConf.set(HAS_MAP_WORK, "true"); @@ -328,7 +324,7 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { Configuration conf = new Configuration(); // initial commit Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); - HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); + HoodieTestUtils.init(conf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); String commitTime = "100"; final int numRecords = 1000; // Create 3 parquet files with 1000 records each @@ -364,7 +360,7 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { mrwork.getMapWork().setPathToPartitionInfo(pt); Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString()); Utilities.setMapRedWork(conf, mrwork, mapWorkPath); - jobConf = new JobConf(conf); + JobConf jobConf = new JobConf(conf); // Add the paths FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); jobConf.set(HAS_MAP_WORK, "true");