This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8ca41fc663 [core] Fix remove orphan files with data file path
directory (#4871)
8ca41fc663 is described below
commit 8ca41fc6631f2f1f8173f1759fc723b1b6907880
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Jan 9 19:47:24 2025 +0800
[core] Fix remove orphan files with data file path directory (#4871)
---
.../apache/paimon/operation/OrphanFilesClean.java | 11 ++--
.../apache/paimon/utils/FileStorePathFactory.java | 70 +++++++++++++---------
.../procedure/RemoveOrphanFilesProcedureTest.scala | 21 +++++++
3 files changed, 71 insertions(+), 31 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index c2b9be4c27..54e0820918 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -33,6 +33,7 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializableConsumer;
@@ -252,12 +253,14 @@ public abstract class OrphanFilesClean implements
Serializable {
/** List directories that contains data files and manifest files. */
protected List<Path> listPaimonFileDirs() {
+ FileStorePathFactory pathFactory = table.store().pathFactory();
+
List<Path> paimonFileDirs = new ArrayList<>();
- paimonFileDirs.add(new Path(location, "manifest"));
- paimonFileDirs.add(new Path(location, "index"));
- paimonFileDirs.add(new Path(location, "statistics"));
- paimonFileDirs.addAll(listFileDirs(location, partitionKeysNum));
+ paimonFileDirs.add(pathFactory.manifestPath());
+ paimonFileDirs.add(pathFactory.indexPath());
+ paimonFileDirs.add(pathFactory.statisticsPath());
+ paimonFileDirs.addAll(listFileDirs(pathFactory.dataFilePath(),
partitionKeysNum));
return paimonFileDirs;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 81b6307a5f..811a2a4e6d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -37,6 +37,17 @@ import java.util.stream.Collectors;
@ThreadSafe
public class FileStorePathFactory {
+ public static final String MANIFEST_PATH = "manifest";
+ public static final String MANIFEST_PREFIX = "manifest-";
+ public static final String MANIFEST_LIST_PREFIX = "manifest-list-";
+ public static final String INDEX_MANIFEST_PREFIX = "index-manifest-";
+
+ public static final String INDEX_PATH = "index";
+ public static final String INDEX_PREFIX = "index-";
+
+ public static final String STATISTICS_PATH = "statistics";
+ public static final String STATISTICS_PREFIX = "stat-";
+
public static final String BUCKET_PATH_PREFIX = "bucket-";
// this is the table schema root path
@@ -94,6 +105,25 @@ public class FileStorePathFactory {
return root;
}
+ public Path manifestPath() {
+ return new Path(root, MANIFEST_PATH);
+ }
+
+ public Path indexPath() {
+ return new Path(root, INDEX_PATH);
+ }
+
+ public Path statisticsPath() {
+ return new Path(root, STATISTICS_PATH);
+ }
+
+ public Path dataFilePath() {
+ if (dataFilePathDirectory != null) {
+ return new Path(root, dataFilePathDirectory);
+ }
+ return root;
+ }
+
@VisibleForTesting
public static InternalRowPartitionComputer getPartitionComputer(
RowType partitionType, String defaultPartValue, boolean
legacyPartitionName) {
@@ -103,25 +133,21 @@ public class FileStorePathFactory {
}
public Path newManifestFile() {
- return new Path(
- root + "/manifest/manifest-" + uuid + "-" +
manifestFileCount.getAndIncrement());
+ return toManifestFilePath(
+ MANIFEST_PREFIX + uuid + "-" +
manifestFileCount.getAndIncrement());
}
public Path newManifestList() {
- return new Path(
- root
- + "/manifest/manifest-list-"
- + uuid
- + "-"
- + manifestListCount.getAndIncrement());
+ return toManifestListPath(
+ MANIFEST_LIST_PREFIX + uuid + "-" +
manifestListCount.getAndIncrement());
}
public Path toManifestFilePath(String manifestFileName) {
- return new Path(root + "/manifest/" + manifestFileName);
+ return new Path(manifestPath(), manifestFileName);
}
public Path toManifestListPath(String manifestListName) {
- return new Path(root + "/manifest/" + manifestListName);
+ return new Path(manifestPath(), manifestListName);
}
public DataFilePathFactory createDataFilePathFactory(BinaryRow partition,
int bucket) {
@@ -217,17 +243,13 @@ public class FileStorePathFactory {
return new PathFactory() {
@Override
public Path newPath() {
- return new Path(
- root
- + "/manifest/index-manifest-"
- + uuid
- + "-"
- + indexManifestCount.getAndIncrement());
+ return toPath(
+ INDEX_MANIFEST_PREFIX + uuid + "-" +
indexManifestCount.getAndIncrement());
}
@Override
public Path toPath(String fileName) {
- return new Path(root + "/manifest/" + fileName);
+ return new Path(manifestPath(), fileName);
}
};
}
@@ -236,13 +258,12 @@ public class FileStorePathFactory {
return new PathFactory() {
@Override
public Path newPath() {
- return new Path(
- root + "/index/index-" + uuid + "-" +
indexFileCount.getAndIncrement());
+ return toPath(INDEX_PREFIX + uuid + "-" +
indexFileCount.getAndIncrement());
}
@Override
public Path toPath(String fileName) {
- return new Path(root + "/index/" + fileName);
+ return new Path(indexPath(), fileName);
}
};
}
@@ -251,17 +272,12 @@ public class FileStorePathFactory {
return new PathFactory() {
@Override
public Path newPath() {
- return new Path(
- root
- + "/statistics/stats-"
- + uuid
- + "-"
- + statsFileCount.getAndIncrement());
+ return toPath(STATISTICS_PREFIX + uuid + "-" +
statsFileCount.getAndIncrement());
}
@Override
public Path toPath(String fileName) {
- return new Path(root + "/statistics/" + fileName);
+ return new Path(statisticsPath(), fileName);
}
};
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
index 3ffe7fba26..f45655d514 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
@@ -219,4 +219,25 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"),
Row(0, 0) :: Nil)
}
+ test("Paimon procedure: remove orphan files with data file path directory") {
+ sql(s"""
+ |CREATE TABLE T (id STRING, name STRING)
+ |USING PAIMON
+ |TBLPROPERTIES ('primary-key'='id',
'data-file.path-directory'='data')
+ |""".stripMargin)
+
+ sql(s"INSERT INTO T VALUES ('1', 'a'), ('2', 'b')")
+
+ val table = loadTable("T")
+ val orphanFile = new Path(table.store().pathFactory().dataFilePath(),
ORPHAN_FILE_1)
+ table.fileIO().tryToWriteAtomic(orphanFile, "b")
+
+ Thread.sleep(1000)
+ val older_than = DateTimeUtils.formatLocalDateTime(
+ DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
+ 3)
+ checkAnswer(
+ sql(s"CALL sys.remove_orphan_files(table => 'T', older_than =>
'$older_than')"),
+ Row(1, 1) :: Nil)
+ }
}