This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 99dc1992f [core] Support rollback to a tag (#1443)
99dc1992f is described below

commit 99dc1992fe82d9e904dbb8a179374bc9f3c9bb40
Author: yuzelin <[email protected]>
AuthorDate: Thu Jul 6 12:22:04 2023 +0800

    [core] Support rollback to a tag (#1443)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |  13 +-
 .../org/apache/paimon/operation/DeletionUtils.java | 157 -----------
 .../apache/paimon/operation/FileDeletionBase.java  | 308 +++++++++++++++++++++
 .../paimon/operation/FileStoreExpireImpl.java      |  41 ++-
 .../apache/paimon/operation/SnapshotDeletion.java  | 219 ++++-----------
 .../org/apache/paimon/operation/TagDeletion.java   | 150 ++--------
 .../org/apache/paimon/operation/TagFileKeeper.java | 125 ---------
 .../paimon/table/AbstractFileStoreTable.java       |  60 +++-
 .../org/apache/paimon/table/ReadonlyTable.java     |   8 +
 .../org/apache/paimon/table/RollbackHelper.java    | 149 ++++++++++
 .../main/java/org/apache/paimon/table/Table.java   |   4 +
 .../org/apache/paimon/utils/SnapshotManager.java   |  50 +---
 .../java/org/apache/paimon/utils/TagManager.java   |  93 ++++++-
 .../test/java/org/apache/paimon/TestFileStore.java |   7 +-
 .../operation/CleanedFileStoreExpireTest.java      |   4 +-
 .../apache/paimon/operation/FileDeletionTest.java  |   4 +-
 .../paimon/table/FileStoreTableTestBase.java       | 164 ++++++++++-
 .../paimon/table/IndexFileExpireTableTest.java     |  24 +-
 .../paimon/flink/action/RollbackToAction.java      |  33 ++-
 .../flink/action/RollbackToActionITCase.java       |  35 ++-
 20 files changed, 933 insertions(+), 715 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 558daa899..8fd6ad216 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -31,7 +31,6 @@ import org.apache.paimon.operation.FileStoreExpireImpl;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
-import org.apache.paimon.operation.TagFileKeeper;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.types.RowType;
@@ -171,11 +170,7 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 options.snapshotTimeRetain().toMillis(),
                 snapshotManager(),
                 newSnapshotDeletion(),
-                new TagFileKeeper(
-                        manifestListFactory().create(),
-                        manifestFileFactory().create(),
-                        new TagManager(fileIO, options.path()),
-                        options.scanManifestParallelism()));
+                new TagManager(fileIO, options.path()));
     }
 
     @Override
@@ -192,12 +187,10 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
     public TagDeletion newTagDeletion() {
         return new TagDeletion(
                 fileIO,
-                options.path(),
                 pathFactory(),
-                manifestListFactory().create(),
                 manifestFileFactory().create(),
-                newIndexFileHandler(),
-                options.scanManifestParallelism());
+                manifestListFactory().create(),
+                newIndexFileHandler());
     }
 
     public abstract Comparator<InternalRow> newKeyComparator();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DeletionUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/DeletionUtils.java
