This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 0e1f9653c0 [HUDI-5070] Move flaky cleaner tests to separate class (#7251) 0e1f9653c0 is described below commit 0e1f9653c0e73287527ae62f75ba6e679cf0c1da Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Mon Nov 21 10:41:55 2022 +0800 [HUDI-5070] Move flaky cleaner tests to separate class (#7251) --- .../java/org/apache/hudi/table/TestCleaner.java | 279 ++++----------------- .../clean/TestCleanerInsertAndCleanByCommits.java | 41 ++- .../clean/TestCleanerInsertAndCleanByVersions.java | 238 ++++++++++++++++++ 3 files changed, 310 insertions(+), 248 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index c1dae9afa4..7577ba8c83 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -25,7 +25,6 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieClusteringStrategy; -import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieSliceInfo; @@ -39,12 +38,9 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BootstrapFileMapping; import org.apache.hudi.common.model.FileSlice; -import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; -import org.apache.hudi.common.model.HoodieFileGroup; -import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; @@ -61,13 +57,11 @@ import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigra import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator; import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; -import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CollectionUtils; -import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -100,7 +94,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; @@ -110,7 +103,6 @@ import java.util.stream.Stream; import scala.Tuple3; import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename; -import static org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes; import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime; import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; @@ -211,14 +203,6 @@ public class TestCleaner extends HoodieClientTestBase { return Pair.of(newCommitTime, statuses); } - /** - * Test Clean-By-Versions using insert/upsert API. - */ - @Test - public void testInsertAndCleanByVersions() throws Exception { - testInsertAndCleanByVersions(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false); - } - /** * Test Clean-Failed-Writes when Cleaning policy is by VERSIONS using insert/upsert API. */ @@ -228,32 +212,63 @@ public class TestCleaner extends HoodieClientTestBase { } /** - * Test Clean-By-Versions using prepped versions of insert/upsert API. + * Test Helper for cleaning failed writes by versions logic from HoodieWriteClient API perspective. + * + * @param insertFn Insert API to be tested + * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during + * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs) + * @throws Exception in case of errors */ - @Test - public void testInsertPreppedAndCleanByVersions() throws Exception { - testInsertAndCleanByVersions(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords, - true); - } + private void testInsertAndCleanFailedWritesByVersions( + Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn, boolean isPreppedAPI) + throws Exception { + int maxVersions = 3; // keep upto 3 versions for each file + HoodieWriteConfig cfg = getConfigBuilder() + .withAutoCommit(false) + .withHeartbeatIntervalInMs(3000) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build()) + .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { - /** - * Test Clean-By-Versions using bulk-insert/upsert API. - */ - @Test - public void testBulkInsertAndCleanByVersions() throws Exception { - testInsertAndCleanByVersions(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false); - } + final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction = + generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts); - /** - * Test Clean-By-Versions using prepped versions of bulk-insert/upsert API. - */ - @Test - public void testBulkInsertPreppedAndCleanByVersions() throws Exception { - testInsertAndCleanByVersions( - (client, recordRDD, instantTime) -> client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()), - SparkRDDWriteClient::upsertPreppedRecords, true); - } + Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn); + client.commit(result.getLeft(), result.getRight()); + + HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient); + + assertTrue(table.getCompletedCleanTimeline().empty()); + + insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); + + insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); + + Pair<String, JavaRDD<WriteStatus>> ret = + insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); + + // Await till enough time passes such that the last failed commits heartbeats are expired + await().atMost(10, TimeUnit.SECONDS).until(() -> client.getHeartbeatClient() + .isHeartbeatExpired(ret.getLeft())); + + List<HoodieCleanStat> cleanStats = runCleaner(cfg); + assertEquals(0, cleanStats.size(), "Must not clean any files"); + HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); + assertTrue(timeline.getTimelineOfActions( + CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants() == 3); + Option<HoodieInstant> rollBackInstantForFailedCommit = timeline.getTimelineOfActions( + CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant(); + HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeAvroMetadata( + timeline.getInstantDetails(rollBackInstantForFailedCommit.get()).get(), HoodieRollbackMetadata.class); + // Rollback of one of the failed writes should have deleted 3 files + assertEquals(3, rollbackMetadata.getTotalFilesDeleted()); + } + } /** * Tests no more than 1 clean is scheduled if hoodie.clean.allow.multiple config is set to false. @@ -329,133 +344,6 @@ public class TestCleaner extends HoodieClientTestBase { } } - /** - * Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective. - * - * @param insertFn Insert API to be tested - * @param upsertFn Upsert API to be tested - * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during - * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs) - * @throws Exception in case of errors - */ - private void testInsertAndCleanByVersions( - Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn, - Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI) - throws Exception { - int maxVersions = 2; // keep upto 2 versions for each file - HoodieWriteConfig cfg = getConfigBuilder() - .withCleanConfig(HoodieCleanConfig.newBuilder() - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) - .retainFileVersions(maxVersions).build()) - .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) - .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) - .build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { - - final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction = - generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts); - - final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction = - generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates); - - insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn); - - Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>(); - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient); - for (String partitionPath : dataGen.getPartitionPaths()) { - TableFileSystemView fsView = table.getFileSystemView(); - Option<Boolean> added = Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst().map(fg -> { - fg.getLatestFileSlice().map(fs -> compactionFileIdToLatestFileSlice.put(fg.getFileGroupId(), fs)); - return true; - })); - if (added.isPresent()) { - // Select only one file-group for compaction - break; - } - } - - // Create workload with selected file-slices - List<Pair<String, FileSlice>> partitionFileSlicePairs = compactionFileIdToLatestFileSlice.entrySet().stream() - .map(e -> Pair.of(e.getKey().getPartitionPath(), e.getValue())).collect(Collectors.toList()); - HoodieCompactionPlan compactionPlan = - CompactionUtils.buildFromFileSlices(partitionFileSlicePairs, Option.empty(), Option.empty()); - List<String> instantTimes = makeIncrementalCommitTimes(9, 1, 10); - String compactionTime = instantTimes.get(0); - table.getActiveTimeline().saveToCompactionRequested( - new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime), - TimelineMetadataUtils.serializeCompactionPlan(compactionPlan)); - - instantTimes = instantTimes.subList(1, instantTimes.size()); - // Keep doing some writes and clean inline. Make sure we have expected number of files - // remaining. - for (String newInstantTime : instantTimes) { - try { - client.startCommitWithTime(newInstantTime); - List<HoodieRecord> records = recordUpsertGenWrappedFunction.apply(newInstantTime, 100); - - List<WriteStatus> statuses = upsertFn.apply(client, jsc.parallelize(records, 1), newInstantTime).collect(); - // Verify there are no errors - assertNoWriteErrors(statuses); - - metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieSparkTable.create(getConfig(), context, metaClient); - HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline(); - - TableFileSystemView fsView = table.getFileSystemView(); - // Need to ensure the following - for (String partitionPath : dataGen.getPartitionPaths()) { - // compute all the versions of all files, from time 0 - HashMap<String, TreeSet<String>> fileIdToVersions = new HashMap<>(); - for (HoodieInstant entry : timeline.getInstants().collect(Collectors.toList())) { - HoodieCommitMetadata commitMetadata = - HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(entry).get(), HoodieCommitMetadata.class); - - for (HoodieWriteStat wstat : commitMetadata.getWriteStats(partitionPath)) { - if (!fileIdToVersions.containsKey(wstat.getFileId())) { - fileIdToVersions.put(wstat.getFileId(), new TreeSet<>()); - } - fileIdToVersions.get(wstat.getFileId()).add(FSUtils.getCommitTime(new Path(wstat.getPath()).getName())); - } - } - - List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList()); - - for (HoodieFileGroup fileGroup : fileGroups) { - if (compactionFileIdToLatestFileSlice.containsKey(fileGroup.getFileGroupId())) { - // Ensure latest file-slice selected for compaction is retained - Option<HoodieBaseFile> dataFileForCompactionPresent = - Option.fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> { - return compactionFileIdToLatestFileSlice.get(fileGroup.getFileGroupId()).getBaseInstantTime() - .equals(df.getCommitTime()); - }).findAny()); - assertTrue(dataFileForCompactionPresent.isPresent(), - "Data File selected for compaction is retained"); - } else { - // file has no more than max versions - String fileId = fileGroup.getFileGroupId().getFileId(); - List<HoodieBaseFile> dataFiles = fileGroup.getAllBaseFiles().collect(Collectors.toList()); - - assertTrue(dataFiles.size() <= maxVersions, - "fileId " + fileId + " has more than " + maxVersions + " versions"); - - // Each file, has the latest N versions (i.e cleaning gets rid of older versions) - List<String> commitedVersions = new ArrayList<>(fileIdToVersions.get(fileId)); - for (int i = 0; i < dataFiles.size(); i++) { - assertEquals((dataFiles.get(i)).getCommitTime(), - commitedVersions.get(commitedVersions.size() - 1 - i), - "File " + fileId + " does not have latest versions on commits" + commitedVersions); - } - } - } - } - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - } - } - /** * Test Clean-By-Commits using insert/upsert API. */ @@ -676,7 +564,7 @@ public class TestCleaner extends HoodieClientTestBase { assertTrue(timeline.getTimelineOfActions( CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(makeNewCommitTime(--instantClean, "%09d"))); } - + @Test public void testCleanWithReplaceCommits() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) @@ -1200,65 +1088,6 @@ public class TestCleaner extends HoodieClientTestBase { assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); } - /** - * Test Helper for cleaning failed writes by versions logic from HoodieWriteClient API perspective. - * - * @param insertFn Insert API to be tested - * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during - * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs) - * @throws Exception in case of errors - */ - private void testInsertAndCleanFailedWritesByVersions( - Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn, boolean isPreppedAPI) - throws Exception { - int maxVersions = 3; // keep upto 3 versions for each file - HoodieWriteConfig cfg = getConfigBuilder() - .withAutoCommit(false) - .withHeartbeatIntervalInMs(3000) - .withCleanConfig(HoodieCleanConfig.newBuilder() - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build()) - .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) - .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) - .build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { - - final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction = - generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts); - - Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn); - - client.commit(result.getLeft(), result.getRight()); - - HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient); - - assertTrue(table.getCompletedCleanTimeline().empty()); - - insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); - - insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); - - Pair<String, JavaRDD<WriteStatus>> ret = - insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); - - // Await till enough time passes such that the last failed commits heartbeats are expired - await().atMost(10, TimeUnit.SECONDS).until(() -> client.getHeartbeatClient() - .isHeartbeatExpired(ret.getLeft())); - - List<HoodieCleanStat> cleanStats = runCleaner(cfg); - assertEquals(0, cleanStats.size(), "Must not clean any files"); - HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); - assertTrue(timeline.getTimelineOfActions( - CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants() == 3); - Option<HoodieInstant> rollBackInstantForFailedCommit = timeline.getTimelineOfActions( - CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant(); - HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeAvroMetadata( - timeline.getInstantDetails(rollBackInstantForFailedCommit.get()).get(), HoodieRollbackMetadata.class); - // Rollback of one of the failed writes should have deleted 3 files - assertEquals(3, rollbackMetadata.getTotalFilesDeleted()); - } - } - /** * Common test method for validating pending compactions. * diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java index 7f5cd5cd99..816a937187 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java @@ -46,7 +46,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -105,10 +104,10 @@ public class TestCleanerInsertAndCleanByCommits extends SparkClientFunctionalTes /** * Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective. * - * @param insertFn Insert API to be tested - * @param upsertFn Upsert API to be tested + * @param insertFn Insert API to be tested + * @param upsertFn Upsert API to be tested * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during - * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs) + * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs) * @throws Exception in case of errors */ private void testInsertAndCleanByCommits( @@ -127,23 +126,21 @@ public class TestCleanerInsertAndCleanByCommits extends SparkClientFunctionalTes .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()) .build(); - final SparkRDDWriteClient client = getHoodieWriteClient(cfg); - - final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(System.nanoTime()); - final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction = isPreppedAPI - ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), context(), cfg, dataGen::generateInserts) - : dataGen::generateInserts; - final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction = isPreppedAPI - ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), context(), cfg, dataGen::generateUniqueUpdates) - : dataGen::generateUniqueUpdates; - - HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE); - insertFirstBigBatchForClientCleanerTest(context(), metaClient, client, recordInsertGenWrappedFunction, insertFn); - - // Keep doing some writes and clean inline. Make sure we have expected number of files remaining. - for (int i = 0; i < 8; i++) { - String newCommitTime = makeNewCommitTime(); - try { + try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(System.nanoTime()); + final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction = isPreppedAPI + ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), context(), cfg, dataGen::generateInserts) + : dataGen::generateInserts; + final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction = isPreppedAPI + ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), context(), cfg, dataGen::generateUniqueUpdates) + : dataGen::generateUniqueUpdates; + + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE); + insertFirstBigBatchForClientCleanerTest(context(), metaClient, client, recordInsertGenWrappedFunction, insertFn); + + // Keep doing some writes and clean inline. Make sure we have expected number of files remaining. + for (int i = 0; i < 8; i++) { + String newCommitTime = makeNewCommitTime(); client.startCommitWithTime(newCommitTime); List<HoodieRecord> records = recordUpsertGenWrappedFunction.apply(newCommitTime, BATCH_SIZE); @@ -186,8 +183,6 @@ public class TestCleanerInsertAndCleanByCommits extends SparkClientFunctionalTes "Only contain acceptable versions of file should be present"); } } - } catch (IOException ioe) { - throw new RuntimeException(ioe); } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java new file mode 100644 index 0000000000..e9c74936f3 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.clean; + +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.testutils.HoodieClientTestBase.Function2; +import org.apache.hudi.testutils.HoodieClientTestBase.Function3; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; + +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes; +import static org.apache.hudi.table.TestCleaner.insertFirstBigBatchForClientCleanerTest; +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.apache.hudi.testutils.HoodieClientTestBase.wrapRecordsGenFunctionForPreppedCalls; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestCleanerInsertAndCleanByVersions extends SparkClientFunctionalTestHarness { + + private static final int BATCH_SIZE = 100; + private static final int PARALLELISM = 2; + + /** + * Test Clean-By-Versions using insert/upsert API. + */ + @Test + public void testInsertAndCleanByVersions() throws Exception { + testInsertAndCleanByVersions(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false); + } + + /** + * Test Clean-By-Versions using prepped versions of insert/upsert API. + */ + @Test + public void testInsertPreppedAndCleanByVersions() throws Exception { + testInsertAndCleanByVersions(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords, + true); + } + + /** + * Test Clean-By-Versions using bulk-insert/upsert API. + */ + @Test + public void testBulkInsertAndCleanByVersions() throws Exception { + testInsertAndCleanByVersions(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false); + } + + /** + * Test Clean-By-Versions using prepped versions of bulk-insert/upsert API. + */ + @Test + public void testBulkInsertPreppedAndCleanByVersions() throws Exception { + testInsertAndCleanByVersions( + (client, recordRDD, instantTime) -> client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()), + SparkRDDWriteClient::upsertPreppedRecords, true); + } + + /** + * Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective. + * + * @param insertFn Insert API to be tested + * @param upsertFn Upsert API to be tested + * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during + * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs) + * @throws Exception in case of errors + */ + private void testInsertAndCleanByVersions( + Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn, + Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI) + throws Exception { + int maxVersions = 2; // keep upto 2 versions for each file + HoodieWriteConfig cfg = getConfigBuilder(true) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) + .retainFileVersions(maxVersions).build()) + .withParallelism(PARALLELISM, PARALLELISM) + .withBulkInsertParallelism(PARALLELISM) + .withFinalizeWriteParallelism(PARALLELISM) + .withDeleteParallelism(PARALLELISM) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .build(); + try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(System.nanoTime()); + final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction = isPreppedAPI + ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), context(), cfg, dataGen::generateInserts) + : dataGen::generateInserts; + final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction = isPreppedAPI + ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), context(), cfg, dataGen::generateUniqueUpdates) + : dataGen::generateUniqueUpdates; + + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE); + insertFirstBigBatchForClientCleanerTest(context(), metaClient, client, recordInsertGenWrappedFunction, insertFn); + + Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>(); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient); + for (String partitionPath : dataGen.getPartitionPaths()) { + TableFileSystemView fsView = table.getFileSystemView(); + Option<Boolean> added = Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst().map(fg -> { + fg.getLatestFileSlice().map(fs -> compactionFileIdToLatestFileSlice.put(fg.getFileGroupId(), fs)); + return true; + })); + if (added.isPresent()) { + // Select only one file-group for compaction + break; + } + } + + // Create workload with selected file-slices + List<Pair<String, FileSlice>> partitionFileSlicePairs = compactionFileIdToLatestFileSlice.entrySet().stream() + .map(e -> Pair.of(e.getKey().getPartitionPath(), e.getValue())).collect(Collectors.toList()); + HoodieCompactionPlan compactionPlan = + CompactionUtils.buildFromFileSlices(partitionFileSlicePairs, Option.empty(), Option.empty()); + List<String> instantTimes = makeIncrementalCommitTimes(9, 1, 10); + String compactionTime = instantTimes.get(0); + table.getActiveTimeline().saveToCompactionRequested( + new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime), + TimelineMetadataUtils.serializeCompactionPlan(compactionPlan)); + + instantTimes = instantTimes.subList(1, instantTimes.size()); + // Keep doing some writes and clean inline. Make sure we have expected number of files + // remaining. + for (String newInstantTime : instantTimes) { + client.startCommitWithTime(newInstantTime); + List<HoodieRecord> records = recordUpsertGenWrappedFunction.apply(newInstantTime, BATCH_SIZE); + + List<WriteStatus> statuses = upsertFn.apply(client, jsc().parallelize(records, PARALLELISM), newInstantTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + metaClient = HoodieTableMetaClient.reload(metaClient); + table = HoodieSparkTable.create(cfg, context(), metaClient); + HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline(); + + TableFileSystemView fsView = table.getFileSystemView(); + // Need to ensure the following + for (String partitionPath : dataGen.getPartitionPaths()) { + // compute all the versions of all files, from time 0 + HashMap<String, TreeSet<String>> fileIdToVersions = new HashMap<>(); + for (HoodieInstant entry : timeline.getInstants().collect(Collectors.toList())) { + HoodieCommitMetadata commitMetadata = + HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(entry).get(), HoodieCommitMetadata.class); + + for (HoodieWriteStat wstat : commitMetadata.getWriteStats(partitionPath)) { + if (!fileIdToVersions.containsKey(wstat.getFileId())) { + fileIdToVersions.put(wstat.getFileId(), new TreeSet<>()); + } + fileIdToVersions.get(wstat.getFileId()).add(FSUtils.getCommitTime(new Path(wstat.getPath()).getName())); + } + } + + List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList()); + + for (HoodieFileGroup fileGroup : fileGroups) { + if (compactionFileIdToLatestFileSlice.containsKey(fileGroup.getFileGroupId())) { + // Ensure latest file-slice selected for compaction is retained + Option<HoodieBaseFile> dataFileForCompactionPresent = + Option.fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> { + return compactionFileIdToLatestFileSlice.get(fileGroup.getFileGroupId()).getBaseInstantTime() + .equals(df.getCommitTime()); + }).findAny()); + assertTrue(dataFileForCompactionPresent.isPresent(), + "Data File selected for compaction is retained"); + } else { + // file has no more than max versions + String fileId = fileGroup.getFileGroupId().getFileId(); + List<HoodieBaseFile> dataFiles = fileGroup.getAllBaseFiles().collect(Collectors.toList()); + + assertTrue(dataFiles.size() <= maxVersions, + "fileId " + fileId + " has more than " + maxVersions + " versions"); + + // Each file, has the latest N versions (i.e cleaning gets rid of older versions) + List<String> commitedVersions = new ArrayList<>(fileIdToVersions.get(fileId)); + for (int i = 0; i < dataFiles.size(); i++) { + assertEquals((dataFiles.get(i)).getCommitTime(), + commitedVersions.get(commitedVersions.size() - 1 - i), + "File " + fileId + " does not have latest versions on commits" + commitedVersions); + } + } + } + } + } + } + } +}