This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 6dcd0a3524fe7be0bbbd3e673ed7e1d4b035e0cb Author: Balaji Varadarajan <varad...@uber.com> AuthorDate: Tue Jun 2 01:49:37 2020 -0700 [HUDI-988] Fix Unit Test Flakiness : Ensure all instantiations of HoodieWriteClient is closed properly. Fix bug in TestRollbacks. Make CLI unit tests for Hudi CLI check skip redering strings --- .../apache/hudi/cli/HoodieTableHeaderFields.java | 16 + .../org/apache/hudi/cli/commands/StatsCommand.java | 4 +- .../cli/commands/AbstractShellIntegrationTest.java | 2 +- .../hudi/cli/commands/TestRepairsCommand.java | 206 ----- .../org/apache/hudi/client/HoodieWriteClient.java | 2 +- .../apache/hudi/client/TestHoodieClientBase.java | 938 ++++++++++----------- .../java/org/apache/hudi/client/TestMultiFS.java | 4 - .../hudi/client/TestUpdateSchemaEvolution.java | 4 +- .../hudi/common/HoodieClientTestHarness.java | 426 +++++----- .../hudi/index/TestHBaseQPSResourceAllocator.java | 2 +- .../java/org/apache/hudi/index/TestHbaseIndex.java | 17 +- .../org/apache/hudi/index/TestHoodieIndex.java | 2 +- .../hudi/index/bloom/TestHoodieBloomIndex.java | 2 +- .../index/bloom/TestHoodieGlobalBloomIndex.java | 2 +- .../org/apache/hudi/io/TestHoodieMergeHandle.java | 12 +- .../apache/hudi/table/TestCopyOnWriteTable.java | 5 +- .../apache/hudi/table/TestMergeOnReadTable.java | 38 +- .../hudi/table/compact/TestHoodieCompactor.java | 12 +- pom.xml | 1 + 19 files changed, 745 insertions(+), 950 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java index 2e3bc01..708ae29 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java @@ -33,4 +33,20 @@ public class HoodieTableHeaderFields { public static final String HEADER_HOODIE_PROPERTY = "Property"; public static final String HEADER_OLD_VALUE = "Old Value"; public static final String HEADER_NEW_VALUE = "New Value"; + + /** + * Fields of Stats. + */ + public static final String HEADER_COMMIT_TIME = "CommitTime"; + public static final String HEADER_TOTAL_UPSERTED = "Total Upserted"; + public static final String HEADER_TOTAL_WRITTEN = "Total Written"; + public static final String HEADER_WRITE_AMPLIFICATION_FACTOR = "Write Amplification Factor"; + public static final String HEADER_HISTOGRAM_MIN = "Min"; + public static final String HEADER_HISTOGRAM_10TH = "10th"; + public static final String HEADER_HISTOGRAM_50TH = "50th"; + public static final String HEADER_HISTOGRAM_AVG = "avg"; + public static final String HEADER_HISTOGRAM_95TH = "95th"; + public static final String HEADER_HISTOGRAM_MAX = "Max"; + public static final String HEADER_HISTOGRAM_NUM_FILES = "NumFiles"; + public static final String HEADER_HISTOGRAM_STD_DEV = "StdDev"; } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java index b05aee2..4874777 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java @@ -54,7 +54,7 @@ import java.util.stream.Collectors; @Component public class StatsCommand implements CommandMarker { - private static final int MAX_FILES = 1000000; + public static final int MAX_FILES = 1000000; @CliCommand(value = "stats wa", help = "Write Amplification. Ratio of how many records were upserted to how many " + "records were actually written") @@ -97,7 +97,7 @@ public class StatsCommand implements CommandMarker { return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows); } - private Comparable[] printFileSizeHistogram(String commitTime, Snapshot s) { + public Comparable[] printFileSizeHistogram(String commitTime, Snapshot s) { return new Comparable[] {commitTime, s.getMin(), s.getValue(0.1), s.getMedian(), s.getMean(), s.get95thPercentile(), s.getMax(), s.size(), s.getStdDev()}; } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java index ad81af5..d9f1688 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java @@ -58,4 +58,4 @@ public abstract class AbstractShellIntegrationTest extends HoodieClientTestHarne protected static JLineShellComponent getShell() { return shell; } -} \ No newline at end of file +} diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java deleted file mode 100644 index 9e78ac7..0000000 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.cli.commands; - -import org.apache.hudi.cli.HoodieCLI; -import org.apache.hudi.cli.HoodiePrintHelper; -import org.apache.hudi.cli.HoodieTableHeaderFields; -import org.apache.hudi.common.HoodieTestDataGenerator; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.TimelineLayoutVersion; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.util.FSUtils; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.junit.Before; -import org.junit.Test; -import org.springframework.shell.core.CommandResult; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Collectors; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - - -/** - * Test class for {@link RepairsCommand}. - */ -public class TestRepairsCommand extends AbstractShellIntegrationTest { - - private String tablePath; - - @Before - public void init() throws IOException { - String tableName = "test_table"; - tablePath = basePath + File.separator + tableName; - - // Create table and connect - new TableCommand().createTable( - tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(), - "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); - } - - /** - * Test case for dry run 'repair addpartitionmeta'. - */ - @Test - public void testAddPartitionMetaWithDryRun() throws IOException { - // create commit instant - Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit")); - - // create partition path - String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; - String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; - String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; - assertTrue(fs.mkdirs(new Path(partition1))); - assertTrue(fs.mkdirs(new Path(partition2))); - assertTrue(fs.mkdirs(new Path(partition3))); - - // default is dry run. - CommandResult cr = getShell().executeCommand("repair addpartitionmeta"); - assertTrue(cr.isSuccess()); - - // expected all 'No'. - String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath) - .stream() - .map(partition -> new String[] {partition, "No", "None"}) - .toArray(String[][]::new); - String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, - HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows); - - assertEquals(expected, cr.getResult().toString()); - } - - /** - * Test case for real run 'repair addpartitionmeta'. - */ - @Test - public void testAddPartitionMetaWithRealRun() throws IOException { - // create commit instant - Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit")); - - // create partition path - String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; - String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; - String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; - assertTrue(fs.mkdirs(new Path(partition1))); - assertTrue(fs.mkdirs(new Path(partition2))); - assertTrue(fs.mkdirs(new Path(partition3))); - - CommandResult cr = getShell().executeCommand("repair addpartitionmeta --dryrun false"); - assertTrue(cr.isSuccess()); - - List<String> paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath); - // after dry run, the action will be 'Repaired' - String[][] rows = paths.stream() - .map(partition -> new String[] {partition, "No", "Repaired"}) - .toArray(String[][]::new); - String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, - HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows); - - assertEquals(expected, cr.getResult().toString()); - - cr = getShell().executeCommand("repair addpartitionmeta"); - - // after real run, Metadata is present now. - rows = paths.stream() - .map(partition -> new String[] {partition, "Yes", "None"}) - .toArray(String[][]::new); - expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, - HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows); - assertEquals(expected, cr.getResult().toString()); - } - - /** - * Test case for 'repair overwrite-hoodie-props'. - */ - @Test - public void testOverwriteHoodieProperties() throws IOException { - URL newProps = this.getClass().getClassLoader().getResource("table-config.properties"); - assertNotNull("New property file must exist", newProps); - - CommandResult cr = getShell().executeCommand("repair overwrite-hoodie-props --new-props-file " + newProps.getPath()); - assertTrue(cr.isSuccess()); - - Map<String, String> oldProps = HoodieCLI.getTableMetaClient().getTableConfig().getProps(); - - // after overwrite, the stored value in .hoodie is equals to which read from properties. - Map<String, String> result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps(); - Properties expectProps = new Properties(); - expectProps.load(new FileInputStream(new File(newProps.getPath()))); - - Map<String, String> expected = expectProps.entrySet().stream() - .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))); - assertEquals(expected, result); - - // check result - List<String> allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type", - "hoodie.archivelog.folder", "hoodie.timeline.layout.version"); - String[][] rows = allPropsStr.stream().sorted().map(key -> new String[] {key, - oldProps.getOrDefault(key, null), result.getOrDefault(key, null)}) - .toArray(String[][]::new); - String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY, - HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows); - - assertEquals(expect, cr.getResult().toString()); - } - - /** - * Test case for 'repair corrupted clean files'. - */ - @Test - public void testRemoveCorruptedPendingCleanAction() throws IOException { - HoodieCLI.conf = jsc.hadoopConfiguration(); - - Configuration conf = HoodieCLI.conf; - - metaClient = HoodieCLI.getTableMetaClient(); - - // Create four requested files - for (int i = 100; i < 104; i++) { - String timestamp = String.valueOf(i); - // Write corrupted requested Compaction - HoodieTestCommitMetadataGenerator.createEmptyCleanRequestedFile(tablePath, timestamp, conf); - } - - // reload meta client - metaClient = HoodieTableMetaClient.reload(metaClient); - // first, there are four instants - assertEquals(4, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count()); - - CommandResult cr = getShell().executeCommand("repair corrupted clean files"); - assertTrue(cr.isSuccess()); - - // reload meta client - metaClient = HoodieTableMetaClient.reload(metaClient); - assertEquals(0, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count()); - } -} diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index 37dfe3d..f5f6233 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -120,7 +120,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo this(jsc, clientConfig, rollbackPending, HoodieIndex.createIndex(clientConfig, jsc)); } - HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) { + public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) { this(jsc, clientConfig, rollbackPending, index, Option.empty()); } diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java index 5f47bf5..6e6458b 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java @@ -18,8 +18,8 @@ package org.apache.hudi.client; -import org.apache.hudi.common.HoodieClientTestHarness; import org.apache.hudi.common.HoodieCleanStat; +import org.apache.hudi.common.HoodieClientTestHarness; import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus; @@ -50,7 +50,6 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.sql.SQLContext; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -73,496 +72,477 @@ import static org.junit.Assert.assertTrue; */ public class TestHoodieClientBase extends HoodieClientTestHarness { - private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class); - - @Before - public void setUp() throws Exception { - initResources(); - } - - @After - public void tearDown() throws Exception { - cleanupResources(); - } - - protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) { - return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName())); - } - - protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) { - return getHoodieWriteClient(cfg, false); - } - - protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) { - return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, jsc)); - } - - protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit, - HoodieIndex index) { - return new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index); - } - - protected HoodieReadClient getHoodieReadClient(String basePath) { - return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc())); - } - - /** - * Get Default HoodieWriteConfig for tests. - * - * @return Default Hoodie Write Config for tests - */ - protected HoodieWriteConfig getConfig() { - return getConfigBuilder().build(); - } - - protected HoodieWriteConfig getConfig(IndexType indexType) { - return getConfigBuilder(indexType).build(); - } - - /** - * Get Config builder with default configs set. - * - * @return Config Builder - */ - protected HoodieWriteConfig.Builder getConfigBuilder() { - return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); - } - - /** - * Get Config builder with default configs set. - * - * @return Config Builder - */ - HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) { - return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType); - } - - HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) { - return getConfigBuilder(schemaStr, IndexType.BLOOM); - } - - /** - * Get Config builder with default configs set. - * - * @return Config Builder - */ - HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) { - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) - .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2) - .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) - .withWriteStatusClass(MetadataMergeWriteStatus.class) - .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) - .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) - .forTable("test-trip-table") - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) - .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() - .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); - } - - protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) { - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - ((SyncableFileSystemView) (table.getSliceView())).reset(); - return table; - } - - /** - * Assert no failures in writing hoodie files. - * - * @param statuses List of Write Status - */ - public static void assertNoWriteErrors(List<WriteStatus> statuses) { - // Verify there are no errors - for (WriteStatus status : statuses) { - assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors()); + private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class); + + @Before + public void setUp() throws Exception { + initResources(); + } + + @After + public void tearDown() throws Exception { + cleanupResources(); + } + + protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) { + return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName())); + } + + /** + * Get Default HoodieWriteConfig for tests. + * + * @return Default Hoodie Write Config for tests + */ + protected HoodieWriteConfig getConfig() { + return getConfigBuilder().build(); } - } - - void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException { - Set<String> partitionPathSet = inputRecords.stream() - .map(HoodieRecord::getPartitionPath) - .collect(Collectors.toSet()); - assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs); - } - - void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException { - Set<String> partitionPathSet = inputKeys.stream() - .map(HoodieKey::getPartitionPath) - .collect(Collectors.toSet()); - assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs); - } - - /** - * Ensure presence of partition meta-data at known depth. - * - * @param partitionPaths Partition paths to check - * @param fs File System - * @throws IOException in case of error - */ - void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException { - for (String partitionPath : partitionPaths) { - assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath))); - HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath)); - pmeta.readFromFS(); - Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth()); + + protected HoodieWriteConfig getConfig(IndexType indexType) { + return getConfigBuilder(indexType).build(); + } + + /** + * Get Config builder with default configs set. + * + * @return Config Builder + */ + protected HoodieWriteConfig.Builder getConfigBuilder() { + return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); } - } - - /** - * Ensure records have location field set. - * - * @param taggedRecords Tagged Records - * @param commitTime Commit Timestamp - */ - protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) { - for (HoodieRecord rec : taggedRecords) { - assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown()); - assertEquals("All records should have commit time " + commitTime + ", since updates were made", - rec.getCurrentLocation().getInstantTime(), commitTime); + + /** + * Get Config builder with default configs set. + * + * @return Config Builder + */ + HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) { + return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType); } - } - - /** - * Assert that there is no duplicate key at the partition level. - * - * @param records List of Hoodie records - */ - void assertNodupesWithinPartition(List<HoodieRecord> records) { - Map<String, Set<String>> partitionToKeys = new HashMap<>(); - for (HoodieRecord r : records) { - String key = r.getRecordKey(); - String partitionPath = r.getPartitionPath(); - if (!partitionToKeys.containsKey(partitionPath)) { - partitionToKeys.put(partitionPath, new HashSet<>()); - } - assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key)); - partitionToKeys.get(partitionPath).add(key); + + HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) { + return getConfigBuilder(schemaStr, IndexType.BLOOM); } - } - - /** - * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records - * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is - * guaranteed by record-generation function itself. - * - * @param writeConfig Hoodie Write Config - * @param recordGenFunction Records Generation function - * @return Wrapped function - */ - private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls( - final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) { - return (commit, numRecords) -> { - final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc); - List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords); - final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc); - JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table); - return taggedRecords.collect(); - }; - } - - /** - * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys - * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is - * guaranteed by key-generation function itself. - * - * @param writeConfig Hoodie Write Config - * @param keyGenFunction Keys Generation function - * @return Wrapped function - */ - private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls( - final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) { - return (numRecords) -> { - final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc); - List<HoodieKey> records = keyGenFunction.apply(numRecords); - final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc); - JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1) - .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload())); - JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table); - return taggedRecords.map(record -> record.getKey()).collect(); - }; - } - - /** - * Generate wrapper for record generation function for testing Prepped APIs. - * - * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs - * @param writeConfig Hoodie Write Config - * @param wrapped Actual Records Generation function - * @return Wrapped Function - */ - protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI, - HoodieWriteConfig writeConfig, - Function2<List<HoodieRecord>, String, Integer> wrapped) { - if (isPreppedAPI) { - return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped); - } else { - return wrapped; + + /** + * Get Config builder with default configs set. + * + * @return Config Builder + */ + HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) + .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2) + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .withWriteStatusClass(MetadataMergeWriteStatus.class) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) + .forTable("test-trip-table") + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) + .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); } - } - - /** - * Generate wrapper for delete key generation function for testing Prepped APIs. - * - * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs - * @param writeConfig Hoodie Write Config - * @param wrapped Actual Records Generation function - * @return Wrapped Function - */ - Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI, - HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) { - if (isPreppedAPI) { - return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped); - } else { - return wrapped; + + protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) { + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + ((SyncableFileSystemView) (table.getSliceView())).reset(); + return table; } - } - - /** - * Helper to insert first batch of records and do regular assertions on the state after successful completion. - * - * @param writeConfig Hoodie Write Config - * @param client Hoodie Write Client - * @param newCommitTime New Commit Timestamp to be used - * @param initCommitTime Begin Timestamp (usually "000") - * @param numRecordsInThisCommit Number of records to be added in the new commit - * @param writeFn Write Function to be used for insertion - * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records - * @param assertForCommit Enable Assertion of Writes - * @param expRecordsInThisCommit Expected number of records in this commit - * @return RDD of write-status - * @throws Exception in case of error - */ - JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, - String initCommitTime, int numRecordsInThisCommit, - Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI, - boolean assertForCommit, int expRecordsInThisCommit) throws Exception { - final Function2<List<HoodieRecord>, String, Integer> recordGenFunction = - generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts); - - return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit, - recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1); - } - - /** - * Helper to upsert batch of records and do regular assertions on the state after successful completion. - * - * @param writeConfig Hoodie Write Config - * @param client Hoodie Write Client - * @param newCommitTime New Commit Timestamp to be used - * @param prevCommitTime Commit Timestamp used in previous commit - * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime - * @param initCommitTime Begin Timestamp (usually "000") - * @param numRecordsInThisCommit Number of records to be added in the new commit - * @param writeFn Write Function to be used for upsert - * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records - * @param assertForCommit Enable Assertion of Writes - * @param expRecordsInThisCommit Expected number of records in this commit - * @param expTotalRecords Expected number of records when scanned - * @param expTotalCommits Expected number of commits (including this commit) - * @return RDD of write-status - * @throws Exception in case of error - */ - JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, - String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, - int numRecordsInThisCommit, - Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI, - boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception { - final Function2<List<HoodieRecord>, String, Integer> recordGenFunction = - generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates); - - return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime, - numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, - expTotalCommits); - } - - /** - * Helper to delete batch of keys and do regular assertions on the state after successful completion. - * - * @param writeConfig Hoodie Write Config - * @param client Hoodie Write Client - * @param newCommitTime New Commit Timestamp to be used - * @param prevCommitTime Commit Timestamp used in previous commit - * @param initCommitTime Begin Timestamp (usually "000") - * @param numRecordsInThisCommit Number of records to be added in the new commit - * @param deleteFn Delete Function to be used for deletes - * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records - * @param assertForCommit Enable Assertion of Writes - * @param expRecordsInThisCommit Expected number of records in this commit - * @param expTotalRecords Expected number of records when scanned - * @return RDD of write-status - * @throws Exception in case of error - */ - JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, - String prevCommitTime, String initCommitTime, - int numRecordsInThisCommit, - Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI, - boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception { - final Function<Integer, List<HoodieKey>> keyGenFunction = - generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes); - - return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit, - keyGenFunction, - deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords); - } - - /** - * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion. - * - * @param client Hoodie Write Client - * @param newCommitTime New Commit Timestamp to be used - * @param prevCommitTime Commit Timestamp used in previous commit - * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime - * @param initCommitTime Begin Timestamp (usually "000") - * @param numRecordsInThisCommit Number of records to be added in the new commit - * @param recordGenFunction Records Generation Function - * @param writeFn Write Function to be used for upsert - * @param assertForCommit Enable Assertion of Writes - * @param expRecordsInThisCommit Expected number of records in this commit - * @param expTotalRecords Expected number of records when scanned - * @param expTotalCommits Expected number of commits (including this commit) - * @throws Exception in case of error - */ - JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime, - Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit, - Function2<List<HoodieRecord>, String, Integer> recordGenFunction, - Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, - boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception { - - // Write 1 (only inserts) - client.startCommitWithTime(newCommitTime); - - List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit); - JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); - - JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime); - List<WriteStatus> statuses = result.collect(); - assertNoWriteErrors(statuses); - - // check the partition metadata is written out - assertPartitionMetadataForRecords(records, fs); - - // verify that there is a commit - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); - - if (assertForCommit) { - assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits, - timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants()); - Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime, - timeline.lastInstant().get().getTimestamp()); - assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit, - HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count()); - - // Check the entire dataset has all records still - String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; - for (int i = 0; i < fullPartitionPaths.length; i++) { - fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); - } - assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords, - HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); - - // Check that the incremental consumption from prevCommitTime - assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit", - HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), - HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count()); - if (commitTimesBetweenPrevAndNew.isPresent()) { - commitTimesBetweenPrevAndNew.get().forEach(ct -> { - assertEquals("Incremental consumption from " + ct + " should give all records in latest commit", - HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), - HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count()); - }); - } + + /** + * Assert no failures in writing hoodie files. + * + * @param statuses List of Write Status + */ + public static void assertNoWriteErrors(List<WriteStatus> statuses) { + // Verify there are no errors + for (WriteStatus status : statuses) { + assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors()); + } } - return result; - } - - /** - * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion. - * - * @param client Hoodie Write Client - * @param newCommitTime New Commit Timestamp to be used - * @param prevCommitTime Commit Timestamp used in previous commit - * @param initCommitTime Begin Timestamp (usually "000") - * @param keyGenFunction Key Generation function - * @param deleteFn Write Function to be used for delete - * @param assertForCommit Enable Assertion of Writes - * @param expRecordsInThisCommit Expected number of records in this commit - * @param expTotalRecords Expected number of records when scanned - * @throws Exception in case of error - */ - JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime, - String initCommitTime, int numRecordsInThisCommit, - Function<Integer, List<HoodieKey>> keyGenFunction, - Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, - boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception { - - // Delete 1 (only deletes) - client.startCommitWithTime(newCommitTime); - - List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit); - JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1); - - JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime); - List<WriteStatus> statuses = result.collect(); - assertNoWriteErrors(statuses); - - // check the partition metadata is written out - assertPartitionMetadataForKeys(keysToDelete, fs); - - // verify that there is a commit - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); - - if (assertForCommit) { - assertEquals("Expecting 3 commits.", 3, - timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants()); - Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime, - timeline.lastInstant().get().getTimestamp()); - assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit, - HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count()); - - // Check the entire dataset has all records still - String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; - for (int i = 0; i < fullPartitionPaths.length; i++) { - fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); - } - assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords, - HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); - - // Check that the incremental consumption from prevCommitTime - assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit," - + " since it is a delete operation", - HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), - HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count()); + + void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException { + Set<String> partitionPathSet = inputRecords.stream() + .map(HoodieRecord::getPartitionPath) + .collect(Collectors.toSet()); + assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs); } - return result; - } - /** - * Get Cleaner state corresponding to a partition path. - * - * @param hoodieCleanStatsTwo List of Clean Stats - * @param partitionPath Partition path for filtering - * @return Cleaner state corresponding to partition path - */ - protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) { - return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null); - } + void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException { + Set<String> partitionPathSet = inputKeys.stream() + .map(HoodieKey::getPartitionPath) + .collect(Collectors.toSet()); + assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs); + } - // Functional Interfaces for passing lambda and Hoodie Write API contexts + /** + * Ensure presence of partition meta-data at known depth. + * + * @param partitionPaths Partition paths to check + * @param fs File System + * @throws IOException in case of error + */ + void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException { + for (String partitionPath : partitionPaths) { + assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath))); + HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath)); + pmeta.readFromFS(); + Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth()); + } + } - @FunctionalInterface - public interface Function2<R, T1, T2> { + /** + * Ensure records have location field set. + * + * @param taggedRecords Tagged Records + * @param commitTime Commit Timestamp + */ + protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) { + for (HoodieRecord rec : taggedRecords) { + assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown()); + assertEquals("All records should have commit time " + commitTime + ", since updates were made", + rec.getCurrentLocation().getInstantTime(), commitTime); + } + } - R apply(T1 v1, T2 v2) throws IOException; - } + /** + * Assert that there is no duplicate key at the partition level. + * + * @param records List of Hoodie records + */ + void assertNodupesWithinPartition(List<HoodieRecord> records) { + Map<String, Set<String>> partitionToKeys = new HashMap<>(); + for (HoodieRecord r : records) { + String key = r.getRecordKey(); + String partitionPath = r.getPartitionPath(); + if (!partitionToKeys.containsKey(partitionPath)) { + partitionToKeys.put(partitionPath, new HashSet<>()); + } + assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key)); + partitionToKeys.get(partitionPath).add(key); + } + } - @FunctionalInterface - public interface Function3<R, T1, T2, T3> { + /** + * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records to be already de-duped and have location set. This wrapper takes care of + * record-location setting. Uniqueness is guaranteed by record-generation function itself. + * + * @param writeConfig Hoodie Write Config + * @param recordGenFunction Records Generation function + * @return Wrapped function + */ + private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls( + final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) { + return (commit, numRecords) -> { + final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc); + List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords); + final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc); + JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table); + return taggedRecords.collect(); + }; + } - R apply(T1 v1, T2 v2, T3 v3) throws IOException; - } + /** + * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys to be already de-duped and have location set. This wrapper takes care of + * record-location setting. Uniqueness is guaranteed by key-generation function itself. + * + * @param writeConfig Hoodie Write Config + * @param keyGenFunction Keys Generation function + * @return Wrapped function + */ + private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls( + final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) { + return (numRecords) -> { + final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc); + List<HoodieKey> records = keyGenFunction.apply(numRecords); + final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc); + JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1) + .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload())); + JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table); + return taggedRecords.map(record -> record.getKey()).collect(); + }; + } + + /** + * Generate wrapper for record generation function for testing Prepped APIs. + * + * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs + * @param writeConfig Hoodie Write Config + * @param wrapped Actual Records Generation function + * @return Wrapped Function + */ + protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI, + HoodieWriteConfig writeConfig, + Function2<List<HoodieRecord>, String, Integer> wrapped) { + if (isPreppedAPI) { + return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped); + } else { + return wrapped; + } + } + + /** + * Generate wrapper for delete key generation function for testing Prepped APIs. + * + * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs + * @param writeConfig Hoodie Write Config + * @param wrapped Actual Records Generation function + * @return Wrapped Function + */ + Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI, + HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) { + if (isPreppedAPI) { + return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped); + } else { + return wrapped; + } + } + + /** + * Helper to insert first batch of records and do regular assertions on the state after successful completion. + * + * @param writeConfig Hoodie Write Config + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param initCommitTime Begin Timestamp (usually "000") + * @param numRecordsInThisCommit Number of records to be added in the new commit + * @param writeFn Write Function to be used for insertion + * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records + * @param assertForCommit Enable Assertion of Writes + * @param expRecordsInThisCommit Expected number of records in this commit + * @return RDD of write-status + * @throws Exception in case of error + */ + JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, + String initCommitTime, int numRecordsInThisCommit, + Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI, + boolean assertForCommit, int expRecordsInThisCommit) throws Exception { + final Function2<List<HoodieRecord>, String, Integer> recordGenFunction = + generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts); + + return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit, + recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1); + } + + /** + * Helper to upsert batch of records and do regular assertions on the state after successful completion. + * + * @param writeConfig Hoodie Write Config + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param prevCommitTime Commit Timestamp used in previous commit + * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime + * @param initCommitTime Begin Timestamp (usually "000") + * @param numRecordsInThisCommit Number of records to be added in the new commit + * @param writeFn Write Function to be used for upsert + * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records + * @param assertForCommit Enable Assertion of Writes + * @param expRecordsInThisCommit Expected number of records in this commit + * @param expTotalRecords Expected number of records when scanned + * @param expTotalCommits Expected number of commits (including this commit) + * @return RDD of write-status + * @throws Exception in case of error + */ + JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, + String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, + int numRecordsInThisCommit, + Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception { + final Function2<List<HoodieRecord>, String, Integer> recordGenFunction = + generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates); + + return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime, + numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, + expTotalCommits); + } + + /** + * Helper to delete batch of keys and do regular assertions on the state after successful completion. + * + * @param writeConfig Hoodie Write Config + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param prevCommitTime Commit Timestamp used in previous commit + * @param initCommitTime Begin Timestamp (usually "000") + * @param numRecordsInThisCommit Number of records to be added in the new commit + * @param deleteFn Delete Function to be used for deletes + * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records + * @param assertForCommit Enable Assertion of Writes + * @param expRecordsInThisCommit Expected number of records in this commit + * @param expTotalRecords Expected number of records when scanned + * @return RDD of write-status + * @throws Exception in case of error + */ + JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, + String prevCommitTime, String initCommitTime, + int numRecordsInThisCommit, + Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception { + final Function<Integer, List<HoodieKey>> keyGenFunction = + generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes); + + return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit, + keyGenFunction, + deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords); + } + + /** + * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion. + * + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param prevCommitTime Commit Timestamp used in previous commit + * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime + * @param initCommitTime Begin Timestamp (usually "000") + * @param numRecordsInThisCommit Number of records to be added in the new commit + * @param recordGenFunction Records Generation Function + * @param writeFn Write Function to be used for upsert + * @param assertForCommit Enable Assertion of Writes + * @param expRecordsInThisCommit Expected number of records in this commit + * @param expTotalRecords Expected number of records when scanned + * @param expTotalCommits Expected number of commits (including this commit) + * @throws Exception in case of error + */ + JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime, + Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit, + Function2<List<HoodieRecord>, String, Integer> recordGenFunction, + Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception { + + // Write 1 (only inserts) + client.startCommitWithTime(newCommitTime); + + List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit); + JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); + + JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime); + List<WriteStatus> statuses = result.collect(); + assertNoWriteErrors(statuses); + + // check the partition metadata is written out + assertPartitionMetadataForRecords(records, fs); + + // verify that there is a commit + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + + if (assertForCommit) { + assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits, + timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants()); + Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime, + timeline.lastInstant().get().getTimestamp()); + assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit, + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count()); + + // Check the entire dataset has all records still + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords, + HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); + + // Check that the incremental consumption from prevCommitTime + assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit", + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), + HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count()); + if (commitTimesBetweenPrevAndNew.isPresent()) { + commitTimesBetweenPrevAndNew.get().forEach(ct -> { + assertEquals("Incremental consumption from " + ct + " should give all records in latest commit", + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), + HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count()); + }); + } + } + return result; + } + + /** + * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion. + * + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param prevCommitTime Commit Timestamp used in previous commit + * @param initCommitTime Begin Timestamp (usually "000") + * @param keyGenFunction Key Generation function + * @param deleteFn Write Function to be used for delete + * @param assertForCommit Enable Assertion of Writes + * @param expRecordsInThisCommit Expected number of records in this commit + * @param expTotalRecords Expected number of records when scanned + * @throws Exception in case of error + */ + JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime, + String initCommitTime, int numRecordsInThisCommit, + Function<Integer, List<HoodieKey>> keyGenFunction, + Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception { + + // Delete 1 (only deletes) + client.startCommitWithTime(newCommitTime); + + List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit); + JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1); + + JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime); + List<WriteStatus> statuses = result.collect(); + assertNoWriteErrors(statuses); + + // check the partition metadata is written out + assertPartitionMetadataForKeys(keysToDelete, fs); + + // verify that there is a commit + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + + if (assertForCommit) { + assertEquals("Expecting 3 commits.", 3, + timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants()); + Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime, + timeline.lastInstant().get().getTimestamp()); + assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit, + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count()); + + // Check the entire dataset has all records still + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords, + HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); + + // Check that the incremental consumption from prevCommitTime + assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit," + + " since it is a delete operation", + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), + HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count()); + } + return result; + } + + /** + * Get Cleaner state corresponding to a partition path. + * + * @param hoodieCleanStatsTwo List of Clean Stats + * @param partitionPath Partition path for filtering + * @return Cleaner state corresponding to partition path + */ + protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) { + return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null); + } + + // Functional Interfaces for passing lambda and Hoodie Write API contexts + + @FunctionalInterface + public interface Function2<R, T1, T2> { + + R apply(T1 v1, T2 v2) throws IOException; + } + + @FunctionalInterface + public interface Function3<R, T1, T2, T3> { + + R apply(T1 v1, T2 v2, T3 v3) throws IOException; + } } diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java index 9b70c10..8d3fa13 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java @@ -68,10 +68,6 @@ public class TestMultiFS extends HoodieClientTestHarness { cleanupTestDataGenerator(); } - private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) throws Exception { - return new HoodieWriteClient(jsc, config); - } - protected HoodieWriteConfig getHoodieWriteConfig(String basePath) { return HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName) diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java index aad8edf..ab6e940 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java @@ -20,6 +20,7 @@ package org.apache.hudi.client; import org.apache.hudi.common.HoodieClientTestHarness; import org.apache.hudi.common.TestRawTripPayload; +import java.io.IOException; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -58,8 +59,9 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness { } @After - public void tearDown() { + public void tearDown() throws IOException { cleanupSparkContexts(); + cleanupFileSystem(); } //@Test diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java index 4e5721f..e4202f0 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java @@ -17,11 +17,15 @@ package org.apache.hudi.common; +import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.TestHoodieClientBase; import org.apache.hudi.common.minicluster.HdfsTestService; import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.FSUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -45,225 +49,239 @@ import java.util.concurrent.atomic.AtomicInteger; */ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class); - - protected transient JavaSparkContext jsc = null; - protected transient SQLContext sqlContext; - protected transient FileSystem fs; - protected transient HoodieTestDataGenerator dataGen = null; - protected transient ExecutorService executorService; - protected transient HoodieTableMetaClient metaClient; - private static AtomicInteger instantGen = new AtomicInteger(1); - - public String getNextInstant() { - return String.format("%09d", instantGen.getAndIncrement()); - } - - // dfs - protected String dfsBasePath; - protected transient HdfsTestService hdfsTestService; - protected transient MiniDFSCluster dfsCluster; - protected transient DistributedFileSystem dfs; - - /** - * Initializes resource group for the subclasses of {@link TestHoodieClientBase}. - * - * @throws IOException - */ - public void initResources() throws IOException { - initPath(); - initSparkContexts(); - initTestDataGenerator(); - initFileSystem(); - initMetaClient(); - } - - /** - * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}. - * - * @throws IOException - */ - public void cleanupResources() throws IOException { - cleanupMetaClient(); - cleanupSparkContexts(); - cleanupTestDataGenerator(); - cleanupFileSystem(); - } - - /** - * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name. - * - * @param appName The specified application name. - */ - protected void initSparkContexts(String appName) { - // Initialize a local spark env - jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName)); - jsc.setLogLevel("ERROR"); - - // SQLContext stuff - sqlContext = new SQLContext(jsc); - } - - /** - * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name - * <b>TestHoodieClient</b>. - */ - protected void initSparkContexts() { - initSparkContexts("TestHoodieClient"); - } - - /** - * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}). - */ - protected void cleanupSparkContexts() { - if (sqlContext != null) { - LOG.info("Clearing sql context cache of spark-session used in previous test-case"); - sqlContext.clearCache(); - sqlContext = null; + private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class); + + protected transient JavaSparkContext jsc = null; + protected transient SQLContext sqlContext; + protected transient FileSystem fs; + protected transient HoodieTestDataGenerator dataGen = null; + protected transient ExecutorService executorService; + protected transient HoodieTableMetaClient metaClient; + private static AtomicInteger instantGen = new AtomicInteger(1); + protected transient HoodieWriteClient client; + + public String getNextInstant() { + return String.format("%09d", instantGen.getAndIncrement()); + } + + // dfs + protected String dfsBasePath; + protected transient HdfsTestService hdfsTestService; + protected transient MiniDFSCluster dfsCluster; + protected transient DistributedFileSystem dfs; + + /** + * Initializes resource group for the subclasses of {@link TestHoodieClientBase}. + */ + public void initResources() throws IOException { + initPath(); + initSparkContexts(); + initTestDataGenerator(); + initFileSystem(); + initMetaClient(); + } + + /** + * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}. + */ + public void cleanupResources() throws IOException { + cleanupClients(); + cleanupSparkContexts(); + cleanupTestDataGenerator(); + cleanupFileSystem(); + } + + /** + * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name. + * + * @param appName The specified application name. + */ + protected void initSparkContexts(String appName) { + // Initialize a local spark env + jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName)); + jsc.setLogLevel("ERROR"); + + // SQLContext stuff + sqlContext = new SQLContext(jsc); } - if (jsc != null) { - LOG.info("Closing spark context used in previous test-case"); - jsc.close(); - jsc.stop(); - jsc = null; + /** + * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name + * <b>TestHoodieClient</b>. + */ + protected void initSparkContexts() { + initSparkContexts("TestHoodieClient"); } - } - - /** - * Initializes a file system with the hadoop configuration of Spark context. - */ - protected void initFileSystem() { - if (jsc == null) { - throw new IllegalStateException("The Spark context has not been initialized."); + + /** + * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}). + */ + protected void cleanupSparkContexts() { + if (sqlContext != null) { + LOG.info("Clearing sql context cache of spark-session used in previous test-case"); + sqlContext.clearCache(); + sqlContext = null; + } + + if (jsc != null) { + LOG.info("Closing spark context used in previous test-case"); + jsc.close(); + jsc.stop(); + jsc = null; + } + } + + /** + * Initializes a file system with the hadoop configuration of Spark context. + */ + protected void initFileSystem() { + if (jsc == null) { + throw new IllegalStateException("The Spark context has not been initialized."); + } + + initFileSystemWithConfiguration(jsc.hadoopConfiguration()); } - initFileSystemWithConfiguration(jsc.hadoopConfiguration()); - } - - /** - * Initializes file system with a default empty configuration. - */ - protected void initFileSystemWithDefaultConfiguration() { - initFileSystemWithConfiguration(new Configuration()); - } - - /** - * Cleanups file system. - * - * @throws IOException - */ - protected void cleanupFileSystem() throws IOException { - if (fs != null) { - LOG.warn("Closing file-system instance used in previous test-run"); - fs.close(); + /** + * Initializes file system with a default empty configuration. + */ + protected void initFileSystemWithDefaultConfiguration() { + initFileSystemWithConfiguration(new Configuration()); } - } - - /** - * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by - * {@code getTableType()}. - * - * @throws IOException - */ - protected void initMetaClient() throws IOException { - if (basePath == null) { - throw new IllegalStateException("The base path has not been initialized."); + + /** + * Cleanups file system. + */ + protected void cleanupFileSystem() throws IOException { + if (fs != null) { + LOG.warn("Closing file-system instance used in previous test-run"); + fs.close(); + } } - if (jsc == null) { - throw new IllegalStateException("The Spark context has not been initialized."); + /** + * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by {@code getTableType()}. + */ + protected void initMetaClient() throws IOException { + if (basePath == null) { + throw new IllegalStateException("The base path has not been initialized."); + } + + if (jsc == null) { + throw new IllegalStateException("The Spark context has not been initialized."); + } + + metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType()); } - metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType()); - } - - /** - * Cleanups table type. - */ - protected void cleanupMetaClient() { - metaClient = null; - } - - /** - * Initializes a test data generator which used to generate test datas. - * - */ - protected void initTestDataGenerator() { - dataGen = new HoodieTestDataGenerator(); - } - - /** - * Cleanups test data generator. - * - */ - protected void cleanupTestDataGenerator() { - dataGen = null; - } - - /** - * Initializes a distributed file system and base directory. - * - * @throws IOException - */ - protected void initDFS() throws IOException { - FileSystem.closeAll(); - hdfsTestService = new HdfsTestService(); - dfsCluster = hdfsTestService.start(true); - - // Create a temp folder as the base path - dfs = dfsCluster.getFileSystem(); - dfsBasePath = dfs.getWorkingDirectory().toString(); - dfs.mkdirs(new Path(dfsBasePath)); - } - - /** - * Cleanups the distributed file system. - * - * @throws IOException - */ - protected void cleanupDFS() throws IOException { - if (hdfsTestService != null) { - hdfsTestService.stop(); - dfsCluster.shutdown(); + /** + * Cleanups table type. + */ + protected void cleanupClients() { + metaClient = null; + if (null != client) { + client.close(); + client = null; + } } - // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the - // same JVM - FileSystem.closeAll(); - } - - /** - * Initializes executor service with a fixed thread pool. - * - * @param threadNum specify the capacity of the fixed thread pool - */ - protected void initExecutorServiceWithFixedThreadPool(int threadNum) { - executorService = Executors.newFixedThreadPool(threadNum); - } - - /** - * Cleanups the executor service. - */ - protected void cleanupExecutorService() { - if (this.executorService != null) { - this.executorService.shutdownNow(); - this.executorService = null; + + /** + * Initializes a test data generator which used to generate test datas. + */ + protected void initTestDataGenerator() { + dataGen = new HoodieTestDataGenerator(); } - } - private void initFileSystemWithConfiguration(Configuration configuration) { - if (basePath == null) { - throw new IllegalStateException("The base path has not been initialized."); + /** + * Cleanups test data generator. + */ + protected void cleanupTestDataGenerator() { + dataGen = null; } - fs = FSUtils.getFs(basePath, configuration); - if (fs instanceof LocalFileSystem) { - LocalFileSystem lfs = (LocalFileSystem) fs; - // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream - // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open - // So, for the tests, we enforce checksum verification to circumvent the problem - lfs.setVerifyChecksum(true); + /** + * Initializes a distributed file system and base directory. + */ + protected void initDFS() throws IOException { + FileSystem.closeAll(); + hdfsTestService = new HdfsTestService(); + dfsCluster = hdfsTestService.start(true); + + // Create a temp folder as the base path + dfs = dfsCluster.getFileSystem(); + dfsBasePath = dfs.getWorkingDirectory().toString(); + dfs.mkdirs(new Path(dfsBasePath)); } - } + /** + * Cleanups the distributed file system. + */ + protected void cleanupDFS() throws IOException { + if (hdfsTestService != null) { + hdfsTestService.stop(); + dfsCluster.shutdown(); + hdfsTestService = null; + dfsCluster = null; + dfs = null; + } + // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the + // same JVM + FileSystem.closeAll(); + } + + /** + * Initializes executor service with a fixed thread pool. + * + * @param threadNum specify the capacity of the fixed thread pool + */ + protected void initExecutorServiceWithFixedThreadPool(int threadNum) { + executorService = Executors.newFixedThreadPool(threadNum); + } + + /** + * Cleanups the executor service. + */ + protected void cleanupExecutorService() { + if (this.executorService != null) { + this.executorService.shutdownNow(); + this.executorService = null; + } + } + + private void initFileSystemWithConfiguration(Configuration configuration) { + if (basePath == null) { + throw new IllegalStateException("The base path has not been initialized."); + } + + fs = FSUtils.getFs(basePath, configuration); + if (fs instanceof LocalFileSystem) { + LocalFileSystem lfs = (LocalFileSystem) fs; + // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream + // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open + // So, for the tests, we enforce checksum verification to circumvent the problem + lfs.setVerifyChecksum(true); + } + } + + public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) { + return getHoodieWriteClient(cfg, false); + } + + public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) { + return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, null)); + } + + public HoodieReadClient getHoodieReadClient(String basePath) { + return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc())); + } + + public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit, + HoodieIndex index) { + if (null != client) { + client.close(); + client = null; + } + client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index); + return client; + } } diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java index 05638e2..6ddb578 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java @@ -60,7 +60,7 @@ public class TestHBaseQPSResourceAllocator extends HoodieClientTestHarness { @After public void tearDown() throws Exception { cleanupSparkContexts(); - cleanupMetaClient(); + cleanupClients(); if (utility != null) { utility.shutdownMiniCluster(); } diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java index 2893947..43f2fd1 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java @@ -86,6 +86,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { @AfterClass public static void clean() throws Exception { if (utility != null) { + utility.deleteTable(tableName); utility.shutdownMiniCluster(); } } @@ -115,11 +116,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { public void tearDown() throws Exception { cleanupSparkContexts(); cleanupTestDataGenerator(); - cleanupMetaClient(); - } - - private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception { - return new HoodieWriteClient(jsc, config); + cleanupClients(); } @Test @@ -132,7 +129,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { // Load to memory HoodieWriteConfig config = getConfig(); HBaseIndex index = new HBaseIndex(config); - try (HoodieWriteClient writeClient = getWriteClient(config);) { + try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); @@ -172,7 +169,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { // Load to memory HoodieWriteConfig config = getConfig(); HBaseIndex index = new HBaseIndex(config); - HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieWriteClient writeClient = getHoodieWriteClient(config); writeClient.startCommitWithTime(newCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); @@ -206,7 +203,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { // Load to memory HoodieWriteConfig config = getConfig(); HBaseIndex index = new HBaseIndex(config); - HoodieWriteClient writeClient = getWriteClient(config); + HoodieWriteClient writeClient = getHoodieWriteClient(config); String newCommitTime = writeClient.startCommit(); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200); @@ -256,7 +253,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { // only for test, set the hbaseConnection to mocked object index.setHbaseConnection(hbaseConnection); - HoodieWriteClient writeClient = getWriteClient(config); + HoodieWriteClient writeClient = getHoodieWriteClient(config); // start a commit and generate test data String newCommitTime = writeClient.startCommit(); @@ -281,7 +278,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { public void testTotalPutsBatching() throws Exception { HoodieWriteConfig config = getConfig(); HBaseIndex index = new HBaseIndex(config); - HoodieWriteClient writeClient = getWriteClient(config); + HoodieWriteClient writeClient = getHoodieWriteClient(config); // start a commit and generate test data String newCommitTime = writeClient.startCommit(); diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java index 91435f8..b97fefc 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java @@ -45,7 +45,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness { @After public void tearDown() { cleanupSparkContexts(); - cleanupMetaClient(); + cleanupClients(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index d29cfa4..105b0e8 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -107,7 +107,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { public void tearDown() throws Exception { cleanupSparkContexts(); cleanupFileSystem(); - cleanupMetaClient(); + cleanupClients(); } private HoodieWriteConfig makeConfig() { diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index ddf2775..55d4526 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -80,7 +80,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { @After public void tearDown() { cleanupSparkContexts(); - cleanupMetaClient(); + cleanupClients(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index 664f4b5..7fd02bc 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -68,11 +68,8 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness { cleanupFileSystem(); cleanupTestDataGenerator(); cleanupSparkContexts(); - cleanupMetaClient(); - } - - private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception { - return new HoodieWriteClient(jsc, config); + cleanupClients(); + cleanupFileSystem(); } @Test @@ -83,9 +80,8 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness { // Build a write config with bulkinsertparallelism set HoodieWriteConfig cfg = getConfigBuilder().build(); - try (HoodieWriteClient client = getWriteClient(cfg);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); - /** * Write 1 (only inserts) This will do a bulk insert of 44 records of which there are 2 records repeated 21 times * each. id1 (21 records), id2 (21 records), id3, id4 @@ -224,7 +220,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness { public void testHoodieMergeHandleWriteStatMetrics() throws Exception { // insert 100 records HoodieWriteConfig config = getConfigBuilder().build(); - try (HoodieWriteClient writeClient = getWriteClient(config);) { + try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) { String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java index ec64080..6887531 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.common.HoodieClientTestHarness; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.HoodieClientTestUtils; @@ -85,7 +86,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { @After public void tearDown() throws Exception { cleanupSparkContexts(); - cleanupMetaClient(); + cleanupClients(); cleanupFileSystem(); cleanupTestDataGenerator(); } @@ -129,6 +130,8 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { // Prepare the AvroParquetIO HoodieWriteConfig config = makeHoodieClientConfig(); String firstCommitTime = HoodieTestUtils.makeNewCommitTime(); + HoodieWriteClient writeClient = getHoodieWriteClient(config); + writeClient.startCommitWithTime(firstCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); String partitionPath = "/2016/01/31"; diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java index 740caf2..fdc968d 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java @@ -96,16 +96,13 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { cleanupDFS(); cleanupSparkContexts(); cleanupTestDataGenerator(); - } - - private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception { - return new HoodieWriteClient(jsc, config); + cleanupClients(); } @Test public void testSimpleInsertAndUpdate() throws Exception { HoodieWriteConfig cfg = getConfig(true); - try (HoodieWriteClient client = getWriteClient(cfg);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { /** * Write 1 (only inserts) @@ -190,7 +187,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { @Test public void testMetadataAggregateFromWriteStatus() throws Exception { HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build(); - try (HoodieWriteClient client = getWriteClient(cfg);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { String newCommitTime = "001"; List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200); @@ -213,7 +210,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { @Test public void testSimpleInsertUpdateAndDelete() throws Exception { HoodieWriteConfig cfg = getConfig(true); - try (HoodieWriteClient client = getWriteClient(cfg);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { /** * Write 1 (only inserts, written as parquet file) @@ -298,7 +295,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE); HoodieWriteConfig cfg = getConfig(true); - try (HoodieWriteClient client = getWriteClient(cfg);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { /** * Write 1 (only inserts) @@ -351,7 +348,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { public void testRollbackWithDeltaAndCompactionCommit() throws Exception { HoodieWriteConfig cfg = getConfig(false); - try (HoodieWriteClient client = getWriteClient(cfg);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { // Test delta commit rollback /** @@ -394,7 +391,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { */ final String commitTime1 = "002"; // WriteClient with custom config (disable small file handling) - try (HoodieWriteClient secondClient = getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());) { + try (HoodieWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());) { secondClient.startCommitWithTime(commitTime1); List<HoodieRecord> copyOfRecords = new ArrayList<>(records); @@ -424,7 +421,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { * Write 3 (inserts + updates - testing successful delta commit) */ final String commitTime2 = "002"; - try (HoodieWriteClient thirdClient = getWriteClient(cfg);) { + try (HoodieWriteClient thirdClient = getHoodieWriteClient(cfg);) { thirdClient.startCommitWithTime(commitTime2); List<HoodieRecord> copyOfRecords = new ArrayList<>(records); @@ -500,7 +497,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { HoodieWriteConfig cfg = getConfig(false); - try (final HoodieWriteClient client = getWriteClient(cfg);) { + try (final HoodieWriteClient client = getHoodieWriteClient(cfg);) { /** * Write 1 (only inserts) */ @@ -541,7 +538,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { */ newCommitTime = "002"; // WriteClient with custom config (disable small file handling) - HoodieWriteClient nClient = getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff()); + HoodieWriteClient nClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff()); nClient.startCommitWithTime(newCommitTime); List<HoodieRecord> copyOfRecords = new ArrayList<>(records); @@ -664,7 +661,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { @Test public void testUpsertPartitioner() throws Exception { HoodieWriteConfig cfg = getConfig(true); - try (HoodieWriteClient client = getWriteClient(cfg);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { /** * Write 1 (only inserts, written as parquet file) @@ -743,7 +740,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { public void testLogFileCountsAfterCompaction() throws Exception { // insert 100 records HoodieWriteConfig config = getConfig(true); - try (HoodieWriteClient writeClient = getWriteClient(config);) { + try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) { String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -816,7 +813,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { // insert 100 records // Setting IndexType to be InMemory to simulate Global Index nature HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build(); - try (HoodieWriteClient writeClient = getWriteClient(config);) { + try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) { String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -853,7 +850,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { // insert 100 records // Setting IndexType to be InMemory to simulate Global Index nature HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build(); - try (HoodieWriteClient writeClient = getWriteClient(config);) { + try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) { String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -927,7 +924,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { // insert 100 records // Setting IndexType to be InMemory to simulate Global Index nature HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build(); - try (HoodieWriteClient writeClient = getWriteClient(config);) { + try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) { String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -979,10 +976,9 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { public void testRollingStatsInMetadata() throws Exception { HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); - try (HoodieWriteClient client = getWriteClient(cfg);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); - // Create a commit without rolling stats in metadata to test backwards compatibility HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); String commitActionType = table.getMetaClient().getCommitActionType(); @@ -1080,7 +1076,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { public void testRollingStatsWithSmallFileHandling() throws Exception { HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); - try (HoodieWriteClient client = getWriteClient(cfg);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); Map<String, Long> fileIdToInsertsMap = new HashMap<>(); Map<String, Long> fileIdToUpsertsMap = new HashMap<>(); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java index 8fa55ec..09d62a7 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java @@ -78,10 +78,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { cleanupFileSystem(); cleanupTestDataGenerator(); cleanupSparkContexts(); - } - - private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception { - return new HoodieWriteClient(jsc, config); + cleanupClients(); } private HoodieWriteConfig getConfig() { @@ -114,8 +111,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { HoodieWriteConfig config = getConfig(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - try (HoodieWriteClient writeClient = getWriteClient(config);) { - + try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) { String newCommitTime = writeClient.startCommit(); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1); @@ -132,8 +128,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { public void testWriteStatusContentsAfterCompaction() throws Exception { // insert 100 records HoodieWriteConfig config = getConfig(); - try (HoodieWriteClient writeClient = getWriteClient(config);) { - String newCommitTime = "100"; + try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) { + String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); diff --git a/pom.xml b/pom.xml index f1990e1..6370c37 100644 --- a/pom.xml +++ b/pom.xml @@ -242,6 +242,7 @@ <version>${maven-surefire-plugin.version}</version> <configuration> <skip>${skipUTs}</skip> + <argLine>-Xmx4g</argLine> <systemPropertyVariables> <log4j.configuration> ${surefire-log4j.file}