This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e1f9653c0 [HUDI-5070] Move flaky cleaner tests to separate class 
(#7251)
0e1f9653c0 is described below

commit 0e1f9653c0e73287527ae62f75ba6e679cf0c1da
Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Mon Nov 21 10:41:55 2022 +0800

    [HUDI-5070] Move flaky cleaner tests to separate class (#7251)
---
 .../java/org/apache/hudi/table/TestCleaner.java    | 279 ++++-----------------
 .../clean/TestCleanerInsertAndCleanByCommits.java  |  41 ++-
 .../clean/TestCleanerInsertAndCleanByVersions.java | 238 ++++++++++++++++++
 3 files changed, 310 insertions(+), 248 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index c1dae9afa4..7577ba8c83 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -25,7 +25,6 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieClusteringStrategy;
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieSliceInfo;
@@ -39,12 +38,9 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BootstrapFileMapping;
 import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
-import org.apache.hudi.common.model.HoodieFileGroup;
-import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -61,13 +57,11 @@ import 
org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigra
 import 
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator;
 import 
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
-import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -100,7 +94,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
@@ -110,7 +103,6 @@ import java.util.stream.Stream;
 import scala.Tuple3;
 
 import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename;
-import static 
org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes;
 import static 
org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
@@ -211,14 +203,6 @@ public class TestCleaner extends HoodieClientTestBase {
     return Pair.of(newCommitTime, statuses);
   }
 
-  /**
-   * Test Clean-By-Versions using insert/upsert API.
-   */
-  @Test
-  public void testInsertAndCleanByVersions() throws Exception {
-    testInsertAndCleanByVersions(SparkRDDWriteClient::insert, 
SparkRDDWriteClient::upsert, false);
-  }
-
   /**
    * Test Clean-Failed-Writes when Cleaning policy is by VERSIONS using 
insert/upsert API.
    */
@@ -228,32 +212,63 @@ public class TestCleaner extends HoodieClientTestBase {
   }
 
   /**
-   * Test Clean-By-Versions using prepped versions of insert/upsert API.
+   * Test Helper for cleaning failed writes by versions logic from 
HoodieWriteClient API perspective.
+   *
+   * @param insertFn     Insert API to be tested
+   * @param isPreppedAPI Flag to indicate if a prepped-version is used. If 
true, a wrapper function will be used during
+   *                     record generation to also tag the regards (de-dupe is 
implicit as we use unique record-gen APIs)
+   * @throws Exception in case of errors
    */
-  @Test
-  public void testInsertPreppedAndCleanByVersions() throws Exception {
-    testInsertAndCleanByVersions(SparkRDDWriteClient::insertPreppedRecords, 
SparkRDDWriteClient::upsertPreppedRecords,
-        true);
-  }
+  private void testInsertAndCleanFailedWritesByVersions(
+      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, 
JavaRDD<HoodieRecord>, String> insertFn, boolean isPreppedAPI)
+      throws Exception {
+    int maxVersions = 3; // keep upto 3 versions for each file
+    HoodieWriteConfig cfg = getConfigBuilder()
+        .withAutoCommit(false)
+        .withHeartbeatIntervalInMs(3000)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build())
+        .withParallelism(1, 
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
+        
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
 
-  /**
-   * Test Clean-By-Versions using bulk-insert/upsert API.
-   */
-  @Test
-  public void testBulkInsertAndCleanByVersions() throws Exception {
-    testInsertAndCleanByVersions(SparkRDDWriteClient::bulkInsert, 
SparkRDDWriteClient::upsert, false);
-  }
+      final Function2<List<HoodieRecord>, String, Integer> 
recordInsertGenWrappedFunction =
+          generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
 
-  /**
-   * Test Clean-By-Versions using prepped versions of bulk-insert/upsert API.
-   */
-  @Test
-  public void testBulkInsertPreppedAndCleanByVersions() throws Exception {
-    testInsertAndCleanByVersions(
-        (client, recordRDD, instantTime) -> 
client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()),
-        SparkRDDWriteClient::upsertPreppedRecords, true);
-  }
+      Pair<String, JavaRDD<WriteStatus>> result = 
insertFirstBigBatchForClientCleanerTest(context, metaClient, client, 
recordInsertGenWrappedFunction, insertFn);
 
+      client.commit(result.getLeft(), result.getRight());
+
+      HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, 
metaClient);
+
+      assertTrue(table.getCompletedCleanTimeline().empty());
+
+      insertFirstFailedBigBatchForClientCleanerTest(context, client, 
recordInsertGenWrappedFunction, insertFn);
+
+      insertFirstFailedBigBatchForClientCleanerTest(context, client, 
recordInsertGenWrappedFunction, insertFn);
+
+      Pair<String, JavaRDD<WriteStatus>> ret =
+          insertFirstFailedBigBatchForClientCleanerTest(context, client, 
recordInsertGenWrappedFunction, insertFn);
+
+      // Await till enough time passes such that the last failed commits 
heartbeats are expired
+      await().atMost(10, TimeUnit.SECONDS).until(() -> 
client.getHeartbeatClient()
+          .isHeartbeatExpired(ret.getLeft()));
+
+      List<HoodieCleanStat> cleanStats = runCleaner(cfg);
+      assertEquals(0, cleanStats.size(), "Must not clean any files");
+      HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
+      assertTrue(timeline.getTimelineOfActions(
+          
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants()
 == 3);
+      Option<HoodieInstant> rollBackInstantForFailedCommit = 
timeline.getTimelineOfActions(
+          
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant();
+      HoodieRollbackMetadata rollbackMetadata = 
TimelineMetadataUtils.deserializeAvroMetadata(
+          
timeline.getInstantDetails(rollBackInstantForFailedCommit.get()).get(), 
HoodieRollbackMetadata.class);
+      // Rollback of one of the failed writes should have deleted 3 files
+      assertEquals(3, rollbackMetadata.getTotalFilesDeleted());
+    }
+  }
 
   /**
    * Tests no more than 1 clean is scheduled if hoodie.clean.allow.multiple 
config is set to false.
@@ -329,133 +344,6 @@ public class TestCleaner extends HoodieClientTestBase {
     }
   }
 
-  /**
-   * Test Helper for Cleaning by versions logic from HoodieWriteClient API 
perspective.
-   *
-   * @param insertFn Insert API to be tested
-   * @param upsertFn Upsert API to be tested
-   * @param isPreppedAPI Flag to indicate if a prepped-version is used. If 
true, a wrapper function will be used during
-   *        record generation to also tag the regards (de-dupe is implicit as 
we use unique record-gen APIs)
-   * @throws Exception in case of errors
-   */
-  private void testInsertAndCleanByVersions(
-      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, 
JavaRDD<HoodieRecord>, String> insertFn,
-      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, 
JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI)
-      throws Exception {
-    int maxVersions = 2; // keep upto 2 versions for each file
-    HoodieWriteConfig cfg = getConfigBuilder()
-        .withCleanConfig(HoodieCleanConfig.newBuilder()
-            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS)
-            .retainFileVersions(maxVersions).build())
-        .withParallelism(1, 
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
-        
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
-        .build();
-    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
-
-      final Function2<List<HoodieRecord>, String, Integer> 
recordInsertGenWrappedFunction =
-          generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
-
-      final Function2<List<HoodieRecord>, String, Integer> 
recordUpsertGenWrappedFunction =
-          generateWrapRecordsFn(isPreppedAPI, cfg, 
dataGen::generateUniqueUpdates);
-
-      insertFirstBigBatchForClientCleanerTest(context, metaClient, client, 
recordInsertGenWrappedFunction, insertFn);
-
-      Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = 
new HashMap<>();
-      metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTable table = HoodieSparkTable.create(getConfig(), context, 
metaClient);
-      for (String partitionPath : dataGen.getPartitionPaths()) {
-        TableFileSystemView fsView = table.getFileSystemView();
-        Option<Boolean> added = 
Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst().map(fg
 -> {
-          fg.getLatestFileSlice().map(fs -> 
compactionFileIdToLatestFileSlice.put(fg.getFileGroupId(), fs));
-          return true;
-        }));
-        if (added.isPresent()) {
-          // Select only one file-group for compaction
-          break;
-        }
-      }
-
-      // Create workload with selected file-slices
-      List<Pair<String, FileSlice>> partitionFileSlicePairs = 
compactionFileIdToLatestFileSlice.entrySet().stream()
-          .map(e -> Pair.of(e.getKey().getPartitionPath(), 
e.getValue())).collect(Collectors.toList());
-      HoodieCompactionPlan compactionPlan =
-          CompactionUtils.buildFromFileSlices(partitionFileSlicePairs, 
Option.empty(), Option.empty());
-      List<String> instantTimes = makeIncrementalCommitTimes(9, 1, 10);
-      String compactionTime = instantTimes.get(0);
-      table.getActiveTimeline().saveToCompactionRequested(
-          new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, 
compactionTime),
-          TimelineMetadataUtils.serializeCompactionPlan(compactionPlan));
-
-      instantTimes = instantTimes.subList(1, instantTimes.size());
-      // Keep doing some writes and clean inline. Make sure we have expected 
number of files
-      // remaining.
-      for (String newInstantTime : instantTimes) {
-        try {
-          client.startCommitWithTime(newInstantTime);
-          List<HoodieRecord> records = 
recordUpsertGenWrappedFunction.apply(newInstantTime, 100);
-
-          List<WriteStatus> statuses = upsertFn.apply(client, 
jsc.parallelize(records, 1), newInstantTime).collect();
-          // Verify there are no errors
-          assertNoWriteErrors(statuses);
-
-          metaClient = HoodieTableMetaClient.reload(metaClient);
-          table = HoodieSparkTable.create(getConfig(), context, metaClient);
-          HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline();
-
-          TableFileSystemView fsView = table.getFileSystemView();
-          // Need to ensure the following
-          for (String partitionPath : dataGen.getPartitionPaths()) {
-            // compute all the versions of all files, from time 0
-            HashMap<String, TreeSet<String>> fileIdToVersions = new 
HashMap<>();
-            for (HoodieInstant entry : 
timeline.getInstants().collect(Collectors.toList())) {
-              HoodieCommitMetadata commitMetadata =
-                  
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(entry).get(), 
HoodieCommitMetadata.class);
-
-              for (HoodieWriteStat wstat : 
commitMetadata.getWriteStats(partitionPath)) {
-                if (!fileIdToVersions.containsKey(wstat.getFileId())) {
-                  fileIdToVersions.put(wstat.getFileId(), new TreeSet<>());
-                }
-                
fileIdToVersions.get(wstat.getFileId()).add(FSUtils.getCommitTime(new 
Path(wstat.getPath()).getName()));
-              }
-            }
-
-            List<HoodieFileGroup> fileGroups = 
fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
-
-            for (HoodieFileGroup fileGroup : fileGroups) {
-              if 
(compactionFileIdToLatestFileSlice.containsKey(fileGroup.getFileGroupId())) {
-                // Ensure latest file-slice selected for compaction is retained
-                Option<HoodieBaseFile> dataFileForCompactionPresent =
-                    
Option.fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> {
-                      return 
compactionFileIdToLatestFileSlice.get(fileGroup.getFileGroupId()).getBaseInstantTime()
-                          .equals(df.getCommitTime());
-                    }).findAny());
-                assertTrue(dataFileForCompactionPresent.isPresent(),
-                    "Data File selected for compaction is retained");
-              } else {
-                // file has no more than max versions
-                String fileId = fileGroup.getFileGroupId().getFileId();
-                List<HoodieBaseFile> dataFiles = 
fileGroup.getAllBaseFiles().collect(Collectors.toList());
-
-                assertTrue(dataFiles.size() <= maxVersions,
-                    "fileId " + fileId + " has more than " + maxVersions + " 
versions");
-
-                // Each file, has the latest N versions (i.e cleaning gets rid 
of older versions)
-                List<String> commitedVersions = new 
ArrayList<>(fileIdToVersions.get(fileId));
-                for (int i = 0; i < dataFiles.size(); i++) {
-                  assertEquals((dataFiles.get(i)).getCommitTime(),
-                      commitedVersions.get(commitedVersions.size() - 1 - i),
-                      "File " + fileId + " does not have latest versions on 
commits" + commitedVersions);
-                }
-              }
-            }
-          }
-        } catch (IOException ioe) {
-          throw new RuntimeException(ioe);
-        }
-      }
-    }
-  }
-
   /**
    * Test Clean-By-Commits using insert/upsert API.
    */
