This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 71f6a39f79 [core] Introduce expireForEmptyCommit to InnerTableCommit
(#6013)
71f6a39f79 is described below
commit 71f6a39f79e087db54cb6de49a39f796fc1898be
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Aug 1 18:41:34 2025 +0800
[core] Introduce expireForEmptyCommit to InnerTableCommit (#6013)
---
.../apache/paimon/operation/FileStoreCommit.java | 4 +-
.../paimon/operation/FileStoreCommitImpl.java | 6 +-
.../apache/paimon/table/sink/InnerTableCommit.java | 2 +
.../apache/paimon/table/sink/TableCommitImpl.java | 73 +++++++++++++++-------
.../apache/paimon/table/SimpleTableTestBase.java | 2 +-
.../apache/paimon/table/sink/TableCommitTest.java | 59 ++++++++++++++++-
6 files changed, 116 insertions(+), 30 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 1ab4ebe968..4156ce0a83 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -40,7 +40,7 @@ public interface FileStoreCommit extends AutoCloseable {
List<ManifestCommittable> filterCommitted(List<ManifestCommittable>
committables);
/** Commit from manifest committable with checkAppendFiles. */
- void commit(ManifestCommittable committable, boolean checkAppendFiles);
+ int commit(ManifestCommittable committable, boolean checkAppendFiles);
/**
* Overwrite from manifest committable and partition.
@@ -50,7 +50,7 @@ public interface FileStoreCommit extends AutoCloseable {
* note that this partition does not necessarily equal to the
partitions of the newly added
* key-values. This is just the partition to be cleaned up.
*/
- void overwrite(
+ int overwrite(
Map<String, String> partition,
ManifestCommittable committable,
Map<String, String> properties);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 54888abe72..c82776a916 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -277,7 +277,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
@Override
- public void commit(ManifestCommittable committable, boolean
checkAppendFiles) {
+ public int commit(ManifestCommittable committable, boolean
checkAppendFiles) {
LOG.info(
"Ready to commit to table {}, number of commit messages: {}",
tableName,
@@ -399,6 +399,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
attempts);
}
}
+ return generatedSnapshot;
}
private void reportCommit(
@@ -422,7 +423,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
@Override
- public void overwrite(
+ public int overwrite(
Map<String, String> partition,
ManifestCommittable committable,
Map<String, String> properties) {
@@ -551,6 +552,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
attempts);
}
}
+ return generatedSnapshot;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
index 1544375569..df6241086a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
@@ -44,6 +44,8 @@ public interface InnerTableCommit extends StreamTableCommit,
BatchTableCommit {
*/
InnerTableCommit ignoreEmptyCommit(boolean ignoreEmptyCommit);
+ InnerTableCommit expireForEmptyCommit(boolean expireForEmptyCommit);
+
@Override
InnerTableCommit withMetricRegistry(MetricRegistry registry);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index e262c5f6bf..40c69289a5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -30,7 +30,9 @@ import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.stats.Statistics;
+import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.tag.TagAutoManager;
+import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.PathFactory;
@@ -80,14 +82,15 @@ public class TableCommitImpl implements InnerTableCommit {
@Nullable private final Duration consumerExpireTime;
private final ConsumerManager consumerManager;
- private final ExecutorService expireMainExecutor;
- private final AtomicReference<Throwable> expireError;
+ private final ExecutorService maintainExecutor;
+ private final AtomicReference<Throwable> maintainError;
private final String tableName;
@Nullable private Map<String, String> overwritePartition = null;
private boolean batchCommitted = false;
private final boolean forceCreatingSnapshot;
+ private boolean expireForEmptyCommit = true;
public TableCommitImpl(
FileStoreCommit commit,
@@ -111,13 +114,13 @@ public class TableCommitImpl implements InnerTableCommit {
this.consumerExpireTime = consumerExpireTime;
this.consumerManager = consumerManager;
- this.expireMainExecutor =
+ this.maintainExecutor =
expireExecutionMode == ExpireExecutionMode.SYNC
? MoreExecutors.newDirectExecutorService()
: Executors.newSingleThreadExecutor(
new ExecutorThreadFactory(
Thread.currentThread().getName() +
"expire-main-thread"));
- this.expireError = new AtomicReference<>(null);
+ this.maintainError = new AtomicReference<>(null);
this.tableName = tableName;
this.forceCreatingSnapshot = forceCreatingSnapshot;
@@ -147,6 +150,12 @@ public class TableCommitImpl implements InnerTableCommit {
return this;
}
+ @Override
+ public TableCommitImpl expireForEmptyCommit(boolean expireForEmptyCommit) {
+ this.expireForEmptyCommit = expireForEmptyCommit;
+ return this;
+ }
+
@Override
public InnerTableCommit withMetricRegistry(MetricRegistry registry) {
commit.withMetrics(new CommitMetrics(registry, tableName));
@@ -213,11 +222,15 @@ public class TableCommitImpl implements InnerTableCommit {
public void commitMultiple(List<ManifestCommittable> committables, boolean
checkAppendFiles) {
if (overwritePartition == null) {
+ int newSnapshots = 0;
for (ManifestCommittable committable : committables) {
- commit.commit(committable, checkAppendFiles);
+ newSnapshots += commit.commit(committable, checkAppendFiles);
}
if (!committables.isEmpty()) {
- expire(committables.get(committables.size() - 1).identifier(),
expireMainExecutor);
+ maintain(
+ committables.get(committables.size() - 1).identifier(),
+ maintainExecutor,
+ newSnapshots > 0 || expireForEmptyCommit);
}
} else {
ManifestCommittable committable;
@@ -233,8 +246,12 @@ public class TableCommitImpl implements InnerTableCommit {
// TODO maybe it can be produced by CommitterOperator
committable = new ManifestCommittable(Long.MAX_VALUE);
}
- commit.overwrite(overwritePartition, committable,
Collections.emptyMap());
- expire(committable.identifier(), expireMainExecutor);
+ int newSnapshots =
+ commit.overwrite(overwritePartition, committable,
Collections.emptyMap());
+ maintain(
+ committable.identifier(),
+ maintainExecutor,
+ newSnapshots > 0 || expireForEmptyCommit);
}
}
@@ -315,36 +332,46 @@ public class TableCommitImpl implements InnerTableCommit {
}
}
- private void expire(long partitionExpireIdentifier, ExecutorService
executor) {
- if (expireError.get() != null) {
- throw new RuntimeException(expireError.get());
+ private void maintain(long identifier, ExecutorService executor, boolean
doExpire) {
+ if (maintainError.get() != null) {
+ throw new RuntimeException(maintainError.get());
}
executor.execute(
() -> {
try {
- expire(partitionExpireIdentifier);
+ maintain(identifier, doExpire);
} catch (Throwable t) {
- LOG.error("Executing expire encountered an error.", t);
- expireError.compareAndSet(null, t);
+ LOG.error("Executing maintain encountered an error.",
t);
+ maintainError.compareAndSet(null, t);
}
});
}
- private void expire(long partitionExpireIdentifier) {
+ private void maintain(long identifier, boolean doExpire) {
// expire consumer first to avoid preventing snapshot expiration
- if (consumerExpireTime != null) {
+ if (doExpire && consumerExpireTime != null) {
consumerManager.expire(LocalDateTime.now().minus(consumerExpireTime));
}
- expireSnapshots();
+ if (doExpire && expireSnapshots != null) {
+ expireSnapshots.run();
+ }
- if (partitionExpire != null) {
- partitionExpire.expire(partitionExpireIdentifier);
+ if (doExpire && partitionExpire != null) {
+ partitionExpire.expire(identifier);
}
if (tagAutoManager != null) {
- tagAutoManager.run();
+ TagAutoCreation tagAutoCreation =
tagAutoManager.getTagAutoCreation();
+ if (tagAutoCreation != null) {
+ tagAutoCreation.run();
+ }
+
+ TagTimeExpire tagTimeExpire = tagAutoManager.getTagTimeExpire();
+ if (doExpire && tagTimeExpire != null) {
+ tagTimeExpire.expire();
+ }
}
}
@@ -357,7 +384,7 @@ public class TableCommitImpl implements InnerTableCommit {
@Override
public void close() throws Exception {
commit.close();
- expireMainExecutor.shutdownNow();
+ maintainExecutor.shutdownNow();
}
@Override
@@ -366,7 +393,7 @@ public class TableCommitImpl implements InnerTableCommit {
}
@VisibleForTesting
- public ExecutorService getExpireMainExecutor() {
- return expireMainExecutor;
+ public ExecutorService getMaintainExecutor() {
+ return maintainExecutor;
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index cd11a2ddfd..457c1dcb16 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -1345,7 +1345,7 @@ public abstract class SimpleTableTestBase {
options.put(SNAPSHOT_EXPIRE_LIMIT.key(), "2");
TableCommitImpl commit = table.copy(options).newCommit(commitUser);
- ExecutorService executor = commit.getExpireMainExecutor();
+ ExecutorService executor = commit.getMaintainExecutor();
CountDownLatch before = new CountDownLatch(1);
CountDownLatch after = new CountDownLatch(1);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index f2dee4743f..8c81ec7c26 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -41,6 +41,7 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ExceptionUtils;
import org.apache.paimon.utils.FailingFileIO;
+import org.apache.paimon.utils.SnapshotManager;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -58,6 +59,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
+import static java.util.Collections.singletonMap;
import static
org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -251,10 +253,10 @@ public class TableCommitTest {
}
// commit 0, fine, it will be filtered
- commit.filterAndCommit(Collections.singletonMap(0L, messages0));
+ commit.filterAndCommit(singletonMap(0L, messages0));
// commit 1, exception now.
- assertThatThrownBy(() ->
commit.filterAndCommit(Collections.singletonMap(1L, messages1)))
+ assertThatThrownBy(() -> commit.filterAndCommit(singletonMap(1L,
messages1)))
.hasMessageContaining(
"Cannot recover from this checkpoint because some
files in the"
+ " snapshot that need to be resubmitted have
been deleted");
@@ -381,4 +383,57 @@ public class TableCommitTest {
.hasMessageContaining(
"Giving up committing as
commit.strict-mode.last-safe-snapshot is set.");
}
+
+ @Test
+ public void testExpireForEmptyCommit() throws Exception {
+ String path = tempDir.toString();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+ new String[] {"k", "v"});
+
+ Options options = new Options();
+ options.set(CoreOptions.PATH, path);
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 2);
+ options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 2);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), new
Path(path)),
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ options.toMap(),
+ ""));
+ FileStoreTable table =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(path),
+ tableSchema,
+ CatalogEnvironment.empty());
+ SnapshotManager snapshotManager = table.snapshotManager();
+ String user1 = UUID.randomUUID().toString();
+ TableWriteImpl<?> write = table.newWrite(user1);
+ TableCommitImpl commit = table.copy(singletonMap("write-only",
"true")).newCommit(user1);
+
+ for (int i = 0; i < 5; i++) {
+ write.write(GenericRow.of(i, (long) i));
+ commit.commit(i, write.prepareCommit(true, i));
+ }
+ assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(1);
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(6);
+
+ // expire for empty commit: false
+ commit =
table.newCommit(user1).ignoreEmptyCommit(true).expireForEmptyCommit(false);
+ commit.commit(7, write.prepareCommit(true, 7));
+ assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(1);
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(6);
+
+ // expire for empty commit: default true
+ commit = table.newCommit(user1).ignoreEmptyCommit(true);
+ commit.commit(7, write.prepareCommit(true, 7));
+ assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(5);
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(6);
+ }
}