This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0947e150ac [core] Support rollback to tag from snapshot id (#5756)
0947e150ac is described below
commit 0947e150ac8ae1d71207cf2f5ab562169a19691d
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 17 13:58:27 2025 +0800
[core] Support rollback to tag from snapshot id (#5756)
---
.../paimon/table/AbstractFileStoreTable.java | 16 ++++++++----
.../apache/paimon/table/SimpleTableTestBase.java | 29 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 5 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index f011b74af3..eea2b0ca2c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -85,8 +85,6 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
private static final long serialVersionUID = 1L;
- private static final String WATERMARK_PREFIX = "watermark-";
-
protected final FileIO fileIO;
protected final Path path;
protected final TableSchema tableSchema;
@@ -502,14 +500,22 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
try {
snapshotManager.rollback(Instant.snapshot(snapshotId));
} catch (UnsupportedOperationException e) {
- Snapshot snapshot;
try {
- snapshot = snapshotManager.tryGetSnapshot(snapshotId);
+ Snapshot snapshot = snapshotManager.tryGetSnapshot(snapshotId);
+ rollbackHelper().cleanLargerThan(snapshot);
} catch (FileNotFoundException ex) {
+ // try to get snapshot from tag
+ TagManager tagManager = tagManager();
+ SortedMap<Snapshot, List<String>> tags = tagManager.tags();
+ for (Map.Entry<Snapshot, List<String>> entry :
tags.entrySet()) {
+ if (entry.getKey().id() == snapshotId) {
+ rollbackTo(entry.getValue().get(0));
+ return;
+ }
+ }
throw new IllegalArgumentException(
String.format("Rollback snapshot '%s' doesn't exist.",
snapshotId), ex);
}
- rollbackHelper().cleanLargerThan(snapshot);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index f837513d50..2fd84c43e8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -931,6 +931,35 @@ public abstract class SimpleTableTestBase {
assertRollbackTo(table, singletonList(1L), 1, 1,
singletonList("test1"));
}
+ @Test
+ public void testRollbackToTagFromSnapshotId() throws Exception {
+ int commitTimes = ThreadLocalRandom.current().nextInt(100) + 5;
+ FileStoreTable table = prepareRollbackTable(commitTimes);
+
+ table.createTag("test", 1);
+
+ // expire snapshots
+ Options options = new Options();
+ options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 5);
+ options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 5);
+ options.set(SNAPSHOT_EXPIRE_LIMIT, Integer.MAX_VALUE);
+ options.set(CHANGELOG_NUM_RETAINED_MIN, 5);
+ options.set(CHANGELOG_NUM_RETAINED_MAX, 5);
+ table.copy(options.toMap()).newCommit("").expireSnapshots();
+
+ table.rollbackTo(1);
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<String> result =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ BATCH_ROW_TO_STRING);
+ assertThat(result)
+
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+ assertRollbackTo(table, singletonList(1L), 1, 1,
singletonList("test"));
+ }
+
private FileStoreTable prepareRollbackTable(int commitTimes) throws
Exception {
FileStoreTable table = createFileStoreTable();
return prepareRollbackTable(commitTimes, table);