This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 864a7cd880cf80aac056aac0658ee94f53b36ac9 Author: garyli1019 <yanjia.gary...@gmail.com> AuthorDate: Fri Jun 5 17:25:59 2020 -0700 [HUDI-988] Fix More Unit Test Flakiness --- .../hudi/client/TestCompactionAdminClient.java | 35 +++---- .../java/org/apache/hudi/client/TestMultiFS.java | 4 +- .../hudi/client/TestUpdateSchemaEvolution.java | 3 +- .../hudi/common/HoodieClientTestHarness.java | 69 +++++++++++--- .../execution/TestBoundedInMemoryExecutor.java | 2 +- .../hudi/execution/TestBoundedInMemoryQueue.java | 3 +- .../org/apache/hudi/index/TestHoodieIndex.java | 29 +++++- .../hudi/index/bloom/TestHoodieBloomIndex.java | 4 +- .../index/bloom/TestHoodieGlobalBloomIndex.java | 5 +- .../apache/hudi/io/TestHoodieCommitArchiveLog.java | 3 +- .../org/apache/hudi/io/TestHoodieMergeHandle.java | 6 +- .../apache/hudi/table/TestConsistencyGuard.java | 2 +- .../apache/hudi/table/TestCopyOnWriteTable.java | 4 +- .../apache/hudi/table/TestMergeOnReadTable.java | 104 ++++++++++----------- .../hudi/table/compact/TestAsyncCompaction.java | 2 +- .../hudi/table/compact/TestHoodieCompactor.java | 5 +- .../table/view/HoodieTableFileSystemView.java | 6 ++ .../timeline/service/FileSystemViewHandler.java | 2 +- 18 files changed, 162 insertions(+), 126 deletions(-) diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java index 8e94857..b82863f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java @@ -33,9 +33,9 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.compact.OperationResult; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -67,13 +67,6 @@ public class TestCompactionAdminClient extends TestHoodieClientBase { client = new CompactionAdminClient(jsc, basePath); } - @After - public void tearDown() { - client.close(); - metaClient = null; - cleanupSparkContexts(); - } - @Test public void testUnscheduleCompactionPlan() throws Exception { int numEntriesPerInstant = 10; @@ -142,13 +135,13 @@ public class TestCompactionAdminClient extends TestHoodieClientBase { List<Pair<HoodieLogFile, HoodieLogFile>> undoFiles = result.stream().flatMap(r -> getRenamingActionsToAlignWithCompactionOperation(metaClient, compactionInstant, r.getOperation(), Option.empty()).stream()).map(rn -> { - try { - renameLogFile(metaClient, rn.getKey(), rn.getValue()); - } catch (IOException e) { - throw new HoodieIOException(e.getMessage(), e); - } - return rn; - }).collect(Collectors.toList()); + try { + renameLogFile(metaClient, rn.getKey(), rn.getValue()); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + return rn; + }).collect(Collectors.toList()); Map<String, String> renameFilesFromUndo = undoFiles.stream() .collect(Collectors.toMap(p -> p.getRight().getPath().toString(), x -> x.getLeft().getPath().toString())); Map<String, String> expRenameFiles = renameFiles.stream() @@ -274,9 +267,9 @@ public class TestCompactionAdminClient extends TestHoodieClientBase { // Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true) .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> { - Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent()); - Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count()); - }); + Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent()); + Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count()); + }); // Ensure same number of log-files before and after renaming per fileId Map<String, Long> fileIdToCountsAfterRenaming = @@ -335,9 +328,9 @@ public class TestCompactionAdminClient extends TestHoodieClientBase { newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true) .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)) .filter(fs -> fs.getFileId().equals(op.getFileId())).forEach(fs -> { - Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent()); - Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count()); - }); + Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent()); + Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count()); + }); // Ensure same number of log-files before and after renaming per fileId Map<String, Long> fileIdToCountsAfterRenaming = diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java index 8d3fa13..24ecc8e 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java @@ -63,9 +63,7 @@ public class TestMultiFS extends HoodieClientTestHarness { @After public void tearDown() throws Exception { - cleanupSparkContexts(); - cleanupDFS(); - cleanupTestDataGenerator(); + cleanupResources(); } protected HoodieWriteConfig getHoodieWriteConfig(String basePath) { diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java index ab6e940..de853f5 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java @@ -60,8 +60,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness { @After public void tearDown() throws IOException { - cleanupSparkContexts(); - cleanupFileSystem(); + cleanupResources(); } //@Test diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java index 4c7b890..2988175 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java @@ -23,16 +23,23 @@ import org.apache.hudi.client.TestHoodieClientBase; import org.apache.hudi.common.minicluster.HdfsTestService; import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.HoodieTable; + import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.slf4j.Logger; @@ -58,7 +65,10 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im protected transient ExecutorService executorService; protected transient HoodieTableMetaClient metaClient; private static AtomicInteger instantGen = new AtomicInteger(1); - protected transient HoodieWriteClient client; + protected transient HoodieWriteClient writeClient; + protected transient HoodieReadClient readClient; + protected transient HoodieTableFileSystemView tableView; + protected transient HoodieTable hoodieTable; public String getNextInstant() { return String.format("%09d", instantGen.getAndIncrement()); @@ -89,6 +99,9 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im cleanupSparkContexts(); cleanupTestDataGenerator(); cleanupFileSystem(); + cleanupDFS(); + cleanupExecutorService(); + System.gc(); } /** @@ -156,6 +169,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im if (fs != null) { LOG.warn("Closing file-system instance used in previous test-run"); fs.close(); + fs = null; } } @@ -175,13 +189,22 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im } /** - * Cleanups table type. + * Cleanups hoodie clients. */ - protected void cleanupClients() { - metaClient = null; - if (null != client) { - client.close(); - client = null; + protected void cleanupClients() throws IOException { + if (metaClient != null) { + metaClient = null; + } + if (readClient != null) { + readClient = null; + } + if (writeClient != null) { + writeClient.close(); + writeClient = null; + } + if (tableView != null) { + tableView.close(); + tableView = null; } } @@ -196,7 +219,9 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im * Cleanups test data generator. */ protected void cleanupTestDataGenerator() { - dataGen = null; + if (dataGen != null) { + dataGen = null; + } } /** @@ -272,16 +297,32 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im } public HoodieReadClient getHoodieReadClient(String basePath) { - return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc())); + readClient = new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc())); + return readClient; } public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit, HoodieIndex index) { - if (null != client) { - client.close(); - client = null; + if (null != writeClient) { + writeClient.close(); + writeClient = null; + } + writeClient = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index); + return writeClient; + } + + public HoodieTableMetaClient getHoodieMetaClient(Configuration conf, String basePath) { + metaClient = new HoodieTableMetaClient(conf, basePath); + return metaClient; + } + + public HoodieTableFileSystemView getHoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, + FileStatus[] fileStatuses) { + if (tableView == null) { + tableView = new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses); + } else { + tableView.init(metaClient, visibleActiveTimeline, fileStatuses); } - client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index); - return client; + return tableView; } } diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java index 8fd418a..5a626c7 100644 --- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java +++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java @@ -52,7 +52,7 @@ public class TestBoundedInMemoryExecutor extends HoodieClientTestHarness { @After public void tearDown() throws Exception { - cleanupTestDataGenerator(); + cleanupResources(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java index acd2ec1..25f5a59 100644 --- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java +++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java @@ -69,8 +69,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness { @After public void tearDown() throws Exception { - cleanupTestDataGenerator(); - cleanupExecutorService(); + cleanupResources(); } // Test to ensure that we are reading all records from queue iterator in the same order diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java index b97fefc..10ae93f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java @@ -18,7 +18,26 @@ package org.apache.hudi.index; +<<<<<<< HEAD import org.apache.hudi.common.HoodieClientTestHarness; +======= +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieCompactionConfig; +>>>>>>> e9cab67b... [HUDI-988] Fix More Unit Test Flakiness import org.apache.hudi.config.HoodieHBaseIndexConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -31,6 +50,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.IOException; + import static org.junit.Assert.assertTrue; public class TestHoodieIndex extends HoodieClientTestHarness { @@ -38,14 +59,12 @@ public class TestHoodieIndex extends HoodieClientTestHarness { @Before public void setUp() throws Exception { initSparkContexts("TestHoodieIndex"); - initPath(); - initMetaClient(); + initResources(); } @After - public void tearDown() { - cleanupSparkContexts(); - cleanupClients(); + public void tearDown() throws IOException { + cleanupResources(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index 105b0e8..09b5782 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -105,9 +105,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { @After public void tearDown() throws Exception { - cleanupSparkContexts(); - cleanupFileSystem(); - cleanupClients(); + cleanupResources(); } private HoodieWriteConfig makeConfig() { diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index 55d4526..1065c23 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -78,9 +78,8 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { } @After - public void tearDown() { - cleanupSparkContexts(); - cleanupClients(); + public void tearDown() throws IOException { + cleanupResources(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java index 0972385..3409eea 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java @@ -67,8 +67,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { @After public void clean() throws IOException { - cleanupDFS(); - cleanupSparkContexts(); + cleanupResources(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index 7fd02bc..fe99816 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -65,11 +65,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness { @After public void tearDown() throws Exception { - cleanupFileSystem(); - cleanupTestDataGenerator(); - cleanupSparkContexts(); - cleanupClients(); - cleanupFileSystem(); + cleanupResources(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java index cc78a64..f5baf37 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java @@ -42,7 +42,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @After public void tearDown() throws Exception { - cleanupFileSystem(); + cleanupResources(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java index 99d9b2b..7f1e538 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java @@ -18,8 +18,8 @@ package org.apache.hudi.table; -import org.apache.hudi.common.HoodieClientTestHarness; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.HoodieClientTestHarness; import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.TestRawTripPayload; @@ -219,7 +219,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { if (file.getName().endsWith(".parquet")) { if (FSUtils.getFileId(file.getName()).equals(FSUtils.getFileId(parquetFile.getName())) && HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(file.getName()), - FSUtils.getCommitTime(parquetFile.getName()), HoodieTimeline.GREATER)) { + FSUtils.getCommitTime(parquetFile.getName()), HoodieTimeline.GREATER)) { updatedParquetFile = file; break; } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java index fdc8b27..9111391 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java @@ -94,10 +94,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { @After public void clean() throws IOException { - cleanupDFS(); - cleanupSparkContexts(); - cleanupTestDataGenerator(); - cleanupClients(); + cleanupResources(); } @Test @@ -167,8 +164,9 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { client.compact(compactionCommitTime); allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = roView.getLatestBaseFiles(); + hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent()); // verify that there is a commit @@ -225,7 +223,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); @@ -236,13 +234,12 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertFalse(commit.isPresent()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - BaseFileOnlyView roView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); - Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles(); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles(); assertFalse(dataFilesToRead.findAny().isPresent()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = roView.getLatestBaseFiles(); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue("should list the parquet files we wrote in the delta commit", dataFilesToRead.findAny().isPresent()); @@ -278,11 +275,11 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertFalse(commit.isPresent()); allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = roView.getLatestBaseFiles(); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent()); - List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); // Wrote 20 records and deleted 20 records, so remaining 20-20 = 0 assertEquals("Must contain 0 records", 0, recordsRead.size()); @@ -311,7 +308,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { // verify there are no errors assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertTrue(commit.isPresent()); assertEquals("commit should be 001", "001", commit.get().getTimestamp()); @@ -337,11 +334,10 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - HoodieTableFileSystemView roView = - new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); final String absentCommit = newCommitTime; - assertFalse(roView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime()))); + assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime()))); } } @@ -366,7 +362,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { List<WriteStatus> statuses = writeStatusJavaRDD.collect(); assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient =getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); @@ -377,13 +373,13 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertFalse(commit.isPresent()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - BaseFileOnlyView roView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); - Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles(); + tableView = + getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(!dataFilesToRead.findAny().isPresent()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = roView.getLatestBaseFiles(); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue("should list the parquet files we wrote in the delta commit", dataFilesToRead.findAny().isPresent()); @@ -399,7 +395,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200)); - List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); assertEquals(recordsRead.size(), 200); @@ -413,7 +409,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { // After rollback, there should be no parquet file with the failed commit time Assert.assertEquals(Arrays.stream(allFiles) .filter(file -> file.getPath().getName().contains(commitTime1)).count(), 0); - dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); assertEquals(recordsRead.size(), 200); } @@ -429,7 +425,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200)); - List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); assertEquals(recordsRead.size(), 200); @@ -449,8 +445,8 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); // check that the number of records read is still correct after rollback operation assertEquals(recordsRead.size(), 200); @@ -476,20 +472,20 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); metaClient = HoodieTableMetaClient.reload(metaClient); - roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp(); - assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); + assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); thirdClient.rollback(compactedCommitTime); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); metaClient = HoodieTableMetaClient.reload(metaClient); - roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); - assertFalse(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); + assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); } } } @@ -513,7 +509,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { List<WriteStatus> statuses = writeStatusJavaRDD.collect(); assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); @@ -524,13 +520,12 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertFalse(commit.isPresent()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - BaseFileOnlyView roView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); - Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles(); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles(); assertFalse(dataFilesToRead.findAny().isPresent()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = roView.getLatestBaseFiles(); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue("Should list the parquet files we wrote in the delta commit", dataFilesToRead.findAny().isPresent()); @@ -546,7 +541,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200)); - List<String> dataFiles = roView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); + List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); assertEquals(recordsRead.size(), 200); @@ -604,12 +599,12 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); metaClient = HoodieTableMetaClient.reload(metaClient); - roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp(); - assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); + assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); /** * Write 5 (updates) @@ -631,12 +626,10 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { metaClient = HoodieTableMetaClient.reload(metaClient); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - roView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); - dataFilesToRead = roView.getLatestBaseFiles(); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); assertFalse(dataFilesToRead.findAny().isPresent()); - SliceView rtView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + SliceView rtView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); List<HoodieFileGroup> fileGroups = ((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors.toList()); assertTrue(fileGroups.isEmpty()); @@ -678,7 +671,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); @@ -689,13 +682,13 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertFalse(commit.isPresent()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - BaseFileOnlyView roView = new HoodieTableFileSystemView(metaClient, + BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles); Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles(); Map<String, Long> parquetFileIdToSize = dataFilesToRead.collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize)); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestBaseFiles(); List<HoodieBaseFile> dataFilesList = dataFilesToRead.collect(Collectors.toList()); assertTrue("Should list the parquet files we wrote in the delta commit", @@ -723,7 +716,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertFalse(commit.isPresent()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - roView = new HoodieTableFileSystemView(metaClient, + roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles); dataFilesToRead = roView.getLatestBaseFiles(); List<HoodieBaseFile> newDataFilesList = dataFilesToRead.collect(Collectors.toList()); @@ -752,7 +745,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { writeClient.insert(recordsRDD, newCommitTime).collect(); // Update all the 100 records - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), basePath); newCommitTime = "101"; writeClient.startCommitWithTime(newCommitTime); @@ -887,7 +880,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { // We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs // and calling rollback twice final String lastCommitTime = newCommitTime; - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), basePath); HoodieInstant last = metaClient.getCommitsTimeline().getInstants() .filter(instant -> instant.getTimestamp().equals(lastCommitTime)).findFirst().get(); String fileName = last.getFileName(); @@ -980,7 +973,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); // Create a commit without rolling stats in metadata to test backwards compatibility HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); @@ -1080,7 +1073,6 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); Map<String, Long> fileIdToInsertsMap = new HashMap<>(); Map<String, Long> fileIdToUpsertsMap = new HashMap<>(); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java index 1a366a4..2a19d2c 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java @@ -542,7 +542,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { private List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException { FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath()); HoodieTableFileSystemView view = - new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles); + getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles); return view.getLatestBaseFiles().collect(Collectors.toList()); } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java index 86a2e1f..3e36d43 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java @@ -75,10 +75,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { @After public void tearDown() throws Exception { - cleanupFileSystem(); - cleanupTestDataGenerator(); - cleanupSparkContexts(); - cleanupClients(); + cleanupResources(); } private HoodieWriteConfig getConfig() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java index 1f7165b..3ba021b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java @@ -90,6 +90,12 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem super.init(metaClient, visibleActiveTimeline); } + public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, + FileStatus[] fileStatuses) { + init(metaClient, visibleActiveTimeline); + addFilesToView(fileStatuses); + } + @Override protected void resetViewState() { this.fgIdToPendingCompaction = null; diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java index 5538d66..4410193 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java @@ -117,7 +117,7 @@ public class FileSystemViewHandler { synchronized (view) { if (isLocalViewBehind(ctx)) { HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline(); - LOG.warn("Syncing view as client passed last known instant " + lastKnownInstantFromClient + LOG.info("Syncing view as client passed last known instant " + lastKnownInstantFromClient + " as last known instant but server has the folling timeline :" + localTimeline.getInstants().collect(Collectors.toList())); view.sync();