This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 414bc1d571 [core] Async snapshot expire fallback to sync mode in batch
commit (#6593)
414bc1d571 is described below
commit 414bc1d571d6e39c4533c4dd6fb3fe0dee66cbb8
Author: WenjunMin <[email protected]>
AuthorDate: Wed Nov 12 23:35:30 2025 +0800
[core] Async snapshot expire fallback to sync mode in batch commit (#6593)
---
.../apache/paimon/table/sink/TableCommitImpl.java | 22 +++++++-----
.../paimon/partition/PartitionExpireTableTest.java | 8 ++---
.../apache/paimon/table/SimpleTableTestBase.java | 42 ++++++++++++++++++++++
3 files changed, 59 insertions(+), 13 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 4f3bb5b71b..3e778c6bcc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -350,15 +350,19 @@ public class TableCommitImpl implements InnerTableCommit {
throw new RuntimeException(maintainError.get());
}
- executor.execute(
- () -> {
- try {
- maintain(identifier, doExpire);
- } catch (Throwable t) {
- LOG.error("Executing maintain encountered an error.",
t);
- maintainError.compareAndSet(null, t);
- }
- });
+ if (batchCommitted) {
+ maintain(identifier, doExpire);
+ } else {
+ executor.execute(
+ () -> {
+ try {
+ maintain(identifier, doExpire);
+ } catch (Throwable t) {
+ LOG.error("Executing maintain encountered an
error.", t);
+ maintainError.compareAndSet(null, t);
+ }
+ });
+ }
}
private void maintain(long identifier, boolean doExpire) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
index c2e1356c15..b78506a825 100644
---
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
@@ -50,14 +50,14 @@ class PartitionExpireTableTest extends TableTestBase {
catalog.createTable(identifier(), schemaBuilder.build(), true);
Table table = catalog.getTable(identifier());
+ String path = table.options().get("path");
+ PartitionEntry expire = new PartitionEntry(BinaryRow.singleColumn(1),
1, 1, 1, 1);
+ TABLE_EXPIRE_PARTITIONS.put(path, Collections.singletonList(expire));
write(table, GenericRow.of(1, 1));
write(table, GenericRow.of(2, 2));
- assertThat(read(table)).containsExactlyInAnyOrder(GenericRow.of(1, 1),
GenericRow.of(2, 2));
+ assertThat(read(table)).containsExactlyInAnyOrder(GenericRow.of(2, 2));
- String path = table.options().get("path");
try {
- PartitionEntry expire = new
PartitionEntry(BinaryRow.singleColumn(1), 1, 1, 1, 1);
- TABLE_EXPIRE_PARTITIONS.put(path,
Collections.singletonList(expire));
write(table, GenericRow.of(3, 3));
assertThat(read(table))
.containsExactlyInAnyOrder(GenericRow.of(3, 3),
GenericRow.of(2, 2));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index 457c1dcb16..81a613ac44 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -46,6 +46,9 @@ import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.InnerTableCommit;
@@ -90,6 +93,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -1422,6 +1426,44 @@ public abstract class SimpleTableTestBase {
commit.close();
}
+ @Test
+ public void testBatchWriteAsyncExpireFallbackToSync() throws Exception {
+ // configure table to async expire but retain only last snapshot
+ Map<String, String> opts = new HashMap<>();
+ opts.put(SNAPSHOT_EXPIRE_EXECUTION_MODE.key(),
ExpireExecutionMode.ASYNC.toString());
+ opts.put(SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
+ opts.put(SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
+ opts.put(SNAPSHOT_EXPIRE_LIMIT.key(), "100");
+
+ FileStoreTable table = createFileStoreTable(conf -> {});
+ table = table.copy(opts);
+
+ SnapshotManager sm = table.snapshotManager();
+ AtomicLong lastId = new AtomicLong(0);
+
+ // perform multiple batch commits; expiration should run synchronously
after each commit
+ for (int i = 0; i < 3; i++) {
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = builder.newWrite();
+ BatchTableCommit commit = builder.newCommit()) {
+ write.write(rowData(i, i * 10, i * 100L));
+ commit.commit(write.prepareCommit());
+ }
+
+ long latest = sm.latestSnapshotId();
+ assertThat(latest).isGreaterThan(0);
+
+ if (lastId.get() > 0) {
+ // since retain min/max = 1, previous snapshot must have been
expired synchronously
+ assertThat(sm.snapshotExists(lastId.get()))
+ .as("previous snapshot should be expired synchronously
in batch mode")
+ .isFalse();
+ assertThat(sm.earliestSnapshotId()).isEqualTo(latest);
+ }
+ lastId.set(latest);
+ }
+ }
+
@Test
@Timeout(120)
public void testExpireWithLimit() throws Exception {