This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 414bc1d571 [core] Async snapshot expire fallback to sync mode in batch 
commit (#6593)
414bc1d571 is described below

commit 414bc1d571d6e39c4533c4dd6fb3fe0dee66cbb8
Author: WenjunMin <[email protected]>
AuthorDate: Wed Nov 12 23:35:30 2025 +0800

    [core] Async snapshot expire fallback to sync mode in batch commit (#6593)
---
 .../apache/paimon/table/sink/TableCommitImpl.java  | 22 +++++++-----
 .../paimon/partition/PartitionExpireTableTest.java |  8 ++---
 .../apache/paimon/table/SimpleTableTestBase.java   | 42 ++++++++++++++++++++++
 3 files changed, 59 insertions(+), 13 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 4f3bb5b71b..3e778c6bcc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -350,15 +350,19 @@ public class TableCommitImpl implements InnerTableCommit {
             throw new RuntimeException(maintainError.get());
         }
 
-        executor.execute(
-                () -> {
-                    try {
-                        maintain(identifier, doExpire);
-                    } catch (Throwable t) {
-                        LOG.error("Executing maintain encountered an error.", 
t);
-                        maintainError.compareAndSet(null, t);
-                    }
-                });
+        if (batchCommitted) {
+            maintain(identifier, doExpire);
+        } else {
+            executor.execute(
+                    () -> {
+                        try {
+                            maintain(identifier, doExpire);
+                        } catch (Throwable t) {
+                            LOG.error("Executing maintain encountered an 
error.", t);
+                            maintainError.compareAndSet(null, t);
+                        }
+                    });
+        }
     }
 
     private void maintain(long identifier, boolean doExpire) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
index c2e1356c15..b78506a825 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
@@ -50,14 +50,14 @@ class PartitionExpireTableTest extends TableTestBase {
         catalog.createTable(identifier(), schemaBuilder.build(), true);
 
         Table table = catalog.getTable(identifier());
+        String path = table.options().get("path");
+        PartitionEntry expire = new PartitionEntry(BinaryRow.singleColumn(1), 
1, 1, 1, 1);
+        TABLE_EXPIRE_PARTITIONS.put(path, Collections.singletonList(expire));
         write(table, GenericRow.of(1, 1));
         write(table, GenericRow.of(2, 2));
-        assertThat(read(table)).containsExactlyInAnyOrder(GenericRow.of(1, 1), 
GenericRow.of(2, 2));
+        assertThat(read(table)).containsExactlyInAnyOrder(GenericRow.of(2, 2));
 
-        String path = table.options().get("path");
         try {
-            PartitionEntry expire = new 
PartitionEntry(BinaryRow.singleColumn(1), 1, 1, 1, 1);
-            TABLE_EXPIRE_PARTITIONS.put(path, 
Collections.singletonList(expire));
             write(table, GenericRow.of(3, 3));
             assertThat(read(table))
                     .containsExactlyInAnyOrder(GenericRow.of(3, 3), 
GenericRow.of(2, 2));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index 457c1dcb16..81a613ac44 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -46,6 +46,9 @@ import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.InnerTableCommit;
@@ -90,6 +93,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -1422,6 +1426,44 @@ public abstract class SimpleTableTestBase {
         commit.close();
     }
 
+    @Test
+    public void testBatchWriteAsyncExpireFallbackToSync() throws Exception {
+        // configure table to async expire but retain only last snapshot
+        Map<String, String> opts = new HashMap<>();
+        opts.put(SNAPSHOT_EXPIRE_EXECUTION_MODE.key(), 
ExpireExecutionMode.ASYNC.toString());
+        opts.put(SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
+        opts.put(SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
+        opts.put(SNAPSHOT_EXPIRE_LIMIT.key(), "100");
+
+        FileStoreTable table = createFileStoreTable(conf -> {});
+        table = table.copy(opts);
+
+        SnapshotManager sm = table.snapshotManager();
+        AtomicLong lastId = new AtomicLong(0);
+
+        // perform multiple batch commits; expiration should run synchronously 
after each commit
+        for (int i = 0; i < 3; i++) {
+            BatchWriteBuilder builder = table.newBatchWriteBuilder();
+            try (BatchTableWrite write = builder.newWrite();
+                    BatchTableCommit commit = builder.newCommit()) {
+                write.write(rowData(i, i * 10, i * 100L));
+                commit.commit(write.prepareCommit());
+            }
+
+            long latest = sm.latestSnapshotId();
+            assertThat(latest).isGreaterThan(0);
+
+            if (lastId.get() > 0) {
+                // since retain min/max = 1, previous snapshot must have been 
expired synchronously
+                assertThat(sm.snapshotExists(lastId.get()))
+                        .as("previous snapshot should be expired synchronously 
in batch mode")
+                        .isFalse();
+                assertThat(sm.earliestSnapshotId()).isEqualTo(latest);
+            }
+            lastId.set(latest);
+        }
+    }
+
     @Test
     @Timeout(120)
     public void testExpireWithLimit() throws Exception {

Reply via email to