This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.0 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit dbc0a6609587dc515171f64f67cdb868e6e94969 Author: Zouxxyy <[email protected]> AuthorDate: Thu Jan 9 19:47:24 2025 +0800 [core] Fix remove orphan files with data file path directory (#4871) --- .../apache/paimon/operation/OrphanFilesClean.java | 11 ++-- .../apache/paimon/utils/FileStorePathFactory.java | 70 +++++++++++++--------- .../procedure/RemoveOrphanFilesProcedureTest.scala | 21 +++++++ 3 files changed, 71 insertions(+), 31 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index c2b9be4c27..54e0820918 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -33,6 +33,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.DateTimeUtils; +import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SerializableConsumer; @@ -252,12 +253,14 @@ public abstract class OrphanFilesClean implements Serializable { /** List directories that contains data files and manifest files. */ protected List<Path> listPaimonFileDirs() { + FileStorePathFactory pathFactory = table.store().pathFactory(); + List<Path> paimonFileDirs = new ArrayList<>(); - paimonFileDirs.add(new Path(location, "manifest")); - paimonFileDirs.add(new Path(location, "index")); - paimonFileDirs.add(new Path(location, "statistics")); - paimonFileDirs.addAll(listFileDirs(location, partitionKeysNum)); + paimonFileDirs.add(pathFactory.manifestPath()); + paimonFileDirs.add(pathFactory.indexPath()); + paimonFileDirs.add(pathFactory.statisticsPath()); + paimonFileDirs.addAll(listFileDirs(pathFactory.dataFilePath(), partitionKeysNum)); return paimonFileDirs; } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java index f255762cfd..5eaa6fd89d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java @@ -36,6 +36,17 @@ import java.util.stream.Collectors; @ThreadSafe public class FileStorePathFactory { + public static final String MANIFEST_PATH = "manifest"; + public static final String MANIFEST_PREFIX = "manifest-"; + public static final String MANIFEST_LIST_PREFIX = "manifest-list-"; + public static final String INDEX_MANIFEST_PREFIX = "index-manifest-"; + + public static final String INDEX_PATH = "index"; + public static final String INDEX_PREFIX = "index-"; + + public static final String STATISTICS_PATH = "statistics"; + public static final String STATISTICS_PREFIX = "stat-"; + public static final String BUCKET_PATH_PREFIX = "bucket-"; private final Path root; @@ -89,6 +100,25 @@ public class FileStorePathFactory { return root; } + public Path manifestPath() { + return new Path(root, MANIFEST_PATH); + } + + public Path indexPath() { + return new Path(root, INDEX_PATH); + } + + public Path statisticsPath() { + return new Path(root, STATISTICS_PATH); + } + + public Path dataFilePath() { + if (dataFilePathDirectory != null) { + return new Path(root, dataFilePathDirectory); + } + return root; + } + @VisibleForTesting public static InternalRowPartitionComputer getPartitionComputer( RowType partitionType, String defaultPartValue, boolean legacyPartitionName) { @@ -98,25 +128,21 @@ public class FileStorePathFactory { } public Path newManifestFile() { - return new Path( - root + "/manifest/manifest-" + uuid + "-" + manifestFileCount.getAndIncrement()); + return toManifestFilePath( + MANIFEST_PREFIX + uuid + "-" + manifestFileCount.getAndIncrement()); } public Path newManifestList() { - return new Path( - root - + "/manifest/manifest-list-" - + uuid - + "-" - + manifestListCount.getAndIncrement()); + return toManifestListPath( + MANIFEST_LIST_PREFIX + uuid + "-" + manifestListCount.getAndIncrement()); } public Path toManifestFilePath(String manifestFileName) { - return new Path(root + "/manifest/" + manifestFileName); + return new Path(manifestPath(), manifestFileName); } public Path toManifestListPath(String manifestListName) { - return new Path(root + "/manifest/" + manifestListName); + return new Path(manifestPath(), manifestListName); } public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bucket) { @@ -201,17 +227,13 @@ public class FileStorePathFactory { return new PathFactory() { @Override public Path newPath() { - return new Path( - root - + "/manifest/index-manifest-" - + uuid - + "-" - + indexManifestCount.getAndIncrement()); + return toPath( + INDEX_MANIFEST_PREFIX + uuid + "-" + indexManifestCount.getAndIncrement()); } @Override public Path toPath(String fileName) { - return new Path(root + "/manifest/" + fileName); + return new Path(manifestPath(), fileName); } }; } @@ -220,13 +242,12 @@ public class FileStorePathFactory { return new PathFactory() { @Override public Path newPath() { - return new Path( - root + "/index/index-" + uuid + "-" + indexFileCount.getAndIncrement()); + return toPath(INDEX_PREFIX + uuid + "-" + indexFileCount.getAndIncrement()); } @Override public Path toPath(String fileName) { - return new Path(root + "/index/" + fileName); + return new Path(indexPath(), fileName); } }; } @@ -235,17 +256,12 @@ public class FileStorePathFactory { return new PathFactory() { @Override public Path newPath() { - return new Path( - root - + "/statistics/stats-" - + uuid - + "-" - + statsFileCount.getAndIncrement()); + return toPath(STATISTICS_PREFIX + uuid + "-" + statsFileCount.getAndIncrement()); } @Override public Path toPath(String fileName) { - return new Path(root + "/statistics/" + fileName); + return new Path(statisticsPath(), fileName); } }; } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala index b1bb3124e3..b680e5b702 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala @@ -224,4 +224,25 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) } + test("Paimon procedure: remove orphan files with data file path directory") { + sql(s""" + |CREATE TABLE T (id STRING, name STRING) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id', 'data-file.path-directory'='data') + |""".stripMargin) + + sql(s"INSERT INTO T VALUES ('1', 'a'), ('2', 'b')") + + val table = loadTable("T") + val orphanFile = new Path(table.store().pathFactory().dataFilePath(), ORPHAN_FILE_1) + table.fileIO().tryToWriteAtomic(orphanFile, "b") + + Thread.sleep(1000) + val older_than = DateTimeUtils.formatLocalDateTime( + DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), + 3) + checkAnswer( + sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than')"), + Row(1, 1) :: Nil) + } }