@@ -676,7 +564,7 @@ public class TestCleaner extends HoodieClientTestBase {
     assertTrue(timeline.getTimelineOfActions(
             
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(makeNewCommitTime(--instantClean,
 "%09d")));
   }
-  
+
   @Test
   public void testCleanWithReplaceCommits() throws Exception {
     HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath)
@@ -1200,65 +1088,6 @@ public class TestCleaner extends HoodieClientTestBase {
     assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
   }
 
-  /**
-   * Test Helper for cleaning failed writes by versions logic from 
HoodieWriteClient API perspective.
-   *
-   * @param insertFn     Insert API to be tested
-   * @param isPreppedAPI Flag to indicate if a prepped-version is used. If 
true, a wrapper function will be used during
-   *                     record generation to also tag the regards (de-dupe is 
implicit as we use unique record-gen APIs)
-   * @throws Exception in case of errors
-   */
-  private void testInsertAndCleanFailedWritesByVersions(
-      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, 
JavaRDD<HoodieRecord>, String> insertFn, boolean isPreppedAPI)
-      throws Exception {
-    int maxVersions = 3; // keep upto 3 versions for each file
-    HoodieWriteConfig cfg = getConfigBuilder()
-        .withAutoCommit(false)
-        .withHeartbeatIntervalInMs(3000)
-        .withCleanConfig(HoodieCleanConfig.newBuilder()
-            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
-            
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build())
-        .withParallelism(1, 
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
-        
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
-        .build();
-    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
-
-      final Function2<List<HoodieRecord>, String, Integer> 
recordInsertGenWrappedFunction =
-          generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
-
-      Pair<String, JavaRDD<WriteStatus>> result = 
insertFirstBigBatchForClientCleanerTest(context, metaClient, client, 
recordInsertGenWrappedFunction, insertFn);
-
-      client.commit(result.getLeft(), result.getRight());
-
-      HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, 
metaClient);
-
-      assertTrue(table.getCompletedCleanTimeline().empty());
-
-      insertFirstFailedBigBatchForClientCleanerTest(context, client, 
recordInsertGenWrappedFunction, insertFn);
-
-      insertFirstFailedBigBatchForClientCleanerTest(context, client, 
recordInsertGenWrappedFunction, insertFn);
-
-      Pair<String, JavaRDD<WriteStatus>> ret =
-          insertFirstFailedBigBatchForClientCleanerTest(context, client, 
recordInsertGenWrappedFunction, insertFn);
-
-      // Await till enough time passes such that the last failed commits 
heartbeats are expired
-      await().atMost(10, TimeUnit.SECONDS).until(() -> 
client.getHeartbeatClient()
-          .isHeartbeatExpired(ret.getLeft()));
-
-      List<HoodieCleanStat> cleanStats = runCleaner(cfg);
-      assertEquals(0, cleanStats.size(), "Must not clean any files");
-      HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
-      assertTrue(timeline.getTimelineOfActions(
-          
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants()
 == 3);
-      Option<HoodieInstant> rollBackInstantForFailedCommit = 
timeline.getTimelineOfActions(
-          
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant();
-      HoodieRollbackMetadata rollbackMetadata = 
TimelineMetadataUtils.deserializeAvroMetadata(
-          
timeline.getInstantDetails(rollBackInstantForFailedCommit.get()).get(), 
HoodieRollbackMetadata.class);
-      // Rollback of one of the failed writes should have deleted 3 files
-      assertEquals(3, rollbackMetadata.getTotalFilesDeleted());
-    }
-  }
-
   /**
    * Common test method for validating pending compactions.
    *
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java
index 7f5cd5cd99..816a937187 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java
@@ -46,7 +46,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
-import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -105,10 +104,10 @@ public class TestCleanerInsertAndCleanByCommits extends 
SparkClientFunctionalTes
   /**
    * Test Helper for Cleaning by versions logic from HoodieWriteClient API 
perspective.
    *
-   * @param insertFn Insert API to be tested
-   * @param upsertFn Upsert API to be tested
+   * @param insertFn     Insert API to be tested
+   * @param upsertFn     Upsert API to be tested
    * @param isPreppedAPI Flag to indicate if a prepped-version is used. If 
true, a wrapper function will be used during
-   *        record generation to also tag the regards (de-dupe is implicit as 
we use unique record-gen APIs)
+   *                     record generation to also tag the regards (de-dupe is 
implicit as we use unique record-gen APIs)
    * @throws Exception in case of errors
    */
   private void testInsertAndCleanByCommits(
@@ -127,23 +126,21 @@ public class TestCleanerInsertAndCleanByCommits extends 
SparkClientFunctionalTes
         
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
         
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
         .build();
-    final SparkRDDWriteClient client = getHoodieWriteClient(cfg);
-
-    final HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(System.nanoTime());
-    final Function2<List<HoodieRecord>, String, Integer> 
recordInsertGenWrappedFunction = isPreppedAPI
-        ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), 
context(), cfg, dataGen::generateInserts)
-        : dataGen::generateInserts;
-    final Function2<List<HoodieRecord>, String, Integer> 
recordUpsertGenWrappedFunction = isPreppedAPI
-        ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), 
context(), cfg, dataGen::generateUniqueUpdates)
-        : dataGen::generateUniqueUpdates;
-
-    HoodieTableMetaClient metaClient = 
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
-    insertFirstBigBatchForClientCleanerTest(context(), metaClient, client, 
recordInsertGenWrappedFunction, insertFn);
-
-    // Keep doing some writes and clean inline. Make sure we have expected 
number of files remaining.
-    for (int i = 0; i < 8; i++) {
-      String newCommitTime = makeNewCommitTime();
-      try {
+    try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      final HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(System.nanoTime());
+      final Function2<List<HoodieRecord>, String, Integer> 
recordInsertGenWrappedFunction = isPreppedAPI
+          ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), 
context(), cfg, dataGen::generateInserts)
+          : dataGen::generateInserts;
+      final Function2<List<HoodieRecord>, String, Integer> 
recordUpsertGenWrappedFunction = isPreppedAPI
+          ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), 
context(), cfg, dataGen::generateUniqueUpdates)
+          : dataGen::generateUniqueUpdates;
+
+      HoodieTableMetaClient metaClient = 
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+      insertFirstBigBatchForClientCleanerTest(context(), metaClient, client, 
recordInsertGenWrappedFunction, insertFn);
+
+      // Keep doing some writes and clean inline. Make sure we have expected 
number of files remaining.
+      for (int i = 0; i < 8; i++) {
+        String newCommitTime = makeNewCommitTime();
         client.startCommitWithTime(newCommitTime);
         List<HoodieRecord> records = 
recordUpsertGenWrappedFunction.apply(newCommitTime, BATCH_SIZE);
 
@@ -186,8 +183,6 @@ public class TestCleanerInsertAndCleanByCommits extends 
SparkClientFunctionalTes
                 "Only contain acceptable versions of file should be present");
           }
         }
