This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 318678804b [core] Expire partiitons add default delete num (#4652)
318678804b is described below

commit 318678804b6d1b12b520c77aa50894200676b616
Author: askwang <[email protected]>
AuthorDate: Sat Dec 7 00:17:59 2024 +0800

    [core] Expire partiitons add default delete num (#4652)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 10 ++++
 .../java/org/apache/paimon/AbstractFileStore.java  |  3 +-
 .../apache/paimon/operation/PartitionExpire.java   | 27 ++++++---
 .../flink/procedure/ExpirePartitionsProcedure.java |  5 +-
 .../flink/action/ExpirePartitionsAction.java       |  3 +-
 .../flink/procedure/ExpirePartitionsProcedure.java |  5 +-
 .../procedure/ExpirePartitionsProcedureITCase.java | 37 ++++++++++++
 .../spark/procedure/ExpirePartitionsProcedure.java |  5 +-
 .../procedure/ExpirePartitionsProcedureTest.scala  | 65 ++++++++++++++++++++++
 10 files changed, 150 insertions(+), 16 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6fb2c72650..7d6bacccb0 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -593,6 +593,12 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>Duration</td>
             <td>The check interval of partition expiration.</td>
         </tr>
+        <tr>
+            <td><h5>partition.expiration-max-num</h5></td>
+            <td style="word-wrap: break-word;">100</td>
+            <td>Integer</td>
+            <td>The default deleted num of partition expiration.</td>
+        </tr>
         <tr>
             <td><h5>partition.expiration-strategy</h5></td>
             <td style="word-wrap: break-word;">values-time</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 765d5a1e32..8aebf2f289 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -809,6 +809,12 @@ public class CoreOptions implements Serializable {
                     .defaultValue(Duration.ofHours(1))
                     .withDescription("The check interval of partition 
expiration.");
 
+    public static final ConfigOption<Integer> PARTITION_EXPIRATION_MAX_NUM =
+            key("partition.expiration-max-num")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription("The default deleted num of partition 
expiration.");
+
     public static final ConfigOption<String> PARTITION_TIMESTAMP_FORMATTER =
             key("partition.timestamp-formatter")
                     .stringType()
@@ -2126,6 +2132,10 @@ public class CoreOptions implements Serializable {
         return options.get(PARTITION_EXPIRATION_CHECK_INTERVAL);
     }
 
+    public int partitionExpireMaxNum() {
+        return options.get(PARTITION_EXPIRATION_MAX_NUM);
+    }
+
     public PartitionExpireStrategy partitionExpireStrategy() {
         return options.get(PARTITION_EXPIRATION_STRATEGY);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 1a538ad89e..54f554aa46 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -309,7 +309,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 newScan(),
                 newCommit(commitUser),
                 metastoreClient,
-                options.endInputCheckPartitionExpire());
+                options.endInputCheckPartitionExpire(),
+                options.partitionExpireMaxNum());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 62a9b79647..d432a37dfd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -54,7 +54,7 @@ public class PartitionExpire {
     private LocalDateTime lastCheck;
     private final PartitionExpireStrategy strategy;
     private final boolean endInputCheckPartitionExpire;
-    private int maxExpires;
+    private int maxExpireNum;
 
     public PartitionExpire(
             Duration expirationTime,
@@ -63,7 +63,8 @@ public class PartitionExpire {
             FileStoreScan scan,
             FileStoreCommit commit,
             @Nullable MetastoreClient metastoreClient,
-            boolean endInputCheckPartitionExpire) {
+            boolean endInputCheckPartitionExpire,
+            int maxExpireNum) {
         this.expirationTime = expirationTime;
         this.checkInterval = checkInterval;
         this.strategy = strategy;
@@ -72,7 +73,7 @@ public class PartitionExpire {
         this.metastoreClient = metastoreClient;
         this.lastCheck = LocalDateTime.now();
         this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
-        this.maxExpires = Integer.MAX_VALUE;
+        this.maxExpireNum = maxExpireNum;
     }
 
     public PartitionExpire(
@@ -81,8 +82,17 @@ public class PartitionExpire {
             PartitionExpireStrategy strategy,
             FileStoreScan scan,
             FileStoreCommit commit,
-            @Nullable MetastoreClient metastoreClient) {
-        this(expirationTime, checkInterval, strategy, scan, commit, 
metastoreClient, false);
+            @Nullable MetastoreClient metastoreClient,
+            int maxExpireNum) {
+        this(
+                expirationTime,
+                checkInterval,
+                strategy,
+                scan,
+                commit,
+                metastoreClient,
+                false,
+                maxExpireNum);
     }
 
     public PartitionExpire withLock(Lock lock) {
@@ -90,8 +100,8 @@ public class PartitionExpire {
         return this;
     }
 
-    public PartitionExpire withMaxExpires(int maxExpires) {
-        this.maxExpires = maxExpires;
+    public PartitionExpire withMaxExpireNum(int maxExpireNum) {
+        this.maxExpireNum = maxExpireNum;
         return this;
     }
 
@@ -145,6 +155,7 @@ public class PartitionExpire {
 
         List<Map<String, String>> expired = new ArrayList<>();
         if (!expiredPartValues.isEmpty()) {
+            // convert partition value to partition string, and limit the 
partition num
             expired = convertToPartitionString(expiredPartValues);
             LOG.info("Expire Partitions: {}", expired);
             if (metastoreClient != null) {
@@ -175,7 +186,7 @@ public class PartitionExpire {
                 .sorted()
                 .map(s -> s.split(DELIMITER))
                 .map(strategy::toPartitionString)
-                .limit(Math.min(expiredPartValues.size(), maxExpires))
+                .limit(Math.min(expiredPartValues.size(), maxExpireNum))
                 .collect(Collectors.toList());
     }
 }
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index c0e5a65c49..1c0d73cfbe 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -93,9 +93,10 @@ public class ExpirePartitionsProcedure extends ProcedureBase 
{
                                                 .catalogEnvironment()
                                                 .metastoreClientFactory())
                                 .map(MetastoreClient.Factory::create)
-                                .orElse(null));
+                                .orElse(null),
+                        fileStore.options().partitionExpireMaxNum());
         if (maxExpires != null) {
-            partitionExpire.withMaxExpires(maxExpires);
+            partitionExpire.withMaxExpireNum(maxExpires);
         }
         List<Map<String, String>> expired = 
partitionExpire.expire(Long.MAX_VALUE);
         return expired == null || expired.isEmpty()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
index 9528bc137d..0fa17e1a8d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
@@ -72,7 +72,8 @@ public class ExpirePartitionsAction extends TableActionBase {
                                                 .catalogEnvironment()
                                                 .metastoreClientFactory())
                                 .map(MetastoreClient.Factory::create)
-                                .orElse(null));
+                                .orElse(null),
+                        fileStore.options().partitionExpireMaxNum());
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index ee6075a927..ce282c6800 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -97,9 +97,10 @@ public class ExpirePartitionsProcedure extends ProcedureBase 
{
                                                 .catalogEnvironment()
                                                 .metastoreClientFactory())
                                 .map(MetastoreClient.Factory::create)
-                                .orElse(null));
+                                .orElse(null),
+                        fileStore.options().partitionExpireMaxNum());
         if (maxExpires != null) {
-            partitionExpire.withMaxExpires(maxExpires);
+            partitionExpire.withMaxExpireNum(maxExpires);
         }
         List<Map<String, String>> expired = 
partitionExpire.expire(Long.MAX_VALUE);
         return expired == null || expired.isEmpty()
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
index 2d1fb6dde7..a40968e067 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
@@ -415,6 +415,43 @@ public class ExpirePartitionsProcedureITCase extends 
CatalogITCaseBase {
                 .containsExactly("No expired partitions.");
     }
 
+    @Test
+    public void testExpirePartitionsWithDefaultNum() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " k STRING,"
+                        + " dt STRING,"
+                        + " PRIMARY KEY (k, dt) NOT ENFORCED"
+                        + ") PARTITIONED BY (dt) WITH ("
+                        + " 'bucket' = '1',"
+                        + " 'partition.expiration-max-num'='2'"
+                        + ")");
+        FileStoreTable table = paimonTable("T");
+
+        sql("INSERT INTO T VALUES ('a', '2024-06-01')");
+        sql("INSERT INTO T VALUES ('b', '2024-06-02')");
+        sql("INSERT INTO T VALUES ('c', '2024-06-03')");
+        // This partition never expires.
+        sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')");
+        Function<InternalRow, String> consumerReadResult =
+                (InternalRow row) -> row.getString(0) + ":" + row.getString(1);
+
+        assertThat(read(table, consumerReadResult))
+                .containsExactlyInAnyOrder(
+                        "a:2024-06-01", "b:2024-06-02", "c:2024-06-03", 
"Never-expire:9999-09-09");
+
+        assertThat(
+                        callExpirePartitions(
+                                "CALL sys.expire_partitions("
+                                        + "`table` => 'default.T'"
+                                        + ", expiration_time => '1 d'"
+                                        + ", timestamp_formatter => 
'yyyy-MM-dd')"))
+                .containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02");
+
+        assertThat(read(table, consumerReadResult))
+                .containsExactlyInAnyOrder("c:2024-06-03", 
"Never-expire:9999-09-09");
+    }
+
     /** Return a list of expired partitions. */
     public List<String> callExpirePartitions(String callSql) {
         return sql(callSql).stream()
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
index 7b388227e5..e3a53d2bd2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
@@ -107,9 +107,10 @@ public class ExpirePartitionsProcedure extends 
BaseProcedure {
                                                             
.catalogEnvironment()
                                                             
.metastoreClientFactory())
                                             
.map(MetastoreClient.Factory::create)
-                                            .orElse(null));
+                                            .orElse(null),
+                                    
fileStore.options().partitionExpireMaxNum());
                     if (maxExpires != null) {
-                        partitionExpire.withMaxExpires(maxExpires);
+                        partitionExpire.withMaxExpireNum(maxExpires);
                     }
                     List<Map<String, String>> expired = 
partitionExpire.expire(Long.MAX_VALUE);
                     return expired == null || expired.isEmpty()
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
index 4561e532f5..9f0d23dc93 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
@@ -551,4 +551,69 @@ class ExpirePartitionsProcedureTest extends 
PaimonSparkTestBase with StreamTest
       }
     }
   }
+
+  test("Paimon Procedure: expire partitions with default num") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          spark.sql(
+            s"""
+               |CREATE TABLE T (k STRING, pt STRING)
+               |TBLPROPERTIES ('primary-key'='k,pt', 'bucket'='1', 
'partition.expiration-max-num'='2')
+               |PARTITIONED BY (pt)
+               |""".stripMargin)
+          val location = loadTable("T").location().toString
+
+          val inputData = MemoryStream[(String, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("k", "pt")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], _: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T")
+
+          try {
+            // snapshot-1
+            inputData.addData(("a", "2024-06-01"))
+            stream.processAllAvailable()
+
+            // snapshot-2
+            inputData.addData(("b", "2024-06-02"))
+            stream.processAllAvailable()
+
+            // snapshot-3
+            inputData.addData(("c", "2024-06-03"))
+            stream.processAllAvailable()
+
+            // This partition never expires.
+            inputData.addData(("Never-expire", "9999-09-09"))
+            stream.processAllAvailable()
+
+            checkAnswer(
+              query(),
+              Row("a", "2024-06-01") :: Row("b", "2024-06-02") :: Row("c", 
"2024-06-03") :: Row(
+                "Never-expire",
+                "9999-09-09") :: Nil)
+            // call expire_partitions.
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.expire_partitions(table => 'test.T', 
expiration_time => '1 d'" +
+                  ", timestamp_formatter => 'yyyy-MM-dd')"),
+              Row("pt=2024-06-01") :: Row("pt=2024-06-02") :: Nil
+            )
+
+            checkAnswer(query(), Row("c", "2024-06-03") :: Row("Never-expire", 
"9999-09-09") :: Nil)
+
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
 }

Reply via email to