This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 26c2bd4dac [core] Postpone bucket writing job should not expire 
snapshots (#5391)
26c2bd4dac is described below

commit 26c2bd4dac3d1a990f3d11d4d193c026191c8052
Author: tsreaper <[email protected]>
AuthorDate: Wed Apr 2 18:37:15 2025 +0800

    [core] Postpone bucket writing job should not expire snapshots (#5391)
---
 .../generated/flink_connector_configuration.html   |  2 +-
 .../paimon/table/AbstractFileStoreTable.java       | 27 ++++++++++------
 .../paimon/table/PrimaryKeyFileStoreTable.java     | 12 +++++++
 .../apache/paimon/flink/FlinkConnectorOptions.java |  2 +-
 .../paimon/flink/PostponeBucketTableITCase.java    | 37 ++++++++++++++++++++++
 5 files changed, 68 insertions(+), 12 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 6c273e91e1..feb8715701 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -100,7 +100,7 @@ under the License.
         </tr>
         <tr>
             <td><h5>postpone.default-bucket-num</h5></td>
-            <td style="word-wrap: break-word;">4</td>
+            <td style="word-wrap: break-word;">1</td>
             <td>Integer</td>
             <td>Bucket number for the partitions compacted for the first time 
in postpone bucket tables.</td>
         </tr>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 2e9647f9d5..9793a6a52a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -439,8 +439,24 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     @Override
     public TableCommitImpl newCommit(String commitUser) {
+        CoreOptions options = coreOptions();
+        return new TableCommitImpl(
+                store().newCommit(commitUser, this),
+                newExpireRunnable(),
+                options.writeOnly() ? null : 
store().newPartitionExpire(commitUser, this),
+                options.writeOnly() ? null : store().newTagCreationManager(),
+                CoreOptions.fromMap(options()).consumerExpireTime(),
+                new ConsumerManager(fileIO, path, snapshotManager().branch()),
+                options.snapshotExpireExecutionMode(),
+                name(),
+                options.forceCreatingSnapshot());
+    }
+
+    @Nullable
+    protected Runnable newExpireRunnable() {
         CoreOptions options = coreOptions();
         Runnable snapshotExpire = null;
+
         if (!options.writeOnly()) {
             boolean changelogDecoupled = options.changelogLifecycleDecoupled();
             ExpireConfig expireConfig = options.expireConfig();
@@ -455,16 +471,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                     };
         }
 
-        return new TableCommitImpl(
-                store().newCommit(commitUser, this),
-                snapshotExpire,
-                options.writeOnly() ? null : 
store().newPartitionExpire(commitUser, this),
-                options.writeOnly() ? null : store().newTagCreationManager(),
-                CoreOptions.fromMap(options()).consumerExpireTime(),
-                new ConsumerManager(fileIO, path, snapshotManager().branch()),
-                options.snapshotExpireExecutionMode(),
-                name(),
-                options.forceCreatingSnapshot());
+        return snapshotExpire;
     }
 
     private Optional<TableSchema> tryTimeTravel(Options options) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 835405c513..b563693085 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -39,6 +39,8 @@ import org.apache.paimon.table.source.MergeTreeSplitGenerator;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.types.RowType;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.function.BiConsumer;
 
@@ -173,4 +175,14 @@ public class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
     public LocalTableQuery newLocalTableQuery() {
         return new LocalTableQuery(this);
     }
+
+    @Override
+    @Nullable
+    protected Runnable newExpireRunnable() {
+        if (coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) {
+            return null;
+        } else {
+            return super.newExpireRunnable();
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index e63356add8..adf7c9624c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -461,7 +461,7 @@ public class FlinkConnectorOptions {
     public static final ConfigOption<Integer> POSTPONE_DEFAULT_BUCKET_NUM =
             key("postpone.default-bucket-num")
                     .intType()
-                    .defaultValue(4)
+                    .defaultValue(1)
                     .withDescription(
                             "Bucket number for the partitions compacted for 
the first time in postpone bucket tables.");
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index bd7c6f898f..3519f162d0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -388,6 +388,43 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
         collect(client, it, 400 - 2);
     }
 
+    @Test
+    public void testPostponeWriteNotExpireSnapshots() throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+
+        tEnv.executeSql(
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  pt INT,\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'bucket' = '-2',\n"
+                        + "  'snapshot.num-retained.min' = '3',\n"
+                        + "  'snapshot.num-retained.max' = '3'\n"
+                        + ")");
+
+        for (int i = 0; i < 5; i++) {
+            tEnv.executeSql(String.format("INSERT INTO T VALUES (%d, 0, 0)", 
i)).await();
+        }
+
+        assertThat(collect(tEnv.executeSql("SELECT COUNT(*) FROM 
`T$snapshots`")))
+                .containsExactlyInAnyOrder("+I[5]");
+    }
+
     private List<String> collect(TableResult result) throws Exception {
         List<String> ret = new ArrayList<>();
         try (CloseableIterator<Row> it = result.collect()) {

Reply via email to