deleted file mode 100644
index 9a9b28465..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/operation/DeletionUtils.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.operation;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.ManifestFile;
-import org.apache.paimon.manifest.ManifestFileMeta;
-import org.apache.paimon.manifest.ManifestList;
-import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.ParallellyExecuteUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/** Util methods for snapshot and tag deletion. */
-public class DeletionUtils {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(DeletionUtils.class);
-
-    public static Iterable<ManifestEntry> readEntries(
-            List<ManifestFileMeta> manifests,
-            ManifestFile manifestFile,
-            @Nullable Integer scanManifestParallelism) {
-        return ParallellyExecuteUtils.parallelismBatchIterable(
-                files ->
-                        files.parallelStream()
-                                .flatMap(m -> 
manifestFile.read(m.fileName()).stream())
-                                .collect(Collectors.toList()),
-                manifests,
-                scanManifestParallelism);
-    }
-
-    public static void addMergedDataFiles(
-            Map<BinaryRow, Map<Integer, Set<String>>> dataFiles,
-            Snapshot snapshot,
-            ManifestList manifestList,
-            ManifestFile manifestFile,
-            @Nullable Integer scanManifestParallelism) {
-        Iterable<ManifestEntry> entries =
-                readEntries(
-                        snapshot.dataManifests(manifestList),
-                        manifestFile,
-                        scanManifestParallelism);
-        for (ManifestEntry entry : ManifestEntry.mergeEntries(entries)) {
-            dataFiles
-                    .computeIfAbsent(entry.partition(), p -> new HashMap<>())
-                    .computeIfAbsent(entry.bucket(), b -> new HashSet<>())
-                    .add(entry.file().fileName());
-        }
-    }
-
-    public static boolean containsDataFile(
-            Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, ManifestEntry 
testee) {
-        Map<Integer, Set<String>> buckets = dataFiles.get(testee.partition());
-        if (buckets != null) {
-            Set<String> fileNames = buckets.get(testee.bucket());
-            if (fileNames != null) {
-                return fileNames.contains(testee.file().fileName());
-            }
-        }
-        return false;
-    }
-
-    /** Try to delete directories that may be empty. */
-    public static void tryDeleteDirectories(
-            Map<BinaryRow, Set<Integer>> deletionBuckets,
-            FileStorePathFactory pathFactory,
-            FileIO fileIO) {
-        // All directory paths are deduplicated and sorted by hierarchy level
-        Map<Integer, Set<Path>> deduplicate = new HashMap<>();
-        for (Map.Entry<BinaryRow, Set<Integer>> entry : 
deletionBuckets.entrySet()) {
-            // try to delete bucket directories
-            for (Integer bucket : entry.getValue()) {
-                tryDeleteEmptyDirectory(pathFactory.bucketPath(entry.getKey(), 
bucket), fileIO);
-            }
-
-            List<Path> hierarchicalPaths = 
pathFactory.getHierarchicalPartitionPath(entry.getKey());
-            int hierarchies = hierarchicalPaths.size();
-            if (hierarchies == 0) {
-                continue;
-            }
-
-            if (tryDeleteEmptyDirectory(hierarchicalPaths.get(hierarchies - 
1), fileIO)) {
-                // deduplicate high level partition directories
-                for (int hierarchy = 0; hierarchy < hierarchies - 1; 
hierarchy++) {
-                    Path path = hierarchicalPaths.get(hierarchy);
-                    deduplicate.computeIfAbsent(hierarchy, i -> new 
HashSet<>()).add(path);
-                }
-            }
-        }
-
-        // from deepest to shallowest
-        for (int hierarchy = deduplicate.size() - 1; hierarchy >= 0; 
hierarchy--) {
-            deduplicate.get(hierarchy).forEach(path -> 
tryDeleteEmptyDirectory(path, fileIO));
-        }
-    }
-
-    private static boolean tryDeleteEmptyDirectory(Path path, FileIO fileIO) {
-        try {
-            fileIO.delete(path, false);
-            return true;
-        } catch (IOException e) {
-            LOG.debug("Failed to delete directory '{}'. Check whether it is 
empty.", path);
-            return false;
-        }
-    }
-
-    public static Set<String> collectManifestSkippingSet(
-            Snapshot snapshot, ManifestList manifestList, IndexFileHandler 
indexFileHandler) {
-        Set<String> skip =
-                snapshot.dataManifests(manifestList).stream()
-                        .map(ManifestFileMeta::fileName)
-                        .collect(Collectors.toCollection(HashSet::new));
-        String indexManifest = snapshot.indexManifest();
-        if (indexManifest != null) {
-            skip.add(indexManifest);
-            indexFileHandler.readManifest(indexManifest).stream()
-                    .map(IndexManifestEntry::indexFile)
-                    .map(IndexFileMeta::fileName)
-                    .forEach(skip::add);
-        }
-        return skip;
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
new file mode 100644
index 000000000..a9ad4cac2
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.utils.FileStorePathFactory;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * Base class for file deletion including methods for clean data files, 
manifest files and empty
+ * data directories.
+ */
+public abstract class FileDeletionBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileDeletionBase.class);
+
+    protected final FileIO fileIO;
+    protected final FileStorePathFactory pathFactory;
+    protected final ManifestFile manifestFile;
+    protected final ManifestList manifestList;
+    protected final IndexFileHandler indexFileHandler;
+    protected final Map<BinaryRow, Set<Integer>> deletionBuckets;
+
+    public FileDeletionBase(
+            FileIO fileIO,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            IndexFileHandler indexFileHandler) {
+        this.fileIO = fileIO;
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.indexFileHandler = indexFileHandler;
+
+        this.deletionBuckets = new HashMap<>();
+    }
+
+    /**
+     * Clean data files that will not be used anymore in the snapshot.
+     *
+     * @param snapshot {@link Snapshot} that will be cleaned
+     * @param skipper if the test result of a data file is true, it will be 
skipped when deleting;
+     *     else it will be deleted
+     */
+    public abstract void cleanUnusedDataFiles(Snapshot snapshot, 
Predicate<ManifestEntry> skipper);
+
+    /**
+     * Clean metadata files that will not be used anymore of a snapshot, 
including data manifests,
+     * index manifests and manifest lists.
+     *
+     * @param snapshot {@link Snapshot} that will be cleaned
+     * @param skippingSet manifests that should not be deleted
+     */
+    public abstract void cleanUnusedManifests(Snapshot snapshot, Set<String> 
skippingSet);
+
+    /** Try to delete data directories that may be empty after data file 
deletion. */
+    public void cleanDataDirectories() {
+        if (deletionBuckets.isEmpty()) {
+            return;
+        }
+
+        // All directory paths are deduplicated and sorted by hierarchy level
+        Map<Integer, Set<Path>> deduplicate = new HashMap<>();
+        for (Map.Entry<BinaryRow, Set<Integer>> entry : 
deletionBuckets.entrySet()) {
+            // try to delete bucket directories
+            for (Integer bucket : entry.getValue()) {
+                tryDeleteEmptyDirectory(pathFactory.bucketPath(entry.getKey(), 
bucket));
+            }
+
+            List<Path> hierarchicalPaths = 
pathFactory.getHierarchicalPartitionPath(entry.getKey());
+            int hierarchies = hierarchicalPaths.size();
+            if (hierarchies == 0) {
+                continue;
+            }
+
+            if (tryDeleteEmptyDirectory(hierarchicalPaths.get(hierarchies - 
1))) {
+                // deduplicate high level partition directories
+                for (int hierarchy = 0; hierarchy < hierarchies - 1; 
hierarchy++) {
+                    Path path = hierarchicalPaths.get(hierarchy);
+                    deduplicate.computeIfAbsent(hierarchy, i -> new 
HashSet<>()).add(path);
+                }
+            }
+        }
+
+        // from deepest to shallowest
+        for (int hierarchy = deduplicate.size() - 1; hierarchy >= 0; 
hierarchy--) {
+            deduplicate.get(hierarchy).forEach(this::tryDeleteEmptyDirectory);
+        }
+
+        deletionBuckets.clear();
+    }
+
+    protected void recordDeletionBuckets(ManifestEntry entry) {
+        deletionBuckets
+                .computeIfAbsent(entry.partition(), p -> new HashSet<>())
+                .add(entry.bucket());
+    }
+
+    protected void cleanUnusedManifests(
+            Snapshot snapshot, Set<String> skippingSet, boolean 
deleteChangelog) {
+        // clean base and delta manifests
+        List<ManifestFileMeta> toExpireManifests = new ArrayList<>();
+        
toExpireManifests.addAll(tryReadManifestList(snapshot.baseManifestList()));
+        
toExpireManifests.addAll(tryReadManifestList(snapshot.deltaManifestList()));
+        for (ManifestFileMeta manifest : toExpireManifests) {
+            String fileName = manifest.fileName();
+            if (!skippingSet.contains(fileName)) {
+                manifestFile.delete(fileName);
+                // to avoid other snapshots trying to delete again
+                skippingSet.add(fileName);
+            }
+        }
+
+        if (!skippingSet.contains(snapshot.baseManifestList())) {
+            manifestList.delete(snapshot.baseManifestList());
+        }
+        if (!skippingSet.contains(snapshot.deltaManifestList())) {
+            manifestList.delete(snapshot.deltaManifestList());
+        }
+
+        // clean changelog manifests
+        if (deleteChangelog && snapshot.changelogManifestList() != null) {
+            for (ManifestFileMeta manifest :
+                    tryReadManifestList(snapshot.changelogManifestList())) {
+                manifestFile.delete(manifest.fileName());
+            }
+            manifestList.delete(snapshot.changelogManifestList());
+        }
+
+        // clean index manifests
+        String indexManifest = snapshot.indexManifest();
+        // check exists, it may have been deleted by other snapshots
+        if (indexManifest != null && 
indexFileHandler.existsManifest(indexManifest)) {
+            for (IndexManifestEntry entry : 
indexFileHandler.readManifest(indexManifest)) {
+                if (!skippingSet.contains(entry.indexFile().fileName())) {
+                    indexFileHandler.deleteIndexFile(entry);
+                }
+            }
+
+            if (!skippingSet.contains(indexManifest)) {
+                indexFileHandler.deleteManifest(indexManifest);
+            }
+        }
+    }
+
+    /**
+     * It is possible that a job was killed during expiration and some 
manifest files have been
+     * deleted, so if the clean methods need to get manifests of a snapshot to 
be cleaned, we should
+     * try to read manifests and return empty list if failed instead of 
calling {@link
+     * Snapshot#dataManifests} directly.
+     */
+    protected List<ManifestFileMeta> tryReadManifestList(String 
manifestListName) {
+        try {
+            return manifestList.read(manifestListName);
+        } catch (Exception e) {
+            LOG.warn("Failed to read manifest list file " + manifestListName, 
e);
+            return Collections.emptyList();
+        }
+    }
+
+    public Iterable<ManifestEntry> tryReadManifestEntries(String 
manifestListName) {
+        return readManifestEntries(tryReadManifestList(manifestListName));
+    }
+
+    /** Try to read base and delta manifest lists at one time. */
+    protected Iterable<ManifestEntry> tryReadDataManifestEntries(Snapshot 
snapshot) {
+        List<ManifestFileMeta> manifestFileMetas = 
tryReadManifestList(snapshot.baseManifestList());
+        
manifestFileMetas.addAll(tryReadManifestList(snapshot.deltaManifestList()));
+
+        return readManifestEntries(manifestFileMetas);
+    }
+
+    protected Iterable<ManifestEntry> readManifestEntries(
+            List<ManifestFileMeta> manifestFileMetas) {
+        Queue<String> files =
+                manifestFileMetas.stream()
+                        .map(ManifestFileMeta::fileName)
+                        .collect(Collectors.toCollection(LinkedList::new));
+        return Iterables.concat(
+                (Iterable<Iterable<ManifestEntry>>)
+                        () ->
+                                new Iterator<Iterable<ManifestEntry>>() {
+                                    @Override
+                                    public boolean hasNext() {
+                                        return files.size() > 0;
+                                    }
+
+                                    @Override
+                                    public Iterable<ManifestEntry> next() {
+                                        String file = files.poll();
+                                        try {
+                                            return manifestFile.read(file);
+                                        } catch (Exception e) {
+                                            LOG.warn("Failed to read manifest 
file " + file, e);
+                                            return Collections.emptyList();
+                                        }
+                                    }
+                                });
+    }
+
+    protected void addMergedDataFiles(
+            Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, Snapshot 
snapshot) {
+        Iterable<ManifestEntry> entries = tryReadDataManifestEntries(snapshot);
+        for (ManifestEntry entry : ManifestEntry.mergeEntries(entries)) {
+            dataFiles
+                    .computeIfAbsent(entry.partition(), p -> new HashMap<>())
+                    .computeIfAbsent(entry.bucket(), b -> new HashSet<>())
+                    .add(entry.file().fileName());
+        }
+    }
+
+    protected boolean containsDataFile(
+            Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, ManifestEntry 
testee) {
+        Map<Integer, Set<String>> buckets = dataFiles.get(testee.partition());
+        if (buckets != null) {
+            Set<String> fileNames = buckets.get(testee.bucket());
+            if (fileNames != null) {
+                return fileNames.contains(testee.file().fileName());
+            }
+        }
+        return false;
+    }
+
+    /** Changelogs were not checked. Let the subclass determine whether to 
delete them. */
+    public Set<String> manifestSkippingSet(Snapshot skippingSnapshot) {
+        return 
manifestSkippingSet(Collections.singletonList(skippingSnapshot));
+    }
+
+    public Set<String> manifestSkippingSet(List<Snapshot> skippingSnapshots) {
+        Set<String> skippingSet = new HashSet<>();
+
+        for (Snapshot skippingSnapshot : skippingSnapshots) {
+            // data manifests
+            skippingSet.add(skippingSnapshot.baseManifestList());
+            skippingSet.add(skippingSnapshot.deltaManifestList());
+            skippingSnapshot.dataManifests(manifestList).stream()
+                    .map(ManifestFileMeta::fileName)
+                    .forEach(skippingSet::add);
+
+            // index manifests
+            String indexManifest = skippingSnapshot.indexManifest();
+            if (indexManifest != null) {
+                skippingSet.add(indexManifest);
+                indexFileHandler.readManifest(indexManifest).stream()
+                        .map(IndexManifestEntry::indexFile)
+                        .map(IndexFileMeta::fileName)
+                        .forEach(skippingSet::add);
+            }
+        }
+
+        return skippingSet;
+    }
+
+    private boolean tryDeleteEmptyDirectory(Path path) {
+        try {
+            fileIO.delete(path, false);
+            return true;
+        } catch (IOException e) {
+            LOG.debug("Failed to delete directory '{}'. Check whether it is 
empty.", path);
+            return false;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
index 46ef06fa5..468fbeb02 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
@@ -21,14 +21,13 @@ package org.apache.paimon.operation;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.consumer.ConsumerManager;
-import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -55,7 +54,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
     private final ConsumerManager consumerManager;
     private final SnapshotDeletion snapshotDeletion;
 
-    private final TagFileKeeper tagFileKeeper;
+    private final TagManager tagManager;
 
     private Lock lock;
 
@@ -65,7 +64,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
             long millisRetained,
             SnapshotManager snapshotManager,
             SnapshotDeletion snapshotDeletion,
-            TagFileKeeper tagFileKeeper) {
+            TagManager tagManager) {
         this.numRetainedMin = numRetainedMin;
         this.numRetainedMax = numRetainedMax;
         this.millisRetained = millisRetained;
@@ -73,7 +72,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
         this.consumerManager =
                 new ConsumerManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
         this.snapshotDeletion = snapshotDeletion;
-        this.tagFileKeeper = tagFileKeeper;
+        this.tagManager = tagManager;
     }
 
     @Override
@@ -149,22 +148,19 @@ public class FileStoreExpireImpl implements 
FileStoreExpire {
                     "Snapshot expire range is [" + beginInclusiveId + ", " + 
endExclusiveId + ")");
         }
 
-        tagFileKeeper.reloadTags();
+        List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
 
         // delete merge tree files
         // deleted merge tree files in a snapshot are not used by the next 
snapshot, so the range of
         // id should be (beginInclusiveId, endExclusiveId]
-        Map<BinaryRow, Set<Integer>> deletionBuckets = new HashMap<>();
         for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Ready to delete merge tree files not used by 
snapshot #" + id);
             }
             Snapshot snapshot = snapshotManager.snapshot(id);
             // expire merge tree files and collect changed buckets
-            snapshotDeletion.deleteExpiredDataFiles(
-                    snapshot.deltaManifestList(),
-                    deletionBuckets,
-                    tagFileKeeper.tagDataFileSkipper(id));
+            snapshotDeletion.cleanUnusedDataFiles(
+                    snapshot, 
snapshotDeletion.dataFileSkipper(taggedSnapshots, id));
         }
 
         // delete changelog files
@@ -174,32 +170,27 @@ public class FileStoreExpireImpl implements 
FileStoreExpire {
             }
             Snapshot snapshot = snapshotManager.snapshot(id);
             if (snapshot.changelogManifestList() != null) {
-                snapshotDeletion.deleteAddedDataFiles(
-                        snapshot.changelogManifestList(), deletionBuckets);
+                
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
             }
         }
 
         // data files and changelog files in bucket directories has been 
deleted
         // then delete changed bucket directories if they are empty
-        snapshotDeletion.tryDeleteDirectories(deletionBuckets);
+        snapshotDeletion.cleanDataDirectories();
 
         // delete manifests and indexFiles
-        Set<String> skipManifestFiles =
-                snapshotDeletion.collectManifestSkippingSet(
-                        snapshotManager.snapshot(endExclusiveId));
-        for (Snapshot snapshot :
-                tagFileKeeper.findOverlappedSnapshots(beginInclusiveId, 
endExclusiveId)) {
-            skipManifestFiles.add(snapshot.baseManifestList());
-            skipManifestFiles.add(snapshot.deltaManifestList());
-            
skipManifestFiles.addAll(snapshotDeletion.collectManifestSkippingSet(snapshot));
-        }
+        List<Snapshot> skippingSnapshots =
+                TagManager.findOverlappedSnapshots(
+                        taggedSnapshots, beginInclusiveId, endExclusiveId);
+        skippingSnapshots.add(snapshotManager.snapshot(endExclusiveId));
+        Set<String> skippingSet = 
snapshotDeletion.manifestSkippingSet(skippingSnapshots);
         for (long id = beginInclusiveId; id < endExclusiveId; id++) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Ready to delete manifests in snapshot #" + id);
             }
 
             Snapshot snapshot = snapshotManager.snapshot(id);
