This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 59d38e3f86 [core][flink] introduce changelog expire procedure and
action (#5319)
59d38e3f86 is described below
commit 59d38e3f86aaa49d73854b64ded29a54af2a10ee
Author: LsomeYeah <[email protected]>
AuthorDate: Fri Mar 21 13:08:56 2025 +0800
[core][flink] introduce changelog expire procedure and action (#5319)
---
docs/content/flink/procedures.md | 39 +++
.../apache/paimon/table/ExpireChangelogImpl.java | 79 ++++++
.../org/apache/paimon/utils/ChangelogManager.java | 8 +
.../org/apache/paimon/utils/HintFileUtils.java | 5 +
.../flink/procedure/ExpireChangelogsProcedure.java | 88 ++++++
.../flink/action/ExpireChangelogsAction.java | 68 +++++
.../action/ExpireChangelogsActionFactory.java | 81 ++++++
.../flink/procedure/ExpireChangelogsProcedure.java | 101 +++++++
.../services/org.apache.paimon.factories.Factory | 2 +
.../procedure/ExpireChangelogsProcedureITCase.java | 297 +++++++++++++++++++++
10 files changed, 768 insertions(+)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index a5a24def4e..068eb439f9 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -601,6 +601,45 @@ All available procedures are listed below.
CALL sys.expire_snapshots(`table` => 'default.T', older_than =>
'2024-01-01 12:00:00', max_deletes => 10)<br/><br/>
</td>
</tr>
+ <tr>
+ <td>expire_changelogs</td>
+ <td>
+ -- Use named argument<br/>
+ CALL [catalog.]sys.expire_changelogs(<br/>
+ `table` => 'identifier', <br/>
+ retain_max => 'retain_max', <br/>
+ retain_min => 'retain_min', <br/>
+ older_than => 'older_than', <br/>
+ max_deletes => 'max_deletes') <br/>
+ delete_all => 'delete_all') <br/><br/>
+ -- Use indexed argument<br/>
+ -- for Flink 1.18<br/>
+ CALL [catalog.]sys.expire_changelogs(table, retain_max, retain_min,
older_than, max_deletes)<br/><br/>
+ CALL [catalog.]sys.expire_changelogs(table, delete_all)<br/><br/>
+ -- for Flink 1.19 and later<br/>
+ CALL [catalog.]sys.expire_changelogs(table, retain_max, retain_min,
older_than, max_deletes, delete_all)<br/><br/>
+ </td>
+ <td>
+ To expire changelogs. Argument:
+ <li>table: the target table identifier. Cannot be empty.</li>
+ <li>retain_max: the maximum number of completed changelogs to
retain.</li>
+ <li>retain_min: the minimum number of completed changelogs to
retain.</li>
+ <li>order_than: timestamp before which changelogs will be
removed.</li>
+ <li>max_deletes: the maximum number of changelogs that can be
deleted at once.</li>
+ <li>delete_all: whether to delete all separated changelogs.</li>
+ </td>
+ <td>
+ -- for Flink 1.18<br/><br/>
+ CALL sys.expire_changelogs('default.T', 4, 2, '2024-01-01 12:00:00',
2)<br/><br/>
+ CALL sys.expire_changelogs('default.T', true)<br/><br/>
+ -- for Flink 1.19 and later<br/><br/>
+ CALL sys.expire_changelogs(`table` => 'default.T', retain_max =>
2)<br/><br/>
+ CALL sys.expire_changelogs(`table` => 'default.T', older_than =>
'2024-01-01 12:00:00')<br/><br/>
+ CALL sys.expire_changelogs(`table` => 'default.T', older_than =>
'2024-01-01 12:00:00', retain_min => 10)<br/><br/>
+ CALL sys.expire_changelogs(`table` => 'default.T', older_than =>
'2024-01-01 12:00:00', max_deletes => 10)<br/><br/>
+ CALL sys.expire_changelogs(`table` => 'default.T', delete_all =>
true)<br/><br/>
+ </td>
+ </tr>
<tr>
<td>expire_partitions</td>
<td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
index bf4f8665ad..cf11a2ba14 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
@@ -32,6 +32,7 @@ import org.apache.paimon.utils.TagManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
@@ -173,6 +174,84 @@ public class ExpireChangelogImpl implements
ExpireSnapshots {
return (int) (endExclusiveId - earliestId);
}
+ /** expire all separated changelogs, only used by
ExpireChangelogsProcedure. */
+ public void expireAll() {
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ if (latestSnapshotId == null) {
+ // no snapshot, nothing to expire
+ return;
+ }
+
+ Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+ if (earliestSnapshotId == null) {
+ return;
+ }
+
+ Long latestChangelogId = changelogManager.latestLongLivedChangelogId();
+ if (latestChangelogId == null) {
+ return;
+ }
+ Long earliestChangelogId =
changelogManager.earliestLongLivedChangelogId();
+ if (earliestChangelogId == null) {
+ return;
+ }
+
+ LOG.info(
+ "Read earliest and latest changelog for expire all.
earliestChangelogId is {}, latestChangelogId is {}",
+ earliestChangelogId,
+ latestChangelogId);
+
+ List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
+
+ // files used by the earliest snapshot id should be skipped
+ Preconditions.checkArgument(
+ latestChangelogId < earliestSnapshotId,
+ "latest changelog id should be less than earliest snapshot id."
+ + "please check your table!");
+ List<Snapshot> skippingSnapshots =
+ findSkippingTags(taggedSnapshots, earliestChangelogId,
earliestSnapshotId);
+ skippingSnapshots.add(snapshotManager.snapshot(earliestSnapshotId));
+
+ Set<String> manifestSkippSet =
changelogDeletion.manifestSkippingSet(skippingSnapshots);
+ for (long id = earliestChangelogId; id <= latestChangelogId; id++) {
+
+ LOG.info("Ready to delete changelog files from changelog #" + id);
+
+ Changelog changelog;
+ try {
+ changelog = changelogManager.tryGetChangelog(id);
+ } catch (FileNotFoundException e) {
+ LOG.info("fail to get changelog #" + id);
+ continue;
+ }
+ Predicate<ExpireFileEntry> skipper;
+ try {
+ skipper =
changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
+ } catch (Exception e) {
+ LOG.info(
+ String.format(
+ "Skip cleaning data files of changelog '%s'
due to failed to build skipping set.",
+ id),
+ e);
+ continue;
+ }
+
+ changelogDeletion.cleanUnusedDataFiles(changelog, skipper);
+ changelogDeletion.cleanUnusedManifests(changelog,
manifestSkippSet);
+
changelogManager.fileIO().deleteQuietly(changelogManager.longLivedChangelogPath(id));
+ }
+
+ // try delete changelog hint file
+ try {
+ changelogManager.deleteEarliestHint();
+ changelogManager.deleteLatestHint();
+ } catch (Exception e) {
+ LOG.error("delete changelog hint file error.", e);
+ }
+
+ changelogDeletion.cleanEmptyDirectories();
+ }
+
private void writeEarliestHintFile(long earliest) {
try {
changelogManager.commitLongLivedChangelogEarliestHint(earliest);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ChangelogManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ChangelogManager.java
index 230ef3231d..7d60649443 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ChangelogManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChangelogManager.java
@@ -170,6 +170,14 @@ public class ChangelogManager implements Serializable {
return changelogs;
}
+ public void deleteLatestHint() throws IOException {
+ HintFileUtils.deleteLatestHint(fileIO, changelogDirectory());
+ }
+
+ public void deleteEarliestHint() throws IOException {
+ HintFileUtils.deleteEarliestHint(fileIO, changelogDirectory());
+ }
+
private static void collectSnapshots(Consumer<Path> pathConsumer,
List<Path> paths)
throws IOException {
ExecutorService executor =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java
index fb1a8ec308..7a8eaf6f6a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java
@@ -103,6 +103,11 @@ public class HintFileUtils {
fileIO.delete(hintFile, false);
}
+ public static void deleteEarliestHint(FileIO fileIO, Path dir) throws
IOException {
+ Path hintFile = new Path(dir, EARLIEST);
+ fileIO.delete(hintFile, false);
+ }
+
public static void commitHint(FileIO fileIO, long id, String fileName,
Path dir)
throws IOException {
Path hintFile = new Path(dir, fileName);
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedure.java
new file mode 100644
index 0000000000..f68ac44a5e
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedure.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.options.ExpireConfig;
+import org.apache.paimon.table.ExpireChangelogImpl;
+import org.apache.paimon.utils.DateTimeUtils;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.time.Duration;
+import java.util.TimeZone;
+
+/** A procedure to expire changelogs. */
+public class ExpireChangelogsProcedure extends ProcedureBase {
+
+ public String identifier() {
+ return "expire_changelogs";
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ int retainMax,
+ int retainMin,
+ String olderThan)
+ throws Catalog.TableNotExistException {
+ return call(procedureContext, tableId, retainMax, retainMin,
olderThan, Integer.MAX_VALUE);
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ int retainMax,
+ int retainMin,
+ String olderThan,
+ int maxDeletes)
+ throws Catalog.TableNotExistException {
+ ExpireChangelogImpl expireChangelogs =
+ (ExpireChangelogImpl) table(tableId).newExpireChangelog();
+ ExpireConfig.Builder builder = ExpireConfig.builder();
+
+ builder.changelogRetainMax(retainMax);
+
+ builder.changelogRetainMin(retainMin);
+
+ builder.changelogTimeRetain(
+ Duration.ofMillis(
+ System.currentTimeMillis()
+ - DateTimeUtils.parseTimestampData(
+ olderThan, 3,
TimeZone.getDefault())
+ .getMillisecond()));
+
+ builder.changelogMaxDeletes(maxDeletes);
+
+ return new String[] {expireChangelogs.config(builder.build()).expire()
+ ""};
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId,
boolean deleteAll)
+ throws Catalog.TableNotExistException {
+ ExpireChangelogImpl expireChangelogs =
+ (ExpireChangelogImpl) table(tableId).newExpireChangelog();
+
+ if (deleteAll) {
+ expireChangelogs.expireAll();
+ return new String[] {"Delete all separated changelogs success."};
+ }
+
+ return new String[] {"deleteAll is false, do nothing."};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireChangelogsAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireChangelogsAction.java
new file mode 100644
index 0000000000..9625b24202
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireChangelogsAction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.procedure.ExpireChangelogsProcedure;
+
+import org.apache.flink.table.procedure.DefaultProcedureContext;
+
+import java.util.Map;
+
+/** Expire changelogs action for Flink. */
+public class ExpireChangelogsAction extends ActionBase {
+ private final String database;
+ private final String table;
+ private final Integer retainMax;
+ private final Integer retainMin;
+ private final String olderThan;
+ private final Integer maxDeletes;
+ private final Boolean deleteAll;
+
+ public ExpireChangelogsAction(
+ String database,
+ String table,
+ Map<String, String> catalogConfig,
+ Integer retainMax,
+ Integer retainMin,
+ String olderThan,
+ Integer maxDeletes,
+ Boolean deleteAll) {
+ super(catalogConfig);
+ this.database = database;
+ this.table = table;
+ this.retainMax = retainMax;
+ this.retainMin = retainMin;
+ this.olderThan = olderThan;
+ this.maxDeletes = maxDeletes;
+ this.deleteAll = deleteAll;
+ }
+
+ public void run() throws Exception {
+ ExpireChangelogsProcedure expireSnapshotsProcedure = new
ExpireChangelogsProcedure();
+ expireSnapshotsProcedure.withCatalog(catalog);
+ expireSnapshotsProcedure.call(
+ new DefaultProcedureContext(env),
+ database + "." + table,
+ retainMax,
+ retainMin,
+ olderThan,
+ maxDeletes,
+ deleteAll);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireChangelogsActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireChangelogsActionFactory.java
new file mode 100644
index 0000000000..04f85d6b74
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireChangelogsActionFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Optional;
+
+/** Factory to create {@link ExpireChangelogsAction}. */
+public class ExpireChangelogsActionFactory implements ActionFactory {
+ public static final String IDENTIFIER = "expire_changelogs";
+
+ private static final String RETAIN_MAX = "retain_max";
+ private static final String RETAIN_MIN = "retain_min";
+ private static final String OLDER_THAN = "older_than";
+ private static final String MAX_DELETES = "max_deletes";
+ private static final String DELETE_ALL = "delete_all";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ Integer retainMax =
+ params.has(RETAIN_MAX) ?
Integer.parseInt(params.get(RETAIN_MAX)) : null;
+ Integer retainMin =
+ params.has(RETAIN_MIN) ?
Integer.parseInt(params.get(RETAIN_MIN)) : null;
+ String olderThan = params.has(OLDER_THAN) ? params.get(OLDER_THAN) :
null;
+ Integer maxDeletes =
+ params.has(MAX_DELETES) ?
Integer.parseInt(params.get(MAX_DELETES)) : null;
+
+ Boolean deleteAll = params.has(DELETE_ALL) &&
Boolean.parseBoolean(params.get(DELETE_ALL));
+
+ ExpireChangelogsAction action =
+ new ExpireChangelogsAction(
+ params.getRequired(DATABASE),
+ params.getRequired(TABLE),
+ catalogConfigMap(params),
+ retainMax,
+ retainMin,
+ olderThan,
+ maxDeletes,
+ deleteAll);
+
+ return Optional.of(action);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println("Action \"expire_changelogs\" expire the target
changelogs.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " expire_changelogs \\\n"
+ + "--warehouse <warehouse_path> \\\n"
+ + "--database <database> \\\n"
+ + "--table <table> \\\n"
+ + "--retain_max <max> \\\n"
+ + "--retain_min <min> \\\n"
+ + "--older_than <older_than> \\\n"
+ + "--max_delete <max_delete> \\\n"
+ + "--delete_all <delete_all>");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedure.java
new file mode 100644
index 0000000000..38491567d5
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedure.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.options.ExpireConfig;
+import org.apache.paimon.table.ExpireChangelogImpl;
+import org.apache.paimon.utils.DateTimeUtils;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.time.Duration;
+import java.util.TimeZone;
+
+/** A procedure to expire changelogs. */
+public class ExpireChangelogsProcedure extends ProcedureBase {
+
+ @Override
+ public String identifier() {
+ return "expire_changelogs";
+ }
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(
+ name = "retain_max",
+ type = @DataTypeHint("INTEGER"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "retain_min",
+ type = @DataTypeHint("INTEGER"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "older_than",
+ type = @DataTypeHint(value = "STRING"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "max_deletes",
+ type = @DataTypeHint("INTEGER"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "delete_all",
+ type = @DataTypeHint("BOOLEAN"),
+ isOptional = true)
+ })
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ Integer retainMax,
+ Integer retainMin,
+ String olderThanStr,
+ Integer maxDeletes,
+ Boolean deleteAll)
+ throws Catalog.TableNotExistException {
+ ExpireChangelogImpl expireChangelogs =
+ (ExpireChangelogImpl) table(tableId).newExpireChangelog();
+ ExpireConfig.Builder builder = ExpireConfig.builder();
+ if (retainMax != null) {
+ builder.changelogRetainMax(retainMax);
+ }
+ if (retainMin != null) {
+ builder.changelogRetainMin(retainMin);
+ }
+ if (olderThanStr != null) {
+ builder.changelogTimeRetain(
+ Duration.ofMillis(
+ System.currentTimeMillis()
+ - DateTimeUtils.parseTimestampData(
+ olderThanStr, 3,
TimeZone.getDefault())
+ .getMillisecond()));
+ }
+ if (maxDeletes != null) {
+ builder.changelogMaxDeletes(maxDeletes);
+ }
+ if (deleteAll != null && deleteAll) {
+ expireChangelogs.expireAll();
+ return new String[] {"Delete all separated changelogs success."};
+ }
+ return new String[] {expireChangelogs.config(builder.build()).expire()
+ ""};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index f099e699d0..184706854c 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -42,6 +42,7 @@ org.apache.paimon.flink.action.RenameTagActionFactory
org.apache.paimon.flink.action.RepairActionFactory
org.apache.paimon.flink.action.RewriteFileIndexActionFactory
org.apache.paimon.flink.action.ExpireSnapshotsActionFactory
+org.apache.paimon.flink.action.ExpireChangelogsActionFactory
org.apache.paimon.flink.action.RemoveUnexistingFilesActionFactory
org.apache.paimon.flink.action.ClearConsumerActionFactory
org.apache.paimon.flink.action.RescaleActionFactory
@@ -69,6 +70,7 @@ org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
org.apache.paimon.flink.procedure.QueryServiceProcedure
org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
+org.apache.paimon.flink.procedure.ExpireChangelogsProcedure
org.apache.paimon.flink.procedure.ExpirePartitionsProcedure
org.apache.paimon.flink.procedure.PurgeFilesProcedure
org.apache.paimon.flink.procedure.privilege.InitFileBasedPrivilegeProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedureITCase.java
new file mode 100644
index 0000000000..489d752454
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireChangelogsProcedureITCase.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.ExpireChangelogsAction;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.BlockingIterator;
+import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link ExpireChangelogsProcedure}. */
+public class ExpireChangelogsProcedureITCase extends CatalogITCaseBase {
+
+ @Test
+ public void testExpireChangelogsProcedure() throws Exception {
+ sql(
+ "CREATE TABLE word_count ( word STRING PRIMARY KEY NOT
ENFORCED, cnt INT)"
+ + " WITH ( 'num-sorted-run.compaction-trigger' =
'9999', 'changelog-producer' = 'input', "
+ + "'snapshot.num-retained.min' = '4',
'snapshot.num-retained.max' = '4', "
+ + "'changelog.num-retained.min' = '10',
'changelog.num-retained.max' = '10' )");
+ FileStoreTable table = paimonTable("word_count");
+ SnapshotManager snapshotManager = table.snapshotManager();
+ ChangelogManager changelogManager = table.changelogManager();
+
+ //
------------------------------------------------------------------------
+ // Basic Function Tests
+ //
------------------------------------------------------------------------
+
+ // initially prepare 10 snapshots
+ for (int i = 0; i < 10; ++i) {
+ sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+ }
+ // expected snapshots (7, 8, 9, 10)
+ checkSnapshots(snapshotManager, 7, 10);
+ // expected changelogs (1, 2, 3, 4, 5, 6)
+ checkChangelogs(changelogManager, 1, 6);
+
+ // retain_max => 8, expected changelogs (3, 4, 5, 6)
+ sql("CALL sys.expire_changelogs(`table` => 'default.word_count',
retain_max => 8)");
+ checkChangelogs(changelogManager, 3, 6);
+
+ // older_than => timestamp of snapshot 10, max_deletes => 1, expected
snapshots (3, 4, 5, 6)
+ Timestamp ts7 = new
Timestamp(snapshotManager.latestSnapshot().timeMillis());
+ sql(
+ "CALL sys.expire_changelogs(`table` => 'default.word_count',
older_than => '"
+ + ts7
+ + "', max_deletes => 1)");
+ checkChangelogs(changelogManager, 4, 6);
+
+ // older_than => timestamp of snapshot 7, retain_min => 6, expected
snapshots (5, 6)
+ sql(
+ "CALL sys.expire_changelogs(`table` => 'default.word_count',
older_than => '"
+ + ts7
+ + "', retain_min => 6)");
+ checkChangelogs(changelogManager, 5, 6);
+
+ // older_than => timestamp of snapshot 7, expected snapshots (6)
+ sql(
+ "CALL sys.expire_changelogs(`table` => 'default.word_count',
older_than => '"
+ + ts7
+ + "')");
+ checkChangelogs(changelogManager, 6, 6);
+
+ // prepare 2 more snapshots
+ for (int i = 10; i < 12; ++i) {
+ sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+ }
+ // expected snapshots (9, 10, 11, 12)
+ checkSnapshots(snapshotManager, 9, 12);
+ // expected changelogs (6, 7, 8)
+ checkChangelogs(changelogManager, 6, 8);
+
+ // retain_max => 4, same as snapshot.num-retained.max
+ // expected changelogs (8), retain 1 latest changelog at least
+ sql("CALL sys.expire_changelogs(`table` => 'default.word_count',
retain_max => 4)");
+ checkChangelogs(changelogManager, 8, 8);
+ checkBatchRead(12);
+
+ //
------------------------------------------------------------------------
+ // Expire All Changelogs Tests
+ //
------------------------------------------------------------------------
+
+ // delete_all => true, delete all separated changelogs
+ sql("CALL sys.expire_changelogs(`table` => 'default.word_count',
delete_all => true)");
+ // expected all changelogs and hints deleted
+ checkAllDeleted(changelogManager);
+ checkStreamRead(9, 4);
+ checkBatchRead(12);
+
+ // prepare 8 more snapshots
+ for (int i = 12; i < 15; ++i) {
+ sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+ }
+ // expected snapshots [12, 15]
+ checkSnapshots(snapshotManager, 12, 15);
+ // changelog EARLIEST hint files have not been created
+ assertThat(
+ changelogManager
+ .fileIO()
+ .exists(
+ new Path(
+
changelogManager.changelogDirectory(), "EARLIEST")))
+ .isFalse();
+ // expected changelogs [9, 11]
+ checkChangelogs(changelogManager, 9, 11);
+
+ // test can expire changelogs even if EARLIEST hint not exists
+ sql("CALL sys.expire_changelogs(`table` => 'default.word_count',
delete_all => true)");
+ checkAllDeleted(changelogManager);
+
+ // prepare 10 more snapshots
+ for (int i = 15; i < 25; ++i) {
+ sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+ }
+ checkSnapshots(snapshotManager, 22, 25);
+ checkChangelogs(changelogManager, 16, 21);
+
+ // make changelog and snapshot lifecycle not decoupled
+ sql("ALTER TABLE word_count SET ( 'changelog.num-retained.min' =
'4')");
+ sql("ALTER TABLE word_count SET ( 'changelog.num-retained.max' =
'4')");
+ // prepare 5 more snapshots
+ for (int i = 25; i < 30; ++i) {
+ sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+ }
+ checkSnapshots(snapshotManager, 27, 30);
+ // expected: the previous separated changelogs will not be expired in
committer
+ checkChangelogs(changelogManager, 16, 21);
+ checkBatchRead(30);
+
+ // make changelog and snapshot lifecycle decoupled again
+ sql("ALTER TABLE word_count SET ( 'changelog.num-retained.max' =
'50')");
+ sql("ALTER TABLE word_count SET ( 'changelog.num-retained.min' =
'50')");
+ // prepare 5 more snapshots
+ for (int i = 30; i < 35; ++i) {
+ sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+ }
+ checkSnapshots(snapshotManager, 32, 35);
+ // actual changelogs: [16, 21] + [27, 31]
+ checkChangelogs(changelogManager, 16, 31);
+
assertThat(changelogManager.safelyGetAllChangelogs().size()).isEqualTo(11);
+ // make changelog and snapshot lifecycle not decoupled again
+ sql("ALTER TABLE word_count SET ( 'changelog.num-retained.min' =
'4')");
+ sql("ALTER TABLE word_count SET ( 'changelog.num-retained.max' =
'4')");
+ // prepare 5 more snapshots
+ for (int i = 35; i < 40; ++i) {
+ sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+ }
+ // expected snapshot: [37, 38, 39, 40]
+ // expected changelogs: [16, 17, 18, 19, 20, 21, 27, 28, 29, 30, 31]
+ checkSnapshots(snapshotManager, 37, 40);
+ checkChangelogs(changelogManager, 16, 31);
+
+ sql("CALL sys.expire_changelogs(`table` => 'default.word_count',
delete_all => true)");
+ checkAllDeleted(changelogManager);
+ checkStreamRead(37, 4);
+ checkBatchRead(40);
+ }
+
+ @Test
+ public void testExpireChangelogsAction() throws Exception {
+ sql(
+ "CREATE TABLE word_count ( word STRING PRIMARY KEY NOT
ENFORCED, cnt INT)"
+ + " WITH ( 'num-sorted-run.compaction-trigger' =
'9999', 'changelog-producer' = 'input', "
+ + "'snapshot.num-retained.min' = '4',
'snapshot.num-retained.max' = '4', "
+ + "'changelog.num-retained.min' = '10',
'changelog.num-retained.max' = '10' )");
+ FileStoreTable table = paimonTable("word_count");
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder().streamingMode().build();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ ChangelogManager changelogManager = table.changelogManager();
+
+ // initially prepare 10 snapshots
+ for (int i = 0; i < 10; ++i) {
+ sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")");
+ }
+ // expected snapshots (7, 8, 9, 10)
+ checkSnapshots(snapshotManager, 7, 10);
+ // expected changelogs (1, 2, 3, 4, 5, 6)
+ checkChangelogs(changelogManager, 1, 6);
+
+ Timestamp ts5 = new
Timestamp(changelogManager.changelog(5).timeMillis());
+
+ createAction(
+ ExpireChangelogsAction.class,
+ "expire_changelogs",
+ "--warehouse",
+ path,
+ "--database",
+ "default",
+ "--table",
+ "word_count",
+ "--retain_max",
+ "8",
+ "--retain_min",
+ "4",
+ "--older_than",
+ ts5.toString(),
+ "--max_deletes",
+ "3")
+ .withStreamExecutionEnvironment(env)
+ .run();
+ checkChangelogs(changelogManager, 4, 6);
+
+ // expire all
+ createAction(
+ ExpireChangelogsAction.class,
+ "expire_changelogs",
+ "--warehouse",
+ path,
+ "--database",
+ "default",
+ "--table",
+ "word_count",
+ "--delete_all",
+ "true")
+ .withStreamExecutionEnvironment(env)
+ .run();
+ checkAllDeleted(changelogManager);
+ }
+
+ private void checkSnapshots(SnapshotManager sm, int earliest, int latest)
throws IOException {
+ assertThat(sm.snapshotCount()).isEqualTo(latest - earliest + 1);
+ assertThat(sm.earliestSnapshotId()).isEqualTo(earliest);
+ assertThat(sm.latestSnapshotId()).isEqualTo(latest);
+ }
+
+ private void checkChangelogs(ChangelogManager cm, int earliest, int
latest) {
+ assertThat(cm.earliestLongLivedChangelogId()).isEqualTo(earliest);
+ assertThat(cm.latestLongLivedChangelogId()).isEqualTo(latest);
+ }
+
+ private void checkAllDeleted(ChangelogManager cm) throws IOException {
+
assertThat(cm.fileIO().listStatus(cm.changelogDirectory()).length).isEqualTo(0);
+ }
+
+ private void checkBatchRead(int snapshotId) {
+ List<Row> rows =
+ sql(
+ String.format(
+ "select * from word_count /*+
OPTIONS('scan.snapshot-id' = '%s') */",
+ snapshotId));
+ assertThat(rows.size()).isEqualTo(snapshotId);
+ }
+
+ private void checkStreamRead(int snapshotId, int expect) throws Exception {
+ BlockingIterator<Row, Row> iter =
+ streamSqlBlockIter(
+ String.format(
+ "select * from word_count /*+
OPTIONS('scan.snapshot-id' = '%s') */",
+ snapshotId));
+ List<Row> rows = iter.collect(expect, 60, TimeUnit.SECONDS);
+ List<Row> expectedRows = new ArrayList<>();
+ for (int i = snapshotId - 1; i < snapshotId - 1 + expect; i++) {
+ expectedRows.add(Row.of(String.valueOf(i), i));
+ }
+ assertThat(rows).hasSameElementsAs(expectedRows);
+ }
+
+ private <T extends ActionBase> T createAction(Class<T> clazz, String...
args) {
+ return ActionFactory.createAction(args)
+ .filter(clazz::isInstance)
+ .map(clazz::cast)
+ .orElseThrow(() -> new RuntimeException("Failed to create
action"));
+ }
+}