This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b3ed9dc01 [core] Unify the order of procedure loading properties 
(#4657)
0b3ed9dc01 is described below

commit 0b3ed9dc01594a2b5e8d97a5ce15c7b392edf308
Author: askwang <[email protected]>
AuthorDate: Mon Apr 7 12:09:29 2025 +0800

    [core] Unify the order of procedure loading properties (#4657)
---
 .../org/apache/paimon/utils/ProcedureUtils.java    |  98 +++++++++++++++
 .../apache/paimon/operation/PartitionExpire.java   |   5 -
 .../flink/procedure/ExpirePartitionsProcedure.java |  46 +++----
 .../paimon/flink/action/ExpireSnapshotsAction.java |   8 +-
 .../flink/action/ExpireSnapshotsActionFactory.java |   5 +-
 .../flink/procedure/CompactManifestProcedure.java  |  26 ++--
 .../flink/procedure/ExpirePartitionsProcedure.java |  52 ++++----
 .../flink/procedure/ExpireSnapshotsProcedure.java  |  44 +++----
 .../UnawareBucketNewFilesCompactionITCase.java     |   3 +-
 .../procedure/ExpirePartitionsProcedureITCase.java |  86 +++++++++++++
 .../procedure/ExpireSnapshotsProcedureITCase.java  |  36 +++++-
 .../spark/procedure/CompactManifestProcedure.java  |  13 +-
 .../paimon/spark/procedure/CompactProcedure.java   |  10 +-
 .../spark/procedure/ExpirePartitionsProcedure.java |  43 ++++---
 .../spark/procedure/ExpireSnapshotsProcedure.java  |  38 +++---
 .../CreateTagFromTimestampProcedureTest.scala      |   3 +-
 .../procedure/ExpirePartitionsProcedureTest.scala  | 140 +++++++++++++++++++++
 .../procedure/ExpireSnapshotsProcedureTest.scala   | 119 +++++++++++++++++-
 .../procedure/RemoveOrphanFilesProcedureTest.scala |   2 +-
 .../paimon/spark/sql/AnalyzeTableTestBase.scala    |   2 +-
 20 files changed, 631 insertions(+), 148 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/ProcedureUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ProcedureUtils.java
new file mode 100644
index 0000000000..d13cb3c6df
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ProcedureUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.ExpireConfig;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+
+/** Utils for procedure. */
+public class ProcedureUtils {
+
+    public static Map<String, String> fillInPartitionOptions(
+            String expireStrategy,
+            String timestampFormatter,
+            String timestampPattern,
+            String expirationTime,
+            Integer maxExpires,
+            String options) {
+
+        HashMap<String, String> dynamicOptions = new HashMap<>();
+        putAllOptions(dynamicOptions, options);
+        putIfNotEmpty(
+                dynamicOptions, 
CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy);
+        putIfNotEmpty(
+                dynamicOptions,
+                CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(),
+                timestampFormatter);
+        putIfNotEmpty(
+                dynamicOptions, CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), 
timestampPattern);
+        putIfNotEmpty(dynamicOptions, 
CoreOptions.PARTITION_EXPIRATION_TIME.key(), expirationTime);
+        // Set check interval to 0 for dedicated partition expiration.
+        putIfNotEmpty(dynamicOptions, 
CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "0");
+        putIfNotEmpty(
+                dynamicOptions,
+                CoreOptions.PARTITION_EXPIRATION_MAX_NUM.key(),
+                maxExpires == null ? null : String.valueOf(maxExpires));
+        return dynamicOptions;
+    }
+
+    public static void putAllOptions(HashMap<String, String> dynamicOptions, 
String options) {
+        if (!StringUtils.isNullOrWhitespaceOnly(options)) {
+            
dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options));
+        }
+    }
+
+    public static void putIfNotEmpty(
+            HashMap<String, String> dynamicOptions, String key, String value) {
+        if (!StringUtils.isNullOrWhitespaceOnly(value)) {
+            dynamicOptions.put(key, value);
+        }
+    }
+
+    public static ExpireConfig.Builder fillInSnapshotOptions(
+            CoreOptions tableOptions,
+            Integer retainMax,
+            Integer retainMin,
+            String olderThanStr,
+            Integer maxDeletes) {
+
+        ExpireConfig.Builder builder = ExpireConfig.builder();
+        builder.snapshotRetainMax(
+                        
Optional.ofNullable(retainMax).orElse(tableOptions.snapshotNumRetainMax()))
+                .snapshotRetainMin(
+                        
Optional.ofNullable(retainMin).orElse(tableOptions.snapshotNumRetainMin()))
+                .snapshotMaxDeletes(
+                        
Optional.ofNullable(maxDeletes).orElse(tableOptions.snapshotExpireLimit()))
+                .snapshotTimeRetain(tableOptions.snapshotTimeRetain());
+        if (!StringUtils.isNullOrWhitespaceOnly(olderThanStr)) {
+            long olderThanMills =
+                    DateTimeUtils.parseTimestampData(olderThanStr, 3, 
TimeZone.getDefault())
+                            .getMillisecond();
+            builder.snapshotTimeRetain(
+                    Duration.ofMillis(System.currentTimeMillis() - 
olderThanMills));
+        }
+        return builder;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 082572d5c4..4ac71db7aa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -95,11 +95,6 @@ public class PartitionExpire {
                 maxExpireNum);
     }
 
-    public PartitionExpire withMaxExpireNum(int maxExpireNum) {
-        this.maxExpireNum = maxExpireNum;
-        return this;
-    }
-
     public List<Map<String, String>> expire(long commitIdentifier) {
         return expire(LocalDateTime.now(), commitIdentifier);
     }
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index 2b0e0d1c68..3ad96fd593 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -18,22 +18,19 @@
 
 package org.apache.paimon.flink.procedure;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.FileStore;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.TimeUtils;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.ProcedureUtils;
 
 import org.apache.flink.table.procedure.ProcedureContext;
 