-            snapshotDeletion.deleteManifestFiles(skipManifestFiles, snapshot);
+            snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet);
 
             // delete snapshot last
             
snapshotManager.fileIO().deleteQuietly(snapshotManager.snapshotPath(id));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
index ffc975977..123fdabc5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
@@ -25,42 +25,28 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
-import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Pair;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.paimon.utils.TagManager;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 
 /** Delete snapshot files. */
-public class SnapshotDeletion {
+public class SnapshotDeletion extends FileDeletionBase {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotDeletion.class);
+    /** Used to record which tag is cached in tagged snapshots list. */
+    private int cachedTagIndex = -1;
 
-    private final FileIO fileIO;
-    private final FileStorePathFactory pathFactory;
-    private final ManifestFile manifestFile;
-    private final ManifestList manifestList;
-    private final IndexFileHandler indexFileHandler;
+    /** Used to cache data files used by current tag. */
+    private final Map<BinaryRow, Map<Integer, Set<String>>> cachedTagDataFiles 
= new HashMap<>();
 
     public SnapshotDeletion(
             FileIO fileIO,
@@ -68,134 +54,22 @@ public class SnapshotDeletion {
             ManifestFile manifestFile,
             ManifestList manifestList,
             IndexFileHandler indexFileHandler) {
-        this.fileIO = fileIO;
-        this.pathFactory = pathFactory;
-        this.manifestFile = manifestFile;
-        this.manifestList = manifestList;
-        this.indexFileHandler = indexFileHandler;
-    }
-
-    // ================================= PUBLIC 
=================================================
-
-    /**
-     * Delete expired file in the manifest list files. Delete files marked as 
"DELETE" in manifests.
-     *
-     * @param manifestListName name of manifest list
-     * @param deletionBuckets partition-buckets of which some data files have 
been deleted
-     * @param dataFileSkipper if the test result of a data file is true, the 
data file will be
-     *     skipped when deleting
-     */
-    public void deleteExpiredDataFiles(
-            String manifestListName,
-            Map<BinaryRow, Set<Integer>> deletionBuckets,
-            Predicate<ManifestEntry> dataFileSkipper) {
-        doDeleteExpiredDataFiles(
-                getManifestEntriesFromManifestList(manifestListName),
-                deletionBuckets,
-                dataFileSkipper);
+        super(fileIO, pathFactory, manifestFile, manifestList, 
indexFileHandler);
     }
 
-    /**
-     * Delete added file in the manifest list files. Added files marked as 
"ADD" in manifests.
-     *
-     * @param manifestListName name of manifest list
-     * @param deletionBuckets partition-buckets of which some data files have 
been deleted
-     */
-    public void deleteAddedDataFiles(
-            String manifestListName, Map<BinaryRow, Set<Integer>> 
deletionBuckets) {
-        for (ManifestEntry entry : 
getManifestEntriesFromManifestList(manifestListName)) {
-            if (entry.kind() == FileKind.ADD) {
-                fileIO.deleteQuietly(
-                        new Path(
-                                pathFactory.bucketPath(entry.partition(), 
entry.bucket()),
-                                entry.file().fileName()));
-                deletionBuckets
-                        .computeIfAbsent(entry.partition(), p -> new 
HashSet<>())
-                        .add(entry.bucket());
-            }
-        }
-    }
-
-    /**
-     * Try to delete directories collected from {@link 
#deleteExpiredDataFiles} and {@link
-     * #deleteAddedDataFiles}.
-     */
-    public void tryDeleteDirectories(Map<BinaryRow, Set<Integer>> 
deletionBuckets) {
-        DeletionUtils.tryDeleteDirectories(deletionBuckets, pathFactory, 
fileIO);
-    }
-
-    /**
-     * Delete metadata of a snapshot, delete {@link ManifestList} file and 
{@link ManifestFileMeta}
-     * file.
-     *
-     * @param skipped manifest file deletion skipping set, deleted manifest 
file will be added to
-     *     this set too. NOTE: changelog manifests won't be checked.
-     */
-    public void deleteManifestFiles(Set<String> skipped, Snapshot snapshot) {
-        // cannot call `snapshot.dataManifests` directly, it is possible that 
a job is
-        // killed during expiration, so some manifest files may have been 
deleted
-        List<ManifestFileMeta> toExpireManifests = new ArrayList<>();
-        
toExpireManifests.addAll(tryReadManifestList(snapshot.baseManifestList()));
-        
toExpireManifests.addAll(tryReadManifestList(snapshot.deltaManifestList()));
-
-        // delete manifest
-        for (ManifestFileMeta manifest : toExpireManifests) {
-            String fileName = manifest.fileName();
-            if (!skipped.contains(fileName)) {
-                manifestFile.delete(fileName);
-                skipped.add(fileName);
-            }
-        }
-        if (snapshot.changelogManifestList() != null) {
-            for (ManifestFileMeta manifest :
-                    tryReadManifestList(snapshot.changelogManifestList())) {
-                manifestFile.delete(manifest.fileName());
-            }
-        }
-
-        // delete manifest lists
-        if (!skipped.contains(snapshot.baseManifestList())) {
-            manifestList.delete(snapshot.baseManifestList());
-        }
-        if (!skipped.contains(snapshot.deltaManifestList())) {
-            manifestList.delete(snapshot.deltaManifestList());
-        }
-        if (snapshot.changelogManifestList() != null) {
-            manifestList.delete(snapshot.changelogManifestList());
-        }
-
-        // delete index files
-        String indexManifest = snapshot.indexManifest();
-        // check exists, it may have been deleted by other snapshots
-        if (indexManifest != null && 
indexFileHandler.existsManifest(indexManifest)) {
-            for (IndexManifestEntry entry : 
indexFileHandler.readManifest(indexManifest)) {
-                if (!skipped.contains(entry.indexFile().fileName())) {
-                    indexFileHandler.deleteIndexFile(entry);
-                }
-            }
-
-            if (!skipped.contains(indexManifest)) {
-                indexFileHandler.deleteManifest(indexManifest);
-            }
-        }
+    @Override
+    public void cleanUnusedDataFiles(Snapshot snapshot, 
Predicate<ManifestEntry> skipper) {
+        
doCleanUnusedDataFile(tryReadManifestEntries(snapshot.deltaManifestList()), 
skipper);
     }
 
-    // ================================= PRIVATE 
=================================================
-
-    private List<ManifestFileMeta> tryReadManifestList(String 
manifestListName) {
-        try {
-            return manifestList.read(manifestListName);
-        } catch (Exception e) {
-            LOG.warn("Failed to read manifest list file " + manifestListName, 
e);
-            return Collections.emptyList();
-        }
+    @Override
+    public void cleanUnusedManifests(Snapshot snapshot, Set<String> 
skippingSet) {
+        cleanUnusedManifests(snapshot, skippingSet, true);
     }
 
     @VisibleForTesting
-    void doDeleteExpiredDataFiles(
-            Iterable<ManifestEntry> dataFileLog,
-            Map<BinaryRow, Set<Integer>> deletionBuckets,
-            Predicate<ManifestEntry> dataFileSkipper) {
+    void doCleanUnusedDataFile(
+            Iterable<ManifestEntry> dataFileLog, Predicate<ManifestEntry> 
skipper) {
         // we cannot delete a data file directly when we meet a DELETE entry, 
because that
         // file might be upgraded
         // data file path -> (original manifest entry, extra file paths)
@@ -224,46 +98,47 @@ public class SnapshotDeletion {
                 (path, pair) -> {
                     ManifestEntry entry = pair.getLeft();
                     // check whether we should skip the data file
-                    if (!dataFileSkipper.test(entry)) {
+                    if (!skipper.test(entry)) {
                         // delete data files
                         fileIO.deleteQuietly(path);
                         pair.getRight().forEach(fileIO::deleteQuietly);
-                        // record changed buckets
-                        deletionBuckets
-                                .computeIfAbsent(entry.partition(), p -> new 
HashSet<>())
-                                .add(entry.bucket());
+
+                        recordDeletionBuckets(entry);
                     }
                 });
     }
 
-    private Iterable<ManifestEntry> getManifestEntriesFromManifestList(String 
manifestListName) {
-        Queue<String> files =
-                tryReadManifestList(manifestListName).stream()
-                        .map(ManifestFileMeta::fileName)
-                        .collect(Collectors.toCollection(LinkedList::new));
-        return Iterables.concat(
-                (Iterable<Iterable<ManifestEntry>>)
-                        () ->
-                                new Iterator<Iterable<ManifestEntry>>() {
-                                    @Override
-                                    public boolean hasNext() {
-                                        return files.size() > 0;
-                                    }
+    /**
+     * Delete added file in the manifest list files. Added files marked as 
"ADD" in manifests.
+     *
+     * @param manifestListName name of manifest list
+     */
+    public void deleteAddedDataFiles(String manifestListName) {
+        deleteAddedDataFiles(tryReadManifestEntries(manifestListName));
+    }
 
-                                    @Override
-                                    public Iterable<ManifestEntry> next() {
-                                        String file = files.poll();
-                                        try {
-                                            return manifestFile.read(file);
-                                        } catch (Exception e) {
-                                            LOG.warn("Failed to read manifest 
file " + file, e);
-                                            return Collections.emptyList();
-                                        }
-                                    }
-                                });
+    public void deleteAddedDataFiles(Iterable<ManifestEntry> manifestEntries) {
+        for (ManifestEntry entry : manifestEntries) {
+            if (entry.kind() == FileKind.ADD) {
+                fileIO.deleteQuietly(
+                        new Path(
+                                pathFactory.bucketPath(entry.partition(), 
entry.bucket()),
+                                entry.file().fileName()));
+                recordDeletionBuckets(entry);
+            }
+        }
     }
 
-    public Set<String> collectManifestSkippingSet(Snapshot snapshot) {
-        return DeletionUtils.collectManifestSkippingSet(snapshot, 
manifestList, indexFileHandler);
+    public Predicate<ManifestEntry> dataFileSkipper(
+            List<Snapshot> taggedSnapshots, long expiringSnapshotId) {
+        int index = TagManager.findPreviousTag(taggedSnapshots, 
expiringSnapshotId);
+        // refresh tag data files
+        if (index >= 0 && cachedTagIndex != index) {
+            cachedTagIndex = index;
+            cachedTagDataFiles.clear();
+            addMergedDataFiles(cachedTagDataFiles, taggedSnapshots.get(index));
+        }
+
+        return entry -> index >= 0 && containsDataFile(cachedTagDataFiles, 
entry);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index 68b467480..8615cbc86 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -23,165 +23,65 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
-import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.utils.TagManager;
-
-import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
 
-import static org.apache.paimon.operation.DeletionUtils.addMergedDataFiles;
-import static 
org.apache.paimon.operation.DeletionUtils.collectManifestSkippingSet;
-import static org.apache.paimon.operation.DeletionUtils.containsDataFile;
-import static org.apache.paimon.operation.DeletionUtils.readEntries;
-import static org.apache.paimon.operation.DeletionUtils.tryDeleteDirectories;
-
-/**
- * Delete tag files. This class doesn't check changelog files because they are 
handled by {@link
- * SnapshotDeletion}.
- */
-public class TagDeletion {
-
-    private final FileIO fileIO;
-    private final FileStorePathFactory pathFactory;
-    private final ManifestList manifestList;
-    private final ManifestFile manifestFile;
-    private final IndexFileHandler indexFileHandler;
-    @Nullable private final Integer scanManifestParallelism;
-
-    private final SnapshotManager snapshotManager;
-    private final List<Snapshot> taggedSnapshots;
-
-    private final Map<BinaryRow, Map<Integer, Set<String>>> skippedDataFiles;
-    private final Set<String> skippedManifests;
+/** Delete tag files. */
+public class TagDeletion extends FileDeletionBase {
 
     public TagDeletion(
             FileIO fileIO,
-            Path tablePath,
             FileStorePathFactory pathFactory,
-            ManifestList manifestList,
             ManifestFile manifestFile,
-            IndexFileHandler indexFileHandler,
-            @Nullable Integer scanManifestParallelism) {
-        this.fileIO = fileIO;
-        this.pathFactory = pathFactory;
-        this.manifestList = manifestList;
-        this.manifestFile = manifestFile;
-        this.indexFileHandler = indexFileHandler;
-        this.scanManifestParallelism = scanManifestParallelism;
-
-        this.snapshotManager = new SnapshotManager(fileIO, tablePath);
-        this.taggedSnapshots = new TagManager(fileIO, 
tablePath).taggedSnapshots();
-
-        this.skippedDataFiles = new HashMap<>();
-        this.skippedManifests = new HashSet<>();
+            ManifestList manifestList,
+            IndexFileHandler indexFileHandler) {
+        super(fileIO, pathFactory, manifestFile, manifestList, 
indexFileHandler);
     }
 
-    /** Delete unused data files, manifest files and empty data file 
directories of tag. */
-    public void delete(Snapshot taggedSnapshot) {
-        if (snapshotManager.snapshotExists(taggedSnapshot.id())) {
-            return;
-        }
-
-        // collect from the earliest snapshot
-        Snapshot earliest = 
snapshotManager.snapshot(snapshotManager.earliestSnapshotId());
-        collectSkip(earliest);
-
-        // collect from the neighbor tags
-        int index = findIndex(taggedSnapshot);
-        if (index - 1 >= 0) {
-            collectSkip(taggedSnapshots.get(index - 1));
-        }
-        if (index + 1 < taggedSnapshots.size()) {
-            collectSkip(taggedSnapshots.get(index + 1));
-        }
-
-        // delete data files and empty directories
-        deleteDataFiles(taggedSnapshot);
-        // delete manifests
-        deleteManifestFiles(taggedSnapshot);
+    @Override
+    public void cleanUnusedDataFiles(Snapshot taggedSnapshot, 
Predicate<ManifestEntry> skipper) {
+        cleanUnusedDataFiles(tryReadDataManifestEntries(taggedSnapshot), 
skipper);
     }
 
-    private void collectSkip(Snapshot snapshot) {
-        addMergedDataFiles(
-                skippedDataFiles, snapshot, manifestList, manifestFile, 
scanManifestParallelism);
-        skippedManifests.addAll(
-                collectManifestSkippingSet(snapshot, manifestList, 
indexFileHandler));
+    @Override
+    public void cleanUnusedManifests(Snapshot taggedSnapshot, Set<String> 
skippingSet) {
+        // doesn't clean changelog files because they are handled by 
SnapshotDeletion
+        cleanUnusedManifests(taggedSnapshot, skippingSet, false);
     }
 
-    private void deleteDataFiles(Snapshot taggedSnapshot) {
-        // delete data files
-        Map<BinaryRow, Set<Integer>> deletionBuckets = new HashMap<>();
-        Iterable<ManifestEntry> entries =
-                readEntries(
-                        taggedSnapshot.dataManifests(manifestList),
-                        manifestFile,
-                        scanManifestParallelism);
+    public void cleanUnusedDataFiles(
+            Iterable<ManifestEntry> entries, Predicate<ManifestEntry> skipper) 
{
         for (ManifestEntry entry : ManifestEntry.mergeEntries(entries)) {
-            if (!containsDataFile(skippedDataFiles, entry)) {
+            if (!skipper.test(entry)) {
                 Path bucketPath = pathFactory.bucketPath(entry.partition(), 
entry.bucket());
                 fileIO.deleteQuietly(new Path(bucketPath, 
entry.file().fileName()));
                 for (String file : entry.file().extraFiles()) {
                     fileIO.deleteQuietly(new Path(bucketPath, file));
                 }
 
-                deletionBuckets
-                        .computeIfAbsent(entry.partition(), p -> new 
HashSet<>())
-                        .add(entry.bucket());
+                recordDeletionBuckets(entry);
             }
         }
-
-        // delete empty data file directories
-        tryDeleteDirectories(deletionBuckets, pathFactory, fileIO);
     }
 
-    private void deleteManifestFiles(Snapshot taggedSnapshot) {
-        for (ManifestFileMeta manifest : 
taggedSnapshot.dataManifests(manifestList)) {
-            String fileName = manifest.fileName();
-            if (!skippedManifests.contains(fileName)) {
-                manifestFile.delete(fileName);
-            }
-        }
-
-        // delete manifest lists
-        manifestList.delete(taggedSnapshot.baseManifestList());
-        manifestList.delete(taggedSnapshot.deltaManifestList());
-
-        // delete index files
-        String indexManifest = taggedSnapshot.indexManifest();
-        // check exists, it may have been deleted by other snapshots
-        if (indexManifest != null && 
indexFileHandler.existsManifest(indexManifest)) {
-            for (IndexManifestEntry entry : 
indexFileHandler.readManifest(indexManifest)) {
-                if (!skippedManifests.contains(entry.indexFile().fileName())) {
-                    indexFileHandler.deleteIndexFile(entry);
-                }
-            }
-
-            if (!skippedManifests.contains(indexManifest)) {
-                indexFileHandler.deleteManifest(indexManifest);
-            }
-        }
+    public Predicate<ManifestEntry> dataFileSkipper(Snapshot fromSnapshot) {
+        return dataFileSkipper(Collections.singletonList(fromSnapshot));
     }
 
-    private int findIndex(Snapshot taggedSnapshot) {
-        for (int i = 0; i < taggedSnapshots.size(); i++) {
-            if (taggedSnapshot.id() == taggedSnapshots.get(i).id()) {
-                return i;
-            }
+    public Predicate<ManifestEntry> dataFileSkipper(List<Snapshot> 
fromSnapshots) {
+        Map<BinaryRow, Map<Integer, Set<String>>> skipped = new HashMap<>();
+        for (Snapshot snapshot : fromSnapshots) {
+            addMergedDataFiles(skipped, snapshot);
         }
-        throw new RuntimeException(
-                String.format(
-                        "Didn't find tag with snapshot id '%s'.This is 
unexpected.",
-                        taggedSnapshot.id()));
+        return entry -> containsDataFile(skipped, entry);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/TagFileKeeper.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/TagFileKeeper.java
deleted file mode 100644
index 3f1cc0bef..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagFileKeeper.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.operation;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.ManifestFile;
-import org.apache.paimon.manifest.ManifestList;
-import org.apache.paimon.utils.TagManager;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Predicate;
-
-import static org.apache.paimon.operation.DeletionUtils.addMergedDataFiles;
-import static org.apache.paimon.operation.DeletionUtils.containsDataFile;
-
-/** Util class to provide methods to prevent tag files to be deleted when 
expiring snapshots. */
-public class TagFileKeeper {
-
-    private final ManifestList manifestList;
-    private final ManifestFile manifestFile;
-    private final TagManager tagManager;
-    @Nullable private final Integer scanManifestParallelism;
-
-    private long cachedTag = -1;
-    private final Map<BinaryRow, Map<Integer, Set<String>>> cachedTagDataFiles;
-
-    private List<Snapshot> taggedSnapshots;
-
-    public TagFileKeeper(
-            ManifestList manifestList,
-            ManifestFile manifestFile,
-            TagManager tagManager,
-            @Nullable Integer scanManifestParallelism) {
-        this.manifestList = manifestList;
-        this.manifestFile = manifestFile;
-        this.tagManager = tagManager;
-        this.scanManifestParallelism = scanManifestParallelism;
-
-        this.cachedTagDataFiles = new HashMap<>();
-    }
-
-    /** Caller should determine whether to reload. */
-    public void reloadTags() {
-        taggedSnapshots = tagManager.taggedSnapshots();
-    }
-
-    public Predicate<ManifestEntry> tagDataFileSkipper(long 
expiringSnapshotId) {
-        int index = findPreviousTag(expiringSnapshotId, taggedSnapshots);
-        if (index >= 0) {
-            tryRefresh(taggedSnapshots.get(index));
-        }
-        return entry -> index >= 0 && containsDataFile(cachedTagDataFiles, 
entry);
-    }
-
-    public List<Snapshot> findOverlappedSnapshots(long beginInclusive, long 
endExclusive) {
-        List<Snapshot> snapshots = new ArrayList<>();
-        int right = findPreviousTag(endExclusive, taggedSnapshots);
-        if (right >= 0) {
-            int left = Math.max(findPreviousOrEqualTag(beginInclusive, 
taggedSnapshots), 0);
-            for (int i = left; i <= right; i++) {
-                snapshots.add(taggedSnapshots.get(i));
-            }
-        }
-        return snapshots;
-    }
-
-    private void tryRefresh(Snapshot taggedSnapshot) {
-        if (cachedTag != taggedSnapshot.id()) {
-            refresh(taggedSnapshot);
-            cachedTag = taggedSnapshot.id();
-        }
-    }
-
-    private void refresh(Snapshot taggedSnapshot) {
-        cachedTagDataFiles.clear();
-        addMergedDataFiles(
-                cachedTagDataFiles,
-                taggedSnapshot,
-                manifestList,
-                manifestFile,
-                scanManifestParallelism);
-    }
-
-    private int findPreviousTag(long targetSnapshotId, List<Snapshot> 
taggedSnapshots) {
-        for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
-            if (taggedSnapshots.get(i).id() < targetSnapshotId) {
-                return i;
-            }
-        }
-        return -1;
-    }
-
-    private int findPreviousOrEqualTag(long targetSnapshotId, List<Snapshot> 
taggedSnapshots) {
-        for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
-            if (taggedSnapshots.get(i).id() <= targetSnapshotId) {
-                return i;
-            }
-        }
-        return -1;
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index a769d6586..536bc1c34 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -25,7 +25,6 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.Lock;
-import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
@@ -45,7 +44,6 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
 import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
 import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.utils.StringUtils;
 import org.apache.paimon.utils.TagManager;
 
 import java.io.IOException;
@@ -252,7 +250,7 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
                     }
                 } else {
                     String tagName = coreOptions.scanTagName();
-                    TagManager tagManager = new TagManager(fileIO, path);
+                    TagManager tagManager = tagManager();
                     if (tagManager.tagExists(tagName)) {
                         long schemaId = 
tagManager.taggedSnapshot(tagName).schemaId();
                         return 
Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
@@ -275,11 +273,13 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     @Override
     public void rollbackTo(long snapshotId) {
-        try {
-            snapshotManager().rollbackTo(store().newSnapshotDeletion(), 
snapshotId);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
+        SnapshotManager snapshotManager = snapshotManager();
+        checkArgument(
+                snapshotManager.snapshotExists(snapshotId),
+                "Rollback snapshot '%s' doesn't exist.",
+                snapshotId);
+
+        rollbackHelper().cleanLargerThan(snapshotManager.snapshot(snapshotId));
     }
 
     @Override
@@ -291,19 +291,49 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 fromSnapshotId);
 
         Snapshot snapshot = snapshotManager.snapshot(fromSnapshotId);
-        TagManager tagManager = new TagManager(fileIO, path);
-        tagManager.createTag(snapshot, tagName);
+        tagManager().createTag(snapshot, tagName);
     }
 
     @Override
     public void deleteTag(String tagName) {
-        checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is 
blank.", tagName);
+        tagManager().deleteTag(tagName, store().newTagDeletion(), 
snapshotManager());
+    }
+
+    @Override
+    public void rollbackTo(String tagName) {
+        TagManager tagManager = tagManager();
+        checkArgument(tagManager.tagExists(tagName), "Rollback tag '%s' 
doesn't exist.", tagName);
 
-        TagManager tagManager = new TagManager(fileIO, path);
         Snapshot taggedSnapshot = tagManager.taggedSnapshot(tagName);
+        rollbackHelper().cleanLargerThan(taggedSnapshot);
 
-        TagDeletion tagDeletion = store().newTagDeletion();
-        tagDeletion.delete(taggedSnapshot);
-        fileIO.deleteQuietly(tagManager.tagPath(tagName));
+        try {
+            // it is possible that the earliest snapshot is later than the 
rollback tag because of
+            // snapshot expiration, in this case the `cleanLargerThan` method 
will delete all
+            // snapshots, so we should write the tag file to snapshot 
directory and modify the
+            // earliest hint
+            SnapshotManager snapshotManager = snapshotManager();
+            if (!snapshotManager.snapshotExists(taggedSnapshot.id())) {
+                fileIO.writeFileUtf8(
+                        snapshotManager().snapshotPath(taggedSnapshot.id()),
+                        fileIO.readFileUtf8(tagManager.tagPath(tagName)));
+                snapshotManager.commitEarliestHint(taggedSnapshot.id());
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private TagManager tagManager() {
+        return new TagManager(fileIO, path);
+    }
+
+    private RollbackHelper rollbackHelper() {
+        return new RollbackHelper(
+                snapshotManager(),
+                tagManager(),
+                fileIO,
+                store().newSnapshotDeletion(),
+                store().newTagDeletion());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index 1cf25af75..86dd1f97d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -110,4 +110,12 @@ public interface ReadonlyTable extends InnerTable {
                         "Readonly Table %s does not support deleteTag.",
                         this.getClass().getSimpleName()));
     }
+
+    @Override
+    default void rollbackTo(String tagName) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support rollbackTo tag.",
+                        this.getClass().getSimpleName()));
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java 
b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
new file mode 100644
index 000000000..4843393e7
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.SnapshotDeletion;
+import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.function.Predicate;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Helper class for {@link Table#rollbackTo} including utils to clean 
snapshots. */
+public class RollbackHelper {
+
+    private final SnapshotManager snapshotManager;
+    private final TagManager tagManager;
+    private final FileIO fileIO;
+    private final SnapshotDeletion snapshotDeletion;
+    private final TagDeletion tagDeletion;
+
+    public RollbackHelper(
+            SnapshotManager snapshotManager,
+            TagManager tagManager,
+            FileIO fileIO,
+            SnapshotDeletion snapshotDeletion,
+            TagDeletion tagDeletion) {
+        this.snapshotManager = snapshotManager;
+        this.tagManager = tagManager;
+        this.fileIO = fileIO;
+        this.snapshotDeletion = snapshotDeletion;
+        this.tagDeletion = tagDeletion;
+    }
+
+    /** Clean snapshots and tags whose id is larger than given snapshot's. */
+    public void cleanLargerThan(Snapshot retainedSnapshot) {
+        // clean data files
+        List<Snapshot> cleanedSnapshots = 
cleanSnapshotsDataFiles(retainedSnapshot);
+        List<Snapshot> cleanedTags = cleanTagsDataFiles(retainedSnapshot);
+
+        // clean manifests
+        // this can be used for snapshots and tags manifests cleaning both
+        Set<String> manifestsSkippingSet = 
snapshotDeletion.manifestSkippingSet(retainedSnapshot);
+
+        for (Snapshot snapshot : cleanedSnapshots) {
+            snapshotDeletion.cleanUnusedManifests(snapshot, 
manifestsSkippingSet);
+        }
+
+        cleanedTags.removeAll(cleanedSnapshots);
+        for (Snapshot snapshot : cleanedTags) {
+            tagDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet);
+        }
+
+        // modify the latest hint
+        try {
+            snapshotManager.commitLatestHint(retainedSnapshot.id());
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private List<Snapshot> cleanSnapshotsDataFiles(Snapshot retainedSnapshot) {
+        long earliest =
+                checkNotNull(
+                        snapshotManager.earliestSnapshotId(), "Cannot find 
earliest snapshot.");
+        long latest =
+                checkNotNull(snapshotManager.latestSnapshotId(), "Cannot find 
latest snapshot.");
+
+        // delete snapshot files first, cannot be read now
+        // it is possible that some snapshots have been expired
+        List<Snapshot> toBeCleaned = new ArrayList<>();
+        long to = Math.max(earliest, retainedSnapshot.id() + 1);
+        for (long i = latest; i >= to; i--) {
+            toBeCleaned.add(snapshotManager.snapshot(i));
+            fileIO.deleteQuietly(snapshotManager.snapshotPath(i));
+        }
+
+        // delete data files of snapshots
+        // don't worry about tag data files because file deletion methods 
won't throw exception
+        // when deleting non-existing data files
+        for (Snapshot snapshot : toBeCleaned) {
+            
snapshotDeletion.deleteAddedDataFiles(snapshot.deltaManifestList());
+            
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
+        }
+
+        // delete directories
+        snapshotDeletion.cleanDataDirectories();
+
+        return toBeCleaned;
+    }
+
+    private List<Snapshot> cleanTagsDataFiles(Snapshot retainedSnapshot) {
+        SortedMap<Snapshot, String> tags = tagManager.tags();
+        if (tags.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        List<Snapshot> taggedSnapshots = new ArrayList<>(tags.keySet());
+        List<Snapshot> toBeCleaned = new ArrayList<>();
+
+        // delete tag files
+        for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
+            Snapshot tag = taggedSnapshots.get(i);
+            if (tag.id() <= retainedSnapshot.id()) {
+                break;
+            }
+            toBeCleaned.add(tag);
+            fileIO.deleteQuietly(tagManager.tagPath(tags.get(tag)));
+        }
+
+        // delete data files
+        Predicate<ManifestEntry> dataFileSkipper = 
tagDeletion.dataFileSkipper(retainedSnapshot);
+        for (Snapshot s : toBeCleaned) {
+            tagDeletion.cleanUnusedDataFiles(s, dataFileSkipper);
+        }
+
+        // delete directories
+        tagDeletion.cleanDataDirectories();
+
+        return toBeCleaned;
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index e5b5c26b2..c1b4b7882 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -75,6 +75,10 @@ public interface Table extends Serializable {
     @Experimental
     void deleteTag(String tagName);
 
+    /** Rollback table's state to a specific tag. */
+    @Experimental
+    void rollbackTo(String tagName);
+
     // =============== Read & Write Operations ==================
 
     /** Returns a new read builder. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index b28dab580..05a18d8da 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -20,23 +20,16 @@ package org.apache.paimon.utils;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.Snapshot.CommitKind;
-import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.operation.SnapshotDeletion;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BinaryOperator;
 import java.util.function.Function;
@@ -107,6 +100,11 @@ public class SnapshotManager implements Serializable {
         }
     }
 
+    public @Nullable Snapshot earliestSnapshot() {
+        Long snapshotId = earliestSnapshotId();
+        return snapshotId == null ? null : snapshot(snapshotId);
+    }
+
     public @Nullable Long earliestSnapshotId() {
         try {
             return findEarliest();
@@ -334,42 +332,4 @@ public class SnapshotManager implements Serializable {
         fileIO.delete(hintFile, false);
         fileIO.writeFileUtf8(hintFile, String.valueOf(snapshotId));
     }
-
-    public void rollbackTo(SnapshotDeletion deletion, long snapshotId) throws 
IOException {
-        if (!snapshotExists(snapshotId)) {
-            throw new IllegalArgumentException("Rollback snapshot not exist: " 
+ snapshotId);
-        }
-
-        Long latest = findLatest();
-        if (latest == null) {
-            return;
-        }
-
-        // first modify hint
-        commitLatestHint(snapshotId);
-
-        // delete snapshots first, cannot be read now.
-        List<Snapshot> snapshots = new ArrayList<>();
-        for (long i = latest; i > snapshotId; i--) {
-            snapshots.add(snapshot(i));
-            fileIO().deleteQuietly(snapshotPath(i));
-        }
-
-        // delete data files
-        Map<BinaryRow, Set<Integer>> deletionBuckets = new HashMap<>();
-        for (Snapshot snapshot : snapshots) {
-            // delete data files
-            deletion.deleteAddedDataFiles(snapshot.deltaManifestList(), 
deletionBuckets);
-            deletion.deleteAddedDataFiles(snapshot.changelogManifestList(), 
deletionBuckets);
-        }
-
-        // delete directories
-        deletion.tryDeleteDirectories(deletionBuckets);
-
-        // delete manifest files.
-        Set<String> manifestSkipped = 
deletion.collectManifestSkippingSet(snapshot(snapshotId));
-        for (Snapshot snapshot : snapshots) {
-            deletion.deleteManifestFiles(manifestSkipped, snapshot);
-        }
-    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index f6fc165d4..46e62b014 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -21,6 +21,8 @@ package org.apache.paimon.utils;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.TagDeletion;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -28,6 +30,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.function.Predicate;
 
 import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -77,6 +80,51 @@ public class TagManager {
         }
     }
 
+    public void deleteTag(
+            String tagName, TagDeletion tagDeletion, SnapshotManager 
snapshotManager) {
+        checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is 
blank.", tagName);
+        checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
+
+        Snapshot taggedSnapshot = taggedSnapshot(tagName);
+        List<Snapshot> taggedSnapshots;
+
+        // skip file deletion if snapshot exists
+        if (snapshotManager.snapshotExists(taggedSnapshot.id())) {
+            fileIO.deleteQuietly(tagPath(tagName));
+            return;
+        } else {
+            // FileIO discovers tags by tag file, so we should read all tags 
before we delete tag
+            taggedSnapshots = taggedSnapshots();
+            fileIO.deleteQuietly(tagPath(tagName));
+        }
+
+        // collect skipping sets from the left neighbor tag and the nearest 
right neighbor (either
+        // the earliest snapshot or right neighbor tag)
+        List<Snapshot> skippedSnapshots = new ArrayList<>();
+
+        int index = findIndex(taggedSnapshot, taggedSnapshots);
+        // the left neighbor tag
+        if (index - 1 >= 0) {
+            skippedSnapshots.add(taggedSnapshots.get(index - 1));
+        }
+        // the nearest right neighbor
+        Snapshot right = snapshotManager.earliestSnapshot();
+        if (index + 1 < taggedSnapshots.size()) {
+            Snapshot rightTag = taggedSnapshots.get(index + 1);
+            right = right.id() < rightTag.id() ? right : rightTag;
+        }
+        skippedSnapshots.add(right);
+
+        // delete data files and empty directories
+        Predicate<ManifestEntry> dataFileSkipper = 
tagDeletion.dataFileSkipper(skippedSnapshots);
+        tagDeletion.cleanUnusedDataFiles(taggedSnapshot, dataFileSkipper);
+        tagDeletion.cleanDataDirectories();
+
+        // delete manifests
+        tagDeletion.cleanUnusedManifests(
+                taggedSnapshot, 
tagDeletion.manifestSkippingSet(skippedSnapshots));
+    }
+
     /** Check if a tag exists. */
     public boolean tagExists(String tagName) {
         Path path = tagPath(tagName);
@@ -111,7 +159,6 @@ public class TagManager {
 
     /** Get all tagged snapshots with names sorted by snapshot id. */
     public SortedMap<Snapshot, String> tags() {
-
         TreeMap<Snapshot, String> tags = new 
TreeMap<>(Comparator.comparingLong(Snapshot::id));
         try {
             listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
@@ -127,4 +174,48 @@ public class TagManager {
         }
         return tags;
     }
+
+    private int findIndex(Snapshot taggedSnapshot, List<Snapshot> 
taggedSnapshots) {
+        for (int i = 0; i < taggedSnapshots.size(); i++) {
+            if (taggedSnapshot.id() == taggedSnapshots.get(i).id()) {
+                return i;
+            }
+        }
+        throw new RuntimeException(
+                String.format(
+                        "Didn't find tag with snapshot id '%s'.This is 
unexpected.",
+                        taggedSnapshot.id()));
+    }
+
+    public static List<Snapshot> findOverlappedSnapshots(
+            List<Snapshot> taggedSnapshots, long beginInclusive, long 
endExclusive) {
+        List<Snapshot> snapshots = new ArrayList<>();
+        int right = findPreviousTag(taggedSnapshots, endExclusive);
+        if (right >= 0) {
+            int left = Math.max(findPreviousOrEqualTag(taggedSnapshots, 
beginInclusive), 0);
+            for (int i = left; i <= right; i++) {
+                snapshots.add(taggedSnapshots.get(i));
+            }
+        }
+        return snapshots;
+    }
+
+    public static int findPreviousTag(List<Snapshot> taggedSnapshots, long 
targetSnapshotId) {
+        for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
+            if (taggedSnapshots.get(i).id() < targetSnapshotId) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    private static int findPreviousOrEqualTag(
+            List<Snapshot> taggedSnapshots, long targetSnapshotId) {
+        for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
+            if (taggedSnapshots.get(i).id() <= targetSnapshotId) {
+                return i;
+            }
+        }
+        return -1;
+    }
 }
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index b548827a4..6b88a838b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -40,7 +40,6 @@ import org.apache.paimon.operation.FileStoreExpireImpl;
 import org.apache.paimon.operation.FileStoreRead;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.operation.TagFileKeeper;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReaderIterator;
@@ -138,11 +137,7 @@ public class TestFileStore extends KeyValueFileStore {
                 millisRetained,
                 snapshotManager(),
                 newSnapshotDeletion(),
-                new TagFileKeeper(
-                        manifestListFactory().create(),
-                        manifestFileFactory().create(),
-                        new TagManager(fileIO, options.path()),
-                        options.scanManifestParallelism()));
+                new TagManager(fileIO, options.path()));
     }
 
     public List<Snapshot> commitData(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
index 20612d344..91eb0b01c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
@@ -36,7 +36,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -89,8 +88,7 @@ public class CleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
         ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 
0, 1, dataFile);
 
         // expire
-        expire.snapshotDeletion()
-                .doDeleteExpiredDataFiles(Arrays.asList(add, delete), new 
HashMap<>(), f -> false);
+        expire.snapshotDeletion().doCleanUnusedDataFile(Arrays.asList(add, 
delete), f -> false);
 
         // check
         assertThat(fileIO.exists(myDataFile)).isFalse();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 095a2b638..ede56ff14 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -399,7 +399,7 @@ public class FileDeletionTest {
             assertPathExists(fileIO, 
pathFactory.toManifestListPath(manifestListName));
         }
 
-        store.newTagDeletion().delete(tagManager.taggedSnapshot("tag1"));
+        tagManager.deleteTag("tag1", store.newTagDeletion(), snapshotManager);
 
         // check data files
         assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 0));
@@ -475,7 +475,7 @@ public class FileDeletionTest {
             assertPathExists(fileIO, 
pathFactory.toManifestListPath(manifestListName));
         }
 
-        store.newTagDeletion().delete(tagManager.taggedSnapshot("tag2"));
+        tagManager.deleteTag("tag2", store.newTagDeletion(), snapshotManager);
 
         // check data files
         assertPathExists(fileIO, pathFactory.bucketPath(partition, 0));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index aff34ee45..e3966f850 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -655,18 +655,15 @@ public abstract class FileStoreTableTestBase {
         write.close();
     }
 
+    // All tags are after the rollback snapshot
     @Test
-    public void testRollbackTo() throws Exception {
-        FileStoreTable table = createFileStoreTable();
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
+    public void testRollbackToSnapshotCase0() throws Exception {
+        int commitTimes = ThreadLocalRandom.current().nextInt(100) + 5;
+        FileStoreTable table = prepareRollbackTable(commitTimes);
 
-        int commitTimes = ThreadLocalRandom.current().nextInt(100) + 1;
-        for (int i = 0; i < commitTimes; i++) {
-            write.write(rowData(i, 10 * i, 100L * i));
-            commit.commit(i, write.prepareCommit(false, i));
-        }
-        write.close();
+        table.createTag("test1", commitTimes);
+        table.createTag("test2", commitTimes - 1);
+        table.createTag("test3", commitTimes - 2);
 
         table.rollbackTo(1);
         ReadBuilder readBuilder = table.newReadBuilder();
@@ -680,7 +677,7 @@ public abstract class FileStoreTableTestBase {
 
         List<java.nio.file.Path> files =
                 Files.walk(new 
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
-        assertThat(files.size()).isEqualTo(14);
+        assertThat(files.size()).isEqualTo(15);
         // table-path
         // table-path/snapshot
         // table-path/snapshot/LATEST
@@ -695,6 +692,151 @@ public abstract class FileStoreTableTestBase {
         // table-path/manifest/manifest-list-0
         // table-path/schema
         // table-path/schema/schema-0
+        // table-path/tag
+    }
+
+    // One tag is at the rollback snapshot and others are after it
+    @Test
+    public void testRollbackToSnapshotCase1() throws Exception {
+        int commitTimes = ThreadLocalRandom.current().nextInt(100) + 5;
+        FileStoreTable table = prepareRollbackTable(commitTimes);
+
+        table.createTag("test1", commitTimes);
+        table.createTag("test2", commitTimes - 1);
+        table.createTag("test3", 1);
+
+        table.rollbackTo(1);
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<String> result =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        BATCH_ROW_TO_STRING);
+        assertThat(result)
+                
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+        // read tag test3
+        table = 
table.copy(Collections.singletonMap(CoreOptions.SCAN_TAG_NAME.key(), "test3"));
+        readBuilder = table.newReadBuilder();
+        result =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        BATCH_ROW_TO_STRING);
+        assertThat(result)
+                
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+        List<java.nio.file.Path> files =
+                Files.walk(new 
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
+        assertThat(files.size()).isEqualTo(16);
+        // case 0 plus 1:
+        // table-path/tag/tag-test3
+    }
+
+    // One tag is before the rollback snapshot and others are after it
+    @Test
+    public void testRollbackToSnapshotCase2() throws Exception {
+        int commitTimes = ThreadLocalRandom.current().nextInt(100) + 5;
+        FileStoreTable table = prepareRollbackTable(commitTimes);
+
+        table.createTag("test1", commitTimes);
+        table.createTag("test2", commitTimes - 1);
+        table.createTag("test3", 1);
+
+        table.rollbackTo(2);
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<String> result =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        BATCH_ROW_TO_STRING);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "1|10|100|binary|varbinary|mapKey:mapVal|multiset");
+
+        // read tag test3
+        table = 
table.copy(Collections.singletonMap(CoreOptions.SCAN_TAG_NAME.key(), "test3"));
+        readBuilder = table.newReadBuilder();
+        result =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        BATCH_ROW_TO_STRING);
+        assertThat(result)
+                
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+        List<java.nio.file.Path> files =
+                Files.walk(new 
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
+        assertThat(files.size()).isEqualTo(23);
+        // case 0 plus 7:
+        // table-path/manifest/manifest-list-2
+        // table-path/manifest/manifest-list-3
+        // table-path/manifest/manifest-1
+        // table-path/snapshot/snapshot-2
+        // table-path/pt=1
+        // table-path/pt=1/bucket-0
+        // table-path/pt=1/bucket-0/data-0.orc
+    }
+
+    @ParameterizedTest(name = "expire snapshots = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testRollbackToTag(boolean expire) throws Exception {
+        int commitTimes = ThreadLocalRandom.current().nextInt(100) + 5;
+        FileStoreTable table = prepareRollbackTable(commitTimes);
+
+        table.createTag("test1", 1);
+        table.createTag("test2", commitTimes - 3);
+        table.createTag("test3", commitTimes - 1);
+
+        if (expire) {
+            // expire snapshots
+            Options options = new Options();
+            options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 5);
+            options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 5);
+            table.copy(options.toMap()).store().newExpire().expire();
+        }
+
+        table.rollbackTo("test1");
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<String> result =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        BATCH_ROW_TO_STRING);
+        assertThat(result)
+                
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+        // read tag test1
+        table = 
table.copy(Collections.singletonMap(CoreOptions.SCAN_TAG_NAME.key(), "test1"));
+        readBuilder = table.newReadBuilder();
+        result =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        BATCH_ROW_TO_STRING);
+        assertThat(result)
+                
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+        List<java.nio.file.Path> files =
+                Files.walk(new 
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
+        assertThat(files.size()).isEqualTo(16);
+        // rollback snapshot case 0 plus 1:
+        // table-path/tag/tag-test1
+    }
+
+    private FileStoreTable prepareRollbackTable(int commitTimes) throws 
Exception {
+        FileStoreTable table = createFileStoreTable();
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        for (int i = 0; i < commitTimes; i++) {
+            write.write(rowData(i, 10 * i, 100L * i));
+            commit.commit(i, write.prepareCommit(false, i));
+        }
+        write.close();
+
+        return table;
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
index 63730cbe1..3d7b06118 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
@@ -142,7 +142,7 @@ public class IndexFileExpireTableTest extends 
PrimaryKeyTableTestBase {
     }
 
     @Test
-    public void testIndexFileRollback() throws Exception {
+    public void testIndexFileRollbackSnapshot() throws Exception {
         prepareExpireTable();
 
         long indexFileSize = indexFileSize();
@@ -169,6 +169,28 @@ public class IndexFileExpireTableTest extends 
PrimaryKeyTableTestBase {
         assertThat(indexManifestSize()).isEqualTo(1);
     }
 
+    @Test
+    public void testIndexFileRollbackTag() throws Exception {
+        prepareExpireTable();
+
+        long indexFileSize = indexFileSize();
+        long indexManifestSize = indexManifestSize();
+
+        table.createTag("tag1", 1);
+        table.createTag("tag5", 5);
+        table.createTag("tag7", 7);
+
+        table.rollbackTo(5);
+        checkIndexFiles(5);
+        assertThat(indexFileSize()).isEqualTo(indexFileSize - 2);
+        assertThat(indexManifestSize()).isEqualTo(indexManifestSize - 2);
+
+        table.rollbackTo("tag1");
+        checkIndexFiles(1);
+        assertThat(indexFileSize()).isEqualTo(3);
+        assertThat(indexManifestSize()).isEqualTo(1);
+    }
+
     private void prepareExpireTable() throws Exception {
         StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
         StreamTableWrite write = writeBuilder.newWrite();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java
index 09e2ac41f..f9c4fb7c2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java
@@ -32,21 +32,21 @@ import static 
org.apache.paimon.flink.action.Action.checkRequiredArgument;
 import static org.apache.paimon.flink.action.Action.getTablePath;
 import static org.apache.paimon.flink.action.Action.optionalConfigMap;
 
-/** Rollback to specific snapshot action for Flink. */
+/** Rollback to specific version action for Flink. */
 public class RollbackToAction extends TableActionBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(RollbackToAction.class);
 
-    private final long snapshotId;
+    private final String version;
 
     public RollbackToAction(
             String warehouse,
             String databaseName,
             String tableName,
-            long snapshotId,
+            String version,
             Map<String, String> catalogConfig) {
         super(warehouse, databaseName, tableName, catalogConfig);
-        this.snapshotId = snapshotId;
+        this.version = version;
     }
 
     public static Optional<Action> create(String[] args) {
@@ -61,41 +61,44 @@ public class RollbackToAction extends TableActionBase {
 
         Tuple3<String, String, String> tablePath = getTablePath(params);
 
-        checkRequiredArgument(params, "snapshot");
-        String snapshot = params.get("snapshot");
+        checkRequiredArgument(params, "version");
+        String version = params.get("version");
 
         Map<String, String> catalogConfig = optionalConfigMap(params, 
"catalog-conf");
 
         RollbackToAction action =
                 new RollbackToAction(
-                        tablePath.f0,
-                        tablePath.f1,
-                        tablePath.f2,
-                        Long.parseLong(snapshot),
-                        catalogConfig);
+                        tablePath.f0, tablePath.f1, tablePath.f2, version, 
catalogConfig);
 
         return Optional.of(action);
     }
 
     private static void printHelp() {
-        System.out.println("Action \"rollback-to\" roll back a table to a 
specific snapshot ID.");
+        System.out.println(
+                "Action \"rollback-to\" roll back a table to a specific 
snapshot ID or tag.");
         System.out.println();
 
         System.out.println("Syntax:");
         System.out.println(
                 "  rollback-to --warehouse <warehouse-path> --database 
<database-name> "
-                        + "--table <table-name> --snapshot <snapshot_spec>");
+                        + "--table <table-name> --version <version-string>");
+        System.out.println(
+                "  'version-string can be a long value representing a snapshot 
ID or a tag name.'");
         System.out.println();
     }
 
     @Override
     public void run() throws Exception {
-        LOG.debug("Run rollback-to action with snapshot id '{}'.", snapshotId);
+        LOG.debug("Run rollback-to action with snapshot id '{}'.", version);
 
         if (!(table instanceof DataTable)) {
             throw new IllegalArgumentException("Unknown table: " + identifier);
         }
 
-        table.rollbackTo(snapshotId);
+        if (version.chars().allMatch(Character::isDigit)) {
+            table.rollbackTo(Long.parseLong(version));
+        } else {
+            table.rollbackTo(version);
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
index 6413dce3e..38555d47d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
@@ -49,7 +49,7 @@ public class RollbackToActionITCase extends ActionITCaseBase {
     }
 
     @Test
-    public void test() throws Exception {
+    public void rollbackToSnapshotTest() throws Exception {
         FileStoreTable table =
                 createFileStoreTable(
                         ROW_TYPE,
@@ -67,11 +67,42 @@ public class RollbackToActionITCase extends 
ActionITCaseBase {
         writeData(rowData(2L, BinaryString.fromString("Flink")));
 
         RollbackToAction action =
-                new RollbackToAction(warehouse, database, tableName, 2, 
Collections.emptyMap());
+                new RollbackToAction(warehouse, database, tableName, "2", 
Collections.emptyMap());
         action.run();
 
         testBatchRead(
                 "SELECT * FROM `" + tableName + "`",
                 Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "Hello")));
     }
+
+    @Test
+    public void rollbackToTagTest() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        ROW_TYPE,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        Collections.emptyMap());
+        snapshotManager = table.snapshotManager();
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        writeData(rowData(1L, BinaryString.fromString("Hi")));
+        writeData(rowData(2L, BinaryString.fromString("Apache")));
+        writeData(rowData(2L, BinaryString.fromString("Paimon")));
+
+        table.createTag("tag1", 1);
+        table.createTag("tag2", 2);
+        table.createTag("tag3", 3);
+
+        RollbackToAction action =
+                new RollbackToAction(
+                        warehouse, database, tableName, "tag2", 
Collections.emptyMap());
+        action.run();
+
+        testBatchRead(
+                "SELECT * FROM `" + tableName + "`",
+                Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "Apache")));
+    }
 }


Reply via email to