This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 59d38e3f86 [core][flink] introduce changelog expire procedure and 
action (#5319)
59d38e3f86 is described below

commit 59d38e3f86aaa49d73854b64ded29a54af2a10ee
Author: LsomeYeah <[email protected]>
AuthorDate: Fri Mar 21 13:08:56 2025 +0800

    [core][flink] introduce changelog expire procedure and action (#5319)
---
 docs/content/flink/procedures.md                   |  39 +++
 .../apache/paimon/table/ExpireChangelogImpl.java   |  79 ++++++
 .../org/apache/paimon/utils/ChangelogManager.java  |   8 +
 .../org/apache/paimon/utils/HintFileUtils.java     |   5 +
 .../flink/procedure/ExpireChangelogsProcedure.java |  88 ++++++
 .../flink/action/ExpireChangelogsAction.java       |  68 +++++
 .../action/ExpireChangelogsActionFactory.java      |  81 ++++++
 .../flink/procedure/ExpireChangelogsProcedure.java | 101 +++++++
 .../services/org.apache.paimon.factories.Factory   |   2 +
 .../procedure/ExpireChangelogsProcedureITCase.java | 297 +++++++++++++++++++++
 10 files changed, 768 insertions(+)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index a5a24def4e..068eb439f9 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -601,6 +601,45 @@ All available procedures are listed below.
          CALL sys.expire_snapshots(`table` => 'default.T', older_than => 
'2024-01-01 12:00:00', max_deletes => 10)<br/><br/>
       </td>
    </tr>
+   <tr>
+      <td>expire_changelogs</td>
+      <td>
+         -- Use named argument<br/>
+         CALL [catalog.]sys.expire_changelogs(<br/>
+            `table` => 'identifier', <br/>
+            retain_max => 'retain_max', <br/>
+            retain_min => 'retain_min', <br/>
+            older_than => 'older_than', <br/>
+            max_deletes => 'max_deletes') <br/>
+            delete_all => 'delete_all') <br/><br/>
+         -- Use indexed argument<br/>
+         -- for Flink 1.18<br/>
+         CALL [catalog.]sys.expire_changelogs(table, retain_max, retain_min, 
older_than, max_deletes)<br/><br/>
+         CALL [catalog.]sys.expire_changelogs(table, delete_all)<br/><br/>
+         -- for Flink 1.19 and later<br/>
+         CALL [catalog.]sys.expire_changelogs(table, retain_max, retain_min, 
older_than, max_deletes, delete_all)<br/><br/>
+      </td>
+      <td>
+         To expire changelogs. Argument:
+            <li>table: the target table identifier. Cannot be empty.</li>
+            <li>retain_max: the maximum number of completed changelogs to 
retain.</li>
+            <li>retain_min: the minimum number of completed changelogs to 
retain.</li>
+            <li>order_than: timestamp before which changelogs will be 
removed.</li>
+            <li>max_deletes: the maximum number of changelogs that can be 
deleted at once.</li>
+            <li>delete_all: whether to delete all separated changelogs.</li>
+      </td>
+      <td>
+         -- for Flink 1.18<br/><br/>
+         CALL sys.expire_changelogs('default.T', 4, 2, '2024-01-01 12:00:00', 
2)<br/><br/>
+         CALL sys.expire_changelogs('default.T', true)<br/><br/>
+         -- for Flink 1.19 and later<br/><br/>
+         CALL sys.expire_changelogs(`table` => 'default.T', retain_max => 
2)<br/><br/>
+         CALL sys.expire_changelogs(`table` => 'default.T', older_than => 
'2024-01-01 12:00:00')<br/><br/>
+         CALL sys.expire_changelogs(`table` => 'default.T', older_than => 
'2024-01-01 12:00:00', retain_min => 10)<br/><br/>
+         CALL sys.expire_changelogs(`table` => 'default.T', older_than => 
'2024-01-01 12:00:00', max_deletes => 10)<br/><br/>
+         CALL sys.expire_changelogs(`table` => 'default.T', delete_all => 
true)<br/><br/>
+      </td>
+   </tr>
 <tr>
       <td>expire_partitions</td>
       <td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
index bf4f8665ad..cf11a2ba14 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
@@ -32,6 +32,7 @@ import org.apache.paimon.utils.TagManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
@@ -173,6 +174,84 @@ public class ExpireChangelogImpl implements 
ExpireSnapshots {
         return (int) (endExclusiveId - earliestId);
     }
 
+    /** expire all separated changelogs, only used by 
ExpireChangelogsProcedure. */
+    public void expireAll() {
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            // no snapshot, nothing to expire
+            return;
+        }
+
+        Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+        if (earliestSnapshotId == null) {
+            return;
+        }
+
+        Long latestChangelogId = changelogManager.latestLongLivedChangelogId();
+        if (latestChangelogId == null) {
+            return;
+        }
+        Long earliestChangelogId = 
changelogManager.earliestLongLivedChangelogId();
+        if (earliestChangelogId == null) {
+            return;
+        }
+
+        LOG.info(
+                "Read earliest and latest changelog for expire all. 
earliestChangelogId is {}, latestChangelogId is {}",
+                earliestChangelogId,
+                latestChangelogId);
+
+        List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
+
+        // files used by the earliest snapshot id should be skipped
+        Preconditions.checkArgument(
+                latestChangelogId < earliestSnapshotId,
+                "latest changelog id should be less than earliest snapshot id."
+                        + "please check your table!");
+        List<Snapshot> skippingSnapshots =
+                findSkippingTags(taggedSnapshots, earliestChangelogId, 
earliestSnapshotId);
+        skippingSnapshots.add(snapshotManager.snapshot(earliestSnapshotId));
+
+        Set<String> manifestSkippSet = 
changelogDeletion.manifestSkippingSet(skippingSnapshots);
+        for (long id = earliestChangelogId; id <= latestChangelogId; id++) {
+
+            LOG.info("Ready to delete changelog files from changelog #" + id);
+
+            Changelog changelog;
+            try {
+                changelog = changelogManager.tryGetChangelog(id);
+            } catch (FileNotFoundException e) {
+                LOG.info("fail to get changelog #" + id);
+                continue;
+            }
+            Predicate<ExpireFileEntry> skipper;
+            try {
+                skipper = 
changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
+            } catch (Exception e) {
+                LOG.info(
+                        String.format(
+                                "Skip cleaning data files of changelog '%s' 
due to failed to build skipping set.",
+                                id),
+                        e);
+                continue;
+            }
+
+            changelogDeletion.cleanUnusedDataFiles(changelog, skipper);
+            changelogDeletion.cleanUnusedManifests(changelog, 
manifestSkippSet);
+            
changelogManager.fileIO().deleteQuietly(changelogManager.longLivedChangelogPath(id));
+        }
+
+        // try delete changelog hint file
+        try {
+            changelogManager.deleteEarliestHint();
+            changelogManager.deleteLatestHint();
+        } catch (Exception e) {
+            LOG.error("delete changelog hint file error.", e);
+        }
+
+        changelogDeletion.cleanEmptyDirectories();
+    }
+
     private void writeEarliestHintFile(long earliest) {
         try {
             changelogManager.commitLongLivedChangelogEarliestHint(earliest);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ChangelogManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ChangelogManager.java
index 230ef3231d..7d60649443 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ChangelogManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChangelogManager.java
@@ -170,6 +170,14 @@ public class ChangelogManager implements Serializable {
         return changelogs;
     }
 
+    public void deleteLatestHint() throws IOException {
+        HintFileUtils.deleteLatestHint(fileIO, changelogDirectory());
+    }
+
+    public void deleteEarliestHint() throws IOException {
+        HintFileUtils.deleteEarliestHint(fileIO, changelogDirectory());
+    }
+
     private static void collectSnapshots(Consumer<Path> pathConsumer, 
List<Path> paths)
             throws IOException {
         ExecutorService executor =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java
index fb1a8ec308..7a8eaf6f6a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java
@@ -103,6 +103,11 @@ public class HintFileUtils {
         fileIO.delete(hintFile, false);
     }
 
+    public static void deleteEarliestHint(FileIO fileIO, Path dir) throws 
IOException {
+        Path hintFile = new Path(dir, EARLIEST);
+        fileIO.delete(hintFile, false);
+    }
+
     public static void commitHint(FileIO fileIO, long id, String fileName, 
Path dir)
             throws IOException {
         Path hintFile = new Path(dir, fileName);
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedure.java
new file mode 100644
index 0000000000..f68ac44a5e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedure.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.options.ExpireConfig;
+import org.apache.paimon.table.ExpireChangelogImpl;
+import org.apache.paimon.utils.DateTimeUtils;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.time.Duration;
+import java.util.TimeZone;
+
+/** A procedure to expire changelogs. */
+public class ExpireChangelogsProcedure extends ProcedureBase {
+
+    public String identifier() {
+        return "expire_changelogs";
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            int retainMax,
+            int retainMin,
+            String olderThan)
+            throws Catalog.TableNotExistException {
+        return call(procedureContext, tableId, retainMax, retainMin, 
olderThan, Integer.MAX_VALUE);
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            int retainMax,
+            int retainMin,
+            String olderThan,
+            int maxDeletes)
+            throws Catalog.TableNotExistException {
+        ExpireChangelogImpl expireChangelogs =
+                (ExpireChangelogImpl) table(tableId).newExpireChangelog();
+        ExpireConfig.Builder builder = ExpireConfig.builder();
+
+        builder.changelogRetainMax(retainMax);
+
+        builder.changelogRetainMin(retainMin);
+
+        builder.changelogTimeRetain(
+                Duration.ofMillis(
+                        System.currentTimeMillis()
+                                - DateTimeUtils.parseTimestampData(
+                                                olderThan, 3, 
TimeZone.getDefault())
+                                        .getMillisecond()));
+
+        builder.changelogMaxDeletes(maxDeletes);
+
+        return new String[] {expireChangelogs.config(builder.build()).expire() 
+ ""};
+    }
+
+    public String[] call(ProcedureContext procedureContext, String tableId, 
boolean deleteAll)
+            throws Catalog.TableNotExistException {
+        ExpireChangelogImpl expireChangelogs =
+                (ExpireChangelogImpl) table(tableId).newExpireChangelog();
+
+        if (deleteAll) {
+            expireChangelogs.expireAll();
+            return new String[] {"Delete all separated changelogs success."};
+        }
+
+        return new String[] {"deleteAll is false, do nothing."};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireChangelogsAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireChangelogsAction.java
new file mode 100644
index 0000000000..9625b24202
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireChangelogsAction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.procedure.ExpireChangelogsProcedure;
+
+import org.apache.flink.table.procedure.DefaultProcedureContext;
+
+import java.util.Map;
+
+/** Expire changelogs action for Flink. */
+public class ExpireChangelogsAction extends ActionBase {
+    private final String database;
+    private final String table;
+    private final Integer retainMax;
+    private final Integer retainMin;
+    private final String olderThan;
+    private final Integer maxDeletes;
+    private final Boolean deleteAll;
+
+    public ExpireChangelogsAction(
+            String database,
+            String table,
+            Map<String, String> catalogConfig,
+            Integer retainMax,
+            Integer retainMin,
+            String olderThan,
+            Integer maxDeletes,
+            Boolean deleteAll) {
+        super(catalogConfig);
+        this.database = database;
+        this.table = table;
+        this.retainMax = retainMax;
+        this.retainMin = retainMin;
+        this.olderThan = olderThan;
+        this.maxDeletes = maxDeletes;
+        this.deleteAll = deleteAll;
+    }
+
+    public void run() throws Exception {
+        ExpireChangelogsProcedure expireSnapshotsProcedure = new 
ExpireChangelogsProcedure();
+        expireSnapshotsProcedure.withCatalog(catalog);
+        expireSnapshotsProcedure.call(
+                new DefaultProcedureContext(env),
+                database + "." + table,
+                retainMax,
+                retainMin,
+                olderThan,
+                maxDeletes,
+                deleteAll);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireChangelogsActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireChangelogsActionFactory.java
new file mode 100644
index 0000000000..04f85d6b74
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireChangelogsActionFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Optional;
+
+/** Factory to create {@link ExpireChangelogsAction}. */
+public class ExpireChangelogsActionFactory implements ActionFactory {
+    public static final String IDENTIFIER = "expire_changelogs";
+
+    private static final String RETAIN_MAX = "retain_max";
+    private static final String RETAIN_MIN = "retain_min";
+    private static final String OLDER_THAN = "older_than";
+    private static final String MAX_DELETES = "max_deletes";
+    private static final String DELETE_ALL = "delete_all";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        Integer retainMax =
+                params.has(RETAIN_MAX) ? 
Integer.parseInt(params.get(RETAIN_MAX)) : null;
+        Integer retainMin =
+                params.has(RETAIN_MIN) ? 
Integer.parseInt(params.get(RETAIN_MIN)) : null;
+        String olderThan = params.has(OLDER_THAN) ? params.get(OLDER_THAN) : 
null;
+        Integer maxDeletes =
+                params.has(MAX_DELETES) ? 
Integer.parseInt(params.get(MAX_DELETES)) : null;
+
+        Boolean deleteAll = params.has(DELETE_ALL) && 
Boolean.parseBoolean(params.get(DELETE_ALL));
+
+        ExpireChangelogsAction action =
+                new ExpireChangelogsAction(
+                        params.getRequired(DATABASE),
+                        params.getRequired(TABLE),
+                        catalogConfigMap(params),
+                        retainMax,
+                        retainMin,
+                        olderThan,
+                        maxDeletes,
+                        deleteAll);
+
+        return Optional.of(action);
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println("Action \"expire_changelogs\" expire the target 
changelogs.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  expire_changelogs \\\n"
+                        + "--warehouse <warehouse_path> \\\n"
+                        + "--database <database> \\\n"
+                        + "--table <table> \\\n"
+                        + "--retain_max <max> \\\n"
+                        + "--retain_min <min> \\\n"
+                        + "--older_than <older_than> \\\n"
+                        + "--max_delete <max_delete> \\\n"
+                        + "--delete_all <delete_all>");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedure.java
new file mode 100644
index 0000000000..38491567d5
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedure.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.options.ExpireConfig;
+import org.apache.paimon.table.ExpireChangelogImpl;
+import org.apache.paimon.utils.DateTimeUtils;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.time.Duration;
+import java.util.TimeZone;
+
+/** A procedure to expire changelogs. */
+public class ExpireChangelogsProcedure extends ProcedureBase {
+
+    @Override
+    public String identifier() {
+        return "expire_changelogs";
+    }
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+                @ArgumentHint(
+                        name = "retain_max",
+                        type = @DataTypeHint("INTEGER"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "retain_min",
+                        type = @DataTypeHint("INTEGER"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "older_than",
+                        type = @DataTypeHint(value = "STRING"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "max_deletes",
+                        type = @DataTypeHint("INTEGER"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "delete_all",
+                        type = @DataTypeHint("BOOLEAN"),
+                        isOptional = true)
+            })
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            Integer retainMax,
+            Integer retainMin,
+            String olderThanStr,
+            Integer maxDeletes,
+            Boolean deleteAll)
+            throws Catalog.TableNotExistException {
+        ExpireChangelogImpl expireChangelogs =
+                (ExpireChangelogImpl) table(tableId).newExpireChangelog();
+        ExpireConfig.Builder builder = ExpireConfig.builder();
+        if (retainMax != null) {
+            builder.changelogRetainMax(retainMax);
+        }
+        if (retainMin != null) {
+            builder.changelogRetainMin(retainMin);
+        }
+        if (olderThanStr != null) {
+            builder.changelogTimeRetain(
+                    Duration.ofMillis(
+                            System.currentTimeMillis()
+                                    - DateTimeUtils.parseTimestampData(
+                                                    olderThanStr, 3, 
TimeZone.getDefault())
+                                            .getMillisecond()));
+        }
+        if (maxDeletes != null) {
+            builder.changelogMaxDeletes(maxDeletes);
+        }
+        if (deleteAll != null && deleteAll) {
+            expireChangelogs.expireAll();
+            return new String[] {"Delete all separated changelogs success."};
+        }
+        return new String[] {expireChangelogs.config(builder.build()).expire() 
+ ""};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index f099e699d0..184706854c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -42,6 +42,7 @@ org.apache.paimon.flink.action.RenameTagActionFactory
 org.apache.paimon.flink.action.RepairActionFactory
 org.apache.paimon.flink.action.RewriteFileIndexActionFactory
 org.apache.paimon.flink.action.ExpireSnapshotsActionFactory
+org.apache.paimon.flink.action.ExpireChangelogsActionFactory
 org.apache.paimon.flink.action.RemoveUnexistingFilesActionFactory
 org.apache.paimon.flink.action.ClearConsumerActionFactory
 org.apache.paimon.flink.action.RescaleActionFactory
@@ -69,6 +70,7 @@ org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
 org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
 org.apache.paimon.flink.procedure.QueryServiceProcedure
 org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
+org.apache.paimon.flink.procedure.ExpireChangelogsProcedure
 org.apache.paimon.flink.procedure.ExpirePartitionsProcedure
 org.apache.paimon.flink.procedure.PurgeFilesProcedure
 org.apache.paimon.flink.procedure.privilege.InitFileBasedPrivilegeProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedureITCase.java
new file mode 100644
index 0000000000..489d752454
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedureITCase.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.ExpireChangelogsAction;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.BlockingIterator;
+import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link ExpireChangelogsProcedure}. */
+public class ExpireChangelogsProcedureITCase extends CatalogITCaseBase {
+
+    @Test
+    public void testExpireChangelogsProcedure() throws Exception {
+        sql(
+                "CREATE TABLE word_count ( word STRING PRIMARY KEY NOT 
ENFORCED, cnt INT)"
+                        + " WITH ( 'num-sorted-run.compaction-trigger' = 
'9999', 'changelog-producer' = 'input', "
+                        + "'snapshot.num-retained.min' = '4', 
'snapshot.num-retained.max' = '4', "
+                        + "'changelog.num-retained.min' = '10', 
'changelog.num-retained.max' = '10' )");
+        FileStoreTable table = paimonTable("word_count");
+        SnapshotManager snapshotManager = table.snapshotManager();
+        ChangelogManager changelogManager = table.changelogManager();
+
+        // 
------------------------------------------------------------------------
+        //  Basic Function Tests
+        // 
------------------------------------------------------------------------
+
+        // initially prepare 10 snapshots
+        for (int i = 0; i < 10; ++i) {
+            sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+        }
+        // expected snapshots (7, 8, 9, 10)
+        checkSnapshots(snapshotManager, 7, 10);
+        // expected changelogs (1, 2, 3, 4, 5, 6)
+        checkChangelogs(changelogManager, 1, 6);
+
+        // retain_max => 8, expected changelogs (3, 4, 5, 6)
+        sql("CALL sys.expire_changelogs(`table` => 'default.word_count', 
retain_max => 8)");
+        checkChangelogs(changelogManager, 3, 6);
+
+        // older_than => timestamp of snapshot 10, max_deletes => 1, expected 
snapshots (3, 4, 5, 6)
+        Timestamp ts7 = new 
Timestamp(snapshotManager.latestSnapshot().timeMillis());
+        sql(
+                "CALL sys.expire_changelogs(`table` => 'default.word_count', 
older_than => '"
+                        + ts7
+                        + "', max_deletes => 1)");
+        checkChangelogs(changelogManager, 4, 6);
+
+        // older_than => timestamp of snapshot 7, retain_min => 6, expected 
snapshots (5, 6)
+        sql(
+                "CALL sys.expire_changelogs(`table` => 'default.word_count', 
older_than => '"
+                        + ts7
+                        + "', retain_min => 6)");
+        checkChangelogs(changelogManager, 5, 6);
+
+        // older_than => timestamp of snapshot 7, expected snapshots (6)
+        sql(
+                "CALL sys.expire_changelogs(`table`  => 'default.word_count', 
older_than => '"
+                        + ts7
+                        + "')");
+        checkChangelogs(changelogManager, 6, 6);
+
+        // prepare 2 more snapshots
+        for (int i = 10; i < 12; ++i) {
+            sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+        }
+        // expected snapshots (9, 10, 11, 12)
+        checkSnapshots(snapshotManager, 9, 12);
+        // expected changelogs (6, 7, 8)
+        checkChangelogs(changelogManager, 6, 8);
+
+        // retain_max => 4, same as snapshot.num-retained.max
+        // expected changelogs (8), retain 1 latest changelog at least
+        sql("CALL sys.expire_changelogs(`table` => 'default.word_count', 
retain_max => 4)");
+        checkChangelogs(changelogManager, 8, 8);
+        checkBatchRead(12);
+
+        // 
------------------------------------------------------------------------
+        //  Expire All Changelogs Tests
+        // 
------------------------------------------------------------------------
+
+        // delete_all => true, delete all separated changelogs
+        sql("CALL sys.expire_changelogs(`table` => 'default.word_count', 
delete_all => true)");
+        // expected all changelogs and hints deleted
+        checkAllDeleted(changelogManager);
+        checkStreamRead(9, 4);
+        checkBatchRead(12);
+
+        // prepare 8 more snapshots
+        for (int i = 12; i < 15; ++i) {
+            sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+        }
+        // expected snapshots [12, 15]
+        checkSnapshots(snapshotManager, 12, 15);
+        // changelog EARLIEST hint files have not been created
+        assertThat(
+                        changelogManager
+                                .fileIO()
+                                .exists(
+                                        new Path(
+                                                
changelogManager.changelogDirectory(), "EARLIEST")))
+                .isFalse();
+        // expected changelogs [9, 11]
+        checkChangelogs(changelogManager, 9, 11);
+
+        // test can expire changelogs even if EARLIEST hint not exists
+        sql("CALL sys.expire_changelogs(`table` => 'default.word_count', 
delete_all => true)");
+        checkAllDeleted(changelogManager);
+
+        // prepare 10 more snapshots
+        for (int i = 15; i < 25; ++i) {
+            sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+        }
+        checkSnapshots(snapshotManager, 22, 25);
+        checkChangelogs(changelogManager, 16, 21);
+
+        // make changelog and snapshot lifecycle not decoupled
+        sql("ALTER TABLE word_count SET ( 'changelog.num-retained.min' = 
'4')");
+        sql("ALTER TABLE word_count SET ( 'changelog.num-retained.max' = 
'4')");
+        // prepare 5 more snapshots
+        for (int i = 25; i < 30; ++i) {
+            sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+        }
+        checkSnapshots(snapshotManager, 27, 30);
+        // expected: the previous separated changelogs will not be expired in 
committer
+        checkChangelogs(changelogManager, 16, 21);
+        checkBatchRead(30);
+
+        // make changelog and snapshot lifecycle decoupled again
+        sql("ALTER TABLE word_count SET ( 'changelog.num-retained.max' = 
'50')");
+        sql("ALTER TABLE word_count SET ( 'changelog.num-retained.min' = 
'50')");
+        // prepare 5 more snapshots
+        for (int i = 30; i < 35; ++i) {
+            sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+        }
+        checkSnapshots(snapshotManager, 32, 35);
+        // actual changelogs: [16, 21] + [27, 31]
+        checkChangelogs(changelogManager, 16, 31);
+        
assertThat(changelogManager.safelyGetAllChangelogs().size()).isEqualTo(11);
+        // make changelog and snapshot lifecycle not decoupled again
+        sql("ALTER TABLE word_count SET ( 'changelog.num-retained.min' = 
'4')");
+        sql("ALTER TABLE word_count SET ( 'changelog.num-retained.max' = 
'4')");
+        // prepare 5 more snapshots
+        for (int i = 35; i < 40; ++i) {
+            sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+        }
+        // expected snapshot: [37, 38, 39, 40]
+        // expected changelogs: [16, 17, 18, 19, 20, 21, 27, 28, 29, 30, 31]
+        checkSnapshots(snapshotManager, 37, 40);
+        checkChangelogs(changelogManager, 16, 31);
+
+        sql("CALL sys.expire_changelogs(`table` => 'default.word_count', 
delete_all => true)");
+        checkAllDeleted(changelogManager);
+        checkStreamRead(37, 4);
+        checkBatchRead(40);
+    }
+
+    @Test
+    public void testExpireChangelogsAction() throws Exception {
+        sql(
+                "CREATE TABLE word_count ( word STRING PRIMARY KEY NOT 
ENFORCED, cnt INT)"
+                        + " WITH ( 'num-sorted-run.compaction-trigger' = 
'9999', 'changelog-producer' = 'input', "
+                        + "'snapshot.num-retained.min' = '4', 
'snapshot.num-retained.max' = '4', "
+                        + "'changelog.num-retained.min' = '10', 
'changelog.num-retained.max' = '10' )");
+        FileStoreTable table = paimonTable("word_count");
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder().streamingMode().build();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        ChangelogManager changelogManager = table.changelogManager();
+
+        // initially prepare 10 snapshots
+        for (int i = 0; i < 10; ++i) {
+            sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+        }
+        // expected snapshots (7, 8, 9, 10)
+        checkSnapshots(snapshotManager, 7, 10);
+        // expected changelogs (1, 2, 3, 4, 5, 6)
+        checkChangelogs(changelogManager, 1, 6);
+
+        Timestamp ts5 = new 
Timestamp(changelogManager.changelog(5).timeMillis());
+
+        createAction(
+                        ExpireChangelogsAction.class,
+                        "expire_changelogs",
+                        "--warehouse",
+                        path,
+                        "--database",
+                        "default",
+                        "--table",
+                        "word_count",
+                        "--retain_max",
+                        "8",
+                        "--retain_min",
+                        "4",
+                        "--older_than",
+                        ts5.toString(),
+                        "--max_deletes",
+                        "3")
+                .withStreamExecutionEnvironment(env)
+                .run();
+        checkChangelogs(changelogManager, 4, 6);
+
+        // expire all
+        createAction(
+                        ExpireChangelogsAction.class,
+                        "expire_changelogs",
+                        "--warehouse",
+                        path,
+                        "--database",
+                        "default",
+                        "--table",
+                        "word_count",
+                        "--delete_all",
+                        "true")
+                .withStreamExecutionEnvironment(env)
+                .run();
+        checkAllDeleted(changelogManager);
+    }
+
+    private void checkSnapshots(SnapshotManager sm, int earliest, int latest) 
throws IOException {
+        assertThat(sm.snapshotCount()).isEqualTo(latest - earliest + 1);
+        assertThat(sm.earliestSnapshotId()).isEqualTo(earliest);
+        assertThat(sm.latestSnapshotId()).isEqualTo(latest);
+    }
+
+    private void checkChangelogs(ChangelogManager cm, int earliest, int 
latest) {
+        assertThat(cm.earliestLongLivedChangelogId()).isEqualTo(earliest);
+        assertThat(cm.latestLongLivedChangelogId()).isEqualTo(latest);
+    }
+
+    private void checkAllDeleted(ChangelogManager cm) throws IOException {
+        
assertThat(cm.fileIO().listStatus(cm.changelogDirectory()).length).isEqualTo(0);
+    }
+
+    private void checkBatchRead(int snapshotId) {
+        List<Row> rows =
+                sql(
+                        String.format(
+                                "select * from word_count /*+ 
OPTIONS('scan.snapshot-id' = '%s') */",
+                                snapshotId));
+        assertThat(rows.size()).isEqualTo(snapshotId);
+    }
+
+    private void checkStreamRead(int snapshotId, int expect) throws Exception {
+        BlockingIterator<Row, Row> iter =
+                streamSqlBlockIter(
+                        String.format(
+                                "select * from word_count /*+ 
OPTIONS('scan.snapshot-id' = '%s') */",
+                                snapshotId));
+        List<Row> rows = iter.collect(expect, 60, TimeUnit.SECONDS);
+        List<Row> expectedRows = new ArrayList<>();
+        for (int i = snapshotId - 1; i < snapshotId - 1 + expect; i++) {
+            expectedRows.add(Row.of(String.valueOf(i), i));
+        }
+        assertThat(rows).hasSameElementsAs(expectedRows);
+    }
+
+    private <T extends ActionBase> T createAction(Class<T> clazz, String... 
args) {
+        return ActionFactory.createAction(args)
+                .filter(clazz::isInstance)
+                .map(clazz::cast)
+                .orElseThrow(() -> new RuntimeException("Failed to create 
action"));
+    }
+}


Reply via email to