This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 318678804b [core] Expire partiitons add default delete num (#4652)
318678804b is described below
commit 318678804b6d1b12b520c77aa50894200676b616
Author: askwang <[email protected]>
AuthorDate: Sat Dec 7 00:17:59 2024 +0800
[core] Expire partiitons add default delete num (#4652)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 10 ++++
.../java/org/apache/paimon/AbstractFileStore.java | 3 +-
.../apache/paimon/operation/PartitionExpire.java | 27 ++++++---
.../flink/procedure/ExpirePartitionsProcedure.java | 5 +-
.../flink/action/ExpirePartitionsAction.java | 3 +-
.../flink/procedure/ExpirePartitionsProcedure.java | 5 +-
.../procedure/ExpirePartitionsProcedureITCase.java | 37 ++++++++++++
.../spark/procedure/ExpirePartitionsProcedure.java | 5 +-
.../procedure/ExpirePartitionsProcedureTest.scala | 65 ++++++++++++++++++++++
10 files changed, 150 insertions(+), 16 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6fb2c72650..7d6bacccb0 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -593,6 +593,12 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Duration</td>
<td>The check interval of partition expiration.</td>
</tr>
+ <tr>
+ <td><h5>partition.expiration-max-num</h5></td>
+ <td style="word-wrap: break-word;">100</td>
+ <td>Integer</td>
+ <td>The default deleted num of partition expiration.</td>
+ </tr>
<tr>
<td><h5>partition.expiration-strategy</h5></td>
<td style="word-wrap: break-word;">values-time</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 765d5a1e32..8aebf2f289 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -809,6 +809,12 @@ public class CoreOptions implements Serializable {
.defaultValue(Duration.ofHours(1))
.withDescription("The check interval of partition
expiration.");
+ public static final ConfigOption<Integer> PARTITION_EXPIRATION_MAX_NUM =
+ key("partition.expiration-max-num")
+ .intType()
+ .defaultValue(100)
+ .withDescription("The default deleted num of partition
expiration.");
+
public static final ConfigOption<String> PARTITION_TIMESTAMP_FORMATTER =
key("partition.timestamp-formatter")
.stringType()
@@ -2126,6 +2132,10 @@ public class CoreOptions implements Serializable {
return options.get(PARTITION_EXPIRATION_CHECK_INTERVAL);
}
+ public int partitionExpireMaxNum() {
+ return options.get(PARTITION_EXPIRATION_MAX_NUM);
+ }
+
public PartitionExpireStrategy partitionExpireStrategy() {
return options.get(PARTITION_EXPIRATION_STRATEGY);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 1a538ad89e..54f554aa46 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -309,7 +309,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
newScan(),
newCommit(commitUser),
metastoreClient,
- options.endInputCheckPartitionExpire());
+ options.endInputCheckPartitionExpire(),
+ options.partitionExpireMaxNum());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 62a9b79647..d432a37dfd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -54,7 +54,7 @@ public class PartitionExpire {
private LocalDateTime lastCheck;
private final PartitionExpireStrategy strategy;
private final boolean endInputCheckPartitionExpire;
- private int maxExpires;
+ private int maxExpireNum;
public PartitionExpire(
Duration expirationTime,
@@ -63,7 +63,8 @@ public class PartitionExpire {
FileStoreScan scan,
FileStoreCommit commit,
@Nullable MetastoreClient metastoreClient,
- boolean endInputCheckPartitionExpire) {
+ boolean endInputCheckPartitionExpire,
+ int maxExpireNum) {
this.expirationTime = expirationTime;
this.checkInterval = checkInterval;
this.strategy = strategy;
@@ -72,7 +73,7 @@ public class PartitionExpire {
this.metastoreClient = metastoreClient;
this.lastCheck = LocalDateTime.now();
this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
- this.maxExpires = Integer.MAX_VALUE;
+ this.maxExpireNum = maxExpireNum;
}
public PartitionExpire(
@@ -81,8 +82,17 @@ public class PartitionExpire {
PartitionExpireStrategy strategy,
FileStoreScan scan,
FileStoreCommit commit,
- @Nullable MetastoreClient metastoreClient) {
- this(expirationTime, checkInterval, strategy, scan, commit,
metastoreClient, false);
+ @Nullable MetastoreClient metastoreClient,
+ int maxExpireNum) {
+ this(
+ expirationTime,
+ checkInterval,
+ strategy,
+ scan,
+ commit,
+ metastoreClient,
+ false,
+ maxExpireNum);
}
public PartitionExpire withLock(Lock lock) {
@@ -90,8 +100,8 @@ public class PartitionExpire {
return this;
}
- public PartitionExpire withMaxExpires(int maxExpires) {
- this.maxExpires = maxExpires;
+ public PartitionExpire withMaxExpireNum(int maxExpireNum) {
+ this.maxExpireNum = maxExpireNum;
return this;
}
@@ -145,6 +155,7 @@ public class PartitionExpire {
List<Map<String, String>> expired = new ArrayList<>();
if (!expiredPartValues.isEmpty()) {
+ // convert partition value to partition string, and limit the
partition num
expired = convertToPartitionString(expiredPartValues);
LOG.info("Expire Partitions: {}", expired);
if (metastoreClient != null) {
@@ -175,7 +186,7 @@ public class PartitionExpire {
.sorted()
.map(s -> s.split(DELIMITER))
.map(strategy::toPartitionString)
- .limit(Math.min(expiredPartValues.size(), maxExpires))
+ .limit(Math.min(expiredPartValues.size(), maxExpireNum))
.collect(Collectors.toList());
}
}
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index c0e5a65c49..1c0d73cfbe 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -93,9 +93,10 @@ public class ExpirePartitionsProcedure extends ProcedureBase
{
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
- .orElse(null));
+ .orElse(null),
+ fileStore.options().partitionExpireMaxNum());
if (maxExpires != null) {
- partitionExpire.withMaxExpires(maxExpires);
+ partitionExpire.withMaxExpireNum(maxExpires);
}
List<Map<String, String>> expired =
partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
index 9528bc137d..0fa17e1a8d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
@@ -72,7 +72,8 @@ public class ExpirePartitionsAction extends TableActionBase {
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
- .orElse(null));
+ .orElse(null),
+ fileStore.options().partitionExpireMaxNum());
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index ee6075a927..ce282c6800 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -97,9 +97,10 @@ public class ExpirePartitionsProcedure extends ProcedureBase
{
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
- .orElse(null));
+ .orElse(null),
+ fileStore.options().partitionExpireMaxNum());
if (maxExpires != null) {
- partitionExpire.withMaxExpires(maxExpires);
+ partitionExpire.withMaxExpireNum(maxExpires);
}
List<Map<String, String>> expired =
partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
index 2d1fb6dde7..a40968e067 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
@@ -415,6 +415,43 @@ public class ExpirePartitionsProcedureITCase extends
CatalogITCaseBase {
.containsExactly("No expired partitions.");
}
+ @Test
+ public void testExpirePartitionsWithDefaultNum() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt) WITH ("
+ + " 'bucket' = '1',"
+ + " 'partition.expiration-max-num'='2'"
+ + ")");
+ FileStoreTable table = paimonTable("T");
+
+ sql("INSERT INTO T VALUES ('a', '2024-06-01')");
+ sql("INSERT INTO T VALUES ('b', '2024-06-02')");
+ sql("INSERT INTO T VALUES ('c', '2024-06-03')");
+ // This partition never expires.
+ sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')");
+ Function<InternalRow, String> consumerReadResult =
+ (InternalRow row) -> row.getString(0) + ":" + row.getString(1);
+
+ assertThat(read(table, consumerReadResult))
+ .containsExactlyInAnyOrder(
+ "a:2024-06-01", "b:2024-06-02", "c:2024-06-03",
"Never-expire:9999-09-09");
+
+ assertThat(
+ callExpirePartitions(
+ "CALL sys.expire_partitions("
+ + "`table` => 'default.T'"
+ + ", expiration_time => '1 d'"
+ + ", timestamp_formatter =>
'yyyy-MM-dd')"))
+ .containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02");
+
+ assertThat(read(table, consumerReadResult))
+ .containsExactlyInAnyOrder("c:2024-06-03",
"Never-expire:9999-09-09");
+ }
+
/** Return a list of expired partitions. */
public List<String> callExpirePartitions(String callSql) {
return sql(callSql).stream()
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
index 7b388227e5..e3a53d2bd2 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
@@ -107,9 +107,10 @@ public class ExpirePartitionsProcedure extends
BaseProcedure {
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
- .orElse(null));
+ .orElse(null),
+
fileStore.options().partitionExpireMaxNum());
if (maxExpires != null) {
- partitionExpire.withMaxExpires(maxExpires);
+ partitionExpire.withMaxExpireNum(maxExpires);
}
List<Map<String, String>> expired =
partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
index 4561e532f5..9f0d23dc93 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
@@ -551,4 +551,69 @@ class ExpirePartitionsProcedureTest extends
PaimonSparkTestBase with StreamTest
}
}
}
+
+ test("Paimon Procedure: expire partitions with default num") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ spark.sql(
+ s"""
+ |CREATE TABLE T (k STRING, pt STRING)
+ |TBLPROPERTIES ('primary-key'='k,pt', 'bucket'='1',
'partition.expiration-max-num'='2')
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+ val location = loadTable("T").location().toString
+
+ val inputData = MemoryStream[(String, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("k", "pt")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], _: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ val query = () => spark.sql("SELECT * FROM T")
+
+ try {
+ // snapshot-1
+ inputData.addData(("a", "2024-06-01"))
+ stream.processAllAvailable()
+
+ // snapshot-2
+ inputData.addData(("b", "2024-06-02"))
+ stream.processAllAvailable()
+
+ // snapshot-3
+ inputData.addData(("c", "2024-06-03"))
+ stream.processAllAvailable()
+
+ // This partition never expires.
+ inputData.addData(("Never-expire", "9999-09-09"))
+ stream.processAllAvailable()
+
+ checkAnswer(
+ query(),
+ Row("a", "2024-06-01") :: Row("b", "2024-06-02") :: Row("c",
"2024-06-03") :: Row(
+ "Never-expire",
+ "9999-09-09") :: Nil)
+ // call expire_partitions.
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.expire_partitions(table => 'test.T',
expiration_time => '1 d'" +
+ ", timestamp_formatter => 'yyyy-MM-dd')"),
+ Row("pt=2024-06-01") :: Row("pt=2024-06-02") :: Nil
+ )
+
+ checkAnswer(query(), Row("c", "2024-06-03") :: Row("Never-expire",
"9999-09-09") :: Nil)
+
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
}