This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 26c2bd4dac [core] Postpone bucket writing job should not expire
snapshots (#5391)
26c2bd4dac is described below
commit 26c2bd4dac3d1a990f3d11d4d193c026191c8052
Author: tsreaper <[email protected]>
AuthorDate: Wed Apr 2 18:37:15 2025 +0800
[core] Postpone bucket writing job should not expire snapshots (#5391)
---
.../generated/flink_connector_configuration.html | 2 +-
.../paimon/table/AbstractFileStoreTable.java | 27 ++++++++++------
.../paimon/table/PrimaryKeyFileStoreTable.java | 12 +++++++
.../apache/paimon/flink/FlinkConnectorOptions.java | 2 +-
.../paimon/flink/PostponeBucketTableITCase.java | 37 ++++++++++++++++++++++
5 files changed, 68 insertions(+), 12 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 6c273e91e1..feb8715701 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -100,7 +100,7 @@ under the License.
</tr>
<tr>
<td><h5>postpone.default-bucket-num</h5></td>
- <td style="word-wrap: break-word;">4</td>
+ <td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>Bucket number for the partitions compacted for the first time
in postpone bucket tables.</td>
</tr>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 2e9647f9d5..9793a6a52a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -439,8 +439,24 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public TableCommitImpl newCommit(String commitUser) {
+ CoreOptions options = coreOptions();
+ return new TableCommitImpl(
+ store().newCommit(commitUser, this),
+ newExpireRunnable(),
+ options.writeOnly() ? null :
store().newPartitionExpire(commitUser, this),
+ options.writeOnly() ? null : store().newTagCreationManager(),
+ CoreOptions.fromMap(options()).consumerExpireTime(),
+ new ConsumerManager(fileIO, path, snapshotManager().branch()),
+ options.snapshotExpireExecutionMode(),
+ name(),
+ options.forceCreatingSnapshot());
+ }
+
+ @Nullable
+ protected Runnable newExpireRunnable() {
CoreOptions options = coreOptions();
Runnable snapshotExpire = null;
+
if (!options.writeOnly()) {
boolean changelogDecoupled = options.changelogLifecycleDecoupled();
ExpireConfig expireConfig = options.expireConfig();
@@ -455,16 +471,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
};
}
- return new TableCommitImpl(
- store().newCommit(commitUser, this),
- snapshotExpire,
- options.writeOnly() ? null :
store().newPartitionExpire(commitUser, this),
- options.writeOnly() ? null : store().newTagCreationManager(),
- CoreOptions.fromMap(options()).consumerExpireTime(),
- new ConsumerManager(fileIO, path, snapshotManager().branch()),
- options.snapshotExpireExecutionMode(),
- name(),
- options.forceCreatingSnapshot());
+ return snapshotExpire;
}
private Optional<TableSchema> tryTimeTravel(Options options) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 835405c513..b563693085 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -39,6 +39,8 @@ import org.apache.paimon.table.source.MergeTreeSplitGenerator;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.types.RowType;
+import javax.annotation.Nullable;
+
import java.util.List;
import java.util.function.BiConsumer;
@@ -173,4 +175,14 @@ public class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
public LocalTableQuery newLocalTableQuery() {
return new LocalTableQuery(this);
}
+
+ @Override
+ @Nullable
+ protected Runnable newExpireRunnable() {
+ if (coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) {
+ return null;
+ } else {
+ return super.newExpireRunnable();
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index e63356add8..adf7c9624c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -461,7 +461,7 @@ public class FlinkConnectorOptions {
public static final ConfigOption<Integer> POSTPONE_DEFAULT_BUCKET_NUM =
key("postpone.default-bucket-num")
.intType()
- .defaultValue(4)
+ .defaultValue(1)
.withDescription(
"Bucket number for the partitions compacted for
the first time in postpone bucket tables.");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index bd7c6f898f..3519f162d0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -388,6 +388,43 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
collect(client, it, 400 - 2);
}
+ @Test
+ public void testPostponeWriteNotExpireSnapshots() throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment tEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+
+ tEnv.executeSql(
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.executeSql(
+ "CREATE TABLE T (\n"
+ + " pt INT,\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'bucket' = '-2',\n"
+ + " 'snapshot.num-retained.min' = '3',\n"
+ + " 'snapshot.num-retained.max' = '3'\n"
+ + ")");
+
+ for (int i = 0; i < 5; i++) {
+ tEnv.executeSql(String.format("INSERT INTO T VALUES (%d, 0, 0)",
i)).await();
+ }
+
+ assertThat(collect(tEnv.executeSql("SELECT COUNT(*) FROM
`T$snapshots`")))
+ .containsExactlyInAnyOrder("+I[5]");
+ }
+
private List<String> collect(TableResult result) throws Exception {
List<String> ret = new ArrayList<>();
try (CloseableIterator<Row> it = result.collect()) {