This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 99dc1992f [core] Support rollback to a tag (#1443)
99dc1992f is described below
commit 99dc1992fe82d9e904dbb8a179374bc9f3c9bb40
Author: yuzelin <[email protected]>
AuthorDate: Thu Jul 6 12:22:04 2023 +0800
[core] Support rollback to a tag (#1443)
---
.../java/org/apache/paimon/AbstractFileStore.java | 13 +-
.../org/apache/paimon/operation/DeletionUtils.java | 157 -----------
.../apache/paimon/operation/FileDeletionBase.java | 308 +++++++++++++++++++++
.../paimon/operation/FileStoreExpireImpl.java | 41 ++-
.../apache/paimon/operation/SnapshotDeletion.java | 219 ++++-----------
.../org/apache/paimon/operation/TagDeletion.java | 150 ++--------
.../org/apache/paimon/operation/TagFileKeeper.java | 125 ---------
.../paimon/table/AbstractFileStoreTable.java | 60 +++-
.../org/apache/paimon/table/ReadonlyTable.java | 8 +
.../org/apache/paimon/table/RollbackHelper.java | 149 ++++++++++
.../main/java/org/apache/paimon/table/Table.java | 4 +
.../org/apache/paimon/utils/SnapshotManager.java | 50 +---
.../java/org/apache/paimon/utils/TagManager.java | 93 ++++++-
.../test/java/org/apache/paimon/TestFileStore.java | 7 +-
.../operation/CleanedFileStoreExpireTest.java | 4 +-
.../apache/paimon/operation/FileDeletionTest.java | 4 +-
.../paimon/table/FileStoreTableTestBase.java | 164 ++++++++++-
.../paimon/table/IndexFileExpireTableTest.java | 24 +-
.../paimon/flink/action/RollbackToAction.java | 33 ++-
.../flink/action/RollbackToActionITCase.java | 35 ++-
20 files changed, 933 insertions(+), 715 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 558daa899..8fd6ad216 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -31,7 +31,6 @@ import org.apache.paimon.operation.FileStoreExpireImpl;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
-import org.apache.paimon.operation.TagFileKeeper;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.RowType;
@@ -171,11 +170,7 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
options.snapshotTimeRetain().toMillis(),
snapshotManager(),
newSnapshotDeletion(),
- new TagFileKeeper(
- manifestListFactory().create(),
- manifestFileFactory().create(),
- new TagManager(fileIO, options.path()),
- options.scanManifestParallelism()));
+ new TagManager(fileIO, options.path()));
}
@Override
@@ -192,12 +187,10 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
public TagDeletion newTagDeletion() {
return new TagDeletion(
fileIO,
- options.path(),
pathFactory(),
- manifestListFactory().create(),
manifestFileFactory().create(),
- newIndexFileHandler(),
- options.scanManifestParallelism());
+ manifestListFactory().create(),
+ newIndexFileHandler());
}
public abstract Comparator<InternalRow> newKeyComparator();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DeletionUtils.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DeletionUtils.java
deleted file mode 100644
index 9a9b28465..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/operation/DeletionUtils.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.operation;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.ManifestFile;
-import org.apache.paimon.manifest.ManifestFileMeta;
-import org.apache.paimon.manifest.ManifestList;
-import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.ParallellyExecuteUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/** Util methods for snapshot and tag deletion. */
-public class DeletionUtils {
-
- private static final Logger LOG =
LoggerFactory.getLogger(DeletionUtils.class);
-
- public static Iterable<ManifestEntry> readEntries(
- List<ManifestFileMeta> manifests,
- ManifestFile manifestFile,
- @Nullable Integer scanManifestParallelism) {
- return ParallellyExecuteUtils.parallelismBatchIterable(
- files ->
- files.parallelStream()
- .flatMap(m ->
manifestFile.read(m.fileName()).stream())
- .collect(Collectors.toList()),
- manifests,
- scanManifestParallelism);
- }
-
- public static void addMergedDataFiles(
- Map<BinaryRow, Map<Integer, Set<String>>> dataFiles,
- Snapshot snapshot,
- ManifestList manifestList,
- ManifestFile manifestFile,
- @Nullable Integer scanManifestParallelism) {
- Iterable<ManifestEntry> entries =
- readEntries(
- snapshot.dataManifests(manifestList),
- manifestFile,
- scanManifestParallelism);
- for (ManifestEntry entry : ManifestEntry.mergeEntries(entries)) {
- dataFiles
- .computeIfAbsent(entry.partition(), p -> new HashMap<>())
- .computeIfAbsent(entry.bucket(), b -> new HashSet<>())
- .add(entry.file().fileName());
- }
- }
-
- public static boolean containsDataFile(
- Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, ManifestEntry
testee) {
- Map<Integer, Set<String>> buckets = dataFiles.get(testee.partition());
- if (buckets != null) {
- Set<String> fileNames = buckets.get(testee.bucket());
- if (fileNames != null) {
- return fileNames.contains(testee.file().fileName());
- }
- }
- return false;
- }
-
- /** Try to delete directories that may be empty. */
- public static void tryDeleteDirectories(
- Map<BinaryRow, Set<Integer>> deletionBuckets,
- FileStorePathFactory pathFactory,
- FileIO fileIO) {
- // All directory paths are deduplicated and sorted by hierarchy level
- Map<Integer, Set<Path>> deduplicate = new HashMap<>();
- for (Map.Entry<BinaryRow, Set<Integer>> entry :
deletionBuckets.entrySet()) {
- // try to delete bucket directories
- for (Integer bucket : entry.getValue()) {
- tryDeleteEmptyDirectory(pathFactory.bucketPath(entry.getKey(),
bucket), fileIO);
- }
-
- List<Path> hierarchicalPaths =
pathFactory.getHierarchicalPartitionPath(entry.getKey());
- int hierarchies = hierarchicalPaths.size();
- if (hierarchies == 0) {
- continue;
- }
-
- if (tryDeleteEmptyDirectory(hierarchicalPaths.get(hierarchies -
1), fileIO)) {
- // deduplicate high level partition directories
- for (int hierarchy = 0; hierarchy < hierarchies - 1;
hierarchy++) {
- Path path = hierarchicalPaths.get(hierarchy);
- deduplicate.computeIfAbsent(hierarchy, i -> new
HashSet<>()).add(path);
- }
- }
- }
-
- // from deepest to shallowest
- for (int hierarchy = deduplicate.size() - 1; hierarchy >= 0;
hierarchy--) {
- deduplicate.get(hierarchy).forEach(path ->
tryDeleteEmptyDirectory(path, fileIO));
- }
- }
-
- private static boolean tryDeleteEmptyDirectory(Path path, FileIO fileIO) {
- try {
- fileIO.delete(path, false);
- return true;
- } catch (IOException e) {
- LOG.debug("Failed to delete directory '{}'. Check whether it is
empty.", path);
- return false;
- }
- }
-
- public static Set<String> collectManifestSkippingSet(
- Snapshot snapshot, ManifestList manifestList, IndexFileHandler
indexFileHandler) {
- Set<String> skip =
- snapshot.dataManifests(manifestList).stream()
- .map(ManifestFileMeta::fileName)
- .collect(Collectors.toCollection(HashSet::new));
- String indexManifest = snapshot.indexManifest();
- if (indexManifest != null) {
- skip.add(indexManifest);
- indexFileHandler.readManifest(indexManifest).stream()
- .map(IndexManifestEntry::indexFile)
- .map(IndexFileMeta::fileName)
- .forEach(skip::add);
- }
- return skip;
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
new file mode 100644
index 000000000..a9ad4cac2
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.utils.FileStorePathFactory;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * Base class for file deletion including methods for clean data files,
manifest files and empty
+ * data directories.
+ */
+public abstract class FileDeletionBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FileDeletionBase.class);
+
+ protected final FileIO fileIO;
+ protected final FileStorePathFactory pathFactory;
+ protected final ManifestFile manifestFile;
+ protected final ManifestList manifestList;
+ protected final IndexFileHandler indexFileHandler;
+ protected final Map<BinaryRow, Set<Integer>> deletionBuckets;
+
+ public FileDeletionBase(
+ FileIO fileIO,
+ FileStorePathFactory pathFactory,
+ ManifestFile manifestFile,
+ ManifestList manifestList,
+ IndexFileHandler indexFileHandler) {
+ this.fileIO = fileIO;
+ this.pathFactory = pathFactory;
+ this.manifestFile = manifestFile;
+ this.manifestList = manifestList;
+ this.indexFileHandler = indexFileHandler;
+
+ this.deletionBuckets = new HashMap<>();
+ }
+
+ /**
+ * Clean data files that will not be used anymore in the snapshot.
+ *
+ * @param snapshot {@link Snapshot} that will be cleaned
+ * @param skipper if the test result of a data file is true, it will be
skipped when deleting;
+ * else it will be deleted
+ */
+ public abstract void cleanUnusedDataFiles(Snapshot snapshot,
Predicate<ManifestEntry> skipper);
+
+ /**
+ * Clean metadata files that will not be used anymore of a snapshot,
including data manifests,
+ * index manifests and manifest lists.
+ *
+ * @param snapshot {@link Snapshot} that will be cleaned
+ * @param skippingSet manifests that should not be deleted
+ */
+ public abstract void cleanUnusedManifests(Snapshot snapshot, Set<String>
skippingSet);
+
+ /** Try to delete data directories that may be empty after data file
deletion. */
+ public void cleanDataDirectories() {
+ if (deletionBuckets.isEmpty()) {
+ return;
+ }
+
+ // All directory paths are deduplicated and sorted by hierarchy level
+ Map<Integer, Set<Path>> deduplicate = new HashMap<>();
+ for (Map.Entry<BinaryRow, Set<Integer>> entry :
deletionBuckets.entrySet()) {
+ // try to delete bucket directories
+ for (Integer bucket : entry.getValue()) {
+ tryDeleteEmptyDirectory(pathFactory.bucketPath(entry.getKey(),
bucket));
+ }
+
+ List<Path> hierarchicalPaths =
pathFactory.getHierarchicalPartitionPath(entry.getKey());
+ int hierarchies = hierarchicalPaths.size();
+ if (hierarchies == 0) {
+ continue;
+ }
+
+ if (tryDeleteEmptyDirectory(hierarchicalPaths.get(hierarchies -
1))) {
+ // deduplicate high level partition directories
+ for (int hierarchy = 0; hierarchy < hierarchies - 1;
hierarchy++) {
+ Path path = hierarchicalPaths.get(hierarchy);
+ deduplicate.computeIfAbsent(hierarchy, i -> new
HashSet<>()).add(path);
+ }
+ }
+ }
+
+ // from deepest to shallowest
+ for (int hierarchy = deduplicate.size() - 1; hierarchy >= 0;
hierarchy--) {
+ deduplicate.get(hierarchy).forEach(this::tryDeleteEmptyDirectory);
+ }
+
+ deletionBuckets.clear();
+ }
+
+ protected void recordDeletionBuckets(ManifestEntry entry) {
+ deletionBuckets
+ .computeIfAbsent(entry.partition(), p -> new HashSet<>())
+ .add(entry.bucket());
+ }
+
+ protected void cleanUnusedManifests(
+ Snapshot snapshot, Set<String> skippingSet, boolean
deleteChangelog) {
+ // clean base and delta manifests
+ List<ManifestFileMeta> toExpireManifests = new ArrayList<>();
+
toExpireManifests.addAll(tryReadManifestList(snapshot.baseManifestList()));
+
toExpireManifests.addAll(tryReadManifestList(snapshot.deltaManifestList()));
+ for (ManifestFileMeta manifest : toExpireManifests) {
+ String fileName = manifest.fileName();
+ if (!skippingSet.contains(fileName)) {
+ manifestFile.delete(fileName);
+ // to avoid other snapshots trying to delete again
+ skippingSet.add(fileName);
+ }
+ }
+
+ if (!skippingSet.contains(snapshot.baseManifestList())) {
+ manifestList.delete(snapshot.baseManifestList());
+ }
+ if (!skippingSet.contains(snapshot.deltaManifestList())) {
+ manifestList.delete(snapshot.deltaManifestList());
+ }
+
+ // clean changelog manifests
+ if (deleteChangelog && snapshot.changelogManifestList() != null) {
+ for (ManifestFileMeta manifest :
+ tryReadManifestList(snapshot.changelogManifestList())) {
+ manifestFile.delete(manifest.fileName());
+ }
+ manifestList.delete(snapshot.changelogManifestList());
+ }
+
+ // clean index manifests
+ String indexManifest = snapshot.indexManifest();
+ // check exists, it may have been deleted by other snapshots
+ if (indexManifest != null &&
indexFileHandler.existsManifest(indexManifest)) {
+ for (IndexManifestEntry entry :
indexFileHandler.readManifest(indexManifest)) {
+ if (!skippingSet.contains(entry.indexFile().fileName())) {
+ indexFileHandler.deleteIndexFile(entry);
+ }
+ }
+
+ if (!skippingSet.contains(indexManifest)) {
+ indexFileHandler.deleteManifest(indexManifest);
+ }
+ }
+ }
+
+ /**
+ * It is possible that a job was killed during expiration and some
manifest files have been
+ * deleted, so if the clean methods need to get manifests of a snapshot to
be cleaned, we should
+ * try to read manifests and return empty list if failed instead of
calling {@link
+ * Snapshot#dataManifests} directly.
+ */
+ protected List<ManifestFileMeta> tryReadManifestList(String
manifestListName) {
+ try {
+ return manifestList.read(manifestListName);
+ } catch (Exception e) {
+ LOG.warn("Failed to read manifest list file " + manifestListName,
e);
+ return Collections.emptyList();
+ }
+ }
+
+ public Iterable<ManifestEntry> tryReadManifestEntries(String
manifestListName) {
+ return readManifestEntries(tryReadManifestList(manifestListName));
+ }
+
+ /** Try to read base and delta manifest lists at one time. */
+ protected Iterable<ManifestEntry> tryReadDataManifestEntries(Snapshot
snapshot) {
+ List<ManifestFileMeta> manifestFileMetas =
tryReadManifestList(snapshot.baseManifestList());
+
manifestFileMetas.addAll(tryReadManifestList(snapshot.deltaManifestList()));
+
+ return readManifestEntries(manifestFileMetas);
+ }
+
+ protected Iterable<ManifestEntry> readManifestEntries(
+ List<ManifestFileMeta> manifestFileMetas) {
+ Queue<String> files =
+ manifestFileMetas.stream()
+ .map(ManifestFileMeta::fileName)
+ .collect(Collectors.toCollection(LinkedList::new));
+ return Iterables.concat(
+ (Iterable<Iterable<ManifestEntry>>)
+ () ->
+ new Iterator<Iterable<ManifestEntry>>() {
+ @Override
+ public boolean hasNext() {
+ return files.size() > 0;
+ }
+
+ @Override
+ public Iterable<ManifestEntry> next() {
+ String file = files.poll();
+ try {
+ return manifestFile.read(file);
+ } catch (Exception e) {
+ LOG.warn("Failed to read manifest
file " + file, e);
+ return Collections.emptyList();
+ }
+ }
+ });
+ }
+
+ protected void addMergedDataFiles(
+ Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, Snapshot
snapshot) {
+ Iterable<ManifestEntry> entries = tryReadDataManifestEntries(snapshot);
+ for (ManifestEntry entry : ManifestEntry.mergeEntries(entries)) {
+ dataFiles
+ .computeIfAbsent(entry.partition(), p -> new HashMap<>())
+ .computeIfAbsent(entry.bucket(), b -> new HashSet<>())
+ .add(entry.file().fileName());
+ }
+ }
+
+ protected boolean containsDataFile(
+ Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, ManifestEntry
testee) {
+ Map<Integer, Set<String>> buckets = dataFiles.get(testee.partition());
+ if (buckets != null) {
+ Set<String> fileNames = buckets.get(testee.bucket());
+ if (fileNames != null) {
+ return fileNames.contains(testee.file().fileName());
+ }
+ }
+ return false;
+ }
+
+ /** Changelogs were not checked. Let the subclass determine whether to
delete them. */
+ public Set<String> manifestSkippingSet(Snapshot skippingSnapshot) {
+ return
manifestSkippingSet(Collections.singletonList(skippingSnapshot));
+ }
+
+ public Set<String> manifestSkippingSet(List<Snapshot> skippingSnapshots) {
+ Set<String> skippingSet = new HashSet<>();
+
+ for (Snapshot skippingSnapshot : skippingSnapshots) {
+ // data manifests
+ skippingSet.add(skippingSnapshot.baseManifestList());
+ skippingSet.add(skippingSnapshot.deltaManifestList());
+ skippingSnapshot.dataManifests(manifestList).stream()
+ .map(ManifestFileMeta::fileName)
+ .forEach(skippingSet::add);
+
+ // index manifests
+ String indexManifest = skippingSnapshot.indexManifest();
+ if (indexManifest != null) {
+ skippingSet.add(indexManifest);
+ indexFileHandler.readManifest(indexManifest).stream()
+ .map(IndexManifestEntry::indexFile)
+ .map(IndexFileMeta::fileName)
+ .forEach(skippingSet::add);
+ }
+ }
+
+ return skippingSet;
+ }
+
+ private boolean tryDeleteEmptyDirectory(Path path) {
+ try {
+ fileIO.delete(path, false);
+ return true;
+ } catch (IOException e) {
+ LOG.debug("Failed to delete directory '{}'. Check whether it is
empty.", path);
+ return false;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
index 46ef06fa5..468fbeb02 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
@@ -21,14 +21,13 @@ package org.apache.paimon.operation;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -55,7 +54,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
private final ConsumerManager consumerManager;
private final SnapshotDeletion snapshotDeletion;
- private final TagFileKeeper tagFileKeeper;
+ private final TagManager tagManager;
private Lock lock;
@@ -65,7 +64,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
long millisRetained,
SnapshotManager snapshotManager,
SnapshotDeletion snapshotDeletion,
- TagFileKeeper tagFileKeeper) {
+ TagManager tagManager) {
this.numRetainedMin = numRetainedMin;
this.numRetainedMax = numRetainedMax;
this.millisRetained = millisRetained;
@@ -73,7 +72,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
this.consumerManager =
new ConsumerManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
this.snapshotDeletion = snapshotDeletion;
- this.tagFileKeeper = tagFileKeeper;
+ this.tagManager = tagManager;
}
@Override
@@ -149,22 +148,19 @@ public class FileStoreExpireImpl implements
FileStoreExpire {
"Snapshot expire range is [" + beginInclusiveId + ", " +
endExclusiveId + ")");
}
- tagFileKeeper.reloadTags();
+ List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
// delete merge tree files
// deleted merge tree files in a snapshot are not used by the next
snapshot, so the range of
// id should be (beginInclusiveId, endExclusiveId]
- Map<BinaryRow, Set<Integer>> deletionBuckets = new HashMap<>();
for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete merge tree files not used by
snapshot #" + id);
}
Snapshot snapshot = snapshotManager.snapshot(id);
// expire merge tree files and collect changed buckets
- snapshotDeletion.deleteExpiredDataFiles(
- snapshot.deltaManifestList(),
- deletionBuckets,
- tagFileKeeper.tagDataFileSkipper(id));
+ snapshotDeletion.cleanUnusedDataFiles(
+ snapshot,
snapshotDeletion.dataFileSkipper(taggedSnapshots, id));
}
// delete changelog files
@@ -174,32 +170,27 @@ public class FileStoreExpireImpl implements
FileStoreExpire {
}
Snapshot snapshot = snapshotManager.snapshot(id);
if (snapshot.changelogManifestList() != null) {
- snapshotDeletion.deleteAddedDataFiles(
- snapshot.changelogManifestList(), deletionBuckets);
+
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
}
}
// data files and changelog files in bucket directories has been
deleted
// then delete changed bucket directories if they are empty
- snapshotDeletion.tryDeleteDirectories(deletionBuckets);
+ snapshotDeletion.cleanDataDirectories();
// delete manifests and indexFiles
- Set<String> skipManifestFiles =
- snapshotDeletion.collectManifestSkippingSet(
- snapshotManager.snapshot(endExclusiveId));
- for (Snapshot snapshot :
- tagFileKeeper.findOverlappedSnapshots(beginInclusiveId,
endExclusiveId)) {
- skipManifestFiles.add(snapshot.baseManifestList());
- skipManifestFiles.add(snapshot.deltaManifestList());
-
skipManifestFiles.addAll(snapshotDeletion.collectManifestSkippingSet(snapshot));
- }
+ List<Snapshot> skippingSnapshots =
+ TagManager.findOverlappedSnapshots(
+ taggedSnapshots, beginInclusiveId, endExclusiveId);
+ skippingSnapshots.add(snapshotManager.snapshot(endExclusiveId));
+ Set<String> skippingSet =
snapshotDeletion.manifestSkippingSet(skippingSnapshots);
for (long id = beginInclusiveId; id < endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete manifests in snapshot #" + id);
}
Snapshot snapshot = snapshotManager.snapshot(id);
- snapshotDeletion.deleteManifestFiles(skipManifestFiles, snapshot);
+ snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet);
// delete snapshot last
snapshotManager.fileIO().deleteQuietly(snapshotManager.snapshotPath(id));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
index ffc975977..123fdabc5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
@@ -25,42 +25,28 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
-import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.paimon.utils.TagManager;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
import java.util.function.Predicate;
-import java.util.stream.Collectors;
/** Delete snapshot files. */
-public class SnapshotDeletion {
+public class SnapshotDeletion extends FileDeletionBase {
- private static final Logger LOG =
LoggerFactory.getLogger(SnapshotDeletion.class);
+ /** Used to record which tag is cached in tagged snapshots list. */
+ private int cachedTagIndex = -1;
- private final FileIO fileIO;
- private final FileStorePathFactory pathFactory;
- private final ManifestFile manifestFile;
- private final ManifestList manifestList;
- private final IndexFileHandler indexFileHandler;
+ /** Used to cache data files used by current tag. */
+ private final Map<BinaryRow, Map<Integer, Set<String>>> cachedTagDataFiles
= new HashMap<>();
public SnapshotDeletion(
FileIO fileIO,
@@ -68,134 +54,22 @@ public class SnapshotDeletion {
ManifestFile manifestFile,
ManifestList manifestList,
IndexFileHandler indexFileHandler) {
- this.fileIO = fileIO;
- this.pathFactory = pathFactory;
- this.manifestFile = manifestFile;
- this.manifestList = manifestList;
- this.indexFileHandler = indexFileHandler;
- }
-
- // ================================= PUBLIC
=================================================
-
- /**
- * Delete expired file in the manifest list files. Delete files marked as
"DELETE" in manifests.
- *
- * @param manifestListName name of manifest list
- * @param deletionBuckets partition-buckets of which some data files have
been deleted
- * @param dataFileSkipper if the test result of a data file is true, the
data file will be
- * skipped when deleting
- */
- public void deleteExpiredDataFiles(
- String manifestListName,
- Map<BinaryRow, Set<Integer>> deletionBuckets,
- Predicate<ManifestEntry> dataFileSkipper) {
- doDeleteExpiredDataFiles(
- getManifestEntriesFromManifestList(manifestListName),
- deletionBuckets,
- dataFileSkipper);
+ super(fileIO, pathFactory, manifestFile, manifestList,
indexFileHandler);
}
- /**
- * Delete added file in the manifest list files. Added files marked as
"ADD" in manifests.
- *
- * @param manifestListName name of manifest list
- * @param deletionBuckets partition-buckets of which some data files have
been deleted
- */
- public void deleteAddedDataFiles(
- String manifestListName, Map<BinaryRow, Set<Integer>>
deletionBuckets) {
- for (ManifestEntry entry :
getManifestEntriesFromManifestList(manifestListName)) {
- if (entry.kind() == FileKind.ADD) {
- fileIO.deleteQuietly(
- new Path(
- pathFactory.bucketPath(entry.partition(),
entry.bucket()),
- entry.file().fileName()));
- deletionBuckets
- .computeIfAbsent(entry.partition(), p -> new
HashSet<>())
- .add(entry.bucket());
- }
- }
- }
-
- /**
- * Try to delete directories collected from {@link
#deleteExpiredDataFiles} and {@link
- * #deleteAddedDataFiles}.
- */
- public void tryDeleteDirectories(Map<BinaryRow, Set<Integer>>
deletionBuckets) {
- DeletionUtils.tryDeleteDirectories(deletionBuckets, pathFactory,
fileIO);
- }
-
- /**
- * Delete metadata of a snapshot, delete {@link ManifestList} file and
{@link ManifestFileMeta}
- * file.
- *
- * @param skipped manifest file deletion skipping set, deleted manifest
file will be added to
- * this set too. NOTE: changelog manifests won't be checked.
- */
- public void deleteManifestFiles(Set<String> skipped, Snapshot snapshot) {
- // cannot call `snapshot.dataManifests` directly, it is possible that
a job is
- // killed during expiration, so some manifest files may have been
deleted
- List<ManifestFileMeta> toExpireManifests = new ArrayList<>();
-
toExpireManifests.addAll(tryReadManifestList(snapshot.baseManifestList()));
-
toExpireManifests.addAll(tryReadManifestList(snapshot.deltaManifestList()));
-
- // delete manifest
- for (ManifestFileMeta manifest : toExpireManifests) {
- String fileName = manifest.fileName();
- if (!skipped.contains(fileName)) {
- manifestFile.delete(fileName);
- skipped.add(fileName);
- }
- }
- if (snapshot.changelogManifestList() != null) {
- for (ManifestFileMeta manifest :
- tryReadManifestList(snapshot.changelogManifestList())) {
- manifestFile.delete(manifest.fileName());
- }
- }
-
- // delete manifest lists
- if (!skipped.contains(snapshot.baseManifestList())) {
- manifestList.delete(snapshot.baseManifestList());
- }
- if (!skipped.contains(snapshot.deltaManifestList())) {
- manifestList.delete(snapshot.deltaManifestList());
- }
- if (snapshot.changelogManifestList() != null) {
- manifestList.delete(snapshot.changelogManifestList());
- }
-
- // delete index files
- String indexManifest = snapshot.indexManifest();
- // check exists, it may have been deleted by other snapshots
- if (indexManifest != null &&
indexFileHandler.existsManifest(indexManifest)) {
- for (IndexManifestEntry entry :
indexFileHandler.readManifest(indexManifest)) {
- if (!skipped.contains(entry.indexFile().fileName())) {
- indexFileHandler.deleteIndexFile(entry);
- }
- }
-
- if (!skipped.contains(indexManifest)) {
- indexFileHandler.deleteManifest(indexManifest);
- }
- }
+ @Override
+ public void cleanUnusedDataFiles(Snapshot snapshot,
Predicate<ManifestEntry> skipper) {
+
doCleanUnusedDataFile(tryReadManifestEntries(snapshot.deltaManifestList()),
skipper);
}
- // ================================= PRIVATE
=================================================
-
- private List<ManifestFileMeta> tryReadManifestList(String
manifestListName) {
- try {
- return manifestList.read(manifestListName);
- } catch (Exception e) {
- LOG.warn("Failed to read manifest list file " + manifestListName,
e);
- return Collections.emptyList();
- }
+ @Override
+ public void cleanUnusedManifests(Snapshot snapshot, Set<String>
skippingSet) {
+ cleanUnusedManifests(snapshot, skippingSet, true);
}
@VisibleForTesting
- void doDeleteExpiredDataFiles(
- Iterable<ManifestEntry> dataFileLog,
- Map<BinaryRow, Set<Integer>> deletionBuckets,
- Predicate<ManifestEntry> dataFileSkipper) {
+ void doCleanUnusedDataFile(
+ Iterable<ManifestEntry> dataFileLog, Predicate<ManifestEntry>
skipper) {
// we cannot delete a data file directly when we meet a DELETE entry,
because that
// file might be upgraded
// data file path -> (original manifest entry, extra file paths)
@@ -224,46 +98,47 @@ public class SnapshotDeletion {
(path, pair) -> {
ManifestEntry entry = pair.getLeft();
// check whether we should skip the data file
- if (!dataFileSkipper.test(entry)) {
+ if (!skipper.test(entry)) {
// delete data files
fileIO.deleteQuietly(path);
pair.getRight().forEach(fileIO::deleteQuietly);
- // record changed buckets
- deletionBuckets
- .computeIfAbsent(entry.partition(), p -> new
HashSet<>())
- .add(entry.bucket());
+
+ recordDeletionBuckets(entry);
}
});
}
- private Iterable<ManifestEntry> getManifestEntriesFromManifestList(String
manifestListName) {
- Queue<String> files =
- tryReadManifestList(manifestListName).stream()
- .map(ManifestFileMeta::fileName)
- .collect(Collectors.toCollection(LinkedList::new));
- return Iterables.concat(
- (Iterable<Iterable<ManifestEntry>>)
- () ->
- new Iterator<Iterable<ManifestEntry>>() {
- @Override
- public boolean hasNext() {
- return files.size() > 0;
- }
+ /**
+ * Delete added file in the manifest list files. Added files marked as
"ADD" in manifests.
+ *
+ * @param manifestListName name of manifest list
+ */
+ public void deleteAddedDataFiles(String manifestListName) {
+ deleteAddedDataFiles(tryReadManifestEntries(manifestListName));
+ }
- @Override
- public Iterable<ManifestEntry> next() {
- String file = files.poll();
- try {
- return manifestFile.read(file);
- } catch (Exception e) {
- LOG.warn("Failed to read manifest
file " + file, e);
- return Collections.emptyList();
- }
- }
- });
+ public void deleteAddedDataFiles(Iterable<ManifestEntry> manifestEntries) {
+ for (ManifestEntry entry : manifestEntries) {
+ if (entry.kind() == FileKind.ADD) {
+ fileIO.deleteQuietly(
+ new Path(
+ pathFactory.bucketPath(entry.partition(),
entry.bucket()),
+ entry.file().fileName()));
+ recordDeletionBuckets(entry);
+ }
+ }
}
- public Set<String> collectManifestSkippingSet(Snapshot snapshot) {
- return DeletionUtils.collectManifestSkippingSet(snapshot,
manifestList, indexFileHandler);
+ public Predicate<ManifestEntry> dataFileSkipper(
+ List<Snapshot> taggedSnapshots, long expiringSnapshotId) {
+ int index = TagManager.findPreviousTag(taggedSnapshots,
expiringSnapshotId);
+ // refresh tag data files
+ if (index >= 0 && cachedTagIndex != index) {
+ cachedTagIndex = index;
+ cachedTagDataFiles.clear();
+ addMergedDataFiles(cachedTagDataFiles, taggedSnapshots.get(index));
+ }
+
+ return entry -> index >= 0 && containsDataFile(cachedTagDataFiles,
entry);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index 68b467480..8615cbc86 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -23,165 +23,65 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
-import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.utils.TagManager;
-
-import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Predicate;
-import static org.apache.paimon.operation.DeletionUtils.addMergedDataFiles;
-import static
org.apache.paimon.operation.DeletionUtils.collectManifestSkippingSet;
-import static org.apache.paimon.operation.DeletionUtils.containsDataFile;
-import static org.apache.paimon.operation.DeletionUtils.readEntries;
-import static org.apache.paimon.operation.DeletionUtils.tryDeleteDirectories;
-
-/**
- * Delete tag files. This class doesn't check changelog files because they are
handled by {@link
- * SnapshotDeletion}.
- */
-public class TagDeletion {
-
- private final FileIO fileIO;
- private final FileStorePathFactory pathFactory;
- private final ManifestList manifestList;
- private final ManifestFile manifestFile;
- private final IndexFileHandler indexFileHandler;
- @Nullable private final Integer scanManifestParallelism;
-
- private final SnapshotManager snapshotManager;
- private final List<Snapshot> taggedSnapshots;
-
- private final Map<BinaryRow, Map<Integer, Set<String>>> skippedDataFiles;
- private final Set<String> skippedManifests;
+/** Delete tag files. */
+public class TagDeletion extends FileDeletionBase {
public TagDeletion(
FileIO fileIO,
- Path tablePath,
FileStorePathFactory pathFactory,
- ManifestList manifestList,
ManifestFile manifestFile,
- IndexFileHandler indexFileHandler,
- @Nullable Integer scanManifestParallelism) {
- this.fileIO = fileIO;
- this.pathFactory = pathFactory;
- this.manifestList = manifestList;
- this.manifestFile = manifestFile;
- this.indexFileHandler = indexFileHandler;
- this.scanManifestParallelism = scanManifestParallelism;
-
- this.snapshotManager = new SnapshotManager(fileIO, tablePath);
- this.taggedSnapshots = new TagManager(fileIO,
tablePath).taggedSnapshots();
-
- this.skippedDataFiles = new HashMap<>();
- this.skippedManifests = new HashSet<>();
+ ManifestList manifestList,
+ IndexFileHandler indexFileHandler) {
+ super(fileIO, pathFactory, manifestFile, manifestList,
indexFileHandler);
}
- /** Delete unused data files, manifest files and empty data file
directories of tag. */
- public void delete(Snapshot taggedSnapshot) {
- if (snapshotManager.snapshotExists(taggedSnapshot.id())) {
- return;
- }
-
- // collect from the earliest snapshot
- Snapshot earliest =
snapshotManager.snapshot(snapshotManager.earliestSnapshotId());
- collectSkip(earliest);
-
- // collect from the neighbor tags
- int index = findIndex(taggedSnapshot);
- if (index - 1 >= 0) {
- collectSkip(taggedSnapshots.get(index - 1));
- }
- if (index + 1 < taggedSnapshots.size()) {
- collectSkip(taggedSnapshots.get(index + 1));
- }
-
- // delete data files and empty directories
- deleteDataFiles(taggedSnapshot);
- // delete manifests
- deleteManifestFiles(taggedSnapshot);
+ @Override
+ public void cleanUnusedDataFiles(Snapshot taggedSnapshot,
Predicate<ManifestEntry> skipper) {
+ cleanUnusedDataFiles(tryReadDataManifestEntries(taggedSnapshot),
skipper);
}
- private void collectSkip(Snapshot snapshot) {
- addMergedDataFiles(
- skippedDataFiles, snapshot, manifestList, manifestFile,
scanManifestParallelism);
- skippedManifests.addAll(
- collectManifestSkippingSet(snapshot, manifestList,
indexFileHandler));
+ @Override
+ public void cleanUnusedManifests(Snapshot taggedSnapshot, Set<String>
skippingSet) {
+ // doesn't clean changelog files because they are handled by
SnapshotDeletion
+ cleanUnusedManifests(taggedSnapshot, skippingSet, false);
}
- private void deleteDataFiles(Snapshot taggedSnapshot) {
- // delete data files
- Map<BinaryRow, Set<Integer>> deletionBuckets = new HashMap<>();
- Iterable<ManifestEntry> entries =
- readEntries(
- taggedSnapshot.dataManifests(manifestList),
- manifestFile,
- scanManifestParallelism);
+ public void cleanUnusedDataFiles(
+ Iterable<ManifestEntry> entries, Predicate<ManifestEntry> skipper)
{
for (ManifestEntry entry : ManifestEntry.mergeEntries(entries)) {
- if (!containsDataFile(skippedDataFiles, entry)) {
+ if (!skipper.test(entry)) {
Path bucketPath = pathFactory.bucketPath(entry.partition(),
entry.bucket());
fileIO.deleteQuietly(new Path(bucketPath,
entry.file().fileName()));
for (String file : entry.file().extraFiles()) {
fileIO.deleteQuietly(new Path(bucketPath, file));
}
- deletionBuckets
- .computeIfAbsent(entry.partition(), p -> new
HashSet<>())
- .add(entry.bucket());
+ recordDeletionBuckets(entry);
}
}
-
- // delete empty data file directories
- tryDeleteDirectories(deletionBuckets, pathFactory, fileIO);
}
- private void deleteManifestFiles(Snapshot taggedSnapshot) {
- for (ManifestFileMeta manifest :
taggedSnapshot.dataManifests(manifestList)) {
- String fileName = manifest.fileName();
- if (!skippedManifests.contains(fileName)) {
- manifestFile.delete(fileName);
- }
- }
-
- // delete manifest lists
- manifestList.delete(taggedSnapshot.baseManifestList());
- manifestList.delete(taggedSnapshot.deltaManifestList());
-
- // delete index files
- String indexManifest = taggedSnapshot.indexManifest();
- // check exists, it may have been deleted by other snapshots
- if (indexManifest != null &&
indexFileHandler.existsManifest(indexManifest)) {
- for (IndexManifestEntry entry :
indexFileHandler.readManifest(indexManifest)) {
- if (!skippedManifests.contains(entry.indexFile().fileName())) {
- indexFileHandler.deleteIndexFile(entry);
- }
- }
-
- if (!skippedManifests.contains(indexManifest)) {
- indexFileHandler.deleteManifest(indexManifest);
- }
- }
+ public Predicate<ManifestEntry> dataFileSkipper(Snapshot fromSnapshot) {
+ return dataFileSkipper(Collections.singletonList(fromSnapshot));
}
- private int findIndex(Snapshot taggedSnapshot) {
- for (int i = 0; i < taggedSnapshots.size(); i++) {
- if (taggedSnapshot.id() == taggedSnapshots.get(i).id()) {
- return i;
- }
+ public Predicate<ManifestEntry> dataFileSkipper(List<Snapshot>
fromSnapshots) {
+ Map<BinaryRow, Map<Integer, Set<String>>> skipped = new HashMap<>();
+ for (Snapshot snapshot : fromSnapshots) {
+ addMergedDataFiles(skipped, snapshot);
}
- throw new RuntimeException(
- String.format(
- "Didn't find tag with snapshot id '%s'.This is
unexpected.",
- taggedSnapshot.id()));
+ return entry -> containsDataFile(skipped, entry);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/TagFileKeeper.java
b/paimon-core/src/main/java/org/apache/paimon/operation/TagFileKeeper.java
deleted file mode 100644
index 3f1cc0bef..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagFileKeeper.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.operation;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.ManifestFile;
-import org.apache.paimon.manifest.ManifestList;
-import org.apache.paimon.utils.TagManager;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Predicate;
-
-import static org.apache.paimon.operation.DeletionUtils.addMergedDataFiles;
-import static org.apache.paimon.operation.DeletionUtils.containsDataFile;
-
-/** Util class to provide methods to prevent tag files to be deleted when
expiring snapshots. */
-public class TagFileKeeper {
-
- private final ManifestList manifestList;
- private final ManifestFile manifestFile;
- private final TagManager tagManager;
- @Nullable private final Integer scanManifestParallelism;
-
- private long cachedTag = -1;
- private final Map<BinaryRow, Map<Integer, Set<String>>> cachedTagDataFiles;
-
- private List<Snapshot> taggedSnapshots;
-
- public TagFileKeeper(
- ManifestList manifestList,
- ManifestFile manifestFile,
- TagManager tagManager,
- @Nullable Integer scanManifestParallelism) {
- this.manifestList = manifestList;
- this.manifestFile = manifestFile;
- this.tagManager = tagManager;
- this.scanManifestParallelism = scanManifestParallelism;
-
- this.cachedTagDataFiles = new HashMap<>();
- }
-
- /** Caller should determine whether to reload. */
- public void reloadTags() {
- taggedSnapshots = tagManager.taggedSnapshots();
- }
-
- public Predicate<ManifestEntry> tagDataFileSkipper(long
expiringSnapshotId) {
- int index = findPreviousTag(expiringSnapshotId, taggedSnapshots);
- if (index >= 0) {
- tryRefresh(taggedSnapshots.get(index));
- }
- return entry -> index >= 0 && containsDataFile(cachedTagDataFiles,
entry);
- }
-
- public List<Snapshot> findOverlappedSnapshots(long beginInclusive, long
endExclusive) {
- List<Snapshot> snapshots = new ArrayList<>();
- int right = findPreviousTag(endExclusive, taggedSnapshots);
- if (right >= 0) {
- int left = Math.max(findPreviousOrEqualTag(beginInclusive,
taggedSnapshots), 0);
- for (int i = left; i <= right; i++) {
- snapshots.add(taggedSnapshots.get(i));
- }
- }
- return snapshots;
- }
-
- private void tryRefresh(Snapshot taggedSnapshot) {
- if (cachedTag != taggedSnapshot.id()) {
- refresh(taggedSnapshot);
- cachedTag = taggedSnapshot.id();
- }
- }
-
- private void refresh(Snapshot taggedSnapshot) {
- cachedTagDataFiles.clear();
- addMergedDataFiles(
- cachedTagDataFiles,
- taggedSnapshot,
- manifestList,
- manifestFile,
- scanManifestParallelism);
- }
-
- private int findPreviousTag(long targetSnapshotId, List<Snapshot>
taggedSnapshots) {
- for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
- if (taggedSnapshots.get(i).id() < targetSnapshotId) {
- return i;
- }
- }
- return -1;
- }
-
- private int findPreviousOrEqualTag(long targetSnapshotId, List<Snapshot>
taggedSnapshots) {
- for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
- if (taggedSnapshots.get(i).id() <= targetSnapshotId) {
- return i;
- }
- }
- return -1;
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index a769d6586..536bc1c34 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -25,7 +25,6 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.Lock;
-import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
@@ -45,7 +44,6 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.TagManager;
import java.io.IOException;
@@ -252,7 +250,7 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
}
} else {
String tagName = coreOptions.scanTagName();
- TagManager tagManager = new TagManager(fileIO, path);
+ TagManager tagManager = tagManager();
if (tagManager.tagExists(tagName)) {
long schemaId =
tagManager.taggedSnapshot(tagName).schemaId();
return
Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
@@ -275,11 +273,13 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public void rollbackTo(long snapshotId) {
- try {
- snapshotManager().rollbackTo(store().newSnapshotDeletion(),
snapshotId);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
+ SnapshotManager snapshotManager = snapshotManager();
+ checkArgument(
+ snapshotManager.snapshotExists(snapshotId),
+ "Rollback snapshot '%s' doesn't exist.",
+ snapshotId);
+
+ rollbackHelper().cleanLargerThan(snapshotManager.snapshot(snapshotId));
}
@Override
@@ -291,19 +291,49 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
fromSnapshotId);
Snapshot snapshot = snapshotManager.snapshot(fromSnapshotId);
- TagManager tagManager = new TagManager(fileIO, path);
- tagManager.createTag(snapshot, tagName);
+ tagManager().createTag(snapshot, tagName);
}
@Override
public void deleteTag(String tagName) {
- checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is
blank.", tagName);
+ tagManager().deleteTag(tagName, store().newTagDeletion(),
snapshotManager());
+ }
+
+ @Override
+ public void rollbackTo(String tagName) {
+ TagManager tagManager = tagManager();
+ checkArgument(tagManager.tagExists(tagName), "Rollback tag '%s'
doesn't exist.", tagName);
- TagManager tagManager = new TagManager(fileIO, path);
Snapshot taggedSnapshot = tagManager.taggedSnapshot(tagName);
+ rollbackHelper().cleanLargerThan(taggedSnapshot);
- TagDeletion tagDeletion = store().newTagDeletion();
- tagDeletion.delete(taggedSnapshot);
- fileIO.deleteQuietly(tagManager.tagPath(tagName));
+ try {
+ // it is possible that the earliest snapshot is later than the
rollback tag because of
+ // snapshot expiration, in this case the `cleanLargerThan` method
will delete all
+ // snapshots, so we should write the tag file to snapshot
directory and modify the
+ // earliest hint
+ SnapshotManager snapshotManager = snapshotManager();
+ if (!snapshotManager.snapshotExists(taggedSnapshot.id())) {
+ fileIO.writeFileUtf8(
+ snapshotManager().snapshotPath(taggedSnapshot.id()),
+ fileIO.readFileUtf8(tagManager.tagPath(tagName)));
+ snapshotManager.commitEarliestHint(taggedSnapshot.id());
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private TagManager tagManager() {
+ return new TagManager(fileIO, path);
+ }
+
+ private RollbackHelper rollbackHelper() {
+ return new RollbackHelper(
+ snapshotManager(),
+ tagManager(),
+ fileIO,
+ store().newSnapshotDeletion(),
+ store().newTagDeletion());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index 1cf25af75..86dd1f97d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -110,4 +110,12 @@ public interface ReadonlyTable extends InnerTable {
"Readonly Table %s does not support deleteTag.",
this.getClass().getSimpleName()));
}
+
+ @Override
+ default void rollbackTo(String tagName) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Readonly Table %s does not support rollbackTo tag.",
+ this.getClass().getSimpleName()));
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
new file mode 100644
index 000000000..4843393e7
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.SnapshotDeletion;
+import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.function.Predicate;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Helper class for {@link Table#rollbackTo} including utils to clean
snapshots. */
+public class RollbackHelper {
+
+ private final SnapshotManager snapshotManager;
+ private final TagManager tagManager;
+ private final FileIO fileIO;
+ private final SnapshotDeletion snapshotDeletion;
+ private final TagDeletion tagDeletion;
+
+ public RollbackHelper(
+ SnapshotManager snapshotManager,
+ TagManager tagManager,
+ FileIO fileIO,
+ SnapshotDeletion snapshotDeletion,
+ TagDeletion tagDeletion) {
+ this.snapshotManager = snapshotManager;
+ this.tagManager = tagManager;
+ this.fileIO = fileIO;
+ this.snapshotDeletion = snapshotDeletion;
+ this.tagDeletion = tagDeletion;
+ }
+
+ /** Clean snapshots and tags whose id is larger than given snapshot's. */
+ public void cleanLargerThan(Snapshot retainedSnapshot) {
+ // clean data files
+ List<Snapshot> cleanedSnapshots =
cleanSnapshotsDataFiles(retainedSnapshot);
+ List<Snapshot> cleanedTags = cleanTagsDataFiles(retainedSnapshot);
+
+ // clean manifests
+ // this can be used for snapshots and tags manifests cleaning both
+ Set<String> manifestsSkippingSet =
snapshotDeletion.manifestSkippingSet(retainedSnapshot);
+
+ for (Snapshot snapshot : cleanedSnapshots) {
+ snapshotDeletion.cleanUnusedManifests(snapshot,
manifestsSkippingSet);
+ }
+
+ cleanedTags.removeAll(cleanedSnapshots);
+ for (Snapshot snapshot : cleanedTags) {
+ tagDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet);
+ }
+
+ // modify the latest hint
+ try {
+ snapshotManager.commitLatestHint(retainedSnapshot.id());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private List<Snapshot> cleanSnapshotsDataFiles(Snapshot retainedSnapshot) {
+ long earliest =
+ checkNotNull(
+ snapshotManager.earliestSnapshotId(), "Cannot find
earliest snapshot.");
+ long latest =
+ checkNotNull(snapshotManager.latestSnapshotId(), "Cannot find
latest snapshot.");
+
+ // delete snapshot files first, cannot be read now
+ // it is possible that some snapshots have been expired
+ List<Snapshot> toBeCleaned = new ArrayList<>();
+ long to = Math.max(earliest, retainedSnapshot.id() + 1);
+ for (long i = latest; i >= to; i--) {
+ toBeCleaned.add(snapshotManager.snapshot(i));
+ fileIO.deleteQuietly(snapshotManager.snapshotPath(i));
+ }
+
+ // delete data files of snapshots
+ // don't worry about tag data files because file deletion methods
won't throw exception
+ // when deleting non-existing data files
+ for (Snapshot snapshot : toBeCleaned) {
+
snapshotDeletion.deleteAddedDataFiles(snapshot.deltaManifestList());
+
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
+ }
+
+ // delete directories
+ snapshotDeletion.cleanDataDirectories();
+
+ return toBeCleaned;
+ }
+
+ private List<Snapshot> cleanTagsDataFiles(Snapshot retainedSnapshot) {
+ SortedMap<Snapshot, String> tags = tagManager.tags();
+ if (tags.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<Snapshot> taggedSnapshots = new ArrayList<>(tags.keySet());
+ List<Snapshot> toBeCleaned = new ArrayList<>();
+
+ // delete tag files
+ for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
+ Snapshot tag = taggedSnapshots.get(i);
+ if (tag.id() <= retainedSnapshot.id()) {
+ break;
+ }
+ toBeCleaned.add(tag);
+ fileIO.deleteQuietly(tagManager.tagPath(tags.get(tag)));
+ }
+
+ // delete data files
+ Predicate<ManifestEntry> dataFileSkipper =
tagDeletion.dataFileSkipper(retainedSnapshot);
+ for (Snapshot s : toBeCleaned) {
+ tagDeletion.cleanUnusedDataFiles(s, dataFileSkipper);
+ }
+
+ // delete directories
+ tagDeletion.cleanDataDirectories();
+
+ return toBeCleaned;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index e5b5c26b2..c1b4b7882 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -75,6 +75,10 @@ public interface Table extends Serializable {
@Experimental
void deleteTag(String tagName);
+ /** Rollback table's state to a specific tag. */
+ @Experimental
+ void rollbackTo(String tagName);
+
// =============== Read & Write Operations ==================
/** Returns a new read builder. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index b28dab580..05a18d8da 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -20,23 +20,16 @@ package org.apache.paimon.utils;
import org.apache.paimon.Snapshot;
import org.apache.paimon.Snapshot.CommitKind;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.operation.SnapshotDeletion;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
import java.util.function.Function;
@@ -107,6 +100,11 @@ public class SnapshotManager implements Serializable {
}
}
+ public @Nullable Snapshot earliestSnapshot() {
+ Long snapshotId = earliestSnapshotId();
+ return snapshotId == null ? null : snapshot(snapshotId);
+ }
+
public @Nullable Long earliestSnapshotId() {
try {
return findEarliest();
@@ -334,42 +332,4 @@ public class SnapshotManager implements Serializable {
fileIO.delete(hintFile, false);
fileIO.writeFileUtf8(hintFile, String.valueOf(snapshotId));
}
-
- public void rollbackTo(SnapshotDeletion deletion, long snapshotId) throws
IOException {
- if (!snapshotExists(snapshotId)) {
- throw new IllegalArgumentException("Rollback snapshot not exist: "
+ snapshotId);
- }
-
- Long latest = findLatest();
- if (latest == null) {
- return;
- }
-
- // first modify hint
- commitLatestHint(snapshotId);
-
- // delete snapshots first, cannot be read now.
- List<Snapshot> snapshots = new ArrayList<>();
- for (long i = latest; i > snapshotId; i--) {
- snapshots.add(snapshot(i));
- fileIO().deleteQuietly(snapshotPath(i));
- }
-
- // delete data files
- Map<BinaryRow, Set<Integer>> deletionBuckets = new HashMap<>();
- for (Snapshot snapshot : snapshots) {
- // delete data files
- deletion.deleteAddedDataFiles(snapshot.deltaManifestList(),
deletionBuckets);
- deletion.deleteAddedDataFiles(snapshot.changelogManifestList(),
deletionBuckets);
- }
-
- // delete directories
- deletion.tryDeleteDirectories(deletionBuckets);
-
- // delete manifest files.
- Set<String> manifestSkipped =
deletion.collectManifestSkippingSet(snapshot(snapshotId));
- for (Snapshot snapshot : snapshots) {
- deletion.deleteManifestFiles(manifestSkipped, snapshot);
- }
- }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index f6fc165d4..46e62b014 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -21,6 +21,8 @@ package org.apache.paimon.utils;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.TagDeletion;
import java.io.IOException;
import java.util.ArrayList;
@@ -28,6 +30,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.function.Predicate;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -77,6 +80,51 @@ public class TagManager {
}
}
+ public void deleteTag(
+ String tagName, TagDeletion tagDeletion, SnapshotManager
snapshotManager) {
+ checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is
blank.", tagName);
+ checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
+
+ Snapshot taggedSnapshot = taggedSnapshot(tagName);
+ List<Snapshot> taggedSnapshots;
+
+ // skip file deletion if snapshot exists
+ if (snapshotManager.snapshotExists(taggedSnapshot.id())) {
+ fileIO.deleteQuietly(tagPath(tagName));
+ return;
+ } else {
+ // FileIO discovers tags by tag file, so we should read all tags
before we delete tag
+ taggedSnapshots = taggedSnapshots();
+ fileIO.deleteQuietly(tagPath(tagName));
+ }
+
+ // collect skipping sets from the left neighbor tag and the nearest
right neighbor (either
+ // the earliest snapshot or right neighbor tag)
+ List<Snapshot> skippedSnapshots = new ArrayList<>();
+
+ int index = findIndex(taggedSnapshot, taggedSnapshots);
+ // the left neighbor tag
+ if (index - 1 >= 0) {
+ skippedSnapshots.add(taggedSnapshots.get(index - 1));
+ }
+ // the nearest right neighbor
+ Snapshot right = snapshotManager.earliestSnapshot();
+ if (index + 1 < taggedSnapshots.size()) {
+ Snapshot rightTag = taggedSnapshots.get(index + 1);
+ right = right.id() < rightTag.id() ? right : rightTag;
+ }
+ skippedSnapshots.add(right);
+
+ // delete data files and empty directories
+ Predicate<ManifestEntry> dataFileSkipper =
tagDeletion.dataFileSkipper(skippedSnapshots);
+ tagDeletion.cleanUnusedDataFiles(taggedSnapshot, dataFileSkipper);
+ tagDeletion.cleanDataDirectories();
+
+ // delete manifests
+ tagDeletion.cleanUnusedManifests(
+ taggedSnapshot,
tagDeletion.manifestSkippingSet(skippedSnapshots));
+ }
+
/** Check if a tag exists. */
public boolean tagExists(String tagName) {
Path path = tagPath(tagName);
@@ -111,7 +159,6 @@ public class TagManager {
/** Get all tagged snapshots with names sorted by snapshot id. */
public SortedMap<Snapshot, String> tags() {
-
TreeMap<Snapshot, String> tags = new
TreeMap<>(Comparator.comparingLong(Snapshot::id));
try {
listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
@@ -127,4 +174,48 @@ public class TagManager {
}
return tags;
}
+
+ private int findIndex(Snapshot taggedSnapshot, List<Snapshot>
taggedSnapshots) {
+ for (int i = 0; i < taggedSnapshots.size(); i++) {
+ if (taggedSnapshot.id() == taggedSnapshots.get(i).id()) {
+ return i;
+ }
+ }
+ throw new RuntimeException(
+ String.format(
+ "Didn't find tag with snapshot id '%s'.This is
unexpected.",
+ taggedSnapshot.id()));
+ }
+
+ public static List<Snapshot> findOverlappedSnapshots(
+ List<Snapshot> taggedSnapshots, long beginInclusive, long
endExclusive) {
+ List<Snapshot> snapshots = new ArrayList<>();
+ int right = findPreviousTag(taggedSnapshots, endExclusive);
+ if (right >= 0) {
+ int left = Math.max(findPreviousOrEqualTag(taggedSnapshots,
beginInclusive), 0);
+ for (int i = left; i <= right; i++) {
+ snapshots.add(taggedSnapshots.get(i));
+ }
+ }
+ return snapshots;
+ }
+
+ public static int findPreviousTag(List<Snapshot> taggedSnapshots, long
targetSnapshotId) {
+ for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
+ if (taggedSnapshots.get(i).id() < targetSnapshotId) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private static int findPreviousOrEqualTag(
+ List<Snapshot> taggedSnapshots, long targetSnapshotId) {
+ for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
+ if (taggedSnapshots.get(i).id() <= targetSnapshotId) {
+ return i;
+ }
+ }
+ return -1;
+ }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index b548827a4..6b88a838b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -40,7 +40,6 @@ import org.apache.paimon.operation.FileStoreExpireImpl;
import org.apache.paimon.operation.FileStoreRead;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.operation.TagFileKeeper;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
@@ -138,11 +137,7 @@ public class TestFileStore extends KeyValueFileStore {
millisRetained,
snapshotManager(),
newSnapshotDeletion(),
- new TagFileKeeper(
- manifestListFactory().create(),
- manifestFileFactory().create(),
- new TagManager(fileIO, options.path()),
- options.scanManifestParallelism()));
+ new TagManager(fileIO, options.path()));
}
public List<Snapshot> commitData(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
index 20612d344..91eb0b01c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
@@ -36,7 +36,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -89,8 +88,7 @@ public class CleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition,
0, 1, dataFile);
// expire
- expire.snapshotDeletion()
- .doDeleteExpiredDataFiles(Arrays.asList(add, delete), new
HashMap<>(), f -> false);
+ expire.snapshotDeletion().doCleanUnusedDataFile(Arrays.asList(add,
delete), f -> false);
// check
assertThat(fileIO.exists(myDataFile)).isFalse();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 095a2b638..ede56ff14 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -399,7 +399,7 @@ public class FileDeletionTest {
assertPathExists(fileIO,
pathFactory.toManifestListPath(manifestListName));
}
- store.newTagDeletion().delete(tagManager.taggedSnapshot("tag1"));
+ tagManager.deleteTag("tag1", store.newTagDeletion(), snapshotManager);
// check data files
assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 0));
@@ -475,7 +475,7 @@ public class FileDeletionTest {
assertPathExists(fileIO,
pathFactory.toManifestListPath(manifestListName));
}
- store.newTagDeletion().delete(tagManager.taggedSnapshot("tag2"));
+ tagManager.deleteTag("tag2", store.newTagDeletion(), snapshotManager);
// check data files
assertPathExists(fileIO, pathFactory.bucketPath(partition, 0));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index aff34ee45..e3966f850 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -655,18 +655,15 @@ public abstract class FileStoreTableTestBase {
write.close();
}
+ // All tags are after the rollback snapshot
@Test
- public void testRollbackTo() throws Exception {
- FileStoreTable table = createFileStoreTable();
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
+ public void testRollbackToSnapshotCase0() throws Exception {
+ int commitTimes = ThreadLocalRandom.current().nextInt(100) + 5;
+ FileStoreTable table = prepareRollbackTable(commitTimes);
- int commitTimes = ThreadLocalRandom.current().nextInt(100) + 1;
- for (int i = 0; i < commitTimes; i++) {
- write.write(rowData(i, 10 * i, 100L * i));
- commit.commit(i, write.prepareCommit(false, i));
- }
- write.close();
+ table.createTag("test1", commitTimes);
+ table.createTag("test2", commitTimes - 1);
+ table.createTag("test3", commitTimes - 2);
table.rollbackTo(1);
ReadBuilder readBuilder = table.newReadBuilder();
@@ -680,7 +677,7 @@ public abstract class FileStoreTableTestBase {
List<java.nio.file.Path> files =
Files.walk(new
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
- assertThat(files.size()).isEqualTo(14);
+ assertThat(files.size()).isEqualTo(15);
// table-path
// table-path/snapshot
// table-path/snapshot/LATEST
@@ -695,6 +692,151 @@ public abstract class FileStoreTableTestBase {
// table-path/manifest/manifest-list-0
// table-path/schema
// table-path/schema/schema-0
+ // table-path/tag
+ }
+
+ // One tag is at the rollback snapshot and others are after it
+ @Test
+ public void testRollbackToSnapshotCase1() throws Exception {
+ int commitTimes = ThreadLocalRandom.current().nextInt(100) + 5;
+ FileStoreTable table = prepareRollbackTable(commitTimes);
+
+ table.createTag("test1", commitTimes);
+ table.createTag("test2", commitTimes - 1);
+ table.createTag("test3", 1);
+
+ table.rollbackTo(1);
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<String> result =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ BATCH_ROW_TO_STRING);
+ assertThat(result)
+
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+ // read tag test3
+ table =
table.copy(Collections.singletonMap(CoreOptions.SCAN_TAG_NAME.key(), "test3"));
+ readBuilder = table.newReadBuilder();
+ result =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ BATCH_ROW_TO_STRING);
+ assertThat(result)
+
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+ List<java.nio.file.Path> files =
+ Files.walk(new
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
+ assertThat(files.size()).isEqualTo(16);
+ // case 0 plus 1:
+ // table-path/tag/tag-test3
+ }
+
+ // One tag is before the rollback snapshot and others are after it
+ @Test
+ public void testRollbackToSnapshotCase2() throws Exception {
+ int commitTimes = ThreadLocalRandom.current().nextInt(100) + 5;
+ FileStoreTable table = prepareRollbackTable(commitTimes);
+
+ table.createTag("test1", commitTimes);
+ table.createTag("test2", commitTimes - 1);
+ table.createTag("test3", 1);
+
+ table.rollbackTo(2);
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<String> result =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ BATCH_ROW_TO_STRING);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset");
+
+ // read tag test3
+ table =
table.copy(Collections.singletonMap(CoreOptions.SCAN_TAG_NAME.key(), "test3"));
+ readBuilder = table.newReadBuilder();
+ result =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ BATCH_ROW_TO_STRING);
+ assertThat(result)
+
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+ List<java.nio.file.Path> files =
+ Files.walk(new
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
+ assertThat(files.size()).isEqualTo(23);
+ // case 0 plus 7:
+ // table-path/manifest/manifest-list-2
+ // table-path/manifest/manifest-list-3
+ // table-path/manifest/manifest-1
+ // table-path/snapshot/snapshot-2
+ // table-path/pt=1
+ // table-path/pt=1/bucket-0
+ // table-path/pt=1/bucket-0/data-0.orc
+ }
+
+ @ParameterizedTest(name = "expire snapshots = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testRollbackToTag(boolean expire) throws Exception {
+ int commitTimes = ThreadLocalRandom.current().nextInt(100) + 5;
+ FileStoreTable table = prepareRollbackTable(commitTimes);
+
+ table.createTag("test1", 1);
+ table.createTag("test2", commitTimes - 3);
+ table.createTag("test3", commitTimes - 1);
+
+ if (expire) {
+ // expire snapshots
+ Options options = new Options();
+ options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 5);
+ options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 5);
+ table.copy(options.toMap()).store().newExpire().expire();
+ }
+
+ table.rollbackTo("test1");
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<String> result =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ BATCH_ROW_TO_STRING);
+ assertThat(result)
+
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+ // read tag test1
+ table =
table.copy(Collections.singletonMap(CoreOptions.SCAN_TAG_NAME.key(), "test1"));
+ readBuilder = table.newReadBuilder();
+ result =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ BATCH_ROW_TO_STRING);
+ assertThat(result)
+
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+ List<java.nio.file.Path> files =
+ Files.walk(new
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
+ assertThat(files.size()).isEqualTo(16);
+ // rollback snapshot case 0 plus 1:
+ // table-path/tag/tag-test1
+ }
+
+ private FileStoreTable prepareRollbackTable(int commitTimes) throws
Exception {
+ FileStoreTable table = createFileStoreTable();
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ for (int i = 0; i < commitTimes; i++) {
+ write.write(rowData(i, 10 * i, 100L * i));
+ commit.commit(i, write.prepareCommit(false, i));
+ }
+ write.close();
+
+ return table;
}
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
index 63730cbe1..3d7b06118 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
@@ -142,7 +142,7 @@ public class IndexFileExpireTableTest extends
PrimaryKeyTableTestBase {
}
@Test
- public void testIndexFileRollback() throws Exception {
+ public void testIndexFileRollbackSnapshot() throws Exception {
prepareExpireTable();
long indexFileSize = indexFileSize();
@@ -169,6 +169,28 @@ public class IndexFileExpireTableTest extends
PrimaryKeyTableTestBase {
assertThat(indexManifestSize()).isEqualTo(1);
}
+ @Test
+ public void testIndexFileRollbackTag() throws Exception {
+ prepareExpireTable();
+
+ long indexFileSize = indexFileSize();
+ long indexManifestSize = indexManifestSize();
+
+ table.createTag("tag1", 1);
+ table.createTag("tag5", 5);
+ table.createTag("tag7", 7);
+
+ table.rollbackTo(5);
+ checkIndexFiles(5);
+ assertThat(indexFileSize()).isEqualTo(indexFileSize - 2);
+ assertThat(indexManifestSize()).isEqualTo(indexManifestSize - 2);
+
+ table.rollbackTo("tag1");
+ checkIndexFiles(1);
+ assertThat(indexFileSize()).isEqualTo(3);
+ assertThat(indexManifestSize()).isEqualTo(1);
+ }
+
private void prepareExpireTable() throws Exception {
StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
StreamTableWrite write = writeBuilder.newWrite();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java
index 09e2ac41f..f9c4fb7c2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java
@@ -32,21 +32,21 @@ import static
org.apache.paimon.flink.action.Action.checkRequiredArgument;
import static org.apache.paimon.flink.action.Action.getTablePath;
import static org.apache.paimon.flink.action.Action.optionalConfigMap;
-/** Rollback to specific snapshot action for Flink. */
+/** Rollback to specific version action for Flink. */
public class RollbackToAction extends TableActionBase {
private static final Logger LOG =
LoggerFactory.getLogger(RollbackToAction.class);
- private final long snapshotId;
+ private final String version;
public RollbackToAction(
String warehouse,
String databaseName,
String tableName,
- long snapshotId,
+ String version,
Map<String, String> catalogConfig) {
super(warehouse, databaseName, tableName, catalogConfig);
- this.snapshotId = snapshotId;
+ this.version = version;
}
public static Optional<Action> create(String[] args) {
@@ -61,41 +61,44 @@ public class RollbackToAction extends TableActionBase {
Tuple3<String, String, String> tablePath = getTablePath(params);
- checkRequiredArgument(params, "snapshot");
- String snapshot = params.get("snapshot");
+ checkRequiredArgument(params, "version");
+ String version = params.get("version");
Map<String, String> catalogConfig = optionalConfigMap(params,
"catalog-conf");
RollbackToAction action =
new RollbackToAction(
- tablePath.f0,
- tablePath.f1,
- tablePath.f2,
- Long.parseLong(snapshot),
- catalogConfig);
+ tablePath.f0, tablePath.f1, tablePath.f2, version,
catalogConfig);
return Optional.of(action);
}
private static void printHelp() {
- System.out.println("Action \"rollback-to\" roll back a table to a
specific snapshot ID.");
+ System.out.println(
+ "Action \"rollback-to\" roll back a table to a specific
snapshot ID or tag.");
System.out.println();
System.out.println("Syntax:");
System.out.println(
" rollback-to --warehouse <warehouse-path> --database
<database-name> "
- + "--table <table-name> --snapshot <snapshot_spec>");
+ + "--table <table-name> --version <version-string>");
+ System.out.println(
+ " 'version-string can be a long value representing a snapshot
ID or a tag name.'");
System.out.println();
}
@Override
public void run() throws Exception {
- LOG.debug("Run rollback-to action with snapshot id '{}'.", snapshotId);
+ LOG.debug("Run rollback-to action with snapshot id '{}'.", version);
if (!(table instanceof DataTable)) {
throw new IllegalArgumentException("Unknown table: " + identifier);
}
- table.rollbackTo(snapshotId);
+ if (version.chars().allMatch(Character::isDigit)) {
+ table.rollbackTo(Long.parseLong(version));
+ } else {
+ table.rollbackTo(version);
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
index 6413dce3e..38555d47d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
@@ -49,7 +49,7 @@ public class RollbackToActionITCase extends ActionITCaseBase {
}
@Test
- public void test() throws Exception {
+ public void rollbackToSnapshotTest() throws Exception {
FileStoreTable table =
createFileStoreTable(
ROW_TYPE,
@@ -67,11 +67,42 @@ public class RollbackToActionITCase extends
ActionITCaseBase {
writeData(rowData(2L, BinaryString.fromString("Flink")));
RollbackToAction action =
- new RollbackToAction(warehouse, database, tableName, 2,
Collections.emptyMap());
+ new RollbackToAction(warehouse, database, tableName, "2",
Collections.emptyMap());
action.run();
testBatchRead(
"SELECT * FROM `" + tableName + "`",
Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "Hello")));
}
+
+ @Test
+ public void rollbackToTagTest() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ ROW_TYPE,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.emptyMap());
+ snapshotManager = table.snapshotManager();
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ writeData(rowData(1L, BinaryString.fromString("Hi")));
+ writeData(rowData(2L, BinaryString.fromString("Apache")));
+ writeData(rowData(2L, BinaryString.fromString("Paimon")));
+
+ table.createTag("tag1", 1);
+ table.createTag("tag2", 2);
+ table.createTag("tag3", 3);
+
+ RollbackToAction action =
+ new RollbackToAction(
+ warehouse, database, tableName, "tag2",
Collections.emptyMap());
+ action.run();
+
+ testBatchRead(
+ "SELECT * FROM `" + tableName + "`",
+ Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "Apache")));
+ }
}