This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3ca5ac6543 [core] Simplify rollback to only clean snapshots (#5712)
3ca5ac6543 is described below
commit 3ca5ac654356642a1a2bf19badea4ae6bceb4e53
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 9 09:49:19 2025 +0800
[core] Simplify rollback to only clean snapshots (#5712)
---
.../apache/paimon/operation/ChangelogDeletion.java | 1 +
.../org/apache/paimon/operation/TagDeletion.java | 5 -
.../paimon/table/AbstractFileStoreTable.java | 71 +++-------
.../org/apache/paimon/table/RollbackHelper.java | 154 +++++----------------
.../org/apache/paimon/utils/SnapshotManager.java | 16 ++-
.../paimon/table/IndexFileExpireTableTest.java | 12 --
.../paimon/table/PrimaryKeySimpleTableTest.java | 15 +-
.../apache/paimon/table/SimpleTableTestBase.java | 59 ++++----
8 files changed, 102 insertions(+), 231 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
index 069e57bb3d..348d2da6bd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
@@ -87,6 +87,7 @@ public class ChangelogDeletion extends
FileDeletionBase<Changelog> {
// the index and statics manifest list should handle by snapshot
deletion.
}
+ @Override
public Set<String> manifestSkippingSet(List<Snapshot> skippingSnapshots) {
Set<String> skippingSet = new HashSet<>();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index 8bf9345643..569a1464cc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -106,10 +105,6 @@ public class TagDeletion extends
FileDeletionBase<Snapshot> {
cleanUnusedManifests(taggedSnapshot, skippingSet, true, false);
}
- public Predicate<ExpireFileEntry> dataFileSkipper(Snapshot fromSnapshot)
throws Exception {
- return dataFileSkipper(Collections.singletonList(fromSnapshot));
- }
-
public Predicate<ExpireFileEntry> dataFileSkipper(List<Snapshot>
fromSnapshots)
throws Exception {
Map<BinaryRow, Map<Integer, Set<String>>> skipped = new HashMap<>();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 9b3b3f8c7e..f011b74af3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -68,8 +68,7 @@ import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cach
import javax.annotation.Nullable;
-import java.io.IOException;
-import java.io.UncheckedIOException;
+import java.io.FileNotFoundException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
@@ -80,7 +79,6 @@ import java.util.SortedMap;
import java.util.function.BiConsumer;
import static org.apache.paimon.CoreOptions.PATH;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Abstract {@link FileStoreTable}. */
abstract class AbstractFileStoreTable implements FileStoreTable {
@@ -504,11 +502,27 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
try {
snapshotManager.rollback(Instant.snapshot(snapshotId));
} catch (UnsupportedOperationException e) {
- checkArgument(
- snapshotManager.snapshotExists(snapshotId),
- "Rollback snapshot '%s' doesn't exist.",
- snapshotId);
-
rollbackHelper().updateLatestAndCleanLargerThan(snapshotManager.snapshot(snapshotId));
+ Snapshot snapshot;
+ try {
+ snapshot = snapshotManager.tryGetSnapshot(snapshotId);
+ } catch (FileNotFoundException ex) {
+ throw new IllegalArgumentException(
+ String.format("Rollback snapshot '%s' doesn't exist.",
snapshotId), ex);
+ }
+ rollbackHelper().cleanLargerThan(snapshot);
+ }
+ }
+
+ @Override
+ public void rollbackTo(String tagName) {
+ SnapshotManager snapshotManager = snapshotManager();
+ try {
+ snapshotManager.rollback(Instant.tag(tagName));
+ } catch (UnsupportedOperationException e) {
+ Snapshot taggedSnapshot =
tagManager().getOrThrow(tagName).trimToSnapshot();
+ RollbackHelper rollbackHelper = rollbackHelper();
+ rollbackHelper.cleanLargerThan(taggedSnapshot);
+ rollbackHelper.createSnapshotFileIfNeeded(taggedSnapshot);
}
}
@@ -644,38 +658,6 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
branchManager().fastForward(branchName);
}
- @Override
- public void rollbackTo(String tagName) {
- SnapshotManager snapshotManager = snapshotManager();
- try {
- snapshotManager.rollback(Instant.tag(tagName));
- return;
- } catch (UnsupportedOperationException ignore) {
-
- }
- TagManager tagManager = tagManager();
- checkArgument(tagManager.tagExists(tagName), "Rollback tag '%s'
doesn't exist.", tagName);
-
- Snapshot taggedSnapshot =
tagManager.getOrThrow(tagName).trimToSnapshot();
- rollbackHelper().updateLatestAndCleanLargerThan(taggedSnapshot);
-
- try {
- // it is possible that the earliest snapshot is later than the
rollback tag because of
- // snapshot expiration, in this case the `cleanLargerThan` method
will delete all
- // snapshots, so we should write the tag file to snapshot
directory and modify the
- // earliest hint
- if (!snapshotManager.snapshotExists(taggedSnapshot.id())) {
- fileIO.writeFile(
- snapshotManager().snapshotPath(taggedSnapshot.id()),
- fileIO.readFileUtf8(tagManager.tagPath(tagName)),
- false);
- snapshotManager.commitEarliestHint(taggedSnapshot.id());
- }
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
@Override
public TagManager tagManager() {
return new TagManager(fileIO, path, currentBranch());
@@ -713,14 +695,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
}
private RollbackHelper rollbackHelper() {
- return new RollbackHelper(
- snapshotManager(),
- changelogManager(),
- tagManager(),
- fileIO,
- store().newSnapshotDeletion(),
- store().newChangelogDeletion(),
- store().newTagDeletion());
+ return new RollbackHelper(snapshotManager(), changelogManager(),
tagManager(), fileIO);
}
protected RowKindGenerator rowKindGenerator() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
index ea1886bda0..21f30dd998 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
@@ -21,90 +21,67 @@ package org.apache.paimon.table;
import org.apache.paimon.Changelog;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.manifest.ExpireFileEntry;
-import org.apache.paimon.operation.ChangelogDeletion;
-import org.apache.paimon.operation.SnapshotDeletion;
-import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.SortedMap;
-import java.util.function.Predicate;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** Helper class for {@link Table#rollbackTo} including utils to clean
snapshots. */
public class RollbackHelper {
- private static final Logger LOG =
LoggerFactory.getLogger(RollbackHelper.class);
-
private final SnapshotManager snapshotManager;
private final ChangelogManager changelogManager;
private final TagManager tagManager;
private final FileIO fileIO;
- private final SnapshotDeletion snapshotDeletion;
- private final ChangelogDeletion changelogDeletion;
- private final TagDeletion tagDeletion;
public RollbackHelper(
SnapshotManager snapshotManager,
ChangelogManager changelogManager,
TagManager tagManager,
- FileIO fileIO,
- SnapshotDeletion snapshotDeletion,
- ChangelogDeletion changelogDeletion,
- TagDeletion tagDeletion) {
+ FileIO fileIO) {
this.snapshotManager = snapshotManager;
this.changelogManager = changelogManager;
this.tagManager = tagManager;
this.fileIO = fileIO;
- this.snapshotDeletion = snapshotDeletion;
- this.changelogDeletion = changelogDeletion;
- this.tagDeletion = tagDeletion;
}
/** Clean snapshots and tags whose id is larger than given snapshot's and
update latest hit. */
- public void updateLatestAndCleanLargerThan(Snapshot retainedSnapshot) {
- // clean data files
- List<Snapshot> cleanedSnapshots =
updateLatestAndCleanSnapshotsDataFiles(retainedSnapshot);
- List<Changelog> cleanedChangelogs =
cleanLongLivedChangelogDataFiles(retainedSnapshot);
- List<Snapshot> cleanedTags = cleanTagsDataFiles(retainedSnapshot);
- Set<Long> cleanedIds = new HashSet<>();
-
- // clean manifests
- // this can be used for snapshots and tags manifests cleaning both
- Set<String> manifestsSkippingSet =
snapshotDeletion.manifestSkippingSet(retainedSnapshot);
-
- for (Snapshot snapshot : cleanedSnapshots) {
- snapshotDeletion.cleanUnusedManifests(snapshot,
manifestsSkippingSet);
- cleanedIds.add(snapshot.id());
- }
-
- for (Changelog changelog : cleanedChangelogs) {
- changelogDeletion.cleanUnusedManifests(changelog,
manifestsSkippingSet);
- cleanedIds.add(changelog.id());
+ public void cleanLargerThan(Snapshot retainedSnapshot) {
+ try {
+ cleanSnapshots(retainedSnapshot);
+ cleanLongLivedChangelogs(retainedSnapshot);
+ cleanTags(retainedSnapshot);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
+ }
- for (Snapshot snapshot : cleanedTags) {
- if (cleanedIds.contains(snapshot.id())) {
- continue;
+ public void createSnapshotFileIfNeeded(Snapshot taggedSnapshot) {
+ // it is possible that the earliest snapshot is later than the
rollback tag because of
+ // snapshot expiration, in this case the `cleanLargerThan` method will
delete all
+ // snapshots, so we should write the tag file to snapshot directory
and modify the
+ // earliest hint
+ if (!snapshotManager.snapshotExists(taggedSnapshot.id())) {
+ try {
+ fileIO.writeFile(
+ snapshotManager.snapshotPath(taggedSnapshot.id()),
+ taggedSnapshot.toJson(),
+ false);
+ snapshotManager.commitEarliestHint(taggedSnapshot.id());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
- tagDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet);
}
}
- private List<Snapshot> updateLatestAndCleanSnapshotsDataFiles(Snapshot
retainedSnapshot) {
+ private void cleanSnapshots(Snapshot retainedSnapshot) throws IOException {
long earliest =
checkNotNull(
snapshotManager.earliestSnapshotId(), "Cannot find
earliest snapshot.");
@@ -112,44 +89,23 @@ public class RollbackHelper {
checkNotNull(snapshotManager.latestSnapshotId(), "Cannot find
latest snapshot.");
// modify the latest hint
- try {
- snapshotManager.commitLatestHint(retainedSnapshot.id());
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- // delete snapshot files first, cannot be read now
+ snapshotManager.commitLatestHint(retainedSnapshot.id());
+
// it is possible that some snapshots have been expired
- List<Snapshot> toBeCleaned = new ArrayList<>();
long to = Math.max(earliest, retainedSnapshot.id() + 1);
for (long i = latest; i >= to; i--) {
// Ignore the non-existent snapshots
if (snapshotManager.snapshotExists(i)) {
- toBeCleaned.add(snapshotManager.snapshot(i));
snapshotManager.deleteSnapshot(i);
}
}
-
- // delete data files of snapshots
- // don't worry about tag data files because file deletion methods
won't throw exception
- // when deleting non-existing data files
- for (Snapshot snapshot : toBeCleaned) {
-
snapshotDeletion.deleteAddedDataFiles(snapshot.deltaManifestList());
- if (snapshot.changelogManifestList() != null) {
-
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
- }
- }
-
- // delete directories
- snapshotDeletion.cleanEmptyDirectories();
-
- return toBeCleaned;
}
- private List<Changelog> cleanLongLivedChangelogDataFiles(Snapshot
retainedSnapshot) {
+ private void cleanLongLivedChangelogs(Snapshot retainedSnapshot) throws
IOException {
Long earliest = changelogManager.earliestLongLivedChangelogId();
Long latest = changelogManager.latestLongLivedChangelogId();
if (earliest == null || latest == null) {
- return Collections.emptyList();
+ return;
}
// it is possible that some snapshots have been expired
@@ -160,42 +116,29 @@ public class RollbackHelper {
}
// modify the latest hint
- try {
- if (toBeCleaned.size() > 0) {
- if (to == earliest) {
- // all changelog has been cleaned, so we do not know the
actual latest id
- // set to -1
- changelogManager.commitLongLivedChangelogLatestHint(-1);
- } else {
- changelogManager.commitLongLivedChangelogLatestHint(to -
1);
- }
+ if (!toBeCleaned.isEmpty()) {
+ if (to == earliest) {
+ // all changelog has been cleaned, so we do not know the
actual latest id
+ // set to -1
+ changelogManager.commitLongLivedChangelogLatestHint(-1);
+ } else {
+ changelogManager.commitLongLivedChangelogLatestHint(to - 1);
}
- } catch (IOException e) {
- throw new UncheckedIOException(e);
}
// delete data files of changelog
for (Changelog changelog : toBeCleaned) {
- // delete changelog files first, cannot be read now
fileIO.deleteQuietly(changelogManager.longLivedChangelogPath(changelog.id()));
- // clean the deleted file
- changelogDeletion.cleanUnusedDataFiles(changelog, manifestEntry ->
false);
}
-
- // delete directories
- snapshotDeletion.cleanEmptyDirectories();
-
- return toBeCleaned;
}
- private List<Snapshot> cleanTagsDataFiles(Snapshot retainedSnapshot) {
+ private void cleanTags(Snapshot retainedSnapshot) {
SortedMap<Snapshot, List<String>> tags = tagManager.tags();
if (tags.isEmpty()) {
- return Collections.emptyList();
+ return;
}
List<Snapshot> taggedSnapshots = new ArrayList<>(tags.keySet());
- List<Snapshot> toBeCleaned = new ArrayList<>();
// delete tag files
for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
@@ -203,30 +146,7 @@ public class RollbackHelper {
if (tag.id() <= retainedSnapshot.id()) {
break;
}
- toBeCleaned.add(tag);
tags.get(tag).forEach(tagName ->
fileIO.deleteQuietly(tagManager.tagPath(tagName)));
}
-
- // delete data files
- Predicate<ExpireFileEntry> dataFileSkipper = null;
- boolean success = true;
- try {
- dataFileSkipper = tagDeletion.dataFileSkipper(retainedSnapshot);
- } catch (Exception e) {
- LOG.info(
- "Skip cleaning data files for deleted tags due to failed
to build skipping set.",
- e);
- success = false;
- }
-
- if (success) {
- for (Snapshot s : toBeCleaned) {
- tagDeletion.cleanUnusedDataFiles(s, dataFileSkipper);
- }
- // delete directories
- tagDeletion.cleanEmptyDirectories();
- }
-
- return toBeCleaned;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index bc28226119..63d32e287b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -49,6 +49,7 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
+import java.util.stream.Stream;
import static org.apache.paimon.utils.BranchManager.branchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
@@ -473,23 +474,27 @@ public class SnapshotManager implements Serializable {
}
public long snapshotCount() throws IOException {
- return listVersionedFiles(fileIO, snapshotDirectory(),
SNAPSHOT_PREFIX).count();
+ return snapshotIdStream().count();
}
public Iterator<Snapshot> snapshots() throws IOException {
- return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX)
+ return snapshotIdStream()
.map(this::snapshot)
.sorted(Comparator.comparingLong(Snapshot::id))
.iterator();
}
public List<Path> snapshotPaths(Predicate<Long> predicate) throws
IOException {
- return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX)
+ return snapshotIdStream()
.filter(predicate)
.map(this::snapshotPath)
.collect(Collectors.toList());
}
+ public Stream<Long> snapshotIdStream() throws IOException {
+ return listVersionedFiles(fileIO, snapshotDirectory(),
SNAPSHOT_PREFIX);
+ }
+
public Iterator<Snapshot> snapshotsWithId(List<Long> snapshotIds) {
return snapshotIds.stream()
.map(this::snapshot)
@@ -543,10 +548,7 @@ public class SnapshotManager implements Serializable {
* be deleted by other processes, so just skip this snapshot.
*/
public List<Snapshot> safelyGetAllSnapshots() throws IOException {
- List<Path> paths =
- listVersionedFiles(fileIO, snapshotDirectory(),
SNAPSHOT_PREFIX)
- .map(this::snapshotPath)
- .collect(Collectors.toList());
+ List<Path> paths =
snapshotIdStream().map(this::snapshotPath).collect(Collectors.toList());
List<Snapshot> snapshots = Collections.synchronizedList(new
ArrayList<>(paths.size()));
collectSnapshots(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
index 3040e11175..d9e58e245c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
@@ -149,23 +149,15 @@ public class IndexFileExpireTableTest extends
PrimaryKeyTableTestBase {
table.rollbackTo(5);
checkIndexFiles(5);
- assertThat(indexFileSize()).isEqualTo(indexFileSize - 2);
- assertThat(indexManifestSize()).isEqualTo(indexManifestSize - 2);
table.rollbackTo(3);
checkIndexFiles(3);
- assertThat(indexFileSize()).isEqualTo(indexFileSize - 3);
- assertThat(indexManifestSize()).isEqualTo(indexManifestSize - 3);
table.rollbackTo(2);
checkIndexFiles(2);
- assertThat(indexFileSize()).isEqualTo(indexFileSize - 3);
- assertThat(indexManifestSize()).isEqualTo(indexManifestSize - 3);
table.rollbackTo(1);
checkIndexFiles(1);
- assertThat(indexFileSize()).isEqualTo(3);
- assertThat(indexManifestSize()).isEqualTo(1);
}
@Test
@@ -181,13 +173,9 @@ public class IndexFileExpireTableTest extends
PrimaryKeyTableTestBase {
table.rollbackTo(5);
checkIndexFiles(5);
- assertThat(indexFileSize()).isEqualTo(indexFileSize - 2);
- assertThat(indexManifestSize()).isEqualTo(indexManifestSize - 2);
table.rollbackTo("tag1");
checkIndexFiles(1);
- assertThat(indexFileSize()).isEqualTo(3);
- assertThat(indexManifestSize()).isEqualTo(1);
}
protected void prepareExpireTable() throws Exception {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 76dac80214..0bf4971f94 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -71,6 +71,7 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.Pair;
import org.apache.parquet.hadoop.ParquetOutputFormat;
@@ -99,6 +100,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.CHANGELOG_FILE_FORMAT;
@@ -2245,15 +2247,10 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
table.rollbackTo("test1");
- List<java.nio.file.Path> files =
- Files.walk(new File(tablePath.toUri().getPath()).toPath())
- .collect(Collectors.toList());
- assertThat(files.size()).isEqualTo(19);
- // rollback snapshot case testRollbackToSnapshotCase0 plus 4:
- // table-path/tag/tag-test1
- // table-path/changelog
- // table-path/changelog/LATEST
- // table-path/changelog/EARLIEST
+ assertRollbackTo(table, singletonList(1L), 1, 1,
singletonList("test1"));
+ ChangelogManager changelogManager = table.changelogManager();
+ assertThat(changelogManager.earliestLongLivedChangelogId()).isNull();
+ assertThat(changelogManager.latestLongLivedChangelogId()).isNull();
}
@ParameterizedTest
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index 27ac1efecd..f837513d50 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -79,6 +79,7 @@ import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -95,6 +96,8 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MAX;
@@ -291,12 +294,10 @@ public abstract class SimpleTableTestBase {
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
.hasSameElementsAs(
- Collections.singletonList(
-
"1|10|100|binary|varbinary|mapKey:mapVal|multiset"));
+
singletonList("1|10|100|binary|varbinary|mapKey:mapVal|multiset"));
assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING))
.hasSameElementsAs(
- Collections.singletonList(
-
"2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
+
singletonList("2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -789,8 +790,7 @@ public abstract class SimpleTableTestBase {
assertThat(result)
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
- FileStatus[] files = table.fileIO().listFiles(table.location(), true);
- assertThat(files).hasSize(8);
+ assertRollbackTo(table, singletonList(1L), 1, 1, emptyList());
}
// All tags are after the rollback snapshot
@@ -813,16 +813,7 @@ public abstract class SimpleTableTestBase {
assertThat(result)
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
- FileStatus[] files = table.fileIO().listFiles(table.location(), true);
- assertThat(files).hasSize(8);
- // table-path/snapshot/LATEST
- // table-path/snapshot/EARLIEST
- // table-path/snapshot/snapshot-1
- // table-path/pt=0/bucket-0/data-0.orc
- // table-path/manifest/manifest-list-1
- // table-path/manifest/manifest-0
- // table-path/manifest/manifest-list-0
- // table-path/schema/schema-0
+ assertRollbackTo(table, singletonList(1L), 1, 1, emptyList());
}
// One tag is at the rollback snapshot and others are after it
@@ -856,10 +847,7 @@ public abstract class SimpleTableTestBase {
assertThat(result)
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
- FileStatus[] files = table.fileIO().listFiles(table.location(), true);
- assertThat(files).hasSize(9);
- // case 0 plus 1:
- // table-path/tag/tag-test3
+ assertRollbackTo(table, singletonList(1L), 1, 1,
singletonList("test3"));
}
// One tag is before the rollback snapshot and others are after it
@@ -895,15 +883,7 @@ public abstract class SimpleTableTestBase {
assertThat(result)
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
- FileStatus[] files = table.fileIO().listFiles(table.location(), true);
- assertThat(files).hasSize(14);
- // case 0 plus 6:
- // table-path/manifest/manifest-list-2
- // table-path/manifest/manifest-list-3
- // table-path/manifest/manifest-1
- // table-path/snapshot/snapshot-2
- // table-path/tag/tag-test3
- // table-path/pt=1/bucket-0/data-0.orc
+ assertRollbackTo(table, Arrays.asList(1L, 2L), 1, 2,
singletonList("test3"));
}
@ParameterizedTest(name = "expire snapshots = {0}")
@@ -948,10 +928,7 @@ public abstract class SimpleTableTestBase {
assertThat(result)
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
- FileStatus[] files = table.fileIO().listFiles(table.location(), true);
- assertThat(files).hasSize(9);
- // rollback snapshot case 0 plus 1:
- // table-path/tag/tag-test1
+ assertRollbackTo(table, singletonList(1L), 1, 1,
singletonList("test1"));
}
private FileStoreTable prepareRollbackTable(int commitTimes) throws
Exception {
@@ -1762,4 +1739,20 @@ public abstract class SimpleTableTestBase {
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
}
+
+ protected void assertRollbackTo(
+ FileStoreTable table,
+ List<Long> expectedSnapshots,
+ long expectedEarliest,
+ long expectedLatest,
+ List<String> expectedTags)
+ throws IOException {
+ SnapshotManager snapshotManager = table.snapshotManager();
+ List<Long> snapshots =
snapshotManager.snapshotIdStream().collect(Collectors.toList());
+
assertThat(snapshots).containsExactlyInAnyOrderElementsOf(expectedSnapshots);
+
assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(expectedEarliest);
+
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(expectedLatest);
+ assertThat(table.tagManager().allTagNames())
+ .containsExactlyInAnyOrderElementsOf(expectedTags);
+ }
}