This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1bab713130 [core] support batch size for expire partition (#5766)
1bab713130 is described below
commit 1bab71313059f378f6f89f09a7525296c669f0b5
Author: demonyangyue <[email protected]>
AuthorDate: Thu Jun 19 20:52:20 2025 +0800
[core] support batch size for expire partition (#5766)
---
.../shortcodes/generated/core_configuration.html | 6 ++++
.../main/java/org/apache/paimon/CoreOptions.java | 14 ++++++++
.../java/org/apache/paimon/AbstractFileStore.java | 3 +-
.../apache/paimon/operation/PartitionExpire.java | 39 ++++++++++++++++------
.../paimon/operation/PartitionExpireTest.java | 33 ++++++++++++++++++
.../flink/action/ExpirePartitionsActionITCase.java | 4 ++-
.../procedure/ExpirePartitionsProcedureITCase.java | 38 +++++++++++++++++++++
.../procedure/ExpirePartitionsProcedureTest.scala | 1 +
8 files changed, 126 insertions(+), 12 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 18b1a61c4c..24425bf93c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -713,6 +713,12 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Boolean</td>
<td>Whether mark the done status to indicate that the data is
ready when end input.</td>
</tr>
+ <tr>
+ <td><h5>partition.expiration-batch-size</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>The batch size of partition expiration. By default, all
partitions to be expired will be expired together, which may cause a risk of
out-of-memory. Use this parameter to divide partition expiration process and
mitigate memory pressure.</td>
+ </tr>
<tr>
<td><h5>partition.expiration-check-interval</h5></td>
<td style="word-wrap: break-word;">1 h</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index aff7887303..d2c9e70cae 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -907,6 +907,15 @@ public class CoreOptions implements Serializable {
.defaultValue(100)
.withDescription("The default deleted num of partition
expiration.");
+ public static final ConfigOption<Integer> PARTITION_EXPIRATION_BATCH_SIZE =
+ key("partition.expiration-batch-size")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "The batch size of partition expiration. "
+ + "By default, all partitions to be
expired will be expired together, which may cause a risk of out-of-memory. "
+ + "Use this parameter to divide partition
expiration process and mitigate memory pressure.");
+
public static final ConfigOption<String> PARTITION_TIMESTAMP_FORMATTER =
key("partition.timestamp-formatter")
.stringType()
@@ -2461,6 +2470,11 @@ public class CoreOptions implements Serializable {
return options.get(PARTITION_EXPIRATION_MAX_NUM);
}
+ public int partitionExpireBatchSize() {
+ return options.getOptional(PARTITION_EXPIRATION_BATCH_SIZE)
+ .orElse(options.get(PARTITION_EXPIRATION_MAX_NUM));
+ }
+
public PartitionExpireStrategy partitionExpireStrategy() {
return options.get(PARTITION_EXPIRATION_STRATEGY);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 5cdaf1456f..556d8a7d7b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -452,7 +452,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
newCommit(commitUser, table),
partitionHandler,
options.endInputCheckPartitionExpire(),
- options.partitionExpireMaxNum());
+ options.partitionExpireMaxNum(),
+ options.partitionExpireBatchSize());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 4ac71db7aa..7819063f2e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -26,6 +26,8 @@ import org.apache.paimon.partition.PartitionExpireStrategy;
import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy;
import org.apache.paimon.table.PartitionHandler;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +57,7 @@ public class PartitionExpire {
private final PartitionExpireStrategy strategy;
private final boolean endInputCheckPartitionExpire;
private int maxExpireNum;
+ private int expireBatchSize;
public PartitionExpire(
Duration expirationTime,
@@ -64,7 +67,8 @@ public class PartitionExpire {
FileStoreCommit commit,
@Nullable PartitionHandler partitionHandler,
boolean endInputCheckPartitionExpire,
- int maxExpireNum) {
+ int maxExpireNum,
+ int expireBatchSize) {
this.expirationTime = expirationTime;
this.checkInterval = checkInterval;
this.strategy = strategy;
@@ -74,6 +78,7 @@ public class PartitionExpire {
this.lastCheck = LocalDateTime.now();
this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
this.maxExpireNum = maxExpireNum;
+ this.expireBatchSize = expireBatchSize;
}
public PartitionExpire(
@@ -83,7 +88,8 @@ public class PartitionExpire {
FileStoreScan scan,
FileStoreCommit commit,
@Nullable PartitionHandler partitionHandler,
- int maxExpireNum) {
+ int maxExpireNum,
+ int expireBatchSize) {
this(
expirationTime,
checkInterval,
@@ -92,7 +98,8 @@ public class PartitionExpire {
commit,
partitionHandler,
false,
- maxExpireNum);
+ maxExpireNum,
+ expireBatchSize);
}
public List<Map<String, String>> expire(long commitIdentifier) {
@@ -148,19 +155,31 @@ public class PartitionExpire {
// convert partition value to partition string, and limit the
partition num
expired = convertToPartitionString(expiredPartValues);
LOG.info("Expire Partitions: {}", expired);
- if (partitionHandler != null) {
- try {
- partitionHandler.dropPartitions(expired);
- } catch (Catalog.TableNotExistException e) {
- throw new RuntimeException(e);
- }
+ if (expireBatchSize > 0 && expireBatchSize < expired.size()) {
+ Lists.partition(expired, expireBatchSize)
+ .forEach(
+ expiredBatchPartitions ->
+ doBatchExpire(expiredBatchPartitions,
commitIdentifier));
} else {
- commit.dropPartitions(expired, commitIdentifier);
+ doBatchExpire(expired, commitIdentifier);
}
}
return expired;
}
+ private void doBatchExpire(
+ List<Map<String, String>> expiredBatchPartitions, long
commitIdentifier) {
+ if (partitionHandler != null) {
+ try {
+ partitionHandler.dropPartitions(expiredBatchPartitions);
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ commit.dropPartitions(expiredBatchPartitions, commitIdentifier);
+ }
+ }
+
private List<Map<String, String>> convertToPartitionString(
List<List<String>> expiredPartValues) {
return expiredPartValues.stream()
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 901a93737b..2b45ad4352 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -47,6 +47,8 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Streams;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -184,6 +186,37 @@ public class PartitionExpireTest {
assertThat(read()).containsExactlyInAnyOrder("abcd:12");
}
+ @Test
+ public void testBatchExpire() throws Exception {
+ SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
path);
+ schemaManager.createTable(
+ new Schema(
+ RowType.of(VarCharType.STRING_TYPE,
VarCharType.STRING_TYPE).getFields(),
+ singletonList("f0"),
+ emptyList(),
+ Collections.emptyMap(),
+ ""));
+ newTable();
+ table =
+ table.copy(
+ Collections.singletonMap(
+
CoreOptions.PARTITION_EXPIRATION_BATCH_SIZE.key(), "1"));
+ write("20230101", "11");
+ write("20230101", "12");
+ write("20230103", "31");
+ write("20230103", "32");
+ write("20230105", "51");
+ PartitionExpire expire = newExpire();
+ expire.setLastCheck(date(1));
+ Assertions.assertDoesNotThrow(() -> expire.expire(date(8),
Long.MAX_VALUE));
+
+ long overwriteSnapshotCnt =
+ Streams.stream(table.snapshotManager().snapshots())
+ .filter(snapshot -> snapshot.commitKind() ==
Snapshot.CommitKind.OVERWRITE)
+ .count();
+ assertThat(overwriteSnapshotCnt).isEqualTo(3L);
+ }
+
@Test
public void test() throws Exception {
SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
path);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
index 70a5de6355..fd9140cd32 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
@@ -168,7 +168,9 @@ public class ExpirePartitionsActionITCase extends
ActionITCaseBase {
"--timestamp_pattern",
"$dt",
"--table_conf",
- "partition.expiration-max-num=1")
+ "partition.expiration-max-num=1",
+ "--table_conf",
+ "partition.expiration-batch-size=1")
.run();
plan = table.newReadBuilder().newScan().plan();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
index 2e5eff6ced..60456faf9f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
@@ -453,6 +453,43 @@ public class ExpirePartitionsProcedureITCase extends
CatalogITCaseBase {
.containsExactlyInAnyOrder("c:2024-06-03",
"Never-expire:9999-09-09");
}
+ @Test
+ public void testExpirePartitionsWithBatchSize() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt) WITH ("
+ + " 'bucket' = '1',"
+ + " 'partition.expiration-batch-size'='1'"
+ + ")");
+ FileStoreTable table = paimonTable("T");
+
+ sql("INSERT INTO T VALUES ('a', '2024-06-01')");
+ sql("INSERT INTO T VALUES ('b', '2024-06-02')");
+ sql("INSERT INTO T VALUES ('c', '2024-06-03')");
+ // This partition never expires.
+ sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')");
+ Function<InternalRow, String> consumerReadResult =
+ (InternalRow row) -> row.getString(0) + ":" + row.getString(1);
+
+ assertThat(read(table, consumerReadResult))
+ .containsExactlyInAnyOrder(
+ "a:2024-06-01", "b:2024-06-02", "c:2024-06-03",
"Never-expire:9999-09-09");
+
+ assertThat(
+ callExpirePartitions(
+ "CALL sys.expire_partitions("
+ + "`table` => 'default.T'"
+ + ", expiration_time => '1 d'"
+ + ", timestamp_formatter =>
'yyyy-MM-dd')"))
+ .containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02",
"dt=2024-06-03");
+
+ assertThat(read(table, consumerReadResult))
+ .containsExactlyInAnyOrder("Never-expire:9999-09-09");
+ }
+
@Test
public void testExpirePartitionsLoadTablePropsFirst() throws Exception {
sql(
@@ -531,6 +568,7 @@ public class ExpirePartitionsProcedureITCase extends
CatalogITCaseBase {
+ "`table` => 'default.T'"
+ ", options =>
'partition.expiration-time = 1d,"
+ " partition.expiration-max-num = 2, "
+ + " partition.expiration-batch-size =
1, "
+ " partition.timestamp-formatter =
yyyy-MM-dd')"))
.containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02");
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
index be7dc26241..586f2e6c2d 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
@@ -744,6 +744,7 @@ class ExpirePartitionsProcedureTest extends
PaimonSparkTestBase with StreamTest
"CALL paimon.sys.expire_partitions(table => 'test.T', " +
"options => 'partition.expiration-time = 1d," +
" partition.expiration-max-num = 2," +
+ " partition.expiration-batch-size = 2," +
" partition.timestamp-formatter = yyyy-MM-dd')"),
Row("pt=2024-06-01") :: Row("pt=2024-06-02") :: Nil
)