This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b91e39199c [core] make ignoreEmptyCommit configurable (#5941)
b91e39199c is described below

commit b91e39199c51d14ad128e62ec222ad9158797cc1
Author: Akwang <[email protected]>
AuthorDate: Wed Jul 23 18:59:38 2025 +0800

    [core] make ignoreEmptyCommit configurable (#5941)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++++
 .../main/java/org/apache/paimon/CoreOptions.java   |  6 ++++
 .../paimon/table/sink/BatchWriteBuilderImpl.java   |  6 +++-
 .../flink/sink/CombinedTableCompactorSink.java     |  5 ++-
 .../org/apache/paimon/spark/SparkWriteITCase.java  | 38 ++++++++++++++++++++++
 5 files changed, 59 insertions(+), 2 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 33a8337931..592986935a 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -212,6 +212,12 @@ under the License.
             <td>String</td>
             <td>Specifies the commit user prefix.</td>
         </tr>
+        <tr>
+            <td><h5>snapshot.ignore-empty-commit</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Whether ignore empty commit.</td>
+        </tr>
         <tr>
             <td><h5>compaction.delete-ratio-threshold</h5></td>
             <td style="word-wrap: break-word;">0.2</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 3eddb381fd..64ea11a835 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1912,6 +1912,12 @@ public class CoreOptions implements Serializable {
                                     + "in 'sink.clustering.by-columns'. 
'order' is used for 1 column, 'zorder' for less than 5 columns, "
                                     + "and 'hilbert' for 5 or more columns.");
 
+    public static final ConfigOption<Boolean> SNAPSHOT_IGNORE_EMPTY_COMMIT =
+            key("snapshot.ignore-empty-commit")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription("Whether ignore empty commit.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index beeb1decbe..445f48bafc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.sink;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.InnerTable;
 import org.apache.paimon.types.RowType;
@@ -73,7 +74,10 @@ public class BatchWriteBuilderImpl implements 
BatchWriteBuilder {
     @Override
     public BatchTableCommit newCommit() {
         InnerTableCommit commit = 
table.newCommit(commitUser).withOverwrite(staticPartition);
-        commit.ignoreEmptyCommit(true);
+        commit.ignoreEmptyCommit(
+                Options.fromMap(table.options())
+                        .getOptional(CoreOptions.SNAPSHOT_IGNORE_EMPTY_COMMIT)
+                        .orElse(true));
         return commit;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index 7f3366d1b8..4d2b9ab9fc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -199,7 +199,10 @@ public class CombinedTableCompactorSink implements 
Serializable {
             dynamicOptions.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
             dynamicOptions.put(CoreOptions.LOOKUP_WAIT.key(), "false");
         }
-        return context -> new StoreMultiCommitter(catalogLoader, context, 
true, dynamicOptions);
+        boolean ignoreEmptyCommit =
+                
options.getOptional(CoreOptions.SNAPSHOT_IGNORE_EMPTY_COMMIT).orElse(true);
+        return context ->
+                new StoreMultiCommitter(catalogLoader, context, 
ignoreEmptyCommit, dynamicOptions);
     }
 
     protected CommittableStateManager<WrappedManifestCommittable> 
createCommittableStateManager() {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index b0737e208f..237e1ac135 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -25,6 +25,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -42,6 +43,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -487,6 +489,42 @@ public class SparkWriteITCase {
         spark.conf().unset("spark.paimon.file.suffix.include.compression");
     }
 
+    @Test
+    public void testIgnoreEmptyCommitConfigurable() {
+        spark.sql(
+                "CREATE TABLE T (id INT, name STRING) "
+                        + "TBLPROPERTIES ("
+                        + "'bucket-key'='id', "
+                        + "'bucket' = '1', "
+                        + "'file.format' = 'avro')");
+
+        FileStoreTable table = getTable("T");
+        SnapshotManager snapshotManager = table.snapshotManager();
+
+        spark.sql("insert into T values (1, 'aa')");
+        Assertions.assertEquals(1, snapshotManager.latestSnapshotId());
+
+        spark.sql("delete from T where id = 1");
+        Assertions.assertEquals(2, snapshotManager.latestSnapshotId());
+        Assertions.assertEquals(
+                -1, 
Objects.requireNonNull(snapshotManager.latestSnapshot()).deltaRecordCount());
+
+        // in batch write, ignore.empty.commit default is true
+        spark.sql("delete from T where id = 1");
+        Assertions.assertEquals(2, snapshotManager.latestSnapshotId());
+        Assertions.assertEquals(
+                -1, 
Objects.requireNonNull(snapshotManager.latestSnapshot()).deltaRecordCount());
+
+        // set false to allow commit empty snapshot
+        spark.conf().set("spark.paimon.snapshot.ignore-empty-commit", "false");
+        spark.sql("delete from T where id = 1");
+        Assertions.assertEquals(3, snapshotManager.latestSnapshotId());
+        Assertions.assertEquals(
+                0, 
Objects.requireNonNull(snapshotManager.latestSnapshot()).deltaRecordCount());
+
+        spark.conf().unset("spark.paimon.snapshot.ignore-empty-commit");
+    }
+
     protected static FileStoreTable getTable(String tableName) {
         return FileStoreTableFactory.create(
                 LocalFileIO.create(),

Reply via email to