-      } catch (IOException ioe) {
-        throw new RuntimeException(ioe);
       }
     }
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java
new file mode 100644
index 0000000000..e9c74936f3
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.clean;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase.Function2;
+import org.apache.hudi.testutils.HoodieClientTestBase.Function3;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes;
+import static 
org.apache.hudi.table.TestCleaner.insertFirstBigBatchForClientCleanerTest;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static 
org.apache.hudi.testutils.HoodieClientTestBase.wrapRecordsGenFunctionForPreppedCalls;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestCleanerInsertAndCleanByVersions extends 
SparkClientFunctionalTestHarness {
+
+  private static final int BATCH_SIZE = 100;
+  private static final int PARALLELISM = 2;
+
+  /**
+   * Test Clean-By-Versions using insert/upsert API.
+   */
+  @Test
+  public void testInsertAndCleanByVersions() throws Exception {
+    testInsertAndCleanByVersions(SparkRDDWriteClient::insert, 
SparkRDDWriteClient::upsert, false);
+  }
+
+  /**
+   * Test Clean-By-Versions using prepped versions of insert/upsert API.
+   */
+  @Test
+  public void testInsertPreppedAndCleanByVersions() throws Exception {
+    testInsertAndCleanByVersions(SparkRDDWriteClient::insertPreppedRecords, 
SparkRDDWriteClient::upsertPreppedRecords,
+        true);
+  }
+
+  /**
+   * Test Clean-By-Versions using bulk-insert/upsert API.
+   */
+  @Test
+  public void testBulkInsertAndCleanByVersions() throws Exception {
+    testInsertAndCleanByVersions(SparkRDDWriteClient::bulkInsert, 
SparkRDDWriteClient::upsert, false);
+  }
+
+  /**
+   * Test Clean-By-Versions using prepped versions of bulk-insert/upsert API.
+   */
+  @Test
+  public void testBulkInsertPreppedAndCleanByVersions() throws Exception {
+    testInsertAndCleanByVersions(
+        (client, recordRDD, instantTime) -> 
client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()),
+        SparkRDDWriteClient::upsertPreppedRecords, true);
+  }
+
+  /**
+   * Test Helper for Cleaning by versions logic from HoodieWriteClient API 
perspective.
+   *
+   * @param insertFn     Insert API to be tested
+   * @param upsertFn     Upsert API to be tested
+   * @param isPreppedAPI Flag to indicate if a prepped-version is used. If 
true, a wrapper function will be used during
+   *                     record generation to also tag the regards (de-dupe is 
implicit as we use unique record-gen APIs)
+   * @throws Exception in case of errors
+   */
+  private void testInsertAndCleanByVersions(
+      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, 
JavaRDD<HoodieRecord>, String> insertFn,
+      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, 
JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI)
+      throws Exception {
+    int maxVersions = 2; // keep upto 2 versions for each file
+    HoodieWriteConfig cfg = getConfigBuilder(true)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS)
+            .retainFileVersions(maxVersions).build())
+        .withParallelism(PARALLELISM, PARALLELISM)
+        .withBulkInsertParallelism(PARALLELISM)
+        .withFinalizeWriteParallelism(PARALLELISM)
+        .withDeleteParallelism(PARALLELISM)
+        
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+        .build();
+    try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      final HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(System.nanoTime());
+      final Function2<List<HoodieRecord>, String, Integer> 
recordInsertGenWrappedFunction = isPreppedAPI
+          ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), 
context(), cfg, dataGen::generateInserts)
+          : dataGen::generateInserts;
+      final Function2<List<HoodieRecord>, String, Integer> 
recordUpsertGenWrappedFunction = isPreppedAPI
+          ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), 
context(), cfg, dataGen::generateUniqueUpdates)
+          : dataGen::generateUniqueUpdates;
+
+      HoodieTableMetaClient metaClient = 
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+      insertFirstBigBatchForClientCleanerTest(context(), metaClient, client, 
recordInsertGenWrappedFunction, insertFn);
+
+      Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = 
new HashMap<>();
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient);
+      for (String partitionPath : dataGen.getPartitionPaths()) {
+        TableFileSystemView fsView = table.getFileSystemView();
+        Option<Boolean> added = 
Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst().map(fg
 -> {
+          fg.getLatestFileSlice().map(fs -> 
compactionFileIdToLatestFileSlice.put(fg.getFileGroupId(), fs));
+          return true;
+        }));
+        if (added.isPresent()) {
+          // Select only one file-group for compaction
+          break;
+        }
+      }
+
+      // Create workload with selected file-slices
+      List<Pair<String, FileSlice>> partitionFileSlicePairs = 
compactionFileIdToLatestFileSlice.entrySet().stream()
+          .map(e -> Pair.of(e.getKey().getPartitionPath(), 
e.getValue())).collect(Collectors.toList());
+      HoodieCompactionPlan compactionPlan =
+          CompactionUtils.buildFromFileSlices(partitionFileSlicePairs, 
Option.empty(), Option.empty());
+      List<String> instantTimes = makeIncrementalCommitTimes(9, 1, 10);
+      String compactionTime = instantTimes.get(0);
+      table.getActiveTimeline().saveToCompactionRequested(
+          new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, compactionTime),
+          TimelineMetadataUtils.serializeCompactionPlan(compactionPlan));
+
+      instantTimes = instantTimes.subList(1, instantTimes.size());
+      // Keep doing some writes and clean inline. Make sure we have expected 
number of files
+      // remaining.
+      for (String newInstantTime : instantTimes) {
+        client.startCommitWithTime(newInstantTime);
+        List<HoodieRecord> records = 
recordUpsertGenWrappedFunction.apply(newInstantTime, BATCH_SIZE);
+
+        List<WriteStatus> statuses = upsertFn.apply(client, 
jsc().parallelize(records, PARALLELISM), newInstantTime).collect();
+        // Verify there are no errors
+        assertNoWriteErrors(statuses);
+
+        metaClient = HoodieTableMetaClient.reload(metaClient);
+        table = HoodieSparkTable.create(cfg, context(), metaClient);
+        HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline();
+
+        TableFileSystemView fsView = table.getFileSystemView();
+        // Need to ensure the following
+        for (String partitionPath : dataGen.getPartitionPaths()) {
+          // compute all the versions of all files, from time 0
+          HashMap<String, TreeSet<String>> fileIdToVersions = new HashMap<>();
+          for (HoodieInstant entry : 
timeline.getInstants().collect(Collectors.toList())) {
+            HoodieCommitMetadata commitMetadata =
+                
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(entry).get(), 
HoodieCommitMetadata.class);
+
+            for (HoodieWriteStat wstat : 
commitMetadata.getWriteStats(partitionPath)) {
+              if (!fileIdToVersions.containsKey(wstat.getFileId())) {
+                fileIdToVersions.put(wstat.getFileId(), new TreeSet<>());
+              }
+              
fileIdToVersions.get(wstat.getFileId()).add(FSUtils.getCommitTime(new 
Path(wstat.getPath()).getName()));
+            }
+          }
+
+          List<HoodieFileGroup> fileGroups = 
fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
+
+          for (HoodieFileGroup fileGroup : fileGroups) {
+            if 
(compactionFileIdToLatestFileSlice.containsKey(fileGroup.getFileGroupId())) {
+              // Ensure latest file-slice selected for compaction is retained
+              Option<HoodieBaseFile> dataFileForCompactionPresent =
+                  
Option.fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> {
+                    return 
compactionFileIdToLatestFileSlice.get(fileGroup.getFileGroupId()).getBaseInstantTime()
+                        .equals(df.getCommitTime());
+                  }).findAny());
+              assertTrue(dataFileForCompactionPresent.isPresent(),
+                  "Data File selected for compaction is retained");
+            } else {
+              // file has no more than max versions
+              String fileId = fileGroup.getFileGroupId().getFileId();
+              List<HoodieBaseFile> dataFiles = 
fileGroup.getAllBaseFiles().collect(Collectors.toList());
+
+              assertTrue(dataFiles.size() <= maxVersions,
+                  "fileId " + fileId + " has more than " + maxVersions + " 
versions");
+
+              // Each file, has the latest N versions (i.e cleaning gets rid 
of older versions)
+              List<String> commitedVersions = new 
ArrayList<>(fileIdToVersions.get(fileId));
+              for (int i = 0; i < dataFiles.size(); i++) {
+                assertEquals((dataFiles.get(i)).getCommitTime(),
+                    commitedVersions.get(commitedVersions.size() - 1 - i),
+                    "File " + fileId + " does not have latest versions on 
commits" + commitedVersions);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+}


Reply via email to