This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b91e39199c [core] make ignoreEmptyCommit configurable (#5941)
b91e39199c is described below
commit b91e39199c51d14ad128e62ec222ad9158797cc1
Author: Akwang <[email protected]>
AuthorDate: Wed Jul 23 18:59:38 2025 +0800
[core] make ignoreEmptyCommit configurable (#5941)
---
.../shortcodes/generated/core_configuration.html | 6 ++++
.../main/java/org/apache/paimon/CoreOptions.java | 6 ++++
.../paimon/table/sink/BatchWriteBuilderImpl.java | 6 +++-
.../flink/sink/CombinedTableCompactorSink.java | 5 ++-
.../org/apache/paimon/spark/SparkWriteITCase.java | 38 ++++++++++++++++++++++
5 files changed, 59 insertions(+), 2 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 33a8337931..592986935a 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -212,6 +212,12 @@ under the License.
<td>String</td>
<td>Specifies the commit user prefix.</td>
</tr>
+ <tr>
+ <td><h5>snapshot.ignore-empty-commit</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Whether ignore empty commit.</td>
+ </tr>
<tr>
<td><h5>compaction.delete-ratio-threshold</h5></td>
<td style="word-wrap: break-word;">0.2</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 3eddb381fd..64ea11a835 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1912,6 +1912,12 @@ public class CoreOptions implements Serializable {
+ "in 'sink.clustering.by-columns'.
'order' is used for 1 column, 'zorder' for less than 5 columns, "
+ "and 'hilbert' for 5 or more columns.");
+ public static final ConfigOption<Boolean> SNAPSHOT_IGNORE_EMPTY_COMMIT =
+ key("snapshot.ignore-empty-commit")
+ .booleanType()
+ .noDefaultValue()
+ .withDescription("Whether ignore empty commit.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index beeb1decbe..445f48bafc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.sink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
@@ -73,7 +74,10 @@ public class BatchWriteBuilderImpl implements
BatchWriteBuilder {
@Override
public BatchTableCommit newCommit() {
InnerTableCommit commit =
table.newCommit(commitUser).withOverwrite(staticPartition);
- commit.ignoreEmptyCommit(true);
+ commit.ignoreEmptyCommit(
+ Options.fromMap(table.options())
+ .getOptional(CoreOptions.SNAPSHOT_IGNORE_EMPTY_COMMIT)
+ .orElse(true));
return commit;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index 7f3366d1b8..4d2b9ab9fc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -199,7 +199,10 @@ public class CombinedTableCompactorSink implements
Serializable {
dynamicOptions.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
dynamicOptions.put(CoreOptions.LOOKUP_WAIT.key(), "false");
}
- return context -> new StoreMultiCommitter(catalogLoader, context,
true, dynamicOptions);
+ boolean ignoreEmptyCommit =
+
options.getOptional(CoreOptions.SNAPSHOT_IGNORE_EMPTY_COMMIT).orElse(true);
+ return context ->
+ new StoreMultiCommitter(catalogLoader, context,
ignoreEmptyCommit, dynamicOptions);
}
protected CommittableStateManager<WrappedManifestCommittable>
createCommittableStateManager() {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index b0737e208f..237e1ac135 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -25,6 +25,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.utils.SnapshotManager;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -42,6 +43,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -487,6 +489,42 @@ public class SparkWriteITCase {
spark.conf().unset("spark.paimon.file.suffix.include.compression");
}
+ @Test
+ public void testIgnoreEmptyCommitConfigurable() {
+ spark.sql(
+ "CREATE TABLE T (id INT, name STRING) "
+ + "TBLPROPERTIES ("
+ + "'bucket-key'='id', "
+ + "'bucket' = '1', "
+ + "'file.format' = 'avro')");
+
+ FileStoreTable table = getTable("T");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ spark.sql("insert into T values (1, 'aa')");
+ Assertions.assertEquals(1, snapshotManager.latestSnapshotId());
+
+ spark.sql("delete from T where id = 1");
+ Assertions.assertEquals(2, snapshotManager.latestSnapshotId());
+ Assertions.assertEquals(
+ -1,
Objects.requireNonNull(snapshotManager.latestSnapshot()).deltaRecordCount());
+
+ // in batch write, ignore.empty.commit default is true
+ spark.sql("delete from T where id = 1");
+ Assertions.assertEquals(2, snapshotManager.latestSnapshotId());
+ Assertions.assertEquals(
+ -1,
Objects.requireNonNull(snapshotManager.latestSnapshot()).deltaRecordCount());
+
+ // set false to allow commit empty snapshot
+ spark.conf().set("spark.paimon.snapshot.ignore-empty-commit", "false");
+ spark.sql("delete from T where id = 1");
+ Assertions.assertEquals(3, snapshotManager.latestSnapshotId());
+ Assertions.assertEquals(
+ 0,
Objects.requireNonNull(snapshotManager.latestSnapshot()).deltaRecordCount());
+
+ spark.conf().unset("spark.paimon.snapshot.ignore-empty-commit");
+ }
+
protected static FileStoreTable getTable(String tableName) {
return FileStoreTableFactory.create(
LocalFileIO.create(),