This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.0 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 16246781efb0c307c6d3b753c9aa37ec977ae84e Author: Jingsong Lee <[email protected]> AuthorDate: Tue Dec 24 16:07:44 2024 +0800 [core] External Path in DataFileMeta should be the file path (#4766) --- .../java/org/apache/paimon/io/DataFileMeta.java | 16 +++++-- .../apache/paimon/io/DataFileMetaSerializer.java | 2 +- .../org/apache/paimon/io/DataFilePathFactory.java | 54 ++++++++++------------ .../org/apache/paimon/io/FileIndexEvaluator.java | 2 +- .../paimon/io/KeyValueFileReaderFactory.java | 40 ++++------------ .../paimon/io/KeyValueFileWriterFactory.java | 15 +++--- .../java/org/apache/paimon/manifest/FileEntry.java | 5 +- .../org/apache/paimon/manifest/ManifestEntry.java | 7 ++- .../apache/paimon/manifest/SimpleFileEntry.java | 1 + .../apache/paimon/mergetree/MergeTreeWriter.java | 16 +++---- .../org/apache/paimon/table/source/DataSplit.java | 4 +- .../org/apache/paimon/table/system/FilesTable.java | 53 +++++++++------------ .../apache/paimon/append/AppendOnlyWriterTest.java | 7 ++- .../apache/paimon/io/DataFilePathFactoryTest.java | 12 +++-- .../paimon/io/KeyValueFileReadWriteTest.java | 14 +++--- .../paimon/mergetree/ContainsLevelsTest.java | 9 +--- .../apache/paimon/mergetree/LookupLevelsTest.java | 9 +--- .../apache/paimon/mergetree/MergeTreeTestBase.java | 8 ++-- .../paimon/utils/FileStorePathFactoryTest.java | 12 +++-- .../apache/paimon/flink/clone/CloneFileInfo.java | 15 ++++-- .../paimon/flink/clone/CopyFileOperator.java | 2 +- .../flink/clone/PickFilesForCloneOperator.java | 6 ++- .../apache/paimon/flink/clone/PickFilesUtil.java | 2 +- .../compact/changelog/ChangelogCompactTask.java | 10 ++-- .../paimon/flink/sink/RewriteFileIndexSink.java | 5 +- .../flink/action/RewriteFileIndexActionITCase.java | 2 +- .../procedure/RewriteFileIndexProcedureITCase.java | 2 +- ...nlySingleTableCompactionWorkerOperatorTest.java | 7 +-- .../apache/paimon/spark/SparkFileIndexITCase.java | 3 +- 29 files changed, 157 insertions(+), 183 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 459cd788de..b164b60fe5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -368,9 +368,14 @@ public class DataFileMeta { return split[split.length - 1]; } - @Nullable - public String externalPath() { - return externalPath; + public Optional<String> externalPath() { + return Optional.ofNullable(externalPath); + } + + public Optional<String> externalPathDir() { + return Optional.ofNullable(externalPath) + .map(Path::new) + .map(p -> p.getParent().toUri().toString()); } public Optional<FileSource> fileSource() { @@ -405,7 +410,8 @@ public class DataFileMeta { externalPath); } - public DataFileMeta rename(String newExternalPath, String newFileName) { + public DataFileMeta rename(String newFileName) { + String newExternalPath = externalPathDir().map(dir -> dir + "/" + newFileName).orElse(null); return new DataFileMeta( newFileName, fileSize, @@ -452,7 +458,7 @@ public class DataFileMeta { public List<Path> collectFiles(DataFilePathFactory pathFactory) { List<Path> paths = new ArrayList<>(); paths.add(pathFactory.toPath(this)); - extraFiles.forEach(f -> paths.add(pathFactory.toExtraFilePath(this, f))); + extraFiles.forEach(f -> paths.add(pathFactory.toAlignedPath(f, this))); return paths; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java index c8a5e326b0..a316f897ff 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java @@ -59,7 +59,7 @@ public class DataFileMetaSerializer extends ObjectSerializer<DataFileMeta> { meta.embeddedIndex(), meta.fileSource().map(FileSource::toByteValue).orElse(null), toStringArrayData(meta.valueStatsCols()), - BinaryString.fromString(meta.externalPath())); + meta.externalPath().map(BinaryString::fromString).orElse(null)); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index daeb9f52ea..19525ab6cd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -20,9 +20,11 @@ package org.apache.paimon.io; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileEntry; import javax.annotation.concurrent.ThreadSafe; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -67,47 +69,36 @@ public class DataFilePathFactory { return newPath(changelogFilePrefix); } - private Path newPath(String prefix) { + public String newChangelogFileName() { + return newFileName(changelogFilePrefix); + } + + public Path newPath(String prefix) { + return new Path(parent, newFileName(prefix)); + } + + private String newFileName(String prefix) { String extension; if (fileSuffixIncludeCompression) { extension = "." + fileCompression + "." + formatIdentifier; } else { extension = "." + formatIdentifier; } - String name = prefix + uuid + "-" + pathCount.getAndIncrement() + extension; - return new Path(parent, name); + return prefix + uuid + "-" + pathCount.getAndIncrement() + extension; } - @VisibleForTesting - public Path toPath(String fileName) { - return new Path(parent + "/" + fileName); - } - - /** - * for read purpose. - * - * @param fileName the file name - * @param externalPath the external path, if null, it will use the parent path - * @return the file's path - */ - public Path toPath(String fileName, String externalPath) { - return new Path((externalPath == null ? parent : externalPath) + "/" + fileName); + public Path toPath(DataFileMeta file) { + return file.externalPath().map(Path::new).orElse(new Path(parent, file.fileName())); } - public Path toPath(DataFileMeta dataFileMeta) { - String externalPath = dataFileMeta.externalPath(); - String fileName = dataFileMeta.fileName(); - return new Path((externalPath == null ? parent : externalPath) + "/" + fileName); + public Path toPath(FileEntry file) { + return Optional.ofNullable(file.externalPath()) + .map(Path::new) + .orElse(new Path(parent, file.fileName())); } - public Path toExtraFilePath(DataFileMeta dataFileMeta, String extraFile) { - String externalPath = dataFileMeta.externalPath(); - return new Path((externalPath == null ? parent : externalPath) + "/" + extraFile); - } - - @VisibleForTesting - public String uuid() { - return uuid; + public Path toAlignedPath(String fileName, DataFileMeta aligned) { + return new Path(aligned.externalPathDir().map(Path::new).orElse(parent), fileName); } public static Path dataFileToFileIndexPath(Path dataFilePath) { @@ -141,4 +132,9 @@ public class DataFilePathFactory { return fileName.substring(index + 1); } + + @VisibleForTesting + String uuid() { + return uuid; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java index 9055097d37..3ed4c278d9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java @@ -62,7 +62,7 @@ public class FileIndexEvaluator { // go to file index check try (FileIndexPredicate predicate = new FileIndexPredicate( - dataFilePathFactory.toExtraFilePath(file, indexFiles.get(0)), + dataFilePathFactory.toAlignedPath(indexFiles.get(0), file), fileIO, dataSchema.logicalRowType())) { return predicate.evaluate( diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 9d65a54113..14221d50be 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -20,7 +20,6 @@ package org.apache.paimon.io; import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; -import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; @@ -98,37 +97,17 @@ public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> { @Override public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws IOException { - return createRecordReader( - file.schemaId(), - file.fileName(), - file.fileSize(), - file.level(), - file.externalPath()); - } - - @VisibleForTesting - public RecordReader<KeyValue> createRecordReader( - long schemaId, String fileName, long fileSize, int level, String externalPath) - throws IOException { - if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) { - return new AsyncRecordReader<>( - () -> - createRecordReader( - schemaId, fileName, level, false, 2, fileSize, externalPath)); + if (file.fileSize() >= asyncThreshold && file.fileName().endsWith(".orc")) { + return new AsyncRecordReader<>(() -> createRecordReader(file, false, 2)); } - return createRecordReader(schemaId, fileName, level, true, null, fileSize, externalPath); + return createRecordReader(file, true, null); } private FileRecordReader<KeyValue> createRecordReader( - long schemaId, - String fileName, - int level, - boolean reuseFormat, - @Nullable Integer orcPoolSize, - long fileSize, - String externalPath) + DataFileMeta file, boolean reuseFormat, @Nullable Integer orcPoolSize) throws IOException { - String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName); + String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName()); + long schemaId = file.schemaId(); Supplier<FormatReaderMapping> formatSupplier = () -> @@ -143,8 +122,9 @@ public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> { new FormatKey(schemaId, formatIdentifier), key -> formatSupplier.get()) : formatSupplier.get(); - Path filePath = pathFactory.toPath(fileName, externalPath); + Path filePath = pathFactory.toPath(file); + long fileSize = file.fileSize(); FileRecordReader<InternalRow> fileRecordReader = new DataFileRecordReader( formatReaderMapping.getReaderFactory(), @@ -156,13 +136,13 @@ public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> { formatReaderMapping.getCastMapping(), PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition)); - Optional<DeletionVector> deletionVector = dvFactory.create(fileName); + Optional<DeletionVector> deletionVector = dvFactory.create(file.fileName()); if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) { fileRecordReader = new ApplyDeletionVectorReader(fileRecordReader, deletionVector.get()); } - return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, level); + return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, file.level()); } public static Builder builder( diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 500320c249..7b6f4f0e3c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -142,14 +142,13 @@ public class KeyValueFileWriterFactory { fileIndexOptions); } - public void deleteFile(DataFileMeta meta, int level) { - fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(meta)); + public void deleteFile(DataFileMeta file) { + fileIO.deleteQuietly(formatContext.pathFactory(file.level()).toPath(file)); } - public void copyFile(DataFileMeta sourceMeta, DataFileMeta targetMeta, int level) - throws IOException { - Path sourcePath = formatContext.pathFactory(level).toPath(sourceMeta); - Path targetPath = formatContext.pathFactory(level).toPath(targetMeta); + public void copyFile(DataFileMeta sourceFile, DataFileMeta targetFile) throws IOException { + Path sourcePath = formatContext.pathFactory(sourceFile.level()).toPath(sourceFile); + Path targetPath = formatContext.pathFactory(targetFile.level()).toPath(targetFile); fileIO.copyFile(sourcePath, targetPath, true); } @@ -157,8 +156,8 @@ public class KeyValueFileWriterFactory { return fileIO; } - public Path newChangelogPath(int level) { - return formatContext.pathFactory(level).newChangelogPath(); + public String newChangelogFileName(int level) { + return formatContext.pathFactory(level).newChangelogFileName(); } public static Builder builder( diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java index 738776438b..dd77759de1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java @@ -54,6 +54,7 @@ public interface FileEntry { String fileName(); + @Nullable String externalPath(); Identifier identifier(); @@ -161,7 +162,9 @@ public interface FileEntry { + ", extraFiles " + extraFiles + ", embeddedIndex " - + Arrays.toString(embeddedIndex); + + Arrays.toString(embeddedIndex) + + ", externalPath " + + externalPath; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index d4748451d8..3cb5733a38 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -26,6 +26,8 @@ import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TinyIntType; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -92,9 +94,10 @@ public class ManifestEntry implements FileEntry { return file.fileName(); } + @Nullable @Override public String externalPath() { - return file.externalPath(); + return file.externalPath().orElse(null); } @Override @@ -129,7 +132,7 @@ public class ManifestEntry implements FileEntry { file.fileName(), file.extraFiles(), file.embeddedIndex(), - file.externalPath()); + externalPath()); } public ManifestEntry copyWithoutStats() { diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java index f86bded52d..c8708db0b8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java @@ -106,6 +106,7 @@ public class SimpleFileEntry implements FileEntry { return fileName; } + @Nullable @Override public String externalPath() { return externalPath; diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index df48559223..1c805e764a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -27,7 +27,6 @@ import org.apache.paimon.compact.CompactResult; import org.apache.paimon.compression.CompressOptions; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.fs.Path; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; @@ -242,10 +241,9 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner { } else if (changelogProducer == ChangelogProducer.INPUT && isInsertOnly) { List<DataFileMeta> changelogMetas = new ArrayList<>(); for (DataFileMeta dataMeta : dataMetas) { - Path newPath = writerFactory.newChangelogPath(0); - DataFileMeta changelogMeta = - dataMeta.rename(newPath.getParent().getName(), newPath.getName()); - writerFactory.copyFile(dataMeta, changelogMeta, 0); + String newFileName = writerFactory.newChangelogFileName(0); + DataFileMeta changelogMeta = dataMeta.rename(newFileName); + writerFactory.copyFile(dataMeta, changelogMeta); changelogMetas.add(changelogMeta); } newFilesChangelog.addAll(changelogMetas); @@ -343,7 +341,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner { // 2. This file is not the input of upgraded. if (!compactBefore.containsKey(file.fileName()) && !afterFiles.contains(file.fileName())) { - writerFactory.deleteFile(file, file.level()); + writerFactory.deleteFile(file); } } else { compactBefore.put(file.fileName(), file); @@ -377,7 +375,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner { deletedFiles.clear(); for (DataFileMeta file : newFilesChangelog) { - writerFactory.deleteFile(file, file.level()); + writerFactory.deleteFile(file); } newFilesChangelog.clear(); @@ -392,12 +390,12 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner { compactAfter.clear(); for (DataFileMeta file : compactChangelog) { - writerFactory.deleteFile(file, file.level()); + writerFactory.deleteFile(file); } compactChangelog.clear(); for (DataFileMeta file : delete) { - writerFactory.deleteFile(file, file.level()); + writerFactory.deleteFile(file); } if (compactDeletionFile != null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 9178d25a91..39f9269f41 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -180,10 +180,8 @@ public class DataSplit implements Split { } private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) { - String path = file.externalPath() != null ? file.externalPath() : bucketPath; - path += "/" + file.fileName(); return new RawFile( - path, + file.externalPath().orElse(bucketPath + "/" + file.fileName()), file.fileSize(), 0, file.fileSize(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 3107ebe150..5c7ccd4809 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -370,9 +370,9 @@ public class FilesTable implements ReadonlyTable { DataSplit dataSplit, RowDataToObjectArrayConverter partitionConverter, Function<Long, RowDataToObjectArrayConverter> keyConverters, - DataFileMeta dataFileMeta, + DataFileMeta file, SimpleStatsEvolutions simpleStatsEvolutions) { - StatsLazyGetter statsGetter = new StatsLazyGetter(dataFileMeta, simpleStatsEvolutions); + StatsLazyGetter statsGetter = new StatsLazyGetter(file, simpleStatsEvolutions); @SuppressWarnings("unchecked") Supplier<Object>[] fields = new Supplier[] { @@ -385,51 +385,44 @@ public class FilesTable implements ReadonlyTable { dataSplit.partition()))), dataSplit::bucket, () -> - dataFileMeta.externalPath() == null - ? BinaryString.fromString( - dataSplit.bucketPath() - + "/" - + dataFileMeta.fileName()) - : BinaryString.fromString( - dataFileMeta.externalPath() - + "/" - + dataFileMeta.fileName()), + BinaryString.fromString( + file.externalPath() + .orElse( + dataSplit.bucketPath() + + "/" + + file.fileName())), () -> BinaryString.fromString( - DataFilePathFactory.formatIdentifier( - dataFileMeta.fileName())), - dataFileMeta::schemaId, - dataFileMeta::level, - dataFileMeta::rowCount, - dataFileMeta::fileSize, + DataFilePathFactory.formatIdentifier(file.fileName())), + file::schemaId, + file::level, + file::rowCount, + file::fileSize, () -> - dataFileMeta.minKey().getFieldCount() <= 0 + file.minKey().getFieldCount() <= 0 ? null : BinaryString.fromString( Arrays.toString( keyConverters - .apply(dataFileMeta.schemaId()) - .convert(dataFileMeta.minKey()))), + .apply(file.schemaId()) + .convert(file.minKey()))), () -> - dataFileMeta.maxKey().getFieldCount() <= 0 + file.maxKey().getFieldCount() <= 0 ? null : BinaryString.fromString( Arrays.toString( keyConverters - .apply(dataFileMeta.schemaId()) - .convert(dataFileMeta.maxKey()))), + .apply(file.schemaId()) + .convert(file.maxKey()))), () -> BinaryString.fromString(statsGetter.nullValueCounts().toString()), () -> BinaryString.fromString(statsGetter.lowerValueBounds().toString()), () -> BinaryString.fromString(statsGetter.upperValueBounds().toString()), - dataFileMeta::minSequenceNumber, - dataFileMeta::maxSequenceNumber, - dataFileMeta::creationTime, + file::minSequenceNumber, + file::maxSequenceNumber, + file::creationTime, () -> BinaryString.fromString( - dataFileMeta - .fileSource() - .map(FileSource::toString) - .orElse(null)) + file.fileSource().map(FileSource::toString).orElse(null)) }; return new LazyGenericRow(fields); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 3f752be13e..7757020532 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -66,7 +66,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Set; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -646,10 +645,10 @@ public class AppendOnlyWriterTest { int size = toCompact.size(); long minSeq = toCompact.get(0).minSequenceNumber(); long maxSeq = toCompact.get(size - 1).maxSequenceNumber(); - String fileName = "compact-" + UUID.randomUUID(); - LocalFileIO.create().newOutputStream(pathFactory.toPath(fileName), false).close(); + Path path = pathFactory.newPath("compact-"); + LocalFileIO.create().newOutputStream(path, false).close(); return DataFileMeta.forAppend( - fileName, + path.getName(), toCompact.stream().mapToLong(DataFileMeta::fileSize).sum(), toCompact.stream().mapToLong(DataFileMeta::rowCount).sum(), STATS_SERIALIZER.toBinaryAllMode( diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java index d36966c55a..109f33c3dc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java @@ -55,8 +55,9 @@ public class DataFilePathFactoryTest { + "." + CoreOptions.FILE_FORMAT.defaultValue())); } - assertThat(pathFactory.toPath("my-data-file-name")) - .isEqualTo(new Path(tempDir.toString() + "/bucket-123/my-data-file-name")); + assertThat(pathFactory.newPath("my-data-file-name").toString()) + .startsWith( + new Path(tempDir.toString() + "/bucket-123/my-data-file-name").toString()); } @Test @@ -83,8 +84,9 @@ public class DataFilePathFactoryTest { + "." + CoreOptions.FILE_FORMAT.defaultValue())); } - assertThat(pathFactory.toPath("my-data-file-name")) - .isEqualTo( - new Path(tempDir.toString() + "/dt=20211224/bucket-123/my-data-file-name")); + assertThat(pathFactory.newPath("my-data-file-name").toString()) + .startsWith( + new Path(tempDir.toString() + "/dt=20211224/bucket-123/my-data-file-name") + .toString()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index e817562689..8f2c815404 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -61,6 +61,7 @@ import java.util.function.Function; import static org.apache.paimon.TestKeyValueGenerator.DEFAULT_ROW_TYPE; import static org.apache.paimon.TestKeyValueGenerator.KEY_TYPE; import static org.apache.paimon.TestKeyValueGenerator.createTestSchemaManager; +import static org.apache.paimon.io.DataFileTestUtils.newFile; import static org.apache.paimon.stats.StatsTestUtils.convertWithoutSchemaEvolution; import static org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory; import static org.assertj.core.api.Assertions.assertThat; @@ -78,7 +79,10 @@ public class KeyValueFileReadWriteTest { public void testReadNonExistentFile() { KeyValueFileReaderFactory readerFactory = createReaderFactory(tempDir.toString(), "avro", null, null); - assertThatThrownBy(() -> readerFactory.createRecordReader(0, "dummy_file.avro", 1, 0, null)) + assertThatThrownBy( + () -> + readerFactory.createRecordReader( + newFile("non_avro_file.avro", 0, 0, 1, 0))) .hasMessageContaining( "you can configure 'snapshot.time-retained' option with a larger value."); } @@ -307,13 +311,7 @@ public class KeyValueFileReadWriteTest { for (DataFileMeta meta : actualMetas) { // check the contents of data file CloseableIterator<KeyValue> actualKvsIterator = - new RecordReaderIterator<>( - readerFactory.createRecordReader( - meta.schemaId(), - meta.fileName(), - meta.fileSize(), - meta.level(), - meta.externalPath())); + new RecordReaderIterator<>(readerFactory.createRecordReader(meta)); while (actualKvsIterator.hasNext()) { assertThat(expectedIterator.hasNext()).isTrue(); KeyValue actualKv = actualKvsIterator.next(); diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index fa96765a42..fa9628b4c1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -195,14 +195,7 @@ public class ContainsLevelsTest { comparator, keyType, new LookupLevels.ContainsValueProcessor(), - file -> - createReaderFactory() - .createRecordReader( - 0, - file.fileName(), - file.fileSize(), - file.level(), - file.externalPath()), + file -> createReaderFactory().createRecordReader(file), file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), new HashLookupStoreFactory( new CacheManager(MemorySize.ofMebiBytes(1)), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 56c45cfdc4..b68a82935b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -272,14 +272,7 @@ public class LookupLevelsTest { comparator, keyType, new LookupLevels.KeyValueProcessor(rowType), - file -> - createReaderFactory() - .createRecordReader( - 0, - file.fileName(), - file.fileSize(), - file.level(), - file.externalPath()), + file -> createReaderFactory().createRecordReader(file), file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), new HashLookupStoreFactory( new CacheManager(MemorySize.ofMebiBytes(1)), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 47d12ce47c..e987e2ee99 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -114,7 +114,7 @@ public abstract class MergeTreeTestBase { pathFactory = createNonPartFactory(path); comparator = Comparator.comparingInt(o -> o.getInt(0)); recreateMergeTree(1024 * 1024); - Path bucketDir = writerFactory.pathFactory(0).toPath("ignore").getParent(); + Path bucketDir = writerFactory.pathFactory(0).newPath().getParent(); LocalFileIO.create().mkdirs(bucketDir); } @@ -418,7 +418,7 @@ public abstract class MergeTreeTestBase { writer.close(); - Path bucketDir = writerFactory.pathFactory(0).toPath("ignore").getParent(); + Path bucketDir = writerFactory.pathFactory(0).newPath().getParent(); Set<String> files = Arrays.stream(LocalFileIO.create().listStatus(bucketDir)) .map(FileStatus::getPath) @@ -475,7 +475,7 @@ public abstract class MergeTreeTestBase { writer.close(); - Path bucketDir = writerFactory.pathFactory(0).toPath("ignore").getParent(); + Path bucketDir = writerFactory.pathFactory(0).newPath().getParent(); Set<String> files = Arrays.stream(LocalFileIO.create().listStatus(bucketDir)) .map(FileStatus::getPath) @@ -592,7 +592,7 @@ public abstract class MergeTreeTestBase { assertThat(remove).isTrue(); // See MergeTreeWriter.updateCompactResult if (!newFileNames.contains(file.fileName()) && !afterFiles.contains(file.fileName())) { - compactWriterFactory.deleteFile(file, file.level()); + compactWriterFactory.deleteFile(file); } } compactedFiles.addAll(increment.compactIncrement().compactAfter()); diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java index 6ca15cf150..c5cda2286d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java @@ -72,8 +72,9 @@ public class FileStorePathFactoryTest { FileStorePathFactory pathFactory = createNonPartFactory(new Path(tempDir.toString())); DataFilePathFactory dataFilePathFactory = pathFactory.createDataFilePathFactory(new BinaryRow(0), 123); - assertThat(dataFilePathFactory.toPath("my-data-file-name")) - .isEqualTo(new Path(tempDir.toString() + "/bucket-123/my-data-file-name")); + assertThat(dataFilePathFactory.newPath("my-data-file-name").toString()) + .startsWith( + new Path(tempDir.toString() + "/bucket-123/my-data-file-name").toString()); } @Test @@ -116,9 +117,10 @@ public class FileStorePathFactoryTest { writer.complete(); DataFilePathFactory dataFilePathFactory = pathFactory.createDataFilePathFactory(partition, 123); - assertThat(dataFilePathFactory.toPath("my-data-file-name")) - .isEqualTo( - new Path(tempDir.toString() + expected + "/bucket-123/my-data-file-name")); + assertThat(dataFilePathFactory.newPath("my-data-file-name").toString()) + .startsWith( + new Path(tempDir.toString() + expected + "/bucket-123/my-data-file-name") + .toString()); } public static FileStorePathFactory createNonPartFactory(Path root) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java index d916958412..5c0ac75e16 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java @@ -21,17 +21,26 @@ package org.apache.paimon.flink.clone; /** The information of copy file. */ public class CloneFileInfo { + private final String sourceFilePath; private final String filePathExcludeTableRoot; private final String sourceIdentifier; private final String targetIdentifier; public CloneFileInfo( - String filePathExcludeTableRoot, String sourceIdentifier, String targetIdentifier) { + String sourceFilePath, + String filePathExcludeTableRoot, + String sourceIdentifier, + String targetIdentifier) { + this.sourceFilePath = sourceFilePath; this.filePathExcludeTableRoot = filePathExcludeTableRoot; this.sourceIdentifier = sourceIdentifier; this.targetIdentifier = targetIdentifier; } + public String getSourceFilePath() { + return sourceFilePath; + } + public String getFilePathExcludeTableRoot() { return filePathExcludeTableRoot; } @@ -47,7 +56,7 @@ public class CloneFileInfo { @Override public String toString() { return String.format( - "{ filePath: %s, sourceIdentifier: %s, targetIdentifier: %s }", - filePathExcludeTableRoot, sourceIdentifier, targetIdentifier); + "{ sourceFilePath: %s, filePathExcludeTableRoot: %s, sourceIdentifier: %s, targetIdentifier: %s }", + sourceFilePath, filePathExcludeTableRoot, sourceIdentifier, targetIdentifier); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java index 8bcaa2a207..e7002cce1e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java @@ -99,7 +99,7 @@ public class CopyFileOperator extends AbstractStreamOperator<CloneFileInfo> }); String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot(); - Path sourcePath = new Path(sourceTableRootPath + filePathExcludeTableRoot); + Path sourcePath = new Path(cloneFileInfo.getSourceFilePath()); Path targetPath = new Path(targetTableRootPath + filePathExcludeTableRoot); if (targetTableFileIO.exists(targetPath) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java index 67eecbc6f2..f58d3acafd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java @@ -123,7 +123,11 @@ public class PickFilesForCloneOperator extends AbstractStreamOperator<CloneFileI for (Path file : files) { Path relativePath = getPathExcludeTableRoot(file, sourceTableRoot); result.add( - new CloneFileInfo(relativePath.toString(), sourceIdentifier, targetIdentifier)); + new CloneFileInfo( + file.toUri().toString(), + relativePath.toString(), + sourceIdentifier, + targetIdentifier)); } return result; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java index b7e1e60878..9de974d047 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java @@ -108,7 +108,7 @@ public class PickFilesUtil { pathFactory .createDataFilePathFactory( simpleFileEntry.partition(), simpleFileEntry.bucket()) - .toPath(simpleFileEntry.fileName(), simpleFileEntry.externalPath()); + .toPath(simpleFileEntry); dataFiles.add(dataFilePath); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java index 39860e418c..7f3f730280 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java @@ -167,12 +167,12 @@ public class ChangelogCompactTask implements Serializable { table.fileIO() .rename( changelogTempPath, - dataFilePathFactory.toExtraFilePath( - baseResult.meta, + dataFilePathFactory.toAlignedPath( realName + "." + CompactedChangelogReadOnlyFormat.getIdentifier( - baseResult.meta.fileFormat()))); + baseResult.meta.fileFormat()), + baseResult.meta)); List<Committable> newCommittables = new ArrayList<>(); @@ -194,9 +194,9 @@ public class ChangelogCompactTask implements Serializable { + CompactedChangelogReadOnlyFormat.getIdentifier( result.meta.fileFormat()); if (result.isCompactResult) { - compactChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name)); + compactChangelog.add(result.meta.rename(name)); } else { - newFilesChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name)); + newFilesChangelog.add(result.meta.rename(name)); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java index d35cb09cb7..99061d4b82 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java @@ -199,14 +199,13 @@ public class RewriteFileIndexSink extends FlinkWriteSink<ManifestEntry> { try (FileIndexFormat.Reader indexReader = FileIndexFormat.createReader( fileIO.newInputStream( - dataFilePathFactory.toExtraFilePath( - dataFileMeta, indexFile)), + dataFilePathFactory.toAlignedPath(indexFile, dataFileMeta)), schemaInfo.fileSchema)) { maintainers = indexReader.readAll(); } newIndexPath = createNewFileIndexFilePath( - dataFilePathFactory.toExtraFilePath(dataFileMeta, indexFile)); + dataFilePathFactory.toAlignedPath(indexFile, dataFileMeta)); } else { maintainers = new HashMap<>(); newIndexPath = dataFileToFileIndexPath(dataFilePathFactory.toPath(dataFileMeta)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java index 242a751416..dc6e5523b0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java @@ -102,7 +102,7 @@ public class RewriteFileIndexActionITCase extends ActionITCaseBase { table.store() .pathFactory() .createDataFilePathFactory(entry.partition(), entry.bucket()) - .toPath(file); + .toAlignedPath(file, entry.file()); try (FileIndexFormat.Reader reader = FileIndexFormat.createReader( table.fileIO().newInputStream(indexFilePath), table.rowType())) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java index 1abfe355a5..23102f2a3d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java @@ -199,7 +199,7 @@ public class RewriteFileIndexProcedureITCase extends CatalogITCaseBase { table.store() .pathFactory() .createDataFilePathFactory(entry.partition(), entry.bucket()) - .toPath(file); + .toAlignedPath(file, entry.file()); try (FileIndexFormat.Reader reader = FileIndexFormat.createReader( table.fileIO().newInputStream(indexFilePath), table.rowType())) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java index 6238a9cbf3..aa33e4fe75 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java @@ -160,8 +160,7 @@ public class AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest List<DataFileMeta> fileMetas = ((CommitMessageImpl) commitMessage).compactIncrement().compactAfter(); for (DataFileMeta fileMeta : fileMetas) { - Assertions.assertThat( - localFileIO.exists(dataFilePathFactory.toPath(fileMeta.fileName()))) + Assertions.assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta))) .isTrue(); } if (i++ > 2) { @@ -188,9 +187,7 @@ public class AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest List<DataFileMeta> fileMetas = ((CommitMessageImpl) commitMessage).compactIncrement().compactAfter(); for (DataFileMeta fileMeta : fileMetas) { - Assertions.assertThat( - localFileIO.exists( - dataFilePathFactory.toPath(fileMeta.fileName()))) + Assertions.assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta))) .isFalse(); } } catch (Exception e) { diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java index 0360def685..99e95cf40e 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java @@ -185,7 +185,8 @@ public class SparkFileIndexITCase extends SparkWriteITCase { try (FileIndexFormat.Reader reader = FileIndexFormat.createReader( fileIO.newInputStream( - dataFilePathFactory.toPath(indexFiles.get(0))), + dataFilePathFactory.toAlignedPath( + indexFiles.get(0), dataFileMeta)), tableSchema.logicalRowType())) { Optional<FileIndexReader> fileIndexReader = reader.readColumnIndex("a").stream().findFirst();
