This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bab713130 [core] support batch size for expire partition (#5766)
1bab713130 is described below

commit 1bab71313059f378f6f89f09a7525296c669f0b5
Author: demonyangyue <[email protected]>
AuthorDate: Thu Jun 19 20:52:20 2025 +0800

    [core] support batch size for expire partition (#5766)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++++
 .../main/java/org/apache/paimon/CoreOptions.java   | 14 ++++++++
 .../java/org/apache/paimon/AbstractFileStore.java  |  3 +-
 .../apache/paimon/operation/PartitionExpire.java   | 39 ++++++++++++++++------
 .../paimon/operation/PartitionExpireTest.java      | 33 ++++++++++++++++++
 .../flink/action/ExpirePartitionsActionITCase.java |  4 ++-
 .../procedure/ExpirePartitionsProcedureITCase.java | 38 +++++++++++++++++++++
 .../procedure/ExpirePartitionsProcedureTest.scala  |  1 +
 8 files changed, 126 insertions(+), 12 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 18b1a61c4c..24425bf93c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -713,6 +713,12 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>Boolean</td>
             <td>Whether mark the done status to indicate that the data is 
ready when end input.</td>
         </tr>
+        <tr>
+            <td><h5>partition.expiration-batch-size</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>The batch size of partition expiration. By default, all 
partitions to be expired will be expired together, which may cause a risk of 
out-of-memory. Use this parameter to divide partition expiration process and 
mitigate memory pressure.</td>
+        </tr>
         <tr>
             <td><h5>partition.expiration-check-interval</h5></td>
             <td style="word-wrap: break-word;">1 h</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index aff7887303..d2c9e70cae 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -907,6 +907,15 @@ public class CoreOptions implements Serializable {
                     .defaultValue(100)
                     .withDescription("The default deleted num of partition 
expiration.");
 
+    public static final ConfigOption<Integer> PARTITION_EXPIRATION_BATCH_SIZE =
+            key("partition.expiration-batch-size")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The batch size of partition expiration. "
+                                    + "By default, all partitions to be 
expired will be expired together, which may cause a risk of out-of-memory. "
+                                    + "Use this parameter to divide partition 
expiration process and mitigate memory pressure.");
+
     public static final ConfigOption<String> PARTITION_TIMESTAMP_FORMATTER =
             key("partition.timestamp-formatter")
                     .stringType()
@@ -2461,6 +2470,11 @@ public class CoreOptions implements Serializable {
         return options.get(PARTITION_EXPIRATION_MAX_NUM);
     }
 
+    public int partitionExpireBatchSize() {
+        return options.getOptional(PARTITION_EXPIRATION_BATCH_SIZE)
+                .orElse(options.get(PARTITION_EXPIRATION_MAX_NUM));
+    }
+
     public PartitionExpireStrategy partitionExpireStrategy() {
         return options.get(PARTITION_EXPIRATION_STRATEGY);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 5cdaf1456f..556d8a7d7b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -452,7 +452,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 newCommit(commitUser, table),
                 partitionHandler,
                 options.endInputCheckPartitionExpire(),
-                options.partitionExpireMaxNum());
+                options.partitionExpireMaxNum(),
+                options.partitionExpireBatchSize());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 4ac71db7aa..7819063f2e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -26,6 +26,8 @@ import org.apache.paimon.partition.PartitionExpireStrategy;
 import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy;
 import org.apache.paimon.table.PartitionHandler;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +57,7 @@ public class PartitionExpire {
     private final PartitionExpireStrategy strategy;
     private final boolean endInputCheckPartitionExpire;
     private int maxExpireNum;
+    private int expireBatchSize;
 
     public PartitionExpire(
             Duration expirationTime,
@@ -64,7 +67,8 @@ public class PartitionExpire {
             FileStoreCommit commit,
             @Nullable PartitionHandler partitionHandler,
             boolean endInputCheckPartitionExpire,
-            int maxExpireNum) {
+            int maxExpireNum,
+            int expireBatchSize) {
         this.expirationTime = expirationTime;
         this.checkInterval = checkInterval;
         this.strategy = strategy;
@@ -74,6 +78,7 @@ public class PartitionExpire {
         this.lastCheck = LocalDateTime.now();
         this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
         this.maxExpireNum = maxExpireNum;
+        this.expireBatchSize = expireBatchSize;
     }
 
     public PartitionExpire(
@@ -83,7 +88,8 @@ public class PartitionExpire {
             FileStoreScan scan,
             FileStoreCommit commit,
             @Nullable PartitionHandler partitionHandler,
-            int maxExpireNum) {
+            int maxExpireNum,
+            int expireBatchSize) {
         this(
                 expirationTime,
                 checkInterval,
@@ -92,7 +98,8 @@ public class PartitionExpire {
                 commit,
                 partitionHandler,
                 false,
-                maxExpireNum);
+                maxExpireNum,
+                expireBatchSize);
     }
 
     public List<Map<String, String>> expire(long commitIdentifier) {
@@ -148,19 +155,31 @@ public class PartitionExpire {
             // convert partition value to partition string, and limit the 
partition num
             expired = convertToPartitionString(expiredPartValues);
             LOG.info("Expire Partitions: {}", expired);
-            if (partitionHandler != null) {
-                try {
-                    partitionHandler.dropPartitions(expired);
-                } catch (Catalog.TableNotExistException e) {
-                    throw new RuntimeException(e);
-                }
+            if (expireBatchSize > 0 && expireBatchSize < expired.size()) {
+                Lists.partition(expired, expireBatchSize)
+                        .forEach(
+                                expiredBatchPartitions ->
+                                        doBatchExpire(expiredBatchPartitions, 
commitIdentifier));
             } else {
-                commit.dropPartitions(expired, commitIdentifier);
+                doBatchExpire(expired, commitIdentifier);
             }
         }
         return expired;
     }
 
+    private void doBatchExpire(
+            List<Map<String, String>> expiredBatchPartitions, long 
commitIdentifier) {
+        if (partitionHandler != null) {
+            try {
+                partitionHandler.dropPartitions(expiredBatchPartitions);
+            } catch (Catalog.TableNotExistException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            commit.dropPartitions(expiredBatchPartitions, commitIdentifier);
+        }
+    }
+
     private List<Map<String, String>> convertToPartitionString(
             List<List<String>> expiredPartValues) {
         return expiredPartValues.stream()
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 901a93737b..2b45ad4352 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -47,6 +47,8 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.SnapshotManager;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.Streams;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -184,6 +186,37 @@ public class PartitionExpireTest {
         assertThat(read()).containsExactlyInAnyOrder("abcd:12");
     }
 
+    @Test
+    public void testBatchExpire() throws Exception {
+        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
path);
+        schemaManager.createTable(
+                new Schema(
+                        RowType.of(VarCharType.STRING_TYPE, 
VarCharType.STRING_TYPE).getFields(),
+                        singletonList("f0"),
+                        emptyList(),
+                        Collections.emptyMap(),
+                        ""));
+        newTable();
+        table =
+                table.copy(
+                        Collections.singletonMap(
+                                
CoreOptions.PARTITION_EXPIRATION_BATCH_SIZE.key(), "1"));
+        write("20230101", "11");
+        write("20230101", "12");
+        write("20230103", "31");
+        write("20230103", "32");
+        write("20230105", "51");
+        PartitionExpire expire = newExpire();
+        expire.setLastCheck(date(1));
+        Assertions.assertDoesNotThrow(() -> expire.expire(date(8), 
Long.MAX_VALUE));
+
+        long overwriteSnapshotCnt =
+                Streams.stream(table.snapshotManager().snapshots())
+                        .filter(snapshot -> snapshot.commitKind() == 
Snapshot.CommitKind.OVERWRITE)
+                        .count();
+        assertThat(overwriteSnapshotCnt).isEqualTo(3L);
+    }
+
     @Test
     public void test() throws Exception {
         SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
path);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
index 70a5de6355..fd9140cd32 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
@@ -168,7 +168,9 @@ public class ExpirePartitionsActionITCase extends 
ActionITCaseBase {
                         "--timestamp_pattern",
                         "$dt",
                         "--table_conf",
-                        "partition.expiration-max-num=1")
+                        "partition.expiration-max-num=1",
+                        "--table_conf",
+                        "partition.expiration-batch-size=1")
                 .run();
 
         plan = table.newReadBuilder().newScan().plan();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
index 2e5eff6ced..60456faf9f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
@@ -453,6 +453,43 @@ public class ExpirePartitionsProcedureITCase extends 
CatalogITCaseBase {
                 .containsExactlyInAnyOrder("c:2024-06-03", 
"Never-expire:9999-09-09");
     }
 
+    @Test
+    public void testExpirePartitionsWithBatchSize() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " k STRING,"
+                        + " dt STRING,"
+                        + " PRIMARY KEY (k, dt) NOT ENFORCED"
+                        + ") PARTITIONED BY (dt) WITH ("
+                        + " 'bucket' = '1',"
+                        + " 'partition.expiration-batch-size'='1'"
+                        + ")");
+        FileStoreTable table = paimonTable("T");
+
+        sql("INSERT INTO T VALUES ('a', '2024-06-01')");
+        sql("INSERT INTO T VALUES ('b', '2024-06-02')");
+        sql("INSERT INTO T VALUES ('c', '2024-06-03')");
+        // This partition never expires.
+        sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')");
+        Function<InternalRow, String> consumerReadResult =
+                (InternalRow row) -> row.getString(0) + ":" + row.getString(1);
+
+        assertThat(read(table, consumerReadResult))
+                .containsExactlyInAnyOrder(
+                        "a:2024-06-01", "b:2024-06-02", "c:2024-06-03", 
"Never-expire:9999-09-09");
+
+        assertThat(
+                        callExpirePartitions(
+                                "CALL sys.expire_partitions("
+                                        + "`table` => 'default.T'"
+                                        + ", expiration_time => '1 d'"
+                                        + ", timestamp_formatter => 
'yyyy-MM-dd')"))
+                .containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02", 
"dt=2024-06-03");
+
+        assertThat(read(table, consumerReadResult))
+                .containsExactlyInAnyOrder("Never-expire:9999-09-09");
+    }
+
     @Test
     public void testExpirePartitionsLoadTablePropsFirst() throws Exception {
         sql(
@@ -531,6 +568,7 @@ public class ExpirePartitionsProcedureITCase extends 
CatalogITCaseBase {
                                         + "`table` => 'default.T'"
                                         + ", options => 
'partition.expiration-time = 1d,"
                                         + " partition.expiration-max-num = 2, "
+                                        + " partition.expiration-batch-size = 
1, "
                                         + " partition.timestamp-formatter = 
yyyy-MM-dd')"))
                 .containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02");
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
index be7dc26241..586f2e6c2d 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
@@ -744,6 +744,7 @@ class ExpirePartitionsProcedureTest extends 
PaimonSparkTestBase with StreamTest
                 "CALL paimon.sys.expire_partitions(table => 'test.T', " +
                   "options => 'partition.expiration-time = 1d," +
                   " partition.expiration-max-num = 2," +
+                  " partition.expiration-batch-size = 2," +
                   " partition.timestamp-formatter = yyyy-MM-dd')"),
               Row("pt=2024-06-01") :: Row("pt=2024-06-02") :: Nil
             )

Reply via email to