This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0947e150ac [core] Support rollback to tag from snapshot id (#5756)
0947e150ac is described below

commit 0947e150ac8ae1d71207cf2f5ab562169a19691d
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 17 13:58:27 2025 +0800

    [core] Support rollback to tag from snapshot id (#5756)
---
 .../paimon/table/AbstractFileStoreTable.java       | 16 ++++++++----
 .../apache/paimon/table/SimpleTableTestBase.java   | 29 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 5 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index f011b74af3..eea2b0ca2c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -85,8 +85,6 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     private static final long serialVersionUID = 1L;
 
-    private static final String WATERMARK_PREFIX = "watermark-";
-
     protected final FileIO fileIO;
     protected final Path path;
     protected final TableSchema tableSchema;
@@ -502,14 +500,22 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         try {
             snapshotManager.rollback(Instant.snapshot(snapshotId));
         } catch (UnsupportedOperationException e) {
-            Snapshot snapshot;
             try {
-                snapshot = snapshotManager.tryGetSnapshot(snapshotId);
+                Snapshot snapshot = snapshotManager.tryGetSnapshot(snapshotId);
+                rollbackHelper().cleanLargerThan(snapshot);
             } catch (FileNotFoundException ex) {
+                // try to get snapshot from tag
+                TagManager tagManager = tagManager();
+                SortedMap<Snapshot, List<String>> tags = tagManager.tags();
+                for (Map.Entry<Snapshot, List<String>> entry : 
tags.entrySet()) {
+                    if (entry.getKey().id() == snapshotId) {
+                        rollbackTo(entry.getValue().get(0));
+                        return;
+                    }
+                }
                 throw new IllegalArgumentException(
                         String.format("Rollback snapshot '%s' doesn't exist.", 
snapshotId), ex);
             }
-            rollbackHelper().cleanLargerThan(snapshot);
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index f837513d50..2fd84c43e8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -931,6 +931,35 @@ public abstract class SimpleTableTestBase {
         assertRollbackTo(table, singletonList(1L), 1, 1, 
singletonList("test1"));
     }
 
+    @Test
+    public void testRollbackToTagFromSnapshotId() throws Exception {
+        int commitTimes = ThreadLocalRandom.current().nextInt(100) + 5;
+        FileStoreTable table = prepareRollbackTable(commitTimes);
+
+        table.createTag("test", 1);
+
+        // expire snapshots
+        Options options = new Options();
+        options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 5);
+        options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 5);
+        options.set(SNAPSHOT_EXPIRE_LIMIT, Integer.MAX_VALUE);
+        options.set(CHANGELOG_NUM_RETAINED_MIN, 5);
+        options.set(CHANGELOG_NUM_RETAINED_MAX, 5);
+        table.copy(options.toMap()).newCommit("").expireSnapshots();
+
+        table.rollbackTo(1);
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<String> result =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        BATCH_ROW_TO_STRING);
+        assertThat(result)
+                
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+        assertRollbackTo(table, singletonList(1L), 1, 1, 
singletonList("test"));
+    }
+
     private FileStoreTable prepareRollbackTable(int commitTimes) throws 
Exception {
         FileStoreTable table = createFileStoreTable();
         return prepareRollbackTable(commitTimes, table);

Reply via email to