This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.0 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit ea176b9dcdaeef67cc25705835edbe3043171dad Author: Houliang Qi <[email protected]> AuthorDate: Tue Dec 24 13:45:06 2024 +0800 [core] Support read external path in DataFileMeta (#4761) --- .../org/apache/paimon/append/AppendOnlyWriter.java | 4 +- .../java/org/apache/paimon/io/DataFileMeta.java | 18 +++--- .../paimon/io/DataFileMeta10LegacySerializer.java | 2 +- .../org/apache/paimon/io/DataFilePathFactory.java | 23 ++++++++ .../org/apache/paimon/io/FileIndexEvaluator.java | 2 +- .../apache/paimon/io/KeyValueDataFileWriter.java | 3 +- .../paimon/io/KeyValueFileReaderFactory.java | 23 ++++++-- .../paimon/io/KeyValueFileWriterFactory.java | 10 ++-- .../org/apache/paimon/io/RowDataFileWriter.java | 3 +- .../apache/paimon/manifest/ExpireFileEntry.java | 18 +++++- .../java/org/apache/paimon/manifest/FileEntry.java | 15 ++++- .../org/apache/paimon/manifest/ManifestEntry.java | 8 ++- .../apache/paimon/manifest/SimpleFileEntry.java | 24 ++++++-- .../paimon/manifest/SimpleFileEntrySerializer.java | 68 ---------------------- .../apache/paimon/mergetree/MergeTreeWriter.java | 14 +++-- .../org/apache/paimon/migrate/FileMetaUtils.java | 1 + .../paimon/operation/FileStoreCommitImpl.java | 2 +- .../apache/paimon/operation/RawFileSplitRead.java | 5 +- .../apache/paimon/table/query/LocalTableQuery.java | 7 +-- .../org/apache/paimon/table/source/DataSplit.java | 4 +- .../org/apache/paimon/table/system/FilesTable.java | 11 +++- .../apache/paimon/append/AppendOnlyWriterTest.java | 7 ++- .../paimon/io/KeyValueFileReadWriteTest.java | 5 +- .../paimon/mergetree/ContainsLevelsTest.java | 6 +- .../apache/paimon/mergetree/LookupLevelsTest.java | 6 +- .../apache/paimon/mergetree/MergeTreeTestBase.java | 2 +- .../paimon/table/AppendOnlyFileStoreTableTest.java | 2 +- .../org/apache/paimon/table/source/SplitTest.java | 1 + .../apache/paimon/flink/clone/PickFilesUtil.java | 2 +- .../compact/changelog/ChangelogCompactTask.java | 11 ++-- .../paimon/flink/sink/RewriteFileIndexSink.java | 12 ++-- .../org/apache/paimon/spark/ScanHelperTest.scala | 2 + 32 files changed, 176 insertions(+), 145 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index a3087e3628..4c313dd655 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -244,7 +244,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { for (DataFileMeta file : compactAfter) { // appendOnlyCompactManager will rewrite the file and no file upgrade will occur, so we // can directly delete the file in compactAfter. - fileIO.deleteQuietly(pathFactory.toPath(file.fileName())); + fileIO.deleteQuietly(pathFactory.toPath(file)); } sinkWriter.close(); @@ -271,7 +271,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { } finally { // remove small files for (DataFileMeta file : files) { - fileIO.deleteQuietly(pathFactory.toPath(file.fileName())); + fileIO.deleteQuietly(pathFactory.toPath(file)); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 3be09ea6c2..459cd788de 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -135,7 +135,8 @@ public class DataFileMeta { List<String> extraFiles, @Nullable byte[] embeddedIndex, @Nullable FileSource fileSource, - @Nullable List<String> valueStatsCols) { + @Nullable List<String> valueStatsCols, + @Nullable String externalPath) { return new DataFileMeta( fileName, fileSize, @@ -154,7 +155,7 @@ public class DataFileMeta { embeddedIndex, fileSource, valueStatsCols, - null); + externalPath); } public DataFileMeta( @@ -173,7 +174,8 @@ public class DataFileMeta { @Nullable Long deleteRowCount, @Nullable byte[] embeddedIndex, @Nullable FileSource fileSource, - @Nullable List<String> valueStatsCols) { + @Nullable List<String> valueStatsCols, + @Nullable String externalPath) { this( fileName, fileSize, @@ -192,7 +194,7 @@ public class DataFileMeta { embeddedIndex, fileSource, valueStatsCols, - null); + externalPath); } public DataFileMeta( @@ -403,7 +405,7 @@ public class DataFileMeta { externalPath); } - public DataFileMeta rename(String newFileName) { + public DataFileMeta rename(String newExternalPath, String newFileName) { return new DataFileMeta( newFileName, fileSize, @@ -422,7 +424,7 @@ public class DataFileMeta { embeddedIndex, fileSource, valueStatsCols, - externalPath); + newExternalPath); } public DataFileMeta copyWithoutStats() { @@ -449,8 +451,8 @@ public class DataFileMeta { public List<Path> collectFiles(DataFilePathFactory pathFactory) { List<Path> paths = new ArrayList<>(); - paths.add(pathFactory.toPath(fileName)); - extraFiles.forEach(f -> paths.add(pathFactory.toPath(f))); + paths.add(pathFactory.toPath(this)); + extraFiles.forEach(f -> paths.add(pathFactory.toExtraFilePath(this, f))); return paths; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java index 68ccba6ea3..518db7c658 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java @@ -46,7 +46,7 @@ import static org.apache.paimon.utils.SerializationUtils.newBytesType; import static org.apache.paimon.utils.SerializationUtils.newStringType; import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; -/** Serializer for {@link DataFileMeta} with 0.9 version. */ +/** Serializer for {@link DataFileMeta} with 1.0 snapshot version. */ public class DataFileMeta10LegacySerializer implements Serializable { private static final long serialVersionUID = 1L; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index b632d44c94..daeb9f52ea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -78,10 +78,33 @@ public class DataFilePathFactory { return new Path(parent, name); } + @VisibleForTesting public Path toPath(String fileName) { return new Path(parent + "/" + fileName); } + /** + * for read purpose. + * + * @param fileName the file name + * @param externalPath the external path, if null, it will use the parent path + * @return the file's path + */ + public Path toPath(String fileName, String externalPath) { + return new Path((externalPath == null ? parent : externalPath) + "/" + fileName); + } + + public Path toPath(DataFileMeta dataFileMeta) { + String externalPath = dataFileMeta.externalPath(); + String fileName = dataFileMeta.fileName(); + return new Path((externalPath == null ? parent : externalPath) + "/" + fileName); + } + + public Path toExtraFilePath(DataFileMeta dataFileMeta, String extraFile) { + String externalPath = dataFileMeta.externalPath(); + return new Path((externalPath == null ? parent : externalPath) + "/" + extraFile); + } + @VisibleForTesting public String uuid() { return uuid; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java index 530b871653..9055097d37 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java @@ -62,7 +62,7 @@ public class FileIndexEvaluator { // go to file index check try (FileIndexPredicate predicate = new FileIndexPredicate( - dataFilePathFactory.toPath(indexFiles.get(0)), + dataFilePathFactory.toExtraFilePath(file, indexFiles.get(0)), fileIO, dataSchema.logicalRowType())) { return predicate.evaluate( diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index 651c6a6f7b..f78d755648 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -195,7 +195,8 @@ public abstract class KeyValueDataFileWriter deleteRecordCount, indexResult.embeddedIndexBytes(), fileSource, - valueStatsPair.getKey()); + valueStatsPair.getKey(), + null); } abstract Pair<SimpleColStats[], SimpleColStats[]> fetchKeyValueStats(SimpleColStats[] rowStats); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 7e272fc97c..9d65a54113 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -20,6 +20,7 @@ package org.apache.paimon.io; import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; @@ -97,16 +98,25 @@ public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> { @Override public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws IOException { - return createRecordReader(file.schemaId(), file.fileName(), file.fileSize(), file.level()); + return createRecordReader( + file.schemaId(), + file.fileName(), + file.fileSize(), + file.level(), + file.externalPath()); } + @VisibleForTesting public RecordReader<KeyValue> createRecordReader( - long schemaId, String fileName, long fileSize, int level) throws IOException { + long schemaId, String fileName, long fileSize, int level, String externalPath) + throws IOException { if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) { return new AsyncRecordReader<>( - () -> createRecordReader(schemaId, fileName, level, false, 2, fileSize)); + () -> + createRecordReader( + schemaId, fileName, level, false, 2, fileSize, externalPath)); } - return createRecordReader(schemaId, fileName, level, true, null, fileSize); + return createRecordReader(schemaId, fileName, level, true, null, fileSize, externalPath); } private FileRecordReader<KeyValue> createRecordReader( @@ -115,7 +125,8 @@ public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> { int level, boolean reuseFormat, @Nullable Integer orcPoolSize, - long fileSize) + long fileSize, + String externalPath) throws IOException { String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName); @@ -132,7 +143,7 @@ public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> { new FormatKey(schemaId, formatIdentifier), key -> formatSupplier.get()) : formatSupplier.get(); - Path filePath = pathFactory.toPath(fileName); + Path filePath = pathFactory.toPath(fileName, externalPath); FileRecordReader<InternalRow> fileRecordReader = new DataFileRecordReader( diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index a6aae3985b..500320c249 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -142,14 +142,14 @@ public class KeyValueFileWriterFactory { fileIndexOptions); } - public void deleteFile(String filename, int level) { - fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename)); + public void deleteFile(DataFileMeta meta, int level) { + fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(meta)); } - public void copyFile(String sourceFileName, String targetFileName, int level) + public void copyFile(DataFileMeta sourceMeta, DataFileMeta targetMeta, int level) throws IOException { - Path sourcePath = formatContext.pathFactory(level).toPath(sourceFileName); - Path targetPath = formatContext.pathFactory(level).toPath(targetFileName); + Path sourcePath = formatContext.pathFactory(level).toPath(sourceMeta); + Path targetPath = formatContext.pathFactory(level).toPath(targetMeta); fileIO.copyFile(sourcePath, targetPath, true); } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index 8c2e8ec949..cd46d67e3b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -124,6 +124,7 @@ public class RowDataFileWriter extends StatsCollectingSingleFileWriter<InternalR : Collections.singletonList(indexResult.independentIndexFile()), indexResult.embeddedIndexBytes(), fileSource, - statsPair.getKey()); + statsPair.getKey(), + null); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java index 060360623c..5d6d68144e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java @@ -41,8 +41,19 @@ public class ExpireFileEntry extends SimpleFileEntry { @Nullable byte[] embeddedIndex, BinaryRow minKey, BinaryRow maxKey, - @Nullable FileSource fileSource) { - super(kind, partition, bucket, level, fileName, extraFiles, embeddedIndex, minKey, maxKey); + @Nullable FileSource fileSource, + @Nullable String externalPath) { + super( + kind, + partition, + bucket, + level, + fileName, + extraFiles, + embeddedIndex, + minKey, + maxKey, + externalPath); this.fileSource = fileSource; } @@ -61,7 +72,8 @@ public class ExpireFileEntry extends SimpleFileEntry { entry.file().embeddedIndex(), entry.minKey(), entry.maxKey(), - entry.file().fileSource().orElse(null)); + entry.file().fileSource().orElse(null), + entry.externalPath()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java index a2569beac6..738776438b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java @@ -54,6 +54,8 @@ public interface FileEntry { String fileName(); + String externalPath(); + Identifier identifier(); BinaryRow minKey(); @@ -73,6 +75,7 @@ public interface FileEntry { public final String fileName; public final List<String> extraFiles; @Nullable private final byte[] embeddedIndex; + @Nullable public final String externalPath; /* Cache the hash code for the string */ private Integer hash; @@ -83,13 +86,15 @@ public interface FileEntry { int level, String fileName, List<String> extraFiles, - @Nullable byte[] embeddedIndex) { + @Nullable byte[] embeddedIndex, + @Nullable String externalPath) { this.partition = partition; this.bucket = bucket; this.level = level; this.fileName = fileName; this.extraFiles = extraFiles; this.embeddedIndex = embeddedIndex; + this.externalPath = externalPath; } @Override @@ -106,7 +111,8 @@ public interface FileEntry { && Objects.equals(partition, that.partition) && Objects.equals(fileName, that.fileName) && Objects.equals(extraFiles, that.extraFiles) - && Objects.deepEquals(embeddedIndex, that.embeddedIndex); + && Objects.deepEquals(embeddedIndex, that.embeddedIndex) + && Objects.deepEquals(externalPath, that.externalPath); } @Override @@ -119,7 +125,8 @@ public interface FileEntry { level, fileName, extraFiles, - Arrays.hashCode(embeddedIndex)); + Arrays.hashCode(embeddedIndex), + externalPath); } return hash; } @@ -138,6 +145,8 @@ public interface FileEntry { + extraFiles + ", embeddedIndex=" + Arrays.toString(embeddedIndex) + + ", externalPath=" + + externalPath + '}'; } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index 626e0a5d46..d4748451d8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -92,6 +92,11 @@ public class ManifestEntry implements FileEntry { return file.fileName(); } + @Override + public String externalPath() { + return file.externalPath(); + } + @Override public BinaryRow minKey() { return file.minKey(); @@ -123,7 +128,8 @@ public class ManifestEntry implements FileEntry { file.level(), file.fileName(), file.extraFiles(), - file.embeddedIndex()); + file.embeddedIndex(), + file.externalPath()); } public ManifestEntry copyWithoutStats() { diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java index fdaed2b85a..f86bded52d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java @@ -38,6 +38,7 @@ public class SimpleFileEntry implements FileEntry { @Nullable private final byte[] embeddedIndex; private final BinaryRow minKey; private final BinaryRow maxKey; + @Nullable private final String externalPath; public SimpleFileEntry( FileKind kind, @@ -48,7 +49,8 @@ public class SimpleFileEntry implements FileEntry { List<String> extraFiles, @Nullable byte[] embeddedIndex, BinaryRow minKey, - BinaryRow maxKey) { + BinaryRow maxKey, + @Nullable String externalPath) { this.kind = kind; this.partition = partition; this.bucket = bucket; @@ -58,6 +60,7 @@ public class SimpleFileEntry implements FileEntry { this.embeddedIndex = embeddedIndex; this.minKey = minKey; this.maxKey = maxKey; + this.externalPath = externalPath; } public static SimpleFileEntry from(ManifestEntry entry) { @@ -70,7 +73,8 @@ public class SimpleFileEntry implements FileEntry { entry.file().extraFiles(), entry.file().embeddedIndex(), entry.minKey(), - entry.maxKey()); + entry.maxKey(), + entry.externalPath()); } public static List<SimpleFileEntry> from(List<ManifestEntry> entries) { @@ -102,9 +106,15 @@ public class SimpleFileEntry implements FileEntry { return fileName; } + @Override + public String externalPath() { + return externalPath; + } + @Override public Identifier identifier() { - return new Identifier(partition, bucket, level, fileName, extraFiles, embeddedIndex); + return new Identifier( + partition, bucket, level, fileName, extraFiles, embeddedIndex, externalPath); } @Override @@ -138,12 +148,14 @@ public class SimpleFileEntry implements FileEntry { && Objects.equals(fileName, that.fileName) && Objects.equals(extraFiles, that.extraFiles) && Objects.equals(minKey, that.minKey) - && Objects.equals(maxKey, that.maxKey); + && Objects.equals(maxKey, that.maxKey) + && Objects.equals(externalPath, that.externalPath); } @Override public int hashCode() { - return Objects.hash(kind, partition, bucket, level, fileName, extraFiles, minKey, maxKey); + return Objects.hash( + kind, partition, bucket, level, fileName, extraFiles, minKey, maxKey, externalPath); } @Override @@ -165,6 +177,8 @@ public class SimpleFileEntry implements FileEntry { + minKey + ", maxKey=" + maxKey + + ", externalPath=" + + externalPath + '}'; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java deleted file mode 100644 index bdc89b8d4c..0000000000 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.manifest; - -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.utils.VersionedObjectSerializer; - -import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData; -import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; - -/** A {@link VersionedObjectSerializer} for {@link SimpleFileEntry}, only supports reading. */ -public class SimpleFileEntrySerializer extends VersionedObjectSerializer<SimpleFileEntry> { - - private static final long serialVersionUID = 1L; - - private final int version; - - public SimpleFileEntrySerializer() { - super(ManifestEntry.SCHEMA); - this.version = new ManifestEntrySerializer().getVersion(); - } - - @Override - public int getVersion() { - return version; - } - - @Override - public InternalRow convertTo(SimpleFileEntry meta) { - throw new UnsupportedOperationException("Only supports convert from row."); - } - - @Override - public SimpleFileEntry convertFrom(int version, InternalRow row) { - if (this.version != version) { - throw new IllegalArgumentException("Unsupported version: " + version); - } - - InternalRow file = row.getRow(4, DataFileMeta.SCHEMA.getFieldCount()); - return new SimpleFileEntry( - FileKind.fromByteValue(row.getByte(0)), - deserializeBinaryRow(row.getBinary(1)), - row.getInt(2), - file.getInt(10), - file.getString(0).toString(), - fromStringArrayData(file.getArray(11)), - file.isNullAt(14) ? null : file.getBinary(14), - deserializeBinaryRow(file.getBinary(3)), - deserializeBinaryRow(file.getBinary(4))); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index f2a964bae1..df48559223 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -27,6 +27,7 @@ import org.apache.paimon.compact.CompactResult; import org.apache.paimon.compression.CompressOptions; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fs.Path; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; @@ -241,9 +242,10 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner { } else if (changelogProducer == ChangelogProducer.INPUT && isInsertOnly) { List<DataFileMeta> changelogMetas = new ArrayList<>(); for (DataFileMeta dataMeta : dataMetas) { + Path newPath = writerFactory.newChangelogPath(0); DataFileMeta changelogMeta = - dataMeta.rename(writerFactory.newChangelogPath(0).getName()); - writerFactory.copyFile(dataMeta.fileName(), changelogMeta.fileName(), 0); + dataMeta.rename(newPath.getParent().getName(), newPath.getName()); + writerFactory.copyFile(dataMeta, changelogMeta, 0); changelogMetas.add(changelogMeta); } newFilesChangelog.addAll(changelogMetas); @@ -341,7 +343,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner { // 2. This file is not the input of upgraded. if (!compactBefore.containsKey(file.fileName()) && !afterFiles.contains(file.fileName())) { - writerFactory.deleteFile(file.fileName(), file.level()); + writerFactory.deleteFile(file, file.level()); } } else { compactBefore.put(file.fileName(), file); @@ -375,7 +377,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner { deletedFiles.clear(); for (DataFileMeta file : newFilesChangelog) { - writerFactory.deleteFile(file.fileName(), file.level()); + writerFactory.deleteFile(file, file.level()); } newFilesChangelog.clear(); @@ -390,12 +392,12 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner { compactAfter.clear(); for (DataFileMeta file : compactChangelog) { - writerFactory.deleteFile(file.fileName(), file.level()); + writerFactory.deleteFile(file, file.level()); } compactChangelog.clear(); for (DataFileMeta file : delete) { - writerFactory.deleteFile(file.fileName(), file.level()); + writerFactory.deleteFile(file, file.level()); } if (compactDeletionFile != null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java index 391c5f9bb6..51a0b5e2a9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java @@ -169,6 +169,7 @@ public class FileMetaUtils { Collections.emptyList(), null, FileSource.APPEND, + null, null); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 001132e167..0b4783b165 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -593,7 +593,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { toDelete.addAll(commitMessage.compactIncrement().changelogFiles()); for (DataFileMeta file : toDelete) { - fileIO.deleteQuietly(pathFactory.toPath(file.fileName())); + fileIO.deleteQuietly(pathFactory.toPath(file)); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 4fda82f4e8..d0f3275b5a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -210,10 +210,7 @@ public class RawFileSplitRead implements SplitRead<InternalRow> { FormatReaderContext formatReaderContext = new FormatReaderContext( - fileIO, - dataFilePathFactory.toPath(file.fileName()), - file.fileSize(), - fileIndexResult); + fileIO, dataFilePathFactory.toPath(file), file.fileSize(), fileIndexResult); FileRecordReader<InternalRow> fileRecordReader = new DataFileRecordReader( formatReaderMapping.getReaderFactory(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index 8ff5ce7a65..d474d4e2d0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -161,12 +161,7 @@ public class LocalTableQuery implements TableQuery { readerFactoryBuilder.keyType(), new LookupLevels.KeyValueProcessor(readerFactoryBuilder.readValueType()), file -> { - RecordReader<KeyValue> reader = - factory.createRecordReader( - file.schemaId(), - file.fileName(), - file.fileSize(), - file.level()); + RecordReader<KeyValue> reader = factory.createRecordReader(file); if (cacheRowFilter != null) { reader = reader.filter( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index bf60234214..9178d25a91 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -180,8 +180,10 @@ public class DataSplit implements Split { } private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) { + String path = file.externalPath() != null ? file.externalPath() : bucketPath; + path += "/" + file.fileName(); return new RawFile( - bucketPath + "/" + file.fileName(), + path, file.fileSize(), 0, file.fileSize(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 6dcbb322d6..3107ebe150 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -385,8 +385,15 @@ public class FilesTable implements ReadonlyTable { dataSplit.partition()))), dataSplit::bucket, () -> - BinaryString.fromString( - dataSplit.bucketPath() + "/" + dataFileMeta.fileName()), + dataFileMeta.externalPath() == null + ? BinaryString.fromString( + dataSplit.bucketPath() + + "/" + + dataFileMeta.fileName()) + : BinaryString.fromString( + dataFileMeta.externalPath() + + "/" + + dataFileMeta.fileName()), () -> BinaryString.fromString( DataFilePathFactory.formatIdentifier( diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index a9012ed89b..3f752be13e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -125,7 +125,7 @@ public class AppendOnlyWriterTest { DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0); assertThat(meta).isNotNull(); - Path path = pathFactory.toPath(meta.fileName()); + Path path = pathFactory.toPath(meta); assertThat(LocalFileIO.create().exists(path)).isTrue(); assertThat(meta.rowCount()).isEqualTo(1L); @@ -186,7 +186,7 @@ public class AppendOnlyWriterTest { assertThat(inc.newFilesIncrement().newFiles().size()).isEqualTo(1); DataFileMeta meta = inc.newFilesIncrement().newFiles().get(0); - Path path = pathFactory.toPath(meta.fileName()); + Path path = pathFactory.toPath(meta); assertThat(LocalFileIO.create().exists(path)).isTrue(); assertThat(meta.rowCount()).isEqualTo(100L); @@ -227,7 +227,7 @@ public class AppendOnlyWriterTest { int id = 0; for (DataFileMeta meta : firstInc.newFilesIncrement().newFiles()) { - Path path = pathFactory.toPath(meta.fileName()); + Path path = pathFactory.toPath(meta); assertThat(LocalFileIO.create().exists(path)).isTrue(); assertThat(meta.rowCount()).isEqualTo(1000L); @@ -680,6 +680,7 @@ public class AppendOnlyWriterTest { Collections.emptyList(), null, FileSource.APPEND, + null, null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index e43cd898db..e817562689 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -78,7 +78,7 @@ public class KeyValueFileReadWriteTest { public void testReadNonExistentFile() { KeyValueFileReaderFactory readerFactory = createReaderFactory(tempDir.toString(), "avro", null, null); - assertThatThrownBy(() -> readerFactory.createRecordReader(0, "dummy_file.avro", 1, 0)) + assertThatThrownBy(() -> readerFactory.createRecordReader(0, "dummy_file.avro", 1, 0, null)) .hasMessageContaining( "you can configure 'snapshot.time-retained' option with a larger value."); } @@ -312,7 +312,8 @@ public class KeyValueFileReadWriteTest { meta.schemaId(), meta.fileName(), meta.fileSize(), - meta.level())); + meta.level(), + meta.externalPath())); while (actualKvsIterator.hasNext()) { assertThat(expectedIterator.hasNext()).isTrue(); KeyValue actualKv = actualKvsIterator.next(); diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index be49311427..fa96765a42 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -198,7 +198,11 @@ public class ContainsLevelsTest { file -> createReaderFactory() .createRecordReader( - 0, file.fileName(), file.fileSize(), file.level()), + 0, + file.fileName(), + file.fileSize(), + file.level(), + file.externalPath()), file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), new HashLookupStoreFactory( new CacheManager(MemorySize.ofMebiBytes(1)), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index a678534042..56c45cfdc4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -275,7 +275,11 @@ public class LookupLevelsTest { file -> createReaderFactory() .createRecordReader( - 0, file.fileName(), file.fileSize(), file.level()), + 0, + file.fileName(), + file.fileSize(), + file.level(), + file.externalPath()), file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), new HashLookupStoreFactory( new CacheManager(MemorySize.ofMebiBytes(1)), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index f2a9c44dd7..47d12ce47c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -592,7 +592,7 @@ public abstract class MergeTreeTestBase { assertThat(remove).isTrue(); // See MergeTreeWriter.updateCompactResult if (!newFileNames.contains(file.fileName()) && !afterFiles.contains(file.fileName())) { - compactWriterFactory.deleteFile(file.fileName(), file.level()); + compactWriterFactory.deleteFile(file, file.level()); } } compactedFiles.addAll(increment.compactIncrement().compactAfter()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java index 01d4e89af9..471b60d3cf 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java @@ -112,7 +112,7 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase { table.store() .pathFactory() .createDataFilePathFactory(split.partition(), split.bucket()) - .toPath(split.dataFiles().get(0).fileName()); + .toPath(split.dataFiles().get(0)); table.fileIO().deleteQuietly(path); // read diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java index a4e581b701..a088f40dab 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java @@ -447,6 +447,7 @@ public class SplitTest { Collections.emptyList(), null, null, + null, null); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java index f83b5cf8f9..b7e1e60878 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java @@ -108,7 +108,7 @@ public class PickFilesUtil { pathFactory .createDataFilePathFactory( simpleFileEntry.partition(), simpleFileEntry.bucket()) - .toPath(simpleFileEntry.fileName()); + .toPath(simpleFileEntry.fileName(), simpleFileEntry.externalPath()); dataFiles.add(dataFilePath); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java index 6b95e36907..39860e418c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java @@ -96,7 +96,7 @@ public class ChangelogCompactTask implements Serializable { outputStream, results, table, - dataFilePathFactory.toPath(meta.fileName()), + dataFilePathFactory.toPath(meta), bucket, false, meta); @@ -111,7 +111,7 @@ public class ChangelogCompactTask implements Serializable { outputStream, results, table, - dataFilePathFactory.toPath(meta.fileName()), + dataFilePathFactory.toPath(meta), bucket, true, meta); @@ -167,7 +167,8 @@ public class ChangelogCompactTask implements Serializable { table.fileIO() .rename( changelogTempPath, - dataFilePathFactory.toPath( + dataFilePathFactory.toExtraFilePath( + baseResult.meta, realName + "." + CompactedChangelogReadOnlyFormat.getIdentifier( @@ -193,9 +194,9 @@ public class ChangelogCompactTask implements Serializable { + CompactedChangelogReadOnlyFormat.getIdentifier( result.meta.fileFormat()); if (result.isCompactResult) { - compactChangelog.add(result.meta.rename(name)); + compactChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name)); } else { - newFilesChangelog.add(result.meta.rename(name)); + newFilesChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name)); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java index d9f863c6b9..d35cb09cb7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java @@ -198,16 +198,18 @@ public class RewriteFileIndexSink extends FlinkWriteSink<ManifestEntry> { String indexFile = indexFiles.get(0); try (FileIndexFormat.Reader indexReader = FileIndexFormat.createReader( - fileIO.newInputStream(dataFilePathFactory.toPath(indexFile)), + fileIO.newInputStream( + dataFilePathFactory.toExtraFilePath( + dataFileMeta, indexFile)), schemaInfo.fileSchema)) { maintainers = indexReader.readAll(); } - newIndexPath = createNewFileIndexFilePath(dataFilePathFactory.toPath(indexFile)); + newIndexPath = + createNewFileIndexFilePath( + dataFilePathFactory.toExtraFilePath(dataFileMeta, indexFile)); } else { maintainers = new HashMap<>(); - newIndexPath = - dataFileToFileIndexPath( - dataFilePathFactory.toPath(dataFileMeta.fileName())); + newIndexPath = dataFileToFileIndexPath(dataFilePathFactory.toPath(dataFileMeta)); } // remove unnecessary diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala index a3223446f6..4997f65eaf 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala @@ -53,6 +53,7 @@ class ScanHelperTest extends PaimonSparkTestBase { new java.util.ArrayList[String](), null, FileSource.APPEND, + null, null) } @@ -89,6 +90,7 @@ class ScanHelperTest extends PaimonSparkTestBase { new java.util.ArrayList[String](), null, FileSource.APPEND, + null, null) ).asJava