-import java.time.Duration;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy;
-
 /** A procedure to expire partitions. */
 public class ExpirePartitionsProcedure extends ProcedureBase {
 
@@ -59,6 +56,7 @@ public class ExpirePartitionsProcedure extends ProcedureBase {
                 timestampFormatter,
                 timestampPattern,
                 expireStrategy,
+                null,
                 null);
     }
 
@@ -69,26 +67,28 @@ public class ExpirePartitionsProcedure extends 
ProcedureBase {
             String timestampFormatter,
             String timestampPattern,
             String expireStrategy,
-            Integer maxExpires)
+            Integer maxExpires,
+            String options)
             throws Catalog.TableNotExistException {
-        FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
+
+        Map<String, String> dynamicOptions =
+                ProcedureUtils.fillInPartitionOptions(
+                        expireStrategy,
+                        timestampFormatter,
+                        timestampPattern,
+                        expirationTime,
+                        maxExpires,
+                        options);
+
+        Table table = table(tableId).copy(dynamicOptions);
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
         FileStore fileStore = fileStoreTable.store();
-        Map<String, String> map = new HashMap<>();
-        map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), 
expireStrategy);
-        map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), 
timestampFormatter);
-        map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), 
timestampPattern);
 
-        PartitionExpire partitionExpire =
-                fileStore.newPartitionExpire(
-                        "",
-                        fileStoreTable,
-                        TimeUtils.parseDuration(expirationTime),
-                        Duration.ofMillis(0L),
-                        createPartitionExpireStrategy(
-                                CoreOptions.fromMap(map), 
fileStore.partitionType()));
-        if (maxExpires != null) {
-            partitionExpire.withMaxExpireNum(maxExpires);
-        }
+        PartitionExpire partitionExpire = fileStore.newPartitionExpire("", 
fileStoreTable);
+        Preconditions.checkNotNull(
+                partitionExpire,
+                "Both the partition expiration time and partition field can 
not be null.");
+
         List<Map<String, String>> expired = 
partitionExpire.expire(Long.MAX_VALUE);
         return expired == null || expired.isEmpty()
                 ? new String[] {"No expired partitions."}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java
index 5ae5c486d7..6822f6a7a9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java
@@ -33,6 +33,7 @@ public class ExpireSnapshotsAction extends ActionBase {
     private final Integer retainMin;
     private final String olderThan;
     private final Integer maxDeletes;
+    private final String options;
 
     public ExpireSnapshotsAction(
             String database,
@@ -41,7 +42,8 @@ public class ExpireSnapshotsAction extends ActionBase {
             Integer retainMax,
             Integer retainMin,
             String olderThan,
-            Integer maxDeletes) {
+            Integer maxDeletes,
+            String options) {
         super(catalogConfig);
         this.database = database;
         this.table = table;
@@ -49,6 +51,7 @@ public class ExpireSnapshotsAction extends ActionBase {
         this.retainMin = retainMin;
         this.olderThan = olderThan;
         this.maxDeletes = maxDeletes;
+        this.options = options;
     }
 
     public void run() throws Exception {
@@ -60,6 +63,7 @@ public class ExpireSnapshotsAction extends ActionBase {
                 retainMax,
                 retainMin,
                 olderThan,
-                maxDeletes);
+                maxDeletes,
+                options);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java
index f169c9f6f1..91628c2e37 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java
@@ -31,6 +31,7 @@ public class ExpireSnapshotsActionFactory implements 
ActionFactory {
     private static final String RETAIN_MIN = "retain_min";
     private static final String OLDER_THAN = "older_than";
     private static final String MAX_DELETES = "max_deletes";
+    private static final String OPTIONS = "options";
 
     @Override
     public String identifier() {
@@ -46,6 +47,7 @@ public class ExpireSnapshotsActionFactory implements 
ActionFactory {
         String olderThan = params.has(OLDER_THAN) ? params.get(OLDER_THAN) : 
null;
         Integer maxDeletes =
                 params.has(MAX_DELETES) ? 
Integer.parseInt(params.get(MAX_DELETES)) : null;
+        String options = params.has(OPTIONS) ? params.get(OPTIONS) : null;
 
         ExpireSnapshotsAction action =
                 new ExpireSnapshotsAction(
@@ -55,7 +57,8 @@ public class ExpireSnapshotsActionFactory implements 
ActionFactory {
                         retainMax,
                         retainMin,
                         olderThan,
-                        maxDeletes);
+                        maxDeletes,
+                        options);
 
         return Optional.of(action);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
index c6b0a170b0..488aee544e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
@@ -19,15 +19,16 @@
 package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.utils.ProcedureUtils;
 
 import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.annotation.ProcedureHint;
 import org.apache.flink.table.procedure.ProcedureContext;
 
-import java.util.Collections;
+import java.util.HashMap;
 
 /** Compact manifest file to reduce deleted manifest entries. */
 public class CompactManifestProcedure extends ProcedureBase {
@@ -39,14 +40,21 @@ public class CompactManifestProcedure extends ProcedureBase 
{
         return "compact_manifest";
     }
 
-    @ProcedureHint(argument = {@ArgumentHint(name = "table", type = 
@DataTypeHint("STRING"))})
-    public String[] call(ProcedureContext procedureContext, String tableId) 
throws Exception {
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "options", type = 
@DataTypeHint("STRING"), isOptional = true)
+            })
+    public String[] call(ProcedureContext procedureContext, String tableId, 
String options)
+            throws Exception {
 
-        Table table =
-                table(tableId)
-                        .copy(
-                                Collections.singletonMap(
-                                        CoreOptions.COMMIT_USER_PREFIX.key(), 
COMMIT_USER));
+        FileStoreTable table = (FileStoreTable) table(tableId);
+        HashMap<String, String> dynamicOptions = new HashMap<>();
+        ProcedureUtils.putIfNotEmpty(
+                dynamicOptions, CoreOptions.COMMIT_USER_PREFIX.key(), 
COMMIT_USER);
+        ProcedureUtils.putAllOptions(dynamicOptions, options);
+
+        table = table.copy(dynamicOptions);
 
         try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
             commit.compactManifests();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index 582b4711fd..8a6528bd02 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -18,12 +18,13 @@
 
 package org.apache.paimon.flink.procedure;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.FileStore;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.TimeUtils;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.ProcedureUtils;
 
 import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.annotation.DataTypeHint;
@@ -31,13 +32,9 @@ import org.apache.flink.table.annotation.ProcedureHint;
 import org.apache.flink.table.procedure.ProcedureContext;
 import org.apache.flink.types.Row;
 
-import java.time.Duration;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy;
-
 /** A procedure to expire partitions. */
 public class ExpirePartitionsProcedure extends ProcedureBase {
     @Override
@@ -48,7 +45,10 @@ public class ExpirePartitionsProcedure extends ProcedureBase 
{
     @ProcedureHint(
             argument = {
                 @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
-                @ArgumentHint(name = "expiration_time", type = 
@DataTypeHint(value = "STRING")),
+                @ArgumentHint(
+                        name = "expiration_time",
+                        type = @DataTypeHint(value = "STRING"),
+                        isOptional = true),
                 @ArgumentHint(
                         name = "timestamp_formatter",
                         type = @DataTypeHint("STRING"),
@@ -64,7 +64,8 @@ public class ExpirePartitionsProcedure extends ProcedureBase {
                 @ArgumentHint(
                         name = "max_expires",
                         type = @DataTypeHint("INTEGER"),
-                        isOptional = true)
+                        isOptional = true),
+                @ArgumentHint(name = "options", type = 
@DataTypeHint("STRING"), isOptional = true)
             })
     public @DataTypeHint("ROW< expired_partitions STRING>") Row[] call(
             ProcedureContext procedureContext,
@@ -73,26 +74,27 @@ public class ExpirePartitionsProcedure extends 
ProcedureBase {
             String timestampFormatter,
             String timestampPattern,
             String expireStrategy,
-            Integer maxExpires)
+            Integer maxExpires,
+            String options)
             throws Catalog.TableNotExistException {
-        FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
+        Map<String, String> dynamicOptions =
+                ProcedureUtils.fillInPartitionOptions(
+                        expireStrategy,
+                        timestampFormatter,
+                        timestampPattern,
+                        expirationTime,
+                        maxExpires,
+                        options);
+        Table table = table(tableId).copy(dynamicOptions);
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
         FileStore fileStore = fileStoreTable.store();
-        Map<String, String> map = new HashMap<>();
-        map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), 
expireStrategy);
-        map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), 
timestampFormatter);
-        map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), 
timestampPattern);
 
-        PartitionExpire partitionExpire =
-                fileStore.newPartitionExpire(
-                        "",
-                        fileStoreTable,
-                        TimeUtils.parseDuration(expirationTime),
-                        Duration.ofMillis(0L),
-                        createPartitionExpireStrategy(
-                                CoreOptions.fromMap(map), 
fileStore.partitionType()));
-        if (maxExpires != null) {
-            partitionExpire.withMaxExpireNum(maxExpires);
-        }
+        PartitionExpire partitionExpire = fileStore.newPartitionExpire("", 
fileStoreTable);
+
+        Preconditions.checkNotNull(
+                partitionExpire,
+                "Both the partition expiration time and partition field can 
not be null.");
+
         List<Map<String, String>> expired = 
partitionExpire.expire(Long.MAX_VALUE);
         return expired == null || expired.isEmpty()
                 ? new Row[] {Row.of("No expired partitions.")}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java
index 9b662fc369..b7c481828d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java
@@ -18,18 +18,20 @@
 
 package org.apache.paimon.flink.procedure;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.options.ExpireConfig;
 import org.apache.paimon.table.ExpireSnapshots;
-import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.ProcedureUtils;
 
 import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.annotation.ProcedureHint;
 import org.apache.flink.table.procedure.ProcedureContext;
 
-import java.time.Duration;
-import java.util.TimeZone;
+import java.util.HashMap;
 
 /** A procedure to expire snapshots. */
 public class ExpireSnapshotsProcedure extends ProcedureBase {
@@ -57,7 +59,8 @@ public class ExpireSnapshotsProcedure extends ProcedureBase {
                 @ArgumentHint(
                         name = "max_deletes",
                         type = @DataTypeHint("INTEGER"),
-                        isOptional = true)
+                        isOptional = true),
+                @ArgumentHint(name = "options", type = 
@DataTypeHint("STRING"), isOptional = true)
             })
     public String[] call(
             ProcedureContext procedureContext,
@@ -65,27 +68,20 @@ public class ExpireSnapshotsProcedure extends ProcedureBase 
{
             Integer retainMax,
             Integer retainMin,
             String olderThanStr,
-            Integer maxDeletes)
+            Integer maxDeletes,
+            String options)
             throws Catalog.TableNotExistException {
-        ExpireSnapshots expireSnapshots = table(tableId).newExpireSnapshots();
-        ExpireConfig.Builder builder = ExpireConfig.builder();
-        if (retainMax != null) {
-            builder.snapshotRetainMax(retainMax);
-        }
-        if (retainMin != null) {
-            builder.snapshotRetainMin(retainMin);
-        }
-        if (olderThanStr != null) {
-            builder.snapshotTimeRetain(
-                    Duration.ofMillis(
-                            System.currentTimeMillis()
-                                    - DateTimeUtils.parseTimestampData(
-                                                    olderThanStr, 3, 
TimeZone.getDefault())
-                                            .getMillisecond()));
-        }
-        if (maxDeletes != null) {
-            builder.snapshotMaxDeletes(maxDeletes);
-        }
+        Table table = table(tableId);
+        HashMap<String, String> dynamicOptions = new HashMap<>();
+        ProcedureUtils.putAllOptions(dynamicOptions, options);
+
+        table = table.copy(dynamicOptions);
+        ExpireSnapshots expireSnapshots = table.newExpireSnapshots();
+
+        CoreOptions tableOptions = ((FileStoreTable) table).store().options();
+        ExpireConfig.Builder builder =
+                ProcedureUtils.fillInSnapshotOptions(
+                        tableOptions, retainMax, retainMin, olderThanStr, 
maxDeletes);
         return new String[] {expireSnapshots.config(builder.build()).expire() 
+ ""};
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionITCase.java
index 90444287d4..45d6197f85 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionITCase.java
@@ -118,7 +118,8 @@ public class UnawareBucketNewFilesCompactionITCase extends 
AbstractTestBase {
         assertThat(actual.keySet()).hasSameElementsAs(values);
         assertThat(actual.values()).allMatch(i -> i == 3);
 
-        tEnv.executeSql("CALL sys.expire_snapshots(`table` => 'default.T', 
retain_max => 1)")
+        tEnv.executeSql(
+                        "CALL sys.expire_snapshots(`table` => 'default.T', 
retain_max => 1, retain_min => 1)")
                 .await();
         assertThat(fileIO.listStatus(new Path(warehouse, 
"default.db/T/pt=0/bucket-0"))).hasSize(1);
         assertThat(fileIO.listStatus(new Path(warehouse, 
"default.db/T/pt=1/bucket-0"))).hasSize(1);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
index da091722fb..2e5eff6ced 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
@@ -31,6 +31,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** IT Case for {@link ExpirePartitionsProcedure}. */
 public class ExpirePartitionsProcedureITCase extends CatalogITCaseBase {
@@ -452,6 +453,91 @@ public class ExpirePartitionsProcedureITCase extends 
CatalogITCaseBase {
                 .containsExactlyInAnyOrder("c:2024-06-03", 
"Never-expire:9999-09-09");
     }
 
+    @Test
+    public void testExpirePartitionsLoadTablePropsFirst() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " k STRING,"
+                        + " dt STRING,"
+                        + " PRIMARY KEY (k, dt) NOT ENFORCED"
+                        + ") PARTITIONED BY (dt) WITH ("
+                        + " 'bucket' = '1', "
+                        + " 'write-only' = 'true', "
+                        + " 'partition.timestamp-formatter' = 'yyyy-MM-dd', "
+                        + " 'partition.expiration-max-num'='2'"
+                        + ")");
+        FileStoreTable table = paimonTable("T");
+
+        sql("INSERT INTO T VALUES ('a', '2024-06-01')");
+        sql("INSERT INTO T VALUES ('b', '2024-06-02')");
+        sql("INSERT INTO T VALUES ('c', '2024-06-03')");
+        // This partition never expires.
+        sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')");
+        Function<InternalRow, String> consumerReadResult =
+                (InternalRow row) -> row.getString(0) + ":" + row.getString(1);
+
+        assertThat(read(table, consumerReadResult))
+                .containsExactlyInAnyOrder(
+                        "a:2024-06-01", "b:2024-06-02", "c:2024-06-03", 
"Never-expire:9999-09-09");
+
+        // no 'partition.expiration-time' value in table property or procedure 
parameter.
+        assertThatThrownBy(() -> sql("CALL sys.expire_partitions(`table` => 
'default.T')"))
+                .rootCause()
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining(
+                        "Both the partition expiration time and partition 
field can not be null.");
+
+        // 'partition.timestamp-formatter' value using table property.
+        // 'partition.expiration-time' value using procedure parameter.
+        assertThat(
+                        callExpirePartitions(
+                                "CALL sys.expire_partitions("
+                                        + "`table` => 'default.T'"
+                                        + ", expiration_time => '1 d')"))
+                .containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02");
+
+        assertThat(read(table, consumerReadResult))
+                .containsExactlyInAnyOrder("c:2024-06-03", 
"Never-expire:9999-09-09");
+    }
+
+    @Test
+    public void testExpirePartitionsUseOptionsParam() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " k STRING,"
+                        + " dt STRING,"
+                        + " PRIMARY KEY (k, dt) NOT ENFORCED"
+                        + ") PARTITIONED BY (dt) WITH ("
+                        + " 'bucket' = '1'"
+                        + ")");
+        FileStoreTable table = paimonTable("T");
+
+        sql("INSERT INTO T VALUES ('a', '2024-06-01')");
+        sql("INSERT INTO T VALUES ('b', '2024-06-02')");
+        sql("INSERT INTO T VALUES ('c', '2024-06-03')");
+        // This partition never expires.
+        sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')");
+        Function<InternalRow, String> consumerReadResult =
+                (InternalRow row) -> row.getString(0) + ":" + row.getString(1);
+
+        assertThat(read(table, consumerReadResult))
+                .containsExactlyInAnyOrder(
+                        "a:2024-06-01", "b:2024-06-02", "c:2024-06-03", 
"Never-expire:9999-09-09");
+
+        // set conf in options.
+        assertThat(
+                        callExpirePartitions(
+                                "CALL sys.expire_partitions("
+                                        + "`table` => 'default.T'"
+                                        + ", options => 
'partition.expiration-time = 1d,"
+                                        + " partition.expiration-max-num = 2, "
+                                        + " partition.timestamp-formatter = 
yyyy-MM-dd')"))
+                .containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02");
+
+        assertThat(read(table, consumerReadResult))
+                .containsExactlyInAnyOrder("c:2024-06-03", 
"Never-expire:9999-09-09");
+    }
+
     /** Return a list of expired partitions. */
     public List<String> callExpirePartitions(String callSql) {
         return sql(callSql).stream()
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
index 77b7409f41..4c530130a2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
@@ -40,7 +40,8 @@ public class ExpireSnapshotsProcedureITCase extends 
CatalogITCaseBase {
     public void testExpireSnapshotsProcedure() throws Exception {
         sql(
                 "CREATE TABLE word_count ( word STRING PRIMARY KEY NOT 
ENFORCED, cnt INT)"
-                        + " WITH ( 'num-sorted-run.compaction-trigger' = 
'9999' )");
+                        + " WITH ( 'num-sorted-run.compaction-trigger' = 
'9999',"
+                        + "'write-only' = 'true', 'snapshot.num-retained.min' 
= '1')");
         FileStoreTable table = paimonTable("word_count");
         SnapshotManager snapshotManager = table.snapshotManager();
 
@@ -81,7 +82,9 @@ public class ExpireSnapshotsProcedureITCase extends 
CatalogITCaseBase {
     public void testExpireSnapshotsAction() throws Exception {
         sql(
                 "CREATE TABLE word_count ( word STRING PRIMARY KEY NOT 
ENFORCED, cnt INT)"
-                        + " WITH ( 'num-sorted-run.compaction-trigger' = 
'9999' )");
+                        + " WITH ( 'num-sorted-run.compaction-trigger' = 
'9999',"
+                        + "'write-only' = 'true', 'snapshot.num-retained.min' 
= '1')");
+
         FileStoreTable table = paimonTable("word_count");
         StreamExecutionEnvironment env =
                 streamExecutionEnvironmentBuilder().streamingMode().build();
@@ -162,6 +165,35 @@ public class ExpireSnapshotsProcedureITCase extends 
CatalogITCaseBase {
         checkSnapshots(snapshotManager, 6, 6);
     }
 
+    @Test
+    public void testLoadTablePropsFirstAndOptions() throws Exception {
+        sql(
+                "CREATE TABLE word_count ( word STRING PRIMARY KEY NOT 
ENFORCED, cnt INT)"
+                        + " WITH ( 'num-sorted-run.compaction-trigger' = 
'9999',"
+                        + "'write-only' = 'true', 'snapshot.num-retained.min' 
= '1', 'snapshot.num-retained.max' = '5')");
+        FileStoreTable table = paimonTable("word_count");
+        SnapshotManager snapshotManager = table.snapshotManager();
+
+        // initially prepare 6 snapshots, expected snapshots (1, 2, 3, 4, 5, 6)
+        for (int i = 0; i < 6; ++i) {
+            sql("INSERT INTO word_count VALUES ('" + String.valueOf(i) + "', " 
+ i + ")");
+        }
+        checkSnapshots(snapshotManager, 1, 6);
+
+        // snapshot.num-retained.max is 5, expected snapshots (2, 3, 4, 5, 6)
+        sql("CALL sys.expire_snapshots(`table` => 'default.word_count')");
+        checkSnapshots(snapshotManager, 2, 6);
+
+        // older_than => timestamp of snapshot 6, snapshot.expire.limit => 1, 
expected snapshots (3,
+        // 4, 5, 6)
+        Timestamp ts6 = new 
Timestamp(snapshotManager.latestSnapshot().timeMillis());
+        sql(
+                "CALL sys.expire_snapshots(`table` => 'default.word_count', 
older_than => '"
+                        + ts6.toString()
+                        + "', options => 'snapshot.expire.limit=1')");
+        checkSnapshots(snapshotManager, 3, 6);
+    }
+
     protected void checkSnapshots(SnapshotManager sm, int earliest, int 
latest) throws IOException {
         assertThat(sm.snapshotCount()).isEqualTo(latest - earliest + 1);
         assertThat(sm.earliestSnapshotId()).isEqualTo(earliest);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
index 12a3a286f4..5a6837f6c1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.procedure;
 
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.utils.ProcedureUtils;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
@@ -29,6 +30,8 @@ import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
+import java.util.HashMap;
+
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
 /**
@@ -41,7 +44,10 @@ import static 
org.apache.spark.sql.types.DataTypes.StringType;
 public class CompactManifestProcedure extends BaseProcedure {
 
     private static final ProcedureParameter[] PARAMETERS =
-            new ProcedureParameter[] {ProcedureParameter.required("table", 
StringType)};
+            new ProcedureParameter[] {
+                ProcedureParameter.required("table", StringType),
+                ProcedureParameter.optional("options", StringType)
+            };
 
     private static final StructType OUTPUT_TYPE =
             new StructType(
@@ -67,7 +73,12 @@ public class CompactManifestProcedure extends BaseProcedure {
     public InternalRow[] call(InternalRow args) {
 
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+        String options = args.isNullAt(1) ? null : args.getString(1);
+
         Table table = loadSparkTable(tableIdent).getTable();
+        HashMap<String, String> dynamicOptions = new HashMap<>();
+        ProcedureUtils.putAllOptions(dynamicOptions, options);
+        table = table.copy(dynamicOptions);
 
         try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
             commit.compactManifests();
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 689e2cabd3..50cc5ad986 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -48,6 +48,7 @@ import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.ParameterUtils;
+import org.apache.paimon.utils.ProcedureUtils;
 import org.apache.paimon.utils.SerializationUtils;
 import org.apache.paimon.utils.StringUtils;
 import org.apache.paimon.utils.TimeUtils;
@@ -197,11 +198,10 @@ public class CompactProcedure extends BaseProcedure {
                                 table.partitionKeys());
                     }
 
-                    Map<String, String> dynamicOptions = new HashMap<>();
-                    dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
-                    if (!StringUtils.isNullOrWhitespaceOnly(options)) {
-                        
dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options));
-                    }
+                    HashMap<String, String> dynamicOptions = new HashMap<>();
+                    ProcedureUtils.putIfNotEmpty(
+                            dynamicOptions, CoreOptions.WRITE_ONLY.key(), 
"false");
+                    ProcedureUtils.putAllOptions(dynamicOptions, options);
                     table = table.copy(dynamicOptions);
                     InternalRow internalRow =
                             newInternalRow(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
index dd9388b67a..036b3b3c8b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
@@ -18,11 +18,11 @@
 
 package org.apache.paimon.spark.procedure;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.FileStore;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.TimeUtils;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.ProcedureUtils;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
@@ -32,12 +32,9 @@ import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
-import java.time.Duration;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy;
 import static org.apache.spark.sql.types.DataTypes.IntegerType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
@@ -47,11 +44,12 @@ public class ExpirePartitionsProcedure extends 
BaseProcedure {
     private static final ProcedureParameter[] PARAMETERS =
             new ProcedureParameter[] {
                 ProcedureParameter.required("table", StringType),
-                ProcedureParameter.required("expiration_time", StringType),
+                ProcedureParameter.optional("expiration_time", StringType),
                 ProcedureParameter.optional("timestamp_formatter", StringType),
                 ProcedureParameter.optional("timestamp_pattern", StringType),
                 ProcedureParameter.optional("expire_strategy", StringType),
-                ProcedureParameter.optional("max_expires", IntegerType)
+                ProcedureParameter.optional("max_expires", IntegerType),
+                ProcedureParameter.optional("options", StringType)
             };
 
     private static final StructType OUTPUT_TYPE =
@@ -77,32 +75,33 @@ public class ExpirePartitionsProcedure extends 
BaseProcedure {
     @Override
     public InternalRow[] call(InternalRow args) {
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-        String expirationTime = args.getString(1);
+        String expirationTime = args.isNullAt(1) ? null : args.getString(1);
         String timestampFormatter = args.isNullAt(2) ? null : 
args.getString(2);
         String timestampPattern = args.isNullAt(3) ? null : args.getString(3);
         String expireStrategy = args.isNullAt(4) ? null : args.getString(4);
         Integer maxExpires = args.isNullAt(5) ? null : args.getInt(5);
+        String options = args.isNullAt(6) ? null : args.getString(6);
+
         return modifyPaimonTable(
                 tableIdent,
                 table -> {
+                    Map<String, String> dynamicOptions =
+                            ProcedureUtils.fillInPartitionOptions(
+                                    expireStrategy,
+                                    timestampFormatter,
+                                    timestampPattern,
+                                    expirationTime,
+                                    maxExpires,
+                                    options);
+                    table = table.copy(dynamicOptions);
                     FileStoreTable fileStoreTable = (FileStoreTable) table;
                     FileStore fileStore = fileStoreTable.store();
-                    Map<String, String> map = new HashMap<>();
-                    map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), 
expireStrategy);
-                    map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), 
timestampFormatter);
-                    map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), 
timestampPattern);
 
                     PartitionExpire partitionExpire =
-                            fileStore.newPartitionExpire(
-                                    "",
-                                    fileStoreTable,
-                                    TimeUtils.parseDuration(expirationTime),
-                                    Duration.ofMillis(0L),
-                                    createPartitionExpireStrategy(
-                                            CoreOptions.fromMap(map), 
fileStore.partitionType()));
-                    if (maxExpires != null) {
-                        partitionExpire.withMaxExpireNum(maxExpires);
-                    }
+                            fileStore.newPartitionExpire("", fileStoreTable);
+                    Preconditions.checkNotNull(
+                            partitionExpire,
+                            "Both the partition expiration time and partition 
field can not be null.");
                     List<Map<String, String>> expired = 
partitionExpire.expire(Long.MAX_VALUE);
                     return expired == null || expired.isEmpty()
                             ? new InternalRow[] {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
index f24f18067a..c54a3dbdaf 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
@@ -18,10 +18,11 @@
 
 package org.apache.paimon.spark.procedure;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.options.ExpireConfig;
 import org.apache.paimon.table.ExpireSnapshots;
-import org.apache.paimon.utils.DateTimeUtils;
-import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.ProcedureUtils;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
@@ -30,8 +31,7 @@ import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
-import java.time.Duration;
-import java.util.TimeZone;
+import java.util.HashMap;
 
 import static org.apache.spark.sql.types.DataTypes.IntegerType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -45,7 +45,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
                 ProcedureParameter.optional("retain_max", IntegerType),
                 ProcedureParameter.optional("retain_min", IntegerType),
                 ProcedureParameter.optional("older_than", StringType),
-                ProcedureParameter.optional("max_deletes", IntegerType)
+                ProcedureParameter.optional("max_deletes", IntegerType),
+                ProcedureParameter.optional("options", StringType)
             };
 
     private static final StructType OUTPUT_TYPE =
@@ -76,29 +77,20 @@ public class ExpireSnapshotsProcedure extends BaseProcedure 
{
         Integer retainMin = args.isNullAt(2) ? null : args.getInt(2);
         String olderThanStr = args.isNullAt(3) ? null : args.getString(3);
         Integer maxDeletes = args.isNullAt(4) ? null : args.getInt(4);
+        String options = args.isNullAt(5) ? null : args.getString(5);
 
         return modifyPaimonTable(
                 tableIdent,
                 table -> {
+                    HashMap<String, String> dynamicOptions = new HashMap<>();
+                    ProcedureUtils.putAllOptions(dynamicOptions, options);
+                    table = table.copy(dynamicOptions);
                     ExpireSnapshots expireSnapshots = 
table.newExpireSnapshots();
-                    ExpireConfig.Builder builder = ExpireConfig.builder();
-                    if (retainMax != null) {
-                        builder.snapshotRetainMax(retainMax);
-                    }
-                    if (retainMin != null) {
-                        builder.snapshotRetainMin(retainMin);
-                    }
-                    if (!StringUtils.isNullOrWhitespaceOnly(olderThanStr)) {
-                        long olderThanMills =
-                                DateTimeUtils.parseTimestampData(
-                                                olderThanStr, 3, 
TimeZone.getDefault())
-                                        .getMillisecond();
-                        builder.snapshotTimeRetain(
-                                Duration.ofMillis(System.currentTimeMillis() - 
olderThanMills));
-                    }
-                    if (maxDeletes != null) {
-                        builder.snapshotMaxDeletes(maxDeletes);
-                    }
+
+                    CoreOptions tableOptions = ((FileStoreTable) 
table).store().options();
+                    ExpireConfig.Builder builder =
+                            ProcedureUtils.fillInSnapshotOptions(
+                                    tableOptions, retainMax, retainMin, 
olderThanStr, maxDeletes);
                     int deleted = 
expireSnapshots.config(builder.build()).expire();
                     return new InternalRow[] {newInternalRow(deleted)};
                 });
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala
index 301b769288..e9b00298e4 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala
@@ -148,7 +148,8 @@ class CreateTagFromTimestampProcedureTest extends 
PaimonSparkTestBase with Strea
 
             // make snapshot 1 expire.
             checkAnswer(
-              spark.sql("CALL paimon.sys.expire_snapshots(table => 'test.T', 
retain_max => 1)"),
+              spark.sql(
+                "CALL paimon.sys.expire_snapshots(table => 'test.T', 
retain_max => 1, retain_min => 1)"),
               Row(1) :: Nil)
 
             // create tag from timestamp that earlier than the expired 
snapshot 1.
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
index 9f0d23dc93..be7dc26241 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
@@ -23,6 +23,7 @@ import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.streaming.StreamTest
+import org.assertj.core.api.Assertions.assertThatThrownBy
 
 /** IT Case for [[ExpirePartitionsProcedure]]. */
 class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with 
StreamTest {
@@ -616,4 +617,143 @@ class ExpirePartitionsProcedureTest extends 
PaimonSparkTestBase with StreamTest
       }
     }
   }
+
+  test("Paimon Procedure: expire partitions load table property first") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          spark.sql(s"""
+                       |CREATE TABLE T (k STRING, pt STRING)
+                       |TBLPROPERTIES (
+                       |  'primary-key' = 'k,pt',
+                       |  'bucket' = '1',
+                       |  'write-only' = 'true',
+                       |  'partition.timestamp-formatter' = 'yyyy-MM-dd',
+                       |  'partition.expiration-max-num'='2')
+                       |PARTITIONED BY (pt)
+                       |""".stripMargin)
+          val location = loadTable("T").location().toString
+
+          val inputData = MemoryStream[(String, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("k", "pt")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], _: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T")
+
+          try {
+            // snapshot-1
+            inputData.addData(("a", "2024-06-01"))
+            stream.processAllAvailable()
+
+            // snapshot-2
+            inputData.addData(("b", "2024-06-02"))
+            stream.processAllAvailable()
+
+            // snapshot-3
+            inputData.addData(("c", "2024-06-03"))
+            stream.processAllAvailable()
+
+            // This partition never expires.
+            inputData.addData(("Never-expire", "9999-09-09"))
+            stream.processAllAvailable()
+
+            checkAnswer(
+              query(),
+              Row("a", "2024-06-01") :: Row("b", "2024-06-02") :: Row("c", 
"2024-06-03") :: Row(
+                "Never-expire",
+                "9999-09-09") :: Nil)
+
+            // 'partition.timestamp-formatter' value using table property.
+            // 'partition.expiration-time' value using procedure parameter.
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.expire_partitions(table => 'test.T', 
expiration_time => '1 d')"),
+              Row("pt=2024-06-01") :: Row("pt=2024-06-02") :: Nil
+            )
+
+            checkAnswer(query(), Row("c", "2024-06-03") :: Row("Never-expire", 
"9999-09-09") :: Nil)
+
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+
+  test("Paimon Procedure: expire partitions add options parameter") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          spark.sql(s"""
+                       |CREATE TABLE T (k STRING, pt STRING)
+                       |TBLPROPERTIES (
+                       |  'primary-key' = 'k,pt',
+                       |  'bucket' = '1')
+                       |PARTITIONED BY (pt)
+                       |""".stripMargin)
+          val location = loadTable("T").location().toString
+
+          val inputData = MemoryStream[(String, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("k", "pt")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], _: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T")
+
+          try {
+            // snapshot-1
+            inputData.addData(("a", "2024-06-01"))
+            stream.processAllAvailable()
+
+            // snapshot-2
+            inputData.addData(("b", "2024-06-02"))
+            stream.processAllAvailable()
+
+            // snapshot-3
+            inputData.addData(("c", "2024-06-03"))
+            stream.processAllAvailable()
+
+            // This partition never expires.
+            inputData.addData(("Never-expire", "9999-09-09"))
+            stream.processAllAvailable()
+
+            checkAnswer(
+              query(),
+              Row("a", "2024-06-01") :: Row("b", "2024-06-02") :: Row("c", 
"2024-06-03") :: Row(
+                "Never-expire",
+                "9999-09-09") :: Nil)
+
+            // set conf in options.
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.expire_partitions(table => 'test.T', " +
+                  "options => 'partition.expiration-time = 1d," +
+                  " partition.expiration-max-num = 2," +
+                  " partition.timestamp-formatter = yyyy-MM-dd')"),
+              Row("pt=2024-06-01") :: Row("pt=2024-06-02") :: Nil
+            )
+
+            checkAnswer(query(), Row("c", "2024-06-03") :: Row("Never-expire", 
"9999-09-09") :: Nil)
+
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
index da7be42310..b39aa5d058 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
@@ -39,7 +39,8 @@ class ExpireSnapshotsProcedureTest extends 
PaimonSparkTestBase with StreamTest {
           // define a change-log table and test `forEachBatch` api
           spark.sql(s"""
                        |CREATE TABLE T (a INT, b STRING)
-                       |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
+                       |TBLPROPERTIES ('primary-key'='a', 'bucket'='3',
+                       |'write-only' = 'true', 'snapshot.num-retained.min' = 
'1')
                        |""".stripMargin)
           val location = loadTable("T").location().toString
 
@@ -144,7 +145,8 @@ class ExpireSnapshotsProcedureTest extends 
PaimonSparkTestBase with StreamTest {
   test("Paimon Procedure: test parameter order_than with string type") {
     sql(
       "CREATE TABLE T (a INT, b STRING) " +
-        "TBLPROPERTIES ( 'num-sorted-run.compaction-trigger' = '999' )")
+        "TBLPROPERTIES ( 'num-sorted-run.compaction-trigger' = '999'," +
+        "'write-only' = 'true', 'snapshot.num-retained.min' = '1')")
     val table = loadTable("T")
     val snapshotManager = table.snapshotManager
 
@@ -160,6 +162,119 @@ class ExpireSnapshotsProcedureTest extends 
PaimonSparkTestBase with StreamTest {
     checkSnapshots(snapshotManager, 3, 5)
   }
 
+  test("Paimon Procedure: expire snapshots load table property first") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 'bucket'='3',
+                       |'snapshot.num-retained.max' = '2',
+                       |'snapshot.num-retained.min' = '1',
+                       |'write-only' = 'true')
+                       |""".stripMargin)
+          val location = loadTable("T").location().toString
+
+          val inputData = MemoryStream[(Int, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], _: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+          try {
+            // snapshot-1
+            inputData.addData((1, "a"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Nil)
+
+            // snapshot-2
+            inputData.addData((2, "b"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+            // snapshot-3
+            inputData.addData((2, "b2"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+
+            // expire
+            checkAnswer(
+              spark.sql("CALL paimon.sys.expire_snapshots(table => 'test.T')"),
+              Row(1) :: Nil)
+
+            checkAnswer(
+              spark.sql("SELECT snapshot_id FROM paimon.test.`T$snapshots`"),
+              Row(2L) :: Row(3L) :: Nil)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+
+  test("Paimon Procedure: expire snapshots add options parameter") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 'bucket'='3', 
'write-only' = 'true')
+                       |""".stripMargin)
+          val location = loadTable("T").location().toString
+
+          val inputData = MemoryStream[(Int, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], _: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+          try {
+            // snapshot-1
+            inputData.addData((1, "a"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Nil)
+
+            // snapshot-2
+            inputData.addData((2, "b"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+            // snapshot-3
+            inputData.addData((2, "b2"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.expire_snapshots(table => 'test.T', options 
=> 'snapshot.num-retained.max=2, snapshot.num-retained.min=1')"),
+              Row(1L) :: Nil)
+
+            checkAnswer(
+              spark.sql("SELECT snapshot_id FROM paimon.test.`T$snapshots`"),
+              Row(2L) :: Row(3L) :: Nil)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+
   def checkSnapshots(sm: SnapshotManager, earliest: Int, latest: Int): Unit = {
     assertThat(sm.snapshotCount).isEqualTo(latest - earliest + 1)
     assertThat(sm.earliestSnapshotId).isEqualTo(earliest)
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
index a205e69fd7..af82549738 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
@@ -278,7 +278,7 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
     fileIO.writeFile(orphanFile2, "b", true)
 
     checkAnswer(
-      spark.sql("CALL paimon.sys.expire_snapshots(table => 'T', retain_max => 
1)"),
+      spark.sql("CALL paimon.sys.expire_snapshots(table => 'T', retain_max => 
1, retain_min => 1)"),
       Row(2) :: Nil)
 
     val older_than1 = new java.sql.Timestamp(
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
index 4f8ccae22d..8170c58364 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
@@ -379,7 +379,7 @@ abstract class AnalyzeTableTestBase extends 
PaimonSparkTestBase {
     Assertions.assertEquals(2, statsFileCount(tableLocation, fileIO))
 
     // test expire statistic
-    spark.sql("CALL sys.expire_snapshots(table => 'test.T', retain_max => 1)")
+    spark.sql("CALL sys.expire_snapshots(table => 'test.T', retain_max => 1, 
retain_min => 1)")
     Assertions.assertEquals(1, statsFileCount(tableLocation, fileIO))
 
     val orphanStats = new Path(tableLocation, "statistics/stats-orphan-0")

Reply via email to