This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d3afcbac3a6e5362d57570a2a5807abbf65c69d8 Author: Sivabalan Narayanan <sivab...@uber.com> AuthorDate: Sun Jun 7 16:23:40 2020 -0400 Making few fixes after cherry picking --- .../apache/hudi/client/TestHoodieClientBase.java | 917 +++++++++++---------- .../hudi/common/HoodieClientTestHarness.java | 426 +++++----- .../apache/hudi/table/TestMergeOnReadTable.java | 2 + .../hudi/table/compact/TestHoodieCompactor.java | 6 +- .../table/string/TestHoodieActiveTimeline.java | 2 +- 5 files changed, 678 insertions(+), 675 deletions(-) diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java index 6e6458b..6856489 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java @@ -72,477 +72,478 @@ import static org.junit.Assert.assertTrue; */ public class TestHoodieClientBase extends HoodieClientTestHarness { - private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class); - - @Before - public void setUp() throws Exception { - initResources(); + private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class); + + @Before + public void setUp() throws Exception { + initResources(); + } + + @After + public void tearDown() throws Exception { + cleanupResources(); + } + + protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) { + return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName())); + } + + /** + * Get Default HoodieWriteConfig for tests. + * + * @return Default Hoodie Write Config for tests + */ + protected HoodieWriteConfig getConfig() { + return getConfigBuilder().build(); + } + + protected HoodieWriteConfig getConfig(IndexType indexType) { + return getConfigBuilder(indexType).build(); + } + + /** + * Get Config builder with default configs set. + * + * @return Config Builder + */ + protected HoodieWriteConfig.Builder getConfigBuilder() { + return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); + } + + /** + * Get Config builder with default configs set. + * + * @return Config Builder + */ + HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) { + return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType); + } + + HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) { + return getConfigBuilder(schemaStr, IndexType.BLOOM); + } + + /** + * Get Config builder with default configs set. + * + * @return Config Builder + */ + HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) + .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2) + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .withWriteStatusClass(MetadataMergeWriteStatus.class) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) + .forTable("test-trip-table") + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) + .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withEnableBackupForRemoteFileSystemView(false) // Fail test if problem connecting to timeline-server + .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); + } + + protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) { + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + ((SyncableFileSystemView) (table.getSliceView())).reset(); + return table; + } + + /** + * Assert no failures in writing hoodie files. + * + * @param statuses List of Write Status + */ + public static void assertNoWriteErrors(List<WriteStatus> statuses) { + // Verify there are no errors + for (WriteStatus status : statuses) { + assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors()); } - - @After - public void tearDown() throws Exception { - cleanupResources(); + } + + void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException { + Set<String> partitionPathSet = inputRecords.stream() + .map(HoodieRecord::getPartitionPath) + .collect(Collectors.toSet()); + assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs); + } + + void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException { + Set<String> partitionPathSet = inputKeys.stream() + .map(HoodieKey::getPartitionPath) + .collect(Collectors.toSet()); + assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs); + } + + /** + * Ensure presence of partition meta-data at known depth. + * + * @param partitionPaths Partition paths to check + * @param fs File System + * @throws IOException in case of error + */ + void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException { + for (String partitionPath : partitionPaths) { + assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath))); + HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath)); + pmeta.readFromFS(); + Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth()); } - - protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) { - return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName())); + } + + /** + * Ensure records have location field set. + * + * @param taggedRecords Tagged Records + * @param commitTime Commit Timestamp + */ + protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) { + for (HoodieRecord rec : taggedRecords) { + assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown()); + assertEquals("All records should have commit time " + commitTime + ", since updates were made", + rec.getCurrentLocation().getInstantTime(), commitTime); } - - /** - * Get Default HoodieWriteConfig for tests. - * - * @return Default Hoodie Write Config for tests - */ - protected HoodieWriteConfig getConfig() { - return getConfigBuilder().build(); + } + + /** + * Assert that there is no duplicate key at the partition level. + * + * @param records List of Hoodie records + */ + void assertNodupesWithinPartition(List<HoodieRecord> records) { + Map<String, Set<String>> partitionToKeys = new HashMap<>(); + for (HoodieRecord r : records) { + String key = r.getRecordKey(); + String partitionPath = r.getPartitionPath(); + if (!partitionToKeys.containsKey(partitionPath)) { + partitionToKeys.put(partitionPath, new HashSet<>()); + } + assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key)); + partitionToKeys.get(partitionPath).add(key); } - - protected HoodieWriteConfig getConfig(IndexType indexType) { - return getConfigBuilder(indexType).build(); + } + + /** + * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records to be already de-duped and have location set. This wrapper takes care of + * record-location setting. Uniqueness is guaranteed by record-generation function itself. + * + * @param writeConfig Hoodie Write Config + * @param recordGenFunction Records Generation function + * @return Wrapped function + */ + private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls( + final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) { + return (commit, numRecords) -> { + final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc); + List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords); + final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc); + JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table); + return taggedRecords.collect(); + }; + } + + /** + * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys to be already de-duped and have location set. This wrapper takes care of + * record-location setting. Uniqueness is guaranteed by key-generation function itself. + * + * @param writeConfig Hoodie Write Config + * @param keyGenFunction Keys Generation function + * @return Wrapped function + */ + private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls( + final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) { + return (numRecords) -> { + final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc); + List<HoodieKey> records = keyGenFunction.apply(numRecords); + final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc); + JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1) + .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload())); + JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table); + return taggedRecords.map(record -> record.getKey()).collect(); + }; + } + + /** + * Generate wrapper for record generation function for testing Prepped APIs. + * + * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs + * @param writeConfig Hoodie Write Config + * @param wrapped Actual Records Generation function + * @return Wrapped Function + */ + protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI, + HoodieWriteConfig writeConfig, + Function2<List<HoodieRecord>, String, Integer> wrapped) { + if (isPreppedAPI) { + return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped); + } else { + return wrapped; } - - /** - * Get Config builder with default configs set. - * - * @return Config Builder - */ - protected HoodieWriteConfig.Builder getConfigBuilder() { - return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); + } + + /** + * Generate wrapper for delete key generation function for testing Prepped APIs. + * + * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs + * @param writeConfig Hoodie Write Config + * @param wrapped Actual Records Generation function + * @return Wrapped Function + */ + Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI, + HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) { + if (isPreppedAPI) { + return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped); + } else { + return wrapped; } - - /** - * Get Config builder with default configs set. - * - * @return Config Builder - */ - HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) { - return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType); + } + + /** + * Helper to insert first batch of records and do regular assertions on the state after successful completion. + * + * @param writeConfig Hoodie Write Config + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param initCommitTime Begin Timestamp (usually "000") + * @param numRecordsInThisCommit Number of records to be added in the new commit + * @param writeFn Write Function to be used for insertion + * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records + * @param assertForCommit Enable Assertion of Writes + * @param expRecordsInThisCommit Expected number of records in this commit + * @return RDD of write-status + * @throws Exception in case of error + */ + JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, + String initCommitTime, int numRecordsInThisCommit, + Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI, + boolean assertForCommit, int expRecordsInThisCommit) throws Exception { + final Function2<List<HoodieRecord>, String, Integer> recordGenFunction = + generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts); + + return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit, + recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1); + } + + /** + * Helper to upsert batch of records and do regular assertions on the state after successful completion. + * + * @param writeConfig Hoodie Write Config + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param prevCommitTime Commit Timestamp used in previous commit + * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime + * @param initCommitTime Begin Timestamp (usually "000") + * @param numRecordsInThisCommit Number of records to be added in the new commit + * @param writeFn Write Function to be used for upsert + * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records + * @param assertForCommit Enable Assertion of Writes + * @param expRecordsInThisCommit Expected number of records in this commit + * @param expTotalRecords Expected number of records when scanned + * @param expTotalCommits Expected number of commits (including this commit) + * @return RDD of write-status + * @throws Exception in case of error + */ + JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, + String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, + int numRecordsInThisCommit, + Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception { + final Function2<List<HoodieRecord>, String, Integer> recordGenFunction = + generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates); + + return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime, + numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, + expTotalCommits); + } + + /** + * Helper to delete batch of keys and do regular assertions on the state after successful completion. + * + * @param writeConfig Hoodie Write Config + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param prevCommitTime Commit Timestamp used in previous commit + * @param initCommitTime Begin Timestamp (usually "000") + * @param numRecordsInThisCommit Number of records to be added in the new commit + * @param deleteFn Delete Function to be used for deletes + * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records + * @param assertForCommit Enable Assertion of Writes + * @param expRecordsInThisCommit Expected number of records in this commit + * @param expTotalRecords Expected number of records when scanned + * @return RDD of write-status + * @throws Exception in case of error + */ + JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, + String prevCommitTime, String initCommitTime, + int numRecordsInThisCommit, + Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception { + final Function<Integer, List<HoodieKey>> keyGenFunction = + generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes); + + return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit, + keyGenFunction, + deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords); + } + + /** + * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion. + * + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param prevCommitTime Commit Timestamp used in previous commit + * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime + * @param initCommitTime Begin Timestamp (usually "000") + * @param numRecordsInThisCommit Number of records to be added in the new commit + * @param recordGenFunction Records Generation Function + * @param writeFn Write Function to be used for upsert + * @param assertForCommit Enable Assertion of Writes + * @param expRecordsInThisCommit Expected number of records in this commit + * @param expTotalRecords Expected number of records when scanned + * @param expTotalCommits Expected number of commits (including this commit) + * @throws Exception in case of error + */ + JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime, + Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit, + Function2<List<HoodieRecord>, String, Integer> recordGenFunction, + Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception { + + // Write 1 (only inserts) + client.startCommitWithTime(newCommitTime); + + List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit); + JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); + + JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime); + List<WriteStatus> statuses = result.collect(); + assertNoWriteErrors(statuses); + + // check the partition metadata is written out + assertPartitionMetadataForRecords(records, fs); + + // verify that there is a commit + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + + if (assertForCommit) { + assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits, + timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants()); + Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime, + timeline.lastInstant().get().getTimestamp()); + assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit, + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count()); + + // Check the entire dataset has all records still + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords, + HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); + + // Check that the incremental consumption from prevCommitTime + assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit", + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), + HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count()); + if (commitTimesBetweenPrevAndNew.isPresent()) { + commitTimesBetweenPrevAndNew.get().forEach(ct -> { + assertEquals("Incremental consumption from " + ct + " should give all records in latest commit", + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), + HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count()); + }); + } } - - HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) { - return getConfigBuilder(schemaStr, IndexType.BLOOM); + return result; + } + + /** + * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion. + * + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param prevCommitTime Commit Timestamp used in previous commit + * @param initCommitTime Begin Timestamp (usually "000") + * @param keyGenFunction Key Generation function + * @param deleteFn Write Function to be used for delete + * @param assertForCommit Enable Assertion of Writes + * @param expRecordsInThisCommit Expected number of records in this commit + * @param expTotalRecords Expected number of records when scanned + * @throws Exception in case of error + */ + JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime, + String initCommitTime, int numRecordsInThisCommit, + Function<Integer, List<HoodieKey>> keyGenFunction, + Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception { + + // Delete 1 (only deletes) + client.startCommitWithTime(newCommitTime); + + List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit); + JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1); + + JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime); + List<WriteStatus> statuses = result.collect(); + assertNoWriteErrors(statuses); + + // check the partition metadata is written out + assertPartitionMetadataForKeys(keysToDelete, fs); + + // verify that there is a commit + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + + if (assertForCommit) { + assertEquals("Expecting 3 commits.", 3, + timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants()); + Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime, + timeline.lastInstant().get().getTimestamp()); + assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit, + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count()); + + // Check the entire dataset has all records still + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords, + HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); + + // Check that the incremental consumption from prevCommitTime + assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit," + + " since it is a delete operation", + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), + HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count()); } + return result; + } - /** - * Get Config builder with default configs set. - * - * @return Config Builder - */ - HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) { - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) - .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2) - .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) - .withWriteStatusClass(MetadataMergeWriteStatus.class) - .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) - .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) - .forTable("test-trip-table") - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) - .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() - .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); - } + /** + * Get Cleaner state corresponding to a partition path. + * + * @param hoodieCleanStatsTwo List of Clean Stats + * @param partitionPath Partition path for filtering + * @return Cleaner state corresponding to partition path + */ + protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) { + return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null); + } - protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) { - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - ((SyncableFileSystemView) (table.getSliceView())).reset(); - return table; - } + // Functional Interfaces for passing lambda and Hoodie Write API contexts - /** - * Assert no failures in writing hoodie files. - * - * @param statuses List of Write Status - */ - public static void assertNoWriteErrors(List<WriteStatus> statuses) { - // Verify there are no errors - for (WriteStatus status : statuses) { - assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors()); - } - } + @FunctionalInterface + public interface Function2<R, T1, T2> { - void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException { - Set<String> partitionPathSet = inputRecords.stream() - .map(HoodieRecord::getPartitionPath) - .collect(Collectors.toSet()); - assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs); - } + R apply(T1 v1, T2 v2) throws IOException; + } - void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException { - Set<String> partitionPathSet = inputKeys.stream() - .map(HoodieKey::getPartitionPath) - .collect(Collectors.toSet()); - assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs); - } - - /** - * Ensure presence of partition meta-data at known depth. - * - * @param partitionPaths Partition paths to check - * @param fs File System - * @throws IOException in case of error - */ - void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException { - for (String partitionPath : partitionPaths) { - assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath))); - HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath)); - pmeta.readFromFS(); - Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth()); - } - } - - /** - * Ensure records have location field set. - * - * @param taggedRecords Tagged Records - * @param commitTime Commit Timestamp - */ - protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) { - for (HoodieRecord rec : taggedRecords) { - assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown()); - assertEquals("All records should have commit time " + commitTime + ", since updates were made", - rec.getCurrentLocation().getInstantTime(), commitTime); - } - } - - /** - * Assert that there is no duplicate key at the partition level. - * - * @param records List of Hoodie records - */ - void assertNodupesWithinPartition(List<HoodieRecord> records) { - Map<String, Set<String>> partitionToKeys = new HashMap<>(); - for (HoodieRecord r : records) { - String key = r.getRecordKey(); - String partitionPath = r.getPartitionPath(); - if (!partitionToKeys.containsKey(partitionPath)) { - partitionToKeys.put(partitionPath, new HashSet<>()); - } - assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key)); - partitionToKeys.get(partitionPath).add(key); - } - } - - /** - * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records to be already de-duped and have location set. This wrapper takes care of - * record-location setting. Uniqueness is guaranteed by record-generation function itself. - * - * @param writeConfig Hoodie Write Config - * @param recordGenFunction Records Generation function - * @return Wrapped function - */ - private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls( - final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) { - return (commit, numRecords) -> { - final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc); - List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords); - final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc); - JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table); - return taggedRecords.collect(); - }; - } - - /** - * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys to be already de-duped and have location set. This wrapper takes care of - * record-location setting. Uniqueness is guaranteed by key-generation function itself. - * - * @param writeConfig Hoodie Write Config - * @param keyGenFunction Keys Generation function - * @return Wrapped function - */ - private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls( - final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) { - return (numRecords) -> { - final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc); - List<HoodieKey> records = keyGenFunction.apply(numRecords); - final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc); - JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1) - .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload())); - JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table); - return taggedRecords.map(record -> record.getKey()).collect(); - }; - } + @FunctionalInterface + public interface Function3<R, T1, T2, T3> { - /** - * Generate wrapper for record generation function for testing Prepped APIs. - * - * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs - * @param writeConfig Hoodie Write Config - * @param wrapped Actual Records Generation function - * @return Wrapped Function - */ - protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI, - HoodieWriteConfig writeConfig, - Function2<List<HoodieRecord>, String, Integer> wrapped) { - if (isPreppedAPI) { - return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped); - } else { - return wrapped; - } - } - - /** - * Generate wrapper for delete key generation function for testing Prepped APIs. - * - * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs - * @param writeConfig Hoodie Write Config - * @param wrapped Actual Records Generation function - * @return Wrapped Function - */ - Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI, - HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) { - if (isPreppedAPI) { - return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped); - } else { - return wrapped; - } - } - - /** - * Helper to insert first batch of records and do regular assertions on the state after successful completion. - * - * @param writeConfig Hoodie Write Config - * @param client Hoodie Write Client - * @param newCommitTime New Commit Timestamp to be used - * @param initCommitTime Begin Timestamp (usually "000") - * @param numRecordsInThisCommit Number of records to be added in the new commit - * @param writeFn Write Function to be used for insertion - * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records - * @param assertForCommit Enable Assertion of Writes - * @param expRecordsInThisCommit Expected number of records in this commit - * @return RDD of write-status - * @throws Exception in case of error - */ - JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, - String initCommitTime, int numRecordsInThisCommit, - Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI, - boolean assertForCommit, int expRecordsInThisCommit) throws Exception { - final Function2<List<HoodieRecord>, String, Integer> recordGenFunction = - generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts); - - return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit, - recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1); - } - - /** - * Helper to upsert batch of records and do regular assertions on the state after successful completion. - * - * @param writeConfig Hoodie Write Config - * @param client Hoodie Write Client - * @param newCommitTime New Commit Timestamp to be used - * @param prevCommitTime Commit Timestamp used in previous commit - * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime - * @param initCommitTime Begin Timestamp (usually "000") - * @param numRecordsInThisCommit Number of records to be added in the new commit - * @param writeFn Write Function to be used for upsert - * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records - * @param assertForCommit Enable Assertion of Writes - * @param expRecordsInThisCommit Expected number of records in this commit - * @param expTotalRecords Expected number of records when scanned - * @param expTotalCommits Expected number of commits (including this commit) - * @return RDD of write-status - * @throws Exception in case of error - */ - JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, - String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, - int numRecordsInThisCommit, - Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI, - boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception { - final Function2<List<HoodieRecord>, String, Integer> recordGenFunction = - generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates); - - return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime, - numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, - expTotalCommits); - } - - /** - * Helper to delete batch of keys and do regular assertions on the state after successful completion. - * - * @param writeConfig Hoodie Write Config - * @param client Hoodie Write Client - * @param newCommitTime New Commit Timestamp to be used - * @param prevCommitTime Commit Timestamp used in previous commit - * @param initCommitTime Begin Timestamp (usually "000") - * @param numRecordsInThisCommit Number of records to be added in the new commit - * @param deleteFn Delete Function to be used for deletes - * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records - * @param assertForCommit Enable Assertion of Writes - * @param expRecordsInThisCommit Expected number of records in this commit - * @param expTotalRecords Expected number of records when scanned - * @return RDD of write-status - * @throws Exception in case of error - */ - JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, - String prevCommitTime, String initCommitTime, - int numRecordsInThisCommit, - Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI, - boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception { - final Function<Integer, List<HoodieKey>> keyGenFunction = - generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes); - - return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit, - keyGenFunction, - deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords); - } - - /** - * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion. - * - * @param client Hoodie Write Client - * @param newCommitTime New Commit Timestamp to be used - * @param prevCommitTime Commit Timestamp used in previous commit - * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime - * @param initCommitTime Begin Timestamp (usually "000") - * @param numRecordsInThisCommit Number of records to be added in the new commit - * @param recordGenFunction Records Generation Function - * @param writeFn Write Function to be used for upsert - * @param assertForCommit Enable Assertion of Writes - * @param expRecordsInThisCommit Expected number of records in this commit - * @param expTotalRecords Expected number of records when scanned - * @param expTotalCommits Expected number of commits (including this commit) - * @throws Exception in case of error - */ - JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime, - Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit, - Function2<List<HoodieRecord>, String, Integer> recordGenFunction, - Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, - boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception { - - // Write 1 (only inserts) - client.startCommitWithTime(newCommitTime); - - List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit); - JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); - - JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime); - List<WriteStatus> statuses = result.collect(); - assertNoWriteErrors(statuses); - - // check the partition metadata is written out - assertPartitionMetadataForRecords(records, fs); - - // verify that there is a commit - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); - - if (assertForCommit) { - assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits, - timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants()); - Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime, - timeline.lastInstant().get().getTimestamp()); - assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit, - HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count()); - - // Check the entire dataset has all records still - String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; - for (int i = 0; i < fullPartitionPaths.length; i++) { - fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); - } - assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords, - HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); - - // Check that the incremental consumption from prevCommitTime - assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit", - HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), - HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count()); - if (commitTimesBetweenPrevAndNew.isPresent()) { - commitTimesBetweenPrevAndNew.get().forEach(ct -> { - assertEquals("Incremental consumption from " + ct + " should give all records in latest commit", - HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), - HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count()); - }); - } - } - return result; - } - - /** - * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion. - * - * @param client Hoodie Write Client - * @param newCommitTime New Commit Timestamp to be used - * @param prevCommitTime Commit Timestamp used in previous commit - * @param initCommitTime Begin Timestamp (usually "000") - * @param keyGenFunction Key Generation function - * @param deleteFn Write Function to be used for delete - * @param assertForCommit Enable Assertion of Writes - * @param expRecordsInThisCommit Expected number of records in this commit - * @param expTotalRecords Expected number of records when scanned - * @throws Exception in case of error - */ - JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime, - String initCommitTime, int numRecordsInThisCommit, - Function<Integer, List<HoodieKey>> keyGenFunction, - Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, - boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception { - - // Delete 1 (only deletes) - client.startCommitWithTime(newCommitTime); - - List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit); - JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1); - - JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime); - List<WriteStatus> statuses = result.collect(); - assertNoWriteErrors(statuses); - - // check the partition metadata is written out - assertPartitionMetadataForKeys(keysToDelete, fs); - - // verify that there is a commit - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); - - if (assertForCommit) { - assertEquals("Expecting 3 commits.", 3, - timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants()); - Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime, - timeline.lastInstant().get().getTimestamp()); - assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit, - HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count()); - - // Check the entire dataset has all records still - String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; - for (int i = 0; i < fullPartitionPaths.length; i++) { - fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); - } - assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords, - HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); - - // Check that the incremental consumption from prevCommitTime - assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit," - + " since it is a delete operation", - HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), - HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count()); - } - return result; - } - - /** - * Get Cleaner state corresponding to a partition path. - * - * @param hoodieCleanStatsTwo List of Clean Stats - * @param partitionPath Partition path for filtering - * @return Cleaner state corresponding to partition path - */ - protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) { - return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null); - } - - // Functional Interfaces for passing lambda and Hoodie Write API contexts - - @FunctionalInterface - public interface Function2<R, T1, T2> { - - R apply(T1 v1, T2 v2) throws IOException; - } - - @FunctionalInterface - public interface Function3<R, T1, T2, T3> { - - R apply(T1 v1, T2 v2, T3 v3) throws IOException; - } + R apply(T1 v1, T2 v2, T3 v3) throws IOException; + } } diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java index e4202f0..4c7b890 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java @@ -49,239 +49,239 @@ import java.util.concurrent.atomic.AtomicInteger; */ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class); - - protected transient JavaSparkContext jsc = null; - protected transient SQLContext sqlContext; - protected transient FileSystem fs; - protected transient HoodieTestDataGenerator dataGen = null; - protected transient ExecutorService executorService; - protected transient HoodieTableMetaClient metaClient; - private static AtomicInteger instantGen = new AtomicInteger(1); - protected transient HoodieWriteClient client; - - public String getNextInstant() { - return String.format("%09d", instantGen.getAndIncrement()); + private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class); + + protected transient JavaSparkContext jsc = null; + protected transient SQLContext sqlContext; + protected transient FileSystem fs; + protected transient HoodieTestDataGenerator dataGen = null; + protected transient ExecutorService executorService; + protected transient HoodieTableMetaClient metaClient; + private static AtomicInteger instantGen = new AtomicInteger(1); + protected transient HoodieWriteClient client; + + public String getNextInstant() { + return String.format("%09d", instantGen.getAndIncrement()); + } + + // dfs + protected String dfsBasePath; + protected transient HdfsTestService hdfsTestService; + protected transient MiniDFSCluster dfsCluster; + protected transient DistributedFileSystem dfs; + + /** + * Initializes resource group for the subclasses of {@link TestHoodieClientBase}. + */ + public void initResources() throws IOException { + initPath(); + initSparkContexts(); + initTestDataGenerator(); + initFileSystem(); + initMetaClient(); + } + + /** + * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}. + */ + public void cleanupResources() throws IOException { + cleanupClients(); + cleanupSparkContexts(); + cleanupTestDataGenerator(); + cleanupFileSystem(); + } + + /** + * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name. + * + * @param appName The specified application name. + */ + protected void initSparkContexts(String appName) { + // Initialize a local spark env + jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName)); + jsc.setLogLevel("ERROR"); + + // SQLContext stuff + sqlContext = new SQLContext(jsc); + } + + /** + * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name + * <b>TestHoodieClient</b>. + */ + protected void initSparkContexts() { + initSparkContexts("TestHoodieClient"); + } + + /** + * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}). + */ + protected void cleanupSparkContexts() { + if (sqlContext != null) { + LOG.info("Clearing sql context cache of spark-session used in previous test-case"); + sqlContext.clearCache(); + sqlContext = null; } - // dfs - protected String dfsBasePath; - protected transient HdfsTestService hdfsTestService; - protected transient MiniDFSCluster dfsCluster; - protected transient DistributedFileSystem dfs; - - /** - * Initializes resource group for the subclasses of {@link TestHoodieClientBase}. - */ - public void initResources() throws IOException { - initPath(); - initSparkContexts(); - initTestDataGenerator(); - initFileSystem(); - initMetaClient(); + if (jsc != null) { + LOG.info("Closing spark context used in previous test-case"); + jsc.close(); + jsc.stop(); + jsc = null; } - - /** - * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}. - */ - public void cleanupResources() throws IOException { - cleanupClients(); - cleanupSparkContexts(); - cleanupTestDataGenerator(); - cleanupFileSystem(); - } - - /** - * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name. - * - * @param appName The specified application name. - */ - protected void initSparkContexts(String appName) { - // Initialize a local spark env - jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName)); - jsc.setLogLevel("ERROR"); - - // SQLContext stuff - sqlContext = new SQLContext(jsc); - } - - /** - * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name - * <b>TestHoodieClient</b>. - */ - protected void initSparkContexts() { - initSparkContexts("TestHoodieClient"); - } - - /** - * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}). - */ - protected void cleanupSparkContexts() { - if (sqlContext != null) { - LOG.info("Clearing sql context cache of spark-session used in previous test-case"); - sqlContext.clearCache(); - sqlContext = null; - } - - if (jsc != null) { - LOG.info("Closing spark context used in previous test-case"); - jsc.close(); - jsc.stop(); - jsc = null; - } - } - - /** - * Initializes a file system with the hadoop configuration of Spark context. - */ - protected void initFileSystem() { - if (jsc == null) { - throw new IllegalStateException("The Spark context has not been initialized."); - } - - initFileSystemWithConfiguration(jsc.hadoopConfiguration()); - } - - /** - * Initializes file system with a default empty configuration. - */ - protected void initFileSystemWithDefaultConfiguration() { - initFileSystemWithConfiguration(new Configuration()); + } + + /** + * Initializes a file system with the hadoop configuration of Spark context. + */ + protected void initFileSystem() { + if (jsc == null) { + throw new IllegalStateException("The Spark context has not been initialized."); } - /** - * Cleanups file system. - */ - protected void cleanupFileSystem() throws IOException { - if (fs != null) { - LOG.warn("Closing file-system instance used in previous test-run"); - fs.close(); - } + initFileSystemWithConfiguration(jsc.hadoopConfiguration()); + } + + /** + * Initializes file system with a default empty configuration. + */ + protected void initFileSystemWithDefaultConfiguration() { + initFileSystemWithConfiguration(new Configuration()); + } + + /** + * Cleanups file system. + */ + protected void cleanupFileSystem() throws IOException { + if (fs != null) { + LOG.warn("Closing file-system instance used in previous test-run"); + fs.close(); } - - /** - * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by {@code getTableType()}. - */ - protected void initMetaClient() throws IOException { - if (basePath == null) { - throw new IllegalStateException("The base path has not been initialized."); - } - - if (jsc == null) { - throw new IllegalStateException("The Spark context has not been initialized."); - } - - metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType()); - } - - /** - * Cleanups table type. - */ - protected void cleanupClients() { - metaClient = null; - if (null != client) { - client.close(); - client = null; - } - } - - /** - * Initializes a test data generator which used to generate test datas. - */ - protected void initTestDataGenerator() { - dataGen = new HoodieTestDataGenerator(); + } + + /** + * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by {@code getTableType()}. + */ + protected void initMetaClient() throws IOException { + if (basePath == null) { + throw new IllegalStateException("The base path has not been initialized."); } - /** - * Cleanups test data generator. - */ - protected void cleanupTestDataGenerator() { - dataGen = null; + if (jsc == null) { + throw new IllegalStateException("The Spark context has not been initialized."); } - /** - * Initializes a distributed file system and base directory. - */ - protected void initDFS() throws IOException { - FileSystem.closeAll(); - hdfsTestService = new HdfsTestService(); - dfsCluster = hdfsTestService.start(true); - - // Create a temp folder as the base path - dfs = dfsCluster.getFileSystem(); - dfsBasePath = dfs.getWorkingDirectory().toString(); - dfs.mkdirs(new Path(dfsBasePath)); + metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType()); + } + + /** + * Cleanups table type. + */ + protected void cleanupClients() { + metaClient = null; + if (null != client) { + client.close(); + client = null; } - - /** - * Cleanups the distributed file system. - */ - protected void cleanupDFS() throws IOException { - if (hdfsTestService != null) { - hdfsTestService.stop(); - dfsCluster.shutdown(); - hdfsTestService = null; - dfsCluster = null; - dfs = null; - } - // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the - // same JVM - FileSystem.closeAll(); + } + + /** + * Initializes a test data generator which used to generate test datas. + */ + protected void initTestDataGenerator() { + dataGen = new HoodieTestDataGenerator(); + } + + /** + * Cleanups test data generator. + */ + protected void cleanupTestDataGenerator() { + dataGen = null; + } + + /** + * Initializes a distributed file system and base directory. + */ + protected void initDFS() throws IOException { + FileSystem.closeAll(); + hdfsTestService = new HdfsTestService(); + dfsCluster = hdfsTestService.start(true); + + // Create a temp folder as the base path + dfs = dfsCluster.getFileSystem(); + dfsBasePath = dfs.getWorkingDirectory().toString(); + dfs.mkdirs(new Path(dfsBasePath)); + } + + /** + * Cleanups the distributed file system. + */ + protected void cleanupDFS() throws IOException { + if (hdfsTestService != null) { + hdfsTestService.stop(); + dfsCluster.shutdown(); + hdfsTestService = null; + dfsCluster = null; + dfs = null; } - - /** - * Initializes executor service with a fixed thread pool. - * - * @param threadNum specify the capacity of the fixed thread pool - */ - protected void initExecutorServiceWithFixedThreadPool(int threadNum) { - executorService = Executors.newFixedThreadPool(threadNum); + // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the + // same JVM + FileSystem.closeAll(); + } + + /** + * Initializes executor service with a fixed thread pool. + * + * @param threadNum specify the capacity of the fixed thread pool + */ + protected void initExecutorServiceWithFixedThreadPool(int threadNum) { + executorService = Executors.newFixedThreadPool(threadNum); + } + + /** + * Cleanups the executor service. + */ + protected void cleanupExecutorService() { + if (this.executorService != null) { + this.executorService.shutdownNow(); + this.executorService = null; } + } - /** - * Cleanups the executor service. - */ - protected void cleanupExecutorService() { - if (this.executorService != null) { - this.executorService.shutdownNow(); - this.executorService = null; - } + private void initFileSystemWithConfiguration(Configuration configuration) { + if (basePath == null) { + throw new IllegalStateException("The base path has not been initialized."); } - private void initFileSystemWithConfiguration(Configuration configuration) { - if (basePath == null) { - throw new IllegalStateException("The base path has not been initialized."); - } - - fs = FSUtils.getFs(basePath, configuration); - if (fs instanceof LocalFileSystem) { - LocalFileSystem lfs = (LocalFileSystem) fs; - // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream - // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open - // So, for the tests, we enforce checksum verification to circumvent the problem - lfs.setVerifyChecksum(true); - } + fs = FSUtils.getFs(basePath, configuration); + if (fs instanceof LocalFileSystem) { + LocalFileSystem lfs = (LocalFileSystem) fs; + // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream + // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open + // So, for the tests, we enforce checksum verification to circumvent the problem + lfs.setVerifyChecksum(true); } + } - public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) { - return getHoodieWriteClient(cfg, false); - } + public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) { + return getHoodieWriteClient(cfg, false); + } - public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) { - return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, null)); - } + public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) { + return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, null)); + } - public HoodieReadClient getHoodieReadClient(String basePath) { - return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc())); - } + public HoodieReadClient getHoodieReadClient(String basePath) { + return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc())); + } - public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit, - HoodieIndex index) { - if (null != client) { - client.close(); - client = null; - } - client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index); - return client; + public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit, + HoodieIndex index) { + if (null != client) { + client.close(); + client = null; } + client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index); + return client; + } } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java index 9f3eaea..fdc8b27 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java @@ -655,6 +655,8 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024) .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) .withEmbeddedTimelineServerEnabled(true) + .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() + .withEnableBackupForRemoteFileSystemView(false).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024).build()).forTable("test-trip-table") .build(); } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java index 09d62a7..86a2e1f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java @@ -18,9 +18,9 @@ package org.apache.hudi.table.compact; -import org.apache.hudi.common.HoodieClientTestHarness; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.HoodieClientTestHarness; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieRecord; @@ -29,9 +29,9 @@ import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; +import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -129,7 +129,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { // insert 100 records HoodieWriteConfig config = getConfig(); try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) { - String newCommitTime = "100"; + String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java index d77392f..b2a7f83 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java @@ -385,7 +385,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { // filterCompletedAndCompactionInstants // This cannot be done using checkFilter as it involves both states and actions final HoodieTimeline t1 = timeline.filterCompletedAndCompactionInstants(); - final Set<State> states = Sets.newHashSet(State.REQUESTED, State.COMPLETED); + final Set<State> states = Sets.newHashSet(State.COMPLETED); final Set<String> actions = Collections.singleton(HoodieTimeline.COMPACTION_ACTION); sup.get().filter(i -> states.contains(i.getState()) || actions.contains(i.getAction())) .forEach(i -> assertTrue(t1.containsInstant(i)));