This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ca5ac6543 [core] Simplify rollback to only clean snapshots (#5712)
3ca5ac6543 is described below

commit 3ca5ac654356642a1a2bf19badea4ae6bceb4e53
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 9 09:49:19 2025 +0800

    [core] Simplify rollback to only clean snapshots (#5712)
---
 .../apache/paimon/operation/ChangelogDeletion.java |   1 +
 .../org/apache/paimon/operation/TagDeletion.java   |   5 -
 .../paimon/table/AbstractFileStoreTable.java       |  71 +++-------
 .../org/apache/paimon/table/RollbackHelper.java    | 154 +++++----------------
 .../org/apache/paimon/utils/SnapshotManager.java   |  16 ++-
 .../paimon/table/IndexFileExpireTableTest.java     |  12 --
 .../paimon/table/PrimaryKeySimpleTableTest.java    |  15 +-
 .../apache/paimon/table/SimpleTableTestBase.java   |  59 ++++----
 8 files changed, 102 insertions(+), 231 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
index 069e57bb3d..348d2da6bd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
@@ -87,6 +87,7 @@ public class ChangelogDeletion extends 
FileDeletionBase<Changelog> {
         // the index and statics manifest list should handle by snapshot 
deletion.
     }
 
+    @Override
     public Set<String> manifestSkippingSet(List<Snapshot> skippingSnapshots) {
         Set<String> skippingSet = new HashSet<>();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index 8bf9345643..569a1464cc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -106,10 +105,6 @@ public class TagDeletion extends 
FileDeletionBase<Snapshot> {
         cleanUnusedManifests(taggedSnapshot, skippingSet, true, false);
     }
 
-    public Predicate<ExpireFileEntry> dataFileSkipper(Snapshot fromSnapshot) 
throws Exception {
-        return dataFileSkipper(Collections.singletonList(fromSnapshot));
-    }
-
     public Predicate<ExpireFileEntry> dataFileSkipper(List<Snapshot> 
fromSnapshots)
             throws Exception {
         Map<BinaryRow, Map<Integer, Set<String>>> skipped = new HashMap<>();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 9b3b3f8c7e..f011b74af3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -68,8 +68,7 @@ import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cach
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
-import java.io.UncheckedIOException;
+import java.io.FileNotFoundException;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
@@ -80,7 +79,6 @@ import java.util.SortedMap;
 import java.util.function.BiConsumer;
 
 import static org.apache.paimon.CoreOptions.PATH;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Abstract {@link FileStoreTable}. */
 abstract class AbstractFileStoreTable implements FileStoreTable {
@@ -504,11 +502,27 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         try {
             snapshotManager.rollback(Instant.snapshot(snapshotId));
         } catch (UnsupportedOperationException e) {
-            checkArgument(
-                    snapshotManager.snapshotExists(snapshotId),
-                    "Rollback snapshot '%s' doesn't exist.",
-                    snapshotId);
-            
rollbackHelper().updateLatestAndCleanLargerThan(snapshotManager.snapshot(snapshotId));
+            Snapshot snapshot;
+            try {
+                snapshot = snapshotManager.tryGetSnapshot(snapshotId);
+            } catch (FileNotFoundException ex) {
+                throw new IllegalArgumentException(
+                        String.format("Rollback snapshot '%s' doesn't exist.", 
snapshotId), ex);
+            }
+            rollbackHelper().cleanLargerThan(snapshot);
+        }
+    }
+
+    @Override
+    public void rollbackTo(String tagName) {
+        SnapshotManager snapshotManager = snapshotManager();
+        try {
+            snapshotManager.rollback(Instant.tag(tagName));
+        } catch (UnsupportedOperationException e) {
+            Snapshot taggedSnapshot = 
tagManager().getOrThrow(tagName).trimToSnapshot();
+            RollbackHelper rollbackHelper = rollbackHelper();
+            rollbackHelper.cleanLargerThan(taggedSnapshot);
+            rollbackHelper.createSnapshotFileIfNeeded(taggedSnapshot);
         }
     }
 
@@ -644,38 +658,6 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         branchManager().fastForward(branchName);
     }
 
-    @Override
-    public void rollbackTo(String tagName) {
-        SnapshotManager snapshotManager = snapshotManager();
-        try {
-            snapshotManager.rollback(Instant.tag(tagName));
-            return;
-        } catch (UnsupportedOperationException ignore) {
-
-        }
-        TagManager tagManager = tagManager();
-        checkArgument(tagManager.tagExists(tagName), "Rollback tag '%s' 
doesn't exist.", tagName);
-
-        Snapshot taggedSnapshot = 
tagManager.getOrThrow(tagName).trimToSnapshot();
-        rollbackHelper().updateLatestAndCleanLargerThan(taggedSnapshot);
-
-        try {
-            // it is possible that the earliest snapshot is later than the 
rollback tag because of
-            // snapshot expiration, in this case the `cleanLargerThan` method 
will delete all
-            // snapshots, so we should write the tag file to snapshot 
directory and modify the
-            // earliest hint
-            if (!snapshotManager.snapshotExists(taggedSnapshot.id())) {
-                fileIO.writeFile(
-                        snapshotManager().snapshotPath(taggedSnapshot.id()),
-                        fileIO.readFileUtf8(tagManager.tagPath(tagName)),
-                        false);
-                snapshotManager.commitEarliestHint(taggedSnapshot.id());
-            }
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
     @Override
     public TagManager tagManager() {
         return new TagManager(fileIO, path, currentBranch());
@@ -713,14 +695,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     }
 
     private RollbackHelper rollbackHelper() {
-        return new RollbackHelper(
-                snapshotManager(),
-                changelogManager(),
-                tagManager(),
-                fileIO,
-                store().newSnapshotDeletion(),
-                store().newChangelogDeletion(),
-                store().newTagDeletion());
+        return new RollbackHelper(snapshotManager(), changelogManager(), 
tagManager(), fileIO);
     }
 
     protected RowKindGenerator rowKindGenerator() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java 
b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
index ea1886bda0..21f30dd998 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
@@ -21,90 +21,67 @@ package org.apache.paimon.table;
 import org.apache.paimon.Changelog;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.manifest.ExpireFileEntry;
-import org.apache.paimon.operation.ChangelogDeletion;
-import org.apache.paimon.operation.SnapshotDeletion;
-import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.SortedMap;
-import java.util.function.Predicate;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Helper class for {@link Table#rollbackTo} including utils to clean 
snapshots. */
 public class RollbackHelper {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(RollbackHelper.class);
-
     private final SnapshotManager snapshotManager;
     private final ChangelogManager changelogManager;
     private final TagManager tagManager;
     private final FileIO fileIO;
-    private final SnapshotDeletion snapshotDeletion;
-    private final ChangelogDeletion changelogDeletion;
-    private final TagDeletion tagDeletion;
 
     public RollbackHelper(
             SnapshotManager snapshotManager,
             ChangelogManager changelogManager,
             TagManager tagManager,
-            FileIO fileIO,
-            SnapshotDeletion snapshotDeletion,
-            ChangelogDeletion changelogDeletion,
-            TagDeletion tagDeletion) {
+            FileIO fileIO) {
         this.snapshotManager = snapshotManager;
         this.changelogManager = changelogManager;
         this.tagManager = tagManager;
         this.fileIO = fileIO;
-        this.snapshotDeletion = snapshotDeletion;
-        this.changelogDeletion = changelogDeletion;
-        this.tagDeletion = tagDeletion;
     }
 
     /** Clean snapshots and tags whose id is larger than given snapshot's and 
update latest hit. */
-    public void updateLatestAndCleanLargerThan(Snapshot retainedSnapshot) {
-        // clean data files
-        List<Snapshot> cleanedSnapshots = 
updateLatestAndCleanSnapshotsDataFiles(retainedSnapshot);
-        List<Changelog> cleanedChangelogs = 
cleanLongLivedChangelogDataFiles(retainedSnapshot);
-        List<Snapshot> cleanedTags = cleanTagsDataFiles(retainedSnapshot);
-        Set<Long> cleanedIds = new HashSet<>();
-
-        // clean manifests
-        // this can be used for snapshots and tags manifests cleaning both
-        Set<String> manifestsSkippingSet = 
snapshotDeletion.manifestSkippingSet(retainedSnapshot);
-
-        for (Snapshot snapshot : cleanedSnapshots) {
-            snapshotDeletion.cleanUnusedManifests(snapshot, 
manifestsSkippingSet);
-            cleanedIds.add(snapshot.id());
-        }
-
-        for (Changelog changelog : cleanedChangelogs) {
-            changelogDeletion.cleanUnusedManifests(changelog, 
manifestsSkippingSet);
-            cleanedIds.add(changelog.id());
+    public void cleanLargerThan(Snapshot retainedSnapshot) {
+        try {
+            cleanSnapshots(retainedSnapshot);
+            cleanLongLivedChangelogs(retainedSnapshot);
+            cleanTags(retainedSnapshot);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
+    }
 
-        for (Snapshot snapshot : cleanedTags) {
-            if (cleanedIds.contains(snapshot.id())) {
-                continue;
+    public void createSnapshotFileIfNeeded(Snapshot taggedSnapshot) {
+        // it is possible that the earliest snapshot is later than the 
rollback tag because of
+        // snapshot expiration, in this case the `cleanLargerThan` method will 
delete all
+        // snapshots, so we should write the tag file to snapshot directory 
and modify the
+        // earliest hint
+        if (!snapshotManager.snapshotExists(taggedSnapshot.id())) {
+            try {
+                fileIO.writeFile(
+                        snapshotManager.snapshotPath(taggedSnapshot.id()),
+                        taggedSnapshot.toJson(),
+                        false);
+                snapshotManager.commitEarliestHint(taggedSnapshot.id());
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
             }
-            tagDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet);
         }
     }
 
-    private List<Snapshot> updateLatestAndCleanSnapshotsDataFiles(Snapshot 
retainedSnapshot) {
+    private void cleanSnapshots(Snapshot retainedSnapshot) throws IOException {
         long earliest =
                 checkNotNull(
                         snapshotManager.earliestSnapshotId(), "Cannot find 
earliest snapshot.");
@@ -112,44 +89,23 @@ public class RollbackHelper {
                 checkNotNull(snapshotManager.latestSnapshotId(), "Cannot find 
latest snapshot.");
 
         // modify the latest hint
-        try {
-            snapshotManager.commitLatestHint(retainedSnapshot.id());
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-        // delete snapshot files first, cannot be read now
+        snapshotManager.commitLatestHint(retainedSnapshot.id());
+
         // it is possible that some snapshots have been expired
-        List<Snapshot> toBeCleaned = new ArrayList<>();
         long to = Math.max(earliest, retainedSnapshot.id() + 1);
         for (long i = latest; i >= to; i--) {
             // Ignore the non-existent snapshots
             if (snapshotManager.snapshotExists(i)) {
-                toBeCleaned.add(snapshotManager.snapshot(i));
                 snapshotManager.deleteSnapshot(i);
             }
         }
-
-        // delete data files of snapshots
-        // don't worry about tag data files because file deletion methods 
won't throw exception
-        // when deleting non-existing data files
-        for (Snapshot snapshot : toBeCleaned) {
-            
snapshotDeletion.deleteAddedDataFiles(snapshot.deltaManifestList());
-            if (snapshot.changelogManifestList() != null) {
-                
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
-            }
-        }
-
-        // delete directories
-        snapshotDeletion.cleanEmptyDirectories();
-
-        return toBeCleaned;
     }
 
-    private List<Changelog> cleanLongLivedChangelogDataFiles(Snapshot 
retainedSnapshot) {
+    private void cleanLongLivedChangelogs(Snapshot retainedSnapshot) throws 
IOException {
         Long earliest = changelogManager.earliestLongLivedChangelogId();
         Long latest = changelogManager.latestLongLivedChangelogId();
         if (earliest == null || latest == null) {
-            return Collections.emptyList();
+            return;
         }
 
         // it is possible that some snapshots have been expired
@@ -160,42 +116,29 @@ public class RollbackHelper {
         }
 
         // modify the latest hint
-        try {
-            if (toBeCleaned.size() > 0) {
-                if (to == earliest) {
-                    // all changelog has been cleaned, so we do not know the 
actual latest id
-                    // set to -1
-                    changelogManager.commitLongLivedChangelogLatestHint(-1);
-                } else {
-                    changelogManager.commitLongLivedChangelogLatestHint(to - 
1);
-                }
+        if (!toBeCleaned.isEmpty()) {
+            if (to == earliest) {
+                // all changelog has been cleaned, so we do not know the 
actual latest id
+                // set to -1
+                changelogManager.commitLongLivedChangelogLatestHint(-1);
+            } else {
+                changelogManager.commitLongLivedChangelogLatestHint(to - 1);
             }
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
         }
 
         // delete data files of changelog
         for (Changelog changelog : toBeCleaned) {
-            // delete changelog files first, cannot be read now
             
fileIO.deleteQuietly(changelogManager.longLivedChangelogPath(changelog.id()));
-            // clean the deleted file
-            changelogDeletion.cleanUnusedDataFiles(changelog, manifestEntry -> 
false);
         }
-
-        // delete directories
-        snapshotDeletion.cleanEmptyDirectories();
-
-        return toBeCleaned;
     }
 
-    private List<Snapshot> cleanTagsDataFiles(Snapshot retainedSnapshot) {
+    private void cleanTags(Snapshot retainedSnapshot) {
         SortedMap<Snapshot, List<String>> tags = tagManager.tags();
         if (tags.isEmpty()) {
-            return Collections.emptyList();
+            return;
         }
 
         List<Snapshot> taggedSnapshots = new ArrayList<>(tags.keySet());
-        List<Snapshot> toBeCleaned = new ArrayList<>();
 
         // delete tag files
         for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
@@ -203,30 +146,7 @@ public class RollbackHelper {
             if (tag.id() <= retainedSnapshot.id()) {
                 break;
             }
-            toBeCleaned.add(tag);
             tags.get(tag).forEach(tagName -> 
fileIO.deleteQuietly(tagManager.tagPath(tagName)));
         }
-
-        // delete data files
-        Predicate<ExpireFileEntry> dataFileSkipper = null;
-        boolean success = true;
-        try {
-            dataFileSkipper = tagDeletion.dataFileSkipper(retainedSnapshot);
-        } catch (Exception e) {
-            LOG.info(
-                    "Skip cleaning data files for deleted tags due to failed 
to build skipping set.",
-                    e);
-            success = false;
-        }
-
-        if (success) {
-            for (Snapshot s : toBeCleaned) {
-                tagDeletion.cleanUnusedDataFiles(s, dataFileSkipper);
-            }
-            // delete directories
-            tagDeletion.cleanEmptyDirectories();
-        }
-
-        return toBeCleaned;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index bc28226119..63d32e287b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -49,6 +49,7 @@ import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
+import java.util.stream.Stream;
 
 import static org.apache.paimon.utils.BranchManager.branchPath;
 import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
@@ -473,23 +474,27 @@ public class SnapshotManager implements Serializable {
     }
 
     public long snapshotCount() throws IOException {
-        return listVersionedFiles(fileIO, snapshotDirectory(), 
SNAPSHOT_PREFIX).count();
+        return snapshotIdStream().count();
     }
 
     public Iterator<Snapshot> snapshots() throws IOException {
-        return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX)
+        return snapshotIdStream()
                 .map(this::snapshot)
                 .sorted(Comparator.comparingLong(Snapshot::id))
                 .iterator();
     }
 
     public List<Path> snapshotPaths(Predicate<Long> predicate) throws 
IOException {
-        return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX)
+        return snapshotIdStream()
                 .filter(predicate)
                 .map(this::snapshotPath)
                 .collect(Collectors.toList());
     }
 
+    public Stream<Long> snapshotIdStream() throws IOException {
+        return listVersionedFiles(fileIO, snapshotDirectory(), 
SNAPSHOT_PREFIX);
+    }
+
     public Iterator<Snapshot> snapshotsWithId(List<Long> snapshotIds) {
         return snapshotIds.stream()
                 .map(this::snapshot)
@@ -543,10 +548,7 @@ public class SnapshotManager implements Serializable {
      * be deleted by other processes, so just skip this snapshot.
      */
     public List<Snapshot> safelyGetAllSnapshots() throws IOException {
-        List<Path> paths =
-                listVersionedFiles(fileIO, snapshotDirectory(), 
SNAPSHOT_PREFIX)
-                        .map(this::snapshotPath)
-                        .collect(Collectors.toList());
+        List<Path> paths = 
snapshotIdStream().map(this::snapshotPath).collect(Collectors.toList());
 
         List<Snapshot> snapshots = Collections.synchronizedList(new 
ArrayList<>(paths.size()));
         collectSnapshots(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
index 3040e11175..d9e58e245c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
@@ -149,23 +149,15 @@ public class IndexFileExpireTableTest extends 
PrimaryKeyTableTestBase {
 
         table.rollbackTo(5);
         checkIndexFiles(5);
-        assertThat(indexFileSize()).isEqualTo(indexFileSize - 2);
-        assertThat(indexManifestSize()).isEqualTo(indexManifestSize - 2);
 
         table.rollbackTo(3);
         checkIndexFiles(3);
-        assertThat(indexFileSize()).isEqualTo(indexFileSize - 3);
-        assertThat(indexManifestSize()).isEqualTo(indexManifestSize - 3);
 
         table.rollbackTo(2);
         checkIndexFiles(2);
-        assertThat(indexFileSize()).isEqualTo(indexFileSize - 3);
-        assertThat(indexManifestSize()).isEqualTo(indexManifestSize - 3);
 
         table.rollbackTo(1);
         checkIndexFiles(1);
-        assertThat(indexFileSize()).isEqualTo(3);
-        assertThat(indexManifestSize()).isEqualTo(1);
     }
 
     @Test
@@ -181,13 +173,9 @@ public class IndexFileExpireTableTest extends 
PrimaryKeyTableTestBase {
 
         table.rollbackTo(5);
         checkIndexFiles(5);
-        assertThat(indexFileSize()).isEqualTo(indexFileSize - 2);
-        assertThat(indexManifestSize()).isEqualTo(indexManifestSize - 2);
 
         table.rollbackTo("tag1");
         checkIndexFiles(1);
-        assertThat(indexFileSize()).isEqualTo(3);
-        assertThat(indexManifestSize()).isEqualTo(1);
     }
 
     protected void prepareExpireTable() throws Exception {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 76dac80214..0bf4971f94 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -71,6 +71,7 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.Pair;
 
 import org.apache.parquet.hadoop.ParquetOutputFormat;
@@ -99,6 +100,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.CHANGELOG_FILE_FORMAT;
@@ -2245,15 +2247,10 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
 
         table.rollbackTo("test1");
 
-        List<java.nio.file.Path> files =
-                Files.walk(new File(tablePath.toUri().getPath()).toPath())
-                        .collect(Collectors.toList());
-        assertThat(files.size()).isEqualTo(19);
-        // rollback snapshot case testRollbackToSnapshotCase0 plus 4:
-        // table-path/tag/tag-test1
-        // table-path/changelog
-        // table-path/changelog/LATEST
-        // table-path/changelog/EARLIEST
+        assertRollbackTo(table, singletonList(1L), 1, 1, 
singletonList("test1"));
+        ChangelogManager changelogManager = table.changelogManager();
+        assertThat(changelogManager.earliestLongLivedChangelogId()).isNull();
+        assertThat(changelogManager.latestLongLivedChangelogId()).isNull();
     }
 
     @ParameterizedTest
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index 27ac1efecd..f837513d50 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -79,6 +79,7 @@ import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -95,6 +96,8 @@ import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.BUCKET_KEY;
 import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MAX;
@@ -291,12 +294,10 @@ public abstract class SimpleTableTestBase {
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
-                        Collections.singletonList(
-                                
"1|10|100|binary|varbinary|mapKey:mapVal|multiset"));
+                        
singletonList("1|10|100|binary|varbinary|mapKey:mapVal|multiset"));
         assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
-                        Collections.singletonList(
-                                
"2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
+                        
singletonList("2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -789,8 +790,7 @@ public abstract class SimpleTableTestBase {
         assertThat(result)
                 
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
 
-        FileStatus[] files = table.fileIO().listFiles(table.location(), true);
-        assertThat(files).hasSize(8);
+        assertRollbackTo(table, singletonList(1L), 1, 1, emptyList());
     }
 
     // All tags are after the rollback snapshot
@@ -813,16 +813,7 @@ public abstract class SimpleTableTestBase {
         assertThat(result)
                 
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
 
-        FileStatus[] files = table.fileIO().listFiles(table.location(), true);
-        assertThat(files).hasSize(8);
-        // table-path/snapshot/LATEST
-        // table-path/snapshot/EARLIEST
-        // table-path/snapshot/snapshot-1
-        // table-path/pt=0/bucket-0/data-0.orc
-        // table-path/manifest/manifest-list-1
-        // table-path/manifest/manifest-0
-        // table-path/manifest/manifest-list-0
-        // table-path/schema/schema-0
+        assertRollbackTo(table, singletonList(1L), 1, 1, emptyList());
     }
 
     // One tag is at the rollback snapshot and others are after it
@@ -856,10 +847,7 @@ public abstract class SimpleTableTestBase {
         assertThat(result)
                 
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
 
-        FileStatus[] files = table.fileIO().listFiles(table.location(), true);
-        assertThat(files).hasSize(9);
-        // case 0 plus 1:
-        // table-path/tag/tag-test3
+        assertRollbackTo(table, singletonList(1L), 1, 1, 
singletonList("test3"));
     }
 
     // One tag is before the rollback snapshot and others are after it
@@ -895,15 +883,7 @@ public abstract class SimpleTableTestBase {
         assertThat(result)
                 
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
 
-        FileStatus[] files = table.fileIO().listFiles(table.location(), true);
-        assertThat(files).hasSize(14);
-        // case 0 plus 6:
-        // table-path/manifest/manifest-list-2
-        // table-path/manifest/manifest-list-3
-        // table-path/manifest/manifest-1
-        // table-path/snapshot/snapshot-2
-        // table-path/tag/tag-test3
-        // table-path/pt=1/bucket-0/data-0.orc
+        assertRollbackTo(table, Arrays.asList(1L, 2L), 1, 2, 
singletonList("test3"));
     }
 
     @ParameterizedTest(name = "expire snapshots = {0}")
@@ -948,10 +928,7 @@ public abstract class SimpleTableTestBase {
         assertThat(result)
                 
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
 
-        FileStatus[] files = table.fileIO().listFiles(table.location(), true);
-        assertThat(files).hasSize(9);
-        // rollback snapshot case 0 plus 1:
-        // table-path/tag/tag-test1
+        assertRollbackTo(table, singletonList(1L), 1, 1, 
singletonList("test1"));
     }
 
     private FileStoreTable prepareRollbackTable(int commitTimes) throws 
Exception {
@@ -1762,4 +1739,20 @@ public abstract class SimpleTableTestBase {
                                 BATCH_ROW_TO_STRING))
                 
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
     }
+
+    protected void assertRollbackTo(
+            FileStoreTable table,
+            List<Long> expectedSnapshots,
+            long expectedEarliest,
+            long expectedLatest,
+            List<String> expectedTags)
+            throws IOException {
+        SnapshotManager snapshotManager = table.snapshotManager();
+        List<Long> snapshots = 
snapshotManager.snapshotIdStream().collect(Collectors.toList());
+        
assertThat(snapshots).containsExactlyInAnyOrderElementsOf(expectedSnapshots);
+        
assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(expectedEarliest);
+        
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(expectedLatest);
+        assertThat(table.tagManager().allTagNames())
+                .containsExactlyInAnyOrderElementsOf(expectedTags);
+    }
 }

Reply via email to