This is an automated email from the ASF dual-hosted git repository.

nsivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c45288dadc4 test(clean): Cover tests executing pending clean before 
scheduling a new one (#19051)
8c45288dadc4 is described below

commit 8c45288dadc4e48e7de89191e7c529b1d6379cd3
Author: Surya Prasanna <[email protected]>
AuthorDate: Wed Jun 24 08:13:58 2026 -0700

    test(clean): Cover tests executing pending clean before scheduling a new 
one (#19051)
    
    Enhance cleaner tests
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    ---------
    
    Co-authored-by: sivabalan <[email protected]>
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../fs/TestHoodieSerializableFileStatus.java       |  8 +--
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 57 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 6 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/TestHoodieSerializableFileStatus.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/TestHoodieSerializableFileStatus.java
index 61857cb9837c..7759ee5e2fb2 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/TestHoodieSerializableFileStatus.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/TestHoodieSerializableFileStatus.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.common.fs;
 
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.hadoop.fs.HoodieSerializableFileStatus;
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
 
@@ -45,7 +43,6 @@ import java.util.List;
 @TestInstance(Lifecycle.PER_CLASS)
 public class TestHoodieSerializableFileStatus extends 
HoodieSparkClientTestHarness {
 
-  HoodieEngineContext engineContext;
   List<Path> testPaths;
 
   @BeforeAll
@@ -55,7 +52,6 @@ public class TestHoodieSerializableFileStatus extends 
HoodieSparkClientTestHarne
     for (int i = 0; i < 5; i++) {
       testPaths.add(new Path("s3://table-bucket/"));
     }
-    engineContext = new HoodieSparkEngineContext(jsc);
   }
 
   @AfterAll
@@ -67,7 +63,7 @@ public class TestHoodieSerializableFileStatus extends 
HoodieSparkClientTestHarne
   public void testNonSerializableFileStatus() {
     Exception e = Assertions.assertThrows(SparkException.class,
         () -> {
-          List<FileStatus> statuses = engineContext.flatMap(testPaths, path -> 
{
+          List<FileStatus> statuses = context.flatMap(testPaths, path -> {
             FileSystem fileSystem = new NonSerializableFileSystem();
             return Arrays.stream(fileSystem.listStatus(path));
           }, 5);
@@ -78,7 +74,7 @@ public class TestHoodieSerializableFileStatus extends 
HoodieSparkClientTestHarne
 
   @Test
   public void testHoodieFileStatusSerialization() {
-    Assertions.assertDoesNotThrow(() -> engineContext.flatMap(testPaths, path 
-> {
+    Assertions.assertDoesNotThrow(() -> context.flatMap(testPaths, path -> {
       FileSystem fileSystem = new NonSerializableFileSystem();
       return 
Arrays.stream(HoodieSerializableFileStatus.fromFileStatuses(fileSystem.listStatus(path)));
     }, 5));
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 4246ab72fc3f..6f0c6b0d5d63 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -53,6 +53,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -133,6 +134,7 @@ import static 
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PA
 import static 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
 import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
 import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLUSTERING_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
@@ -2152,6 +2154,61 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
         "Schema should be rolled over into new commit even with lookback=1");
   }
 
+  @Test
+  public void 
testExecutingPendingCleanInstantsBeforeSchedulingNewCleanInstant() throws 
Exception {
+    Properties props = new Properties();
+    props.setProperty("hoodie.clean.automatic", "false");
+    HoodieWriteConfig cfg = getConfigBuilder().withProperties(props).build();
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+    // Bulk insert
+    String firstCommit = WriteClientTestUtils.createNewInstantTime();
+    insertFirstBatch(cfg, client, firstCommit, "000", 100, 
SparkRDDWriteClient::bulkInsert,
+        false, false, 100, INSTANT_GENERATOR);
+
+    // First upsert
+    String secondCommit = WriteClientTestUtils.createNewInstantTime();
+    updateBatch(cfg, client, secondCommit, firstCommit, Option.empty(), "000", 
100,
+        SparkRDDWriteClient::upsert, false, true, 100, 100, 2, 
INSTANT_GENERATOR);
+
+    // Second upsert
+    String thirdCommit = WriteClientTestUtils.createNewInstantTime();
+    updateBatch(cfg, client, thirdCommit, secondCommit, Option.empty(), "000", 
100,
+        SparkRDDWriteClient::upsert, false, true, 100, 100, 3, 
INSTANT_GENERATOR);
+
+    // Schedule clean operation
+    Properties cleanProps = new Properties();
+    cleanProps.setProperty("hoodie.clean.automatic", "true");
+    cleanProps.setProperty("hoodie.clean.commits.retained", "1");
+    cleanProps.setProperty("hoodie.clean.multiple.enabled", "false");
+    HoodieWriteConfig cleanCfg = 
getConfigBuilder().withProperties(cleanProps).build();
+    SparkRDDWriteClient cleanClient = new SparkRDDWriteClient(context, 
cleanCfg);
+    cleanClient.scheduleTableService(Option.empty(), TableServiceType.CLEAN);
+
+    // Verify whether clean operation is scheduled.
+    Option<HoodieInstant> firstCleanInstant = 
metaClient.reloadActiveTimeline().lastInstant();
+    assertTrue(firstCleanInstant.isPresent());
+    assertEquals(CLEAN_ACTION, firstCleanInstant.get().getAction());
+
+    // Third upsert
+    String fourthCommit = WriteClientTestUtils.createNewInstantTime();
+    updateBatch(cfg, client, fourthCommit, thirdCommit, Option.empty(), "000", 
100,
+        SparkRDDWriteClient::upsert, false, true, 100, 100, 4, 
INSTANT_GENERATOR);
+
+    // Execute clean operation. This should complete pending clean operation.
+    cleanClient.clean();
+
+    // Verify timeline
+    HoodieTimeline timeline = metaClient.reloadActiveTimeline();
+    assertEquals(5, timeline.countInstants());
+    assertEquals(0, timeline.filterInflights().countInstants());
+    HoodieTimeline cleanTimeline = timeline.filter(instant -> 
instant.getAction().equals(CLEAN_ACTION));
+    assertEquals(1, cleanTimeline.countInstants());
+    HoodieInstant cleanInstant = cleanTimeline.getInstants().get(0);
+    assertTrue(cleanInstant.isCompleted());
+    assertEquals(firstCleanInstant.get().requestedTime(), 
cleanInstant.requestedTime());
+  }
+
   /**
    * Disabling row writer here as clustering tests will throw the error below 
if it is used.
    * java.util.concurrent.CompletionException: java.lang.ClassNotFoundException

Reply via email to