This is an automated email from the ASF dual-hosted git repository.
liguojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cf230b888 [Core]Clean orphan files for branch (#2863)
cf230b888 is described below
commit cf230b888fefc3e49929105eef05ba72fe0887c5
Author: TaoZex <[email protected]>
AuthorDate: Tue Jul 2 19:07:56 2024 +0800
[Core]Clean orphan files for branch (#2863)
* [Core]Clean orphan files for branch
---
.../org/apache/paimon/utils/BranchManager.java | 11 +++++++
.../org/apache/paimon/utils/SnapshotManager.java | 38 ++++++++++++++++++++--
.../paimon/operation/OrphanFilesCleanTest.java | 12 +++++++
3 files changed, 59 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 8cda5a4ed..6ff8d4c2a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -41,6 +41,7 @@ import java.util.SortedMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.paimon.utils.FileUtils.listOriginalVersionedFiles;
import static org.apache.paimon.utils.FileUtils.listVersionedDirectories;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -77,6 +78,16 @@ public class BranchManager {
return new Path(tablePath + "/branch");
}
+ /** Return the root Directory of branch by given tablePath. */
+ public static Path branchDirectory(Path tablePath) {
+ return new Path(tablePath + "/branch");
+ }
+
+ public static List<String> branchNames(FileIO fileIO, Path tablePath)
throws IOException {
+ return listOriginalVersionedFiles(fileIO, branchDirectory(tablePath),
BRANCH_PREFIX)
+ .collect(Collectors.toList());
+ }
+
public static boolean isMainBranch(String branch) {
return branch.equals(DEFAULT_MAIN_BRANCH);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index c72734753..ca88259de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -47,6 +47,7 @@ import java.util.stream.Collectors;
import java.util.stream.LongStream;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
+import static org.apache.paimon.utils.BranchManager.branchNames;
import static org.apache.paimon.utils.BranchManager.branchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
@@ -107,6 +108,10 @@ public class SnapshotManager implements Serializable {
return new Path(branchPath(tablePath, branch) + "/snapshot");
}
+ public static Path snapshotDirectory(Path tablePath, String branch) {
+ return new Path(branchPath(tablePath, branch) + "/snapshot");
+ }
+
public Snapshot snapshot(long snapshotId) {
Path snapshotPath = snapshotPath(snapshotId);
return Snapshot.fromPath(fileIO, snapshotPath);
@@ -390,11 +395,25 @@ public class SnapshotManager implements Serializable {
* be deleted by other processes, so just skip this snapshot.
*/
public List<Snapshot> safelyGetAllSnapshots() throws IOException {
+ // For main branch
List<Path> paths =
listVersionedFiles(fileIO, snapshotDirectory(),
SNAPSHOT_PREFIX)
.map(id -> snapshotPath(id))
.collect(Collectors.toList());
+ // For other branch
+ List<String> allBranchNames = branchNames(fileIO, tablePath);
+ for (String branchName : allBranchNames) {
+ List<Path> branchPaths =
+ listVersionedFiles(
+ fileIO,
+ snapshotDirectory(tablePath, branchName),
+ SNAPSHOT_PREFIX)
+ .map(this::snapshotPath)
+ .collect(Collectors.toList());
+ paths.addAll(branchPaths);
+ }
+
List<Snapshot> snapshots = new ArrayList<>();
for (Path path : paths) {
Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path);
@@ -428,8 +447,23 @@ public class SnapshotManager implements Serializable {
* Try to get non snapshot files. If any error occurred, just ignore it
and return an empty
* result.
*/
- public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus>
fileStatusFilter) {
- return listPathWithFilter(snapshotDirectory(), fileStatusFilter,
nonSnapshotFileFilter());
+ public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus>
fileStatusFilter)
+ throws IOException {
+ // For main branch
+ List<Path> nonSnapshotFiles =
+ listPathWithFilter(snapshotDirectory(), fileStatusFilter,
nonSnapshotFileFilter());
+
+ // For other branch
+ List<String> allBranchNames = branchNames(fileIO, tablePath);
+ allBranchNames.stream()
+ .map(
+ branchName ->
+ listPathWithFilter(
+ snapshotDirectory(tablePath,
branchName),
+ fileStatusFilter,
+ nonSnapshotFileFilter()))
+ .forEach(nonSnapshotFiles::addAll);
+ return nonSnapshotFiles;
}
public List<Path> tryGetNonChangelogFiles(Predicate<FileStatus>
fileStatusFilter) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index 6dbd33c76..ec9197b8b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -81,6 +81,7 @@ import java.util.stream.Collectors;
import static org.apache.paimon.io.DataFilePathFactory.CHANGELOG_FILE_PREFIX;
import static org.apache.paimon.io.DataFilePathFactory.DATA_FILE_PREFIX;
+import static org.apache.paimon.utils.BranchManager.branchPath;
import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
@@ -148,6 +149,9 @@ public class OrphanFilesCleanTest {
}
}
+ // create branch1 by tag
+ table.createBranch("branch1", allTags.get(0));
+
// generate non used files
int shouldBeDeleted = generateUnUsedFile();
assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted);
@@ -465,6 +469,14 @@ public class OrphanFilesCleanTest {
fileNum,
Arrays.asList("manifest-list-", "manifest-",
"index-manifest-", "UNKNOWN-"));
shouldBeDeleted += fileNum;
+
+ // branch snapshot
+ addNonUsedFiles(
+ new Path(branchPath(tablePath, "branch1") + "/snapshot"),
+ fileNum,
+ Collections.singletonList("UNKNOWN"));
+ shouldBeDeleted += fileNum;
+
return shouldBeDeleted;
}