This is an automated email from the ASF dual-hosted git repository.
nsivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8c45288dadc4 test(clean): Cover tests executing pending clean before
scheduling a new one (#19051)
8c45288dadc4 is described below
commit 8c45288dadc4e48e7de89191e7c529b1d6379cd3
Author: Surya Prasanna <[email protected]>
AuthorDate: Wed Jun 24 08:13:58 2026 -0700
test(clean): Cover tests executing pending clean before scheduling a new
one (#19051)
Enhance cleaner tests
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---------
Co-authored-by: sivabalan <[email protected]>
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../fs/TestHoodieSerializableFileStatus.java | 8 +--
.../TestHoodieClientOnCopyOnWriteStorage.java | 57 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 6 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/TestHoodieSerializableFileStatus.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/TestHoodieSerializableFileStatus.java
index 61857cb9837c..7759ee5e2fb2 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/TestHoodieSerializableFileStatus.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/fs/TestHoodieSerializableFileStatus.java
@@ -18,8 +18,6 @@
package org.apache.hudi.common.fs;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.hadoop.fs.HoodieSerializableFileStatus;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
@@ -45,7 +43,6 @@ import java.util.List;
@TestInstance(Lifecycle.PER_CLASS)
public class TestHoodieSerializableFileStatus extends
HoodieSparkClientTestHarness {
- HoodieEngineContext engineContext;
List<Path> testPaths;
@BeforeAll
@@ -55,7 +52,6 @@ public class TestHoodieSerializableFileStatus extends
HoodieSparkClientTestHarne
for (int i = 0; i < 5; i++) {
testPaths.add(new Path("s3://table-bucket/"));
}
- engineContext = new HoodieSparkEngineContext(jsc);
}
@AfterAll
@@ -67,7 +63,7 @@ public class TestHoodieSerializableFileStatus extends
HoodieSparkClientTestHarne
public void testNonSerializableFileStatus() {
Exception e = Assertions.assertThrows(SparkException.class,
() -> {
- List<FileStatus> statuses = engineContext.flatMap(testPaths, path ->
{
+ List<FileStatus> statuses = context.flatMap(testPaths, path -> {
FileSystem fileSystem = new NonSerializableFileSystem();
return Arrays.stream(fileSystem.listStatus(path));
}, 5);
@@ -78,7 +74,7 @@ public class TestHoodieSerializableFileStatus extends
HoodieSparkClientTestHarne
@Test
public void testHoodieFileStatusSerialization() {
- Assertions.assertDoesNotThrow(() -> engineContext.flatMap(testPaths, path
-> {
+ Assertions.assertDoesNotThrow(() -> context.flatMap(testPaths, path -> {
FileSystem fileSystem = new NonSerializableFileSystem();
return
Arrays.stream(HoodieSerializableFileStatus.fromFileStatuses(fileSystem.listStatus(path)));
}, 5));
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 4246ab72fc3f..6f0c6b0d5d63 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -53,6 +53,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -133,6 +134,7 @@ import static
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PA
import static
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.CLUSTERING_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
@@ -2152,6 +2154,61 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
"Schema should be rolled over into new commit even with lookback=1");
}
+ @Test
+ public void
testExecutingPendingCleanInstantsBeforeSchedulingNewCleanInstant() throws
Exception {
+ Properties props = new Properties();
+ props.setProperty("hoodie.clean.automatic", "false");
+ HoodieWriteConfig cfg = getConfigBuilder().withProperties(props).build();
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+ // Bulk insert
+ String firstCommit = WriteClientTestUtils.createNewInstantTime();
+ insertFirstBatch(cfg, client, firstCommit, "000", 100,
SparkRDDWriteClient::bulkInsert,
+ false, false, 100, INSTANT_GENERATOR);
+
+ // First upsert
+ String secondCommit = WriteClientTestUtils.createNewInstantTime();
+ updateBatch(cfg, client, secondCommit, firstCommit, Option.empty(), "000",
100,
+ SparkRDDWriteClient::upsert, false, true, 100, 100, 2,
INSTANT_GENERATOR);
+
+ // Second upsert
+ String thirdCommit = WriteClientTestUtils.createNewInstantTime();
+ updateBatch(cfg, client, thirdCommit, secondCommit, Option.empty(), "000",
100,
+ SparkRDDWriteClient::upsert, false, true, 100, 100, 3,
INSTANT_GENERATOR);
+
+ // Schedule clean operation
+ Properties cleanProps = new Properties();
+ cleanProps.setProperty("hoodie.clean.automatic", "true");
+ cleanProps.setProperty("hoodie.clean.commits.retained", "1");
+ cleanProps.setProperty("hoodie.clean.multiple.enabled", "false");
+ HoodieWriteConfig cleanCfg =
getConfigBuilder().withProperties(cleanProps).build();
+ SparkRDDWriteClient cleanClient = new SparkRDDWriteClient(context,
cleanCfg);
+ cleanClient.scheduleTableService(Option.empty(), TableServiceType.CLEAN);
+
+ // Verify whether clean operation is scheduled.
+ Option<HoodieInstant> firstCleanInstant =
metaClient.reloadActiveTimeline().lastInstant();
+ assertTrue(firstCleanInstant.isPresent());
+ assertEquals(CLEAN_ACTION, firstCleanInstant.get().getAction());
+
+ // Third upsert
+ String fourthCommit = WriteClientTestUtils.createNewInstantTime();
+ updateBatch(cfg, client, fourthCommit, thirdCommit, Option.empty(), "000",
100,
+ SparkRDDWriteClient::upsert, false, true, 100, 100, 4,
INSTANT_GENERATOR);
+
+ // Execute clean operation. This should complete pending clean operation.
+ cleanClient.clean();
+
+ // Verify timeline
+ HoodieTimeline timeline = metaClient.reloadActiveTimeline();
+ assertEquals(5, timeline.countInstants());
+ assertEquals(0, timeline.filterInflights().countInstants());
+ HoodieTimeline cleanTimeline = timeline.filter(instant ->
instant.getAction().equals(CLEAN_ACTION));
+ assertEquals(1, cleanTimeline.countInstants());
+ HoodieInstant cleanInstant = cleanTimeline.getInstants().get(0);
+ assertTrue(cleanInstant.isCompleted());
+ assertEquals(firstCleanInstant.get().requestedTime(),
cleanInstant.requestedTime());
+ }
+
/**
* Disabling row writer here as clustering tests will throw the error below
if it is used.
* java.util.concurrent.CompletionException: java.lang.ClassNotFoundException