This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0ae0c02130 [core] Support read external path in DataFileMeta (#4761)
0ae0c02130 is described below
commit 0ae0c021302d2fcd7263644a6c7d7d93ec4e489b
Author: Houliang Qi <[email protected]>
AuthorDate: Tue Dec 24 13:45:06 2024 +0800
[core] Support read external path in DataFileMeta (#4761)
---
.../org/apache/paimon/append/AppendOnlyWriter.java | 4 +-
.../java/org/apache/paimon/io/DataFileMeta.java | 18 +++---
.../paimon/io/DataFileMeta10LegacySerializer.java | 2 +-
.../org/apache/paimon/io/DataFilePathFactory.java | 23 ++++++++
.../org/apache/paimon/io/FileIndexEvaluator.java | 2 +-
.../apache/paimon/io/KeyValueDataFileWriter.java | 3 +-
.../paimon/io/KeyValueFileReaderFactory.java | 23 ++++++--
.../paimon/io/KeyValueFileWriterFactory.java | 10 ++--
.../org/apache/paimon/io/RowDataFileWriter.java | 3 +-
.../apache/paimon/manifest/ExpireFileEntry.java | 18 +++++-
.../java/org/apache/paimon/manifest/FileEntry.java | 15 ++++-
.../org/apache/paimon/manifest/ManifestEntry.java | 8 ++-
.../apache/paimon/manifest/SimpleFileEntry.java | 24 ++++++--
.../paimon/manifest/SimpleFileEntrySerializer.java | 68 ----------------------
.../apache/paimon/mergetree/MergeTreeWriter.java | 14 +++--
.../org/apache/paimon/migrate/FileMetaUtils.java | 1 +
.../paimon/operation/FileStoreCommitImpl.java | 2 +-
.../apache/paimon/operation/RawFileSplitRead.java | 5 +-
.../apache/paimon/table/query/LocalTableQuery.java | 7 +--
.../org/apache/paimon/table/source/DataSplit.java | 4 +-
.../org/apache/paimon/table/system/FilesTable.java | 11 +++-
.../apache/paimon/append/AppendOnlyWriterTest.java | 7 ++-
.../paimon/io/KeyValueFileReadWriteTest.java | 5 +-
.../paimon/mergetree/ContainsLevelsTest.java | 6 +-
.../apache/paimon/mergetree/LookupLevelsTest.java | 6 +-
.../apache/paimon/mergetree/MergeTreeTestBase.java | 2 +-
.../paimon/table/AppendOnlyFileStoreTableTest.java | 2 +-
.../org/apache/paimon/table/source/SplitTest.java | 1 +
.../apache/paimon/flink/clone/PickFilesUtil.java | 2 +-
.../compact/changelog/ChangelogCompactTask.java | 11 ++--
.../paimon/flink/sink/RewriteFileIndexSink.java | 12 ++--
.../org/apache/paimon/spark/ScanHelperTest.scala | 2 +
32 files changed, 176 insertions(+), 145 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index a3087e3628..4c313dd655 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -244,7 +244,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
for (DataFileMeta file : compactAfter) {
// appendOnlyCompactManager will rewrite the file and no file
upgrade will occur, so we
// can directly delete the file in compactAfter.
- fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
+ fileIO.deleteQuietly(pathFactory.toPath(file));
}
sinkWriter.close();
@@ -271,7 +271,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
} finally {
// remove small files
for (DataFileMeta file : files) {
- fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
+ fileIO.deleteQuietly(pathFactory.toPath(file));
}
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 3be09ea6c2..459cd788de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -135,7 +135,8 @@ public class DataFileMeta {
List<String> extraFiles,
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
- @Nullable List<String> valueStatsCols) {
+ @Nullable List<String> valueStatsCols,
+ @Nullable String externalPath) {
return new DataFileMeta(
fileName,
fileSize,
@@ -154,7 +155,7 @@ public class DataFileMeta {
embeddedIndex,
fileSource,
valueStatsCols,
- null);
+ externalPath);
}
public DataFileMeta(
@@ -173,7 +174,8 @@ public class DataFileMeta {
@Nullable Long deleteRowCount,
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
- @Nullable List<String> valueStatsCols) {
+ @Nullable List<String> valueStatsCols,
+ @Nullable String externalPath) {
this(
fileName,
fileSize,
@@ -192,7 +194,7 @@ public class DataFileMeta {
embeddedIndex,
fileSource,
valueStatsCols,
- null);
+ externalPath);
}
public DataFileMeta(
@@ -403,7 +405,7 @@ public class DataFileMeta {
externalPath);
}
- public DataFileMeta rename(String newFileName) {
+ public DataFileMeta rename(String newExternalPath, String newFileName) {
return new DataFileMeta(
newFileName,
fileSize,
@@ -422,7 +424,7 @@ public class DataFileMeta {
embeddedIndex,
fileSource,
valueStatsCols,
- externalPath);
+ newExternalPath);
}
public DataFileMeta copyWithoutStats() {
@@ -449,8 +451,8 @@ public class DataFileMeta {
public List<Path> collectFiles(DataFilePathFactory pathFactory) {
List<Path> paths = new ArrayList<>();
- paths.add(pathFactory.toPath(fileName));
- extraFiles.forEach(f -> paths.add(pathFactory.toPath(f)));
+ paths.add(pathFactory.toPath(this));
+ extraFiles.forEach(f -> paths.add(pathFactory.toExtraFilePath(this,
f)));
return paths;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
index 68ccba6ea3..518db7c658 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
@@ -46,7 +46,7 @@ import static
org.apache.paimon.utils.SerializationUtils.newBytesType;
import static org.apache.paimon.utils.SerializationUtils.newStringType;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
-/** Serializer for {@link DataFileMeta} with 0.9 version. */
+/** Serializer for {@link DataFileMeta} with 1.0 snapshot version. */
public class DataFileMeta10LegacySerializer implements Serializable {
private static final long serialVersionUID = 1L;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index b632d44c94..daeb9f52ea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -78,10 +78,33 @@ public class DataFilePathFactory {
return new Path(parent, name);
}
+ @VisibleForTesting
public Path toPath(String fileName) {
return new Path(parent + "/" + fileName);
}
+ /**
+ * for read purpose.
+ *
+ * @param fileName the file name
+ * @param externalPath the external path, if null, it will use the parent
path
+ * @return the file's path
+ */
+ public Path toPath(String fileName, String externalPath) {
+ return new Path((externalPath == null ? parent : externalPath) + "/" +
fileName);
+ }
+
+ public Path toPath(DataFileMeta dataFileMeta) {
+ String externalPath = dataFileMeta.externalPath();
+ String fileName = dataFileMeta.fileName();
+ return new Path((externalPath == null ? parent : externalPath) + "/" +
fileName);
+ }
+
+ public Path toExtraFilePath(DataFileMeta dataFileMeta, String extraFile) {
+ String externalPath = dataFileMeta.externalPath();
+ return new Path((externalPath == null ? parent : externalPath) + "/" +
extraFile);
+ }
+
@VisibleForTesting
public String uuid() {
return uuid;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
index 530b871653..9055097d37 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
@@ -62,7 +62,7 @@ public class FileIndexEvaluator {
// go to file index check
try (FileIndexPredicate predicate =
new FileIndexPredicate(
- dataFilePathFactory.toPath(indexFiles.get(0)),
+ dataFilePathFactory.toExtraFilePath(file,
indexFiles.get(0)),
fileIO,
dataSchema.logicalRowType())) {
return predicate.evaluate(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index 651c6a6f7b..f78d755648 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -195,7 +195,8 @@ public abstract class KeyValueDataFileWriter
deleteRecordCount,
indexResult.embeddedIndexBytes(),
fileSource,
- valueStatsPair.getKey());
+ valueStatsPair.getKey(),
+ null);
}
abstract Pair<SimpleColStats[], SimpleColStats[]>
fetchKeyValueStats(SimpleColStats[] rowStats);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 7e272fc97c..9d65a54113 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -20,6 +20,7 @@ package org.apache.paimon.io;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
@@ -97,16 +98,25 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
@Override
public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws
IOException {
- return createRecordReader(file.schemaId(), file.fileName(),
file.fileSize(), file.level());
+ return createRecordReader(
+ file.schemaId(),
+ file.fileName(),
+ file.fileSize(),
+ file.level(),
+ file.externalPath());
}
+ @VisibleForTesting
public RecordReader<KeyValue> createRecordReader(
- long schemaId, String fileName, long fileSize, int level) throws
IOException {
+ long schemaId, String fileName, long fileSize, int level, String
externalPath)
+ throws IOException {
if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) {
return new AsyncRecordReader<>(
- () -> createRecordReader(schemaId, fileName, level, false,
2, fileSize));
+ () ->
+ createRecordReader(
+ schemaId, fileName, level, false, 2,
fileSize, externalPath));
}
- return createRecordReader(schemaId, fileName, level, true, null,
fileSize);
+ return createRecordReader(schemaId, fileName, level, true, null,
fileSize, externalPath);
}
private FileRecordReader<KeyValue> createRecordReader(
@@ -115,7 +125,8 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
int level,
boolean reuseFormat,
@Nullable Integer orcPoolSize,
- long fileSize)
+ long fileSize,
+ String externalPath)
throws IOException {
String formatIdentifier =
DataFilePathFactory.formatIdentifier(fileName);
@@ -132,7 +143,7 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
new FormatKey(schemaId, formatIdentifier),
key -> formatSupplier.get())
: formatSupplier.get();
- Path filePath = pathFactory.toPath(fileName);
+ Path filePath = pathFactory.toPath(fileName, externalPath);
FileRecordReader<InternalRow> fileRecordReader =
new DataFileRecordReader(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index a6aae3985b..500320c249 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -142,14 +142,14 @@ public class KeyValueFileWriterFactory {
fileIndexOptions);
}
- public void deleteFile(String filename, int level) {
-
fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename));
+ public void deleteFile(DataFileMeta meta, int level) {
+ fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(meta));
}
- public void copyFile(String sourceFileName, String targetFileName, int
level)
+ public void copyFile(DataFileMeta sourceMeta, DataFileMeta targetMeta, int
level)
throws IOException {
- Path sourcePath =
formatContext.pathFactory(level).toPath(sourceFileName);
- Path targetPath =
formatContext.pathFactory(level).toPath(targetFileName);
+ Path sourcePath = formatContext.pathFactory(level).toPath(sourceMeta);
+ Path targetPath = formatContext.pathFactory(level).toPath(targetMeta);
fileIO.copyFile(sourcePath, targetPath, true);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 8c2e8ec949..cd46d67e3b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -124,6 +124,7 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
:
Collections.singletonList(indexResult.independentIndexFile()),
indexResult.embeddedIndexBytes(),
fileSource,
- statsPair.getKey());
+ statsPair.getKey(),
+ null);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
index 060360623c..5d6d68144e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
@@ -41,8 +41,19 @@ public class ExpireFileEntry extends SimpleFileEntry {
@Nullable byte[] embeddedIndex,
BinaryRow minKey,
BinaryRow maxKey,
- @Nullable FileSource fileSource) {
- super(kind, partition, bucket, level, fileName, extraFiles,
embeddedIndex, minKey, maxKey);
+ @Nullable FileSource fileSource,
+ @Nullable String externalPath) {
+ super(
+ kind,
+ partition,
+ bucket,
+ level,
+ fileName,
+ extraFiles,
+ embeddedIndex,
+ minKey,
+ maxKey,
+ externalPath);
this.fileSource = fileSource;
}
@@ -61,7 +72,8 @@ public class ExpireFileEntry extends SimpleFileEntry {
entry.file().embeddedIndex(),
entry.minKey(),
entry.maxKey(),
- entry.file().fileSource().orElse(null));
+ entry.file().fileSource().orElse(null),
+ entry.externalPath());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index a2569beac6..738776438b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -54,6 +54,8 @@ public interface FileEntry {
String fileName();
+ String externalPath();
+
Identifier identifier();
BinaryRow minKey();
@@ -73,6 +75,7 @@ public interface FileEntry {
public final String fileName;
public final List<String> extraFiles;
@Nullable private final byte[] embeddedIndex;
+ @Nullable public final String externalPath;
/* Cache the hash code for the string */
private Integer hash;
@@ -83,13 +86,15 @@ public interface FileEntry {
int level,
String fileName,
List<String> extraFiles,
- @Nullable byte[] embeddedIndex) {
+ @Nullable byte[] embeddedIndex,
+ @Nullable String externalPath) {
this.partition = partition;
this.bucket = bucket;
this.level = level;
this.fileName = fileName;
this.extraFiles = extraFiles;
this.embeddedIndex = embeddedIndex;
+ this.externalPath = externalPath;
}
@Override
@@ -106,7 +111,8 @@ public interface FileEntry {
&& Objects.equals(partition, that.partition)
&& Objects.equals(fileName, that.fileName)
&& Objects.equals(extraFiles, that.extraFiles)
- && Objects.deepEquals(embeddedIndex, that.embeddedIndex);
+ && Objects.deepEquals(embeddedIndex, that.embeddedIndex)
+ && Objects.deepEquals(externalPath, that.externalPath);
}
@Override
@@ -119,7 +125,8 @@ public interface FileEntry {
level,
fileName,
extraFiles,
- Arrays.hashCode(embeddedIndex));
+ Arrays.hashCode(embeddedIndex),
+ externalPath);
}
return hash;
}
@@ -138,6 +145,8 @@ public interface FileEntry {
+ extraFiles
+ ", embeddedIndex="
+ Arrays.toString(embeddedIndex)
+ + ", externalPath="
+ + externalPath
+ '}';
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 626e0a5d46..d4748451d8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -92,6 +92,11 @@ public class ManifestEntry implements FileEntry {
return file.fileName();
}
+ @Override
+ public String externalPath() {
+ return file.externalPath();
+ }
+
@Override
public BinaryRow minKey() {
return file.minKey();
@@ -123,7 +128,8 @@ public class ManifestEntry implements FileEntry {
file.level(),
file.fileName(),
file.extraFiles(),
- file.embeddedIndex());
+ file.embeddedIndex(),
+ file.externalPath());
}
public ManifestEntry copyWithoutStats() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
index fdaed2b85a..f86bded52d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
@@ -38,6 +38,7 @@ public class SimpleFileEntry implements FileEntry {
@Nullable private final byte[] embeddedIndex;
private final BinaryRow minKey;
private final BinaryRow maxKey;
+ @Nullable private final String externalPath;
public SimpleFileEntry(
FileKind kind,
@@ -48,7 +49,8 @@ public class SimpleFileEntry implements FileEntry {
List<String> extraFiles,
@Nullable byte[] embeddedIndex,
BinaryRow minKey,
- BinaryRow maxKey) {
+ BinaryRow maxKey,
+ @Nullable String externalPath) {
this.kind = kind;
this.partition = partition;
this.bucket = bucket;
@@ -58,6 +60,7 @@ public class SimpleFileEntry implements FileEntry {
this.embeddedIndex = embeddedIndex;
this.minKey = minKey;
this.maxKey = maxKey;
+ this.externalPath = externalPath;
}
public static SimpleFileEntry from(ManifestEntry entry) {
@@ -70,7 +73,8 @@ public class SimpleFileEntry implements FileEntry {
entry.file().extraFiles(),
entry.file().embeddedIndex(),
entry.minKey(),
- entry.maxKey());
+ entry.maxKey(),
+ entry.externalPath());
}
public static List<SimpleFileEntry> from(List<ManifestEntry> entries) {
@@ -102,9 +106,15 @@ public class SimpleFileEntry implements FileEntry {
return fileName;
}
+ @Override
+ public String externalPath() {
+ return externalPath;
+ }
+
@Override
public Identifier identifier() {
- return new Identifier(partition, bucket, level, fileName, extraFiles,
embeddedIndex);
+ return new Identifier(
+ partition, bucket, level, fileName, extraFiles, embeddedIndex,
externalPath);
}
@Override
@@ -138,12 +148,14 @@ public class SimpleFileEntry implements FileEntry {
&& Objects.equals(fileName, that.fileName)
&& Objects.equals(extraFiles, that.extraFiles)
&& Objects.equals(minKey, that.minKey)
- && Objects.equals(maxKey, that.maxKey);
+ && Objects.equals(maxKey, that.maxKey)
+ && Objects.equals(externalPath, that.externalPath);
}
@Override
public int hashCode() {
- return Objects.hash(kind, partition, bucket, level, fileName,
extraFiles, minKey, maxKey);
+ return Objects.hash(
+ kind, partition, bucket, level, fileName, extraFiles, minKey,
maxKey, externalPath);
}
@Override
@@ -165,6 +177,8 @@ public class SimpleFileEntry implements FileEntry {
+ minKey
+ ", maxKey="
+ maxKey
+ + ", externalPath="
+ + externalPath
+ '}';
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
deleted file mode 100644
index bdc89b8d4c..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.manifest;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.utils.VersionedObjectSerializer;
-
-import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData;
-import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
-
-/** A {@link VersionedObjectSerializer} for {@link SimpleFileEntry}, only
supports reading. */
-public class SimpleFileEntrySerializer extends
VersionedObjectSerializer<SimpleFileEntry> {
-
- private static final long serialVersionUID = 1L;
-
- private final int version;
-
- public SimpleFileEntrySerializer() {
- super(ManifestEntry.SCHEMA);
- this.version = new ManifestEntrySerializer().getVersion();
- }
-
- @Override
- public int getVersion() {
- return version;
- }
-
- @Override
- public InternalRow convertTo(SimpleFileEntry meta) {
- throw new UnsupportedOperationException("Only supports convert from
row.");
- }
-
- @Override
- public SimpleFileEntry convertFrom(int version, InternalRow row) {
- if (this.version != version) {
- throw new IllegalArgumentException("Unsupported version: " +
version);
- }
-
- InternalRow file = row.getRow(4, DataFileMeta.SCHEMA.getFieldCount());
- return new SimpleFileEntry(
- FileKind.fromByteValue(row.getByte(0)),
- deserializeBinaryRow(row.getBinary(1)),
- row.getInt(2),
- file.getInt(10),
- file.getString(0).toString(),
- fromStringArrayData(file.getArray(11)),
- file.isNullAt(14) ? null : file.getBinary(14),
- deserializeBinaryRow(file.getBinary(3)),
- deserializeBinaryRow(file.getBinary(4)));
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index f2a964bae1..df48559223 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -27,6 +27,7 @@ import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
@@ -241,9 +242,10 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
} else if (changelogProducer == ChangelogProducer.INPUT &&
isInsertOnly) {
List<DataFileMeta> changelogMetas = new ArrayList<>();
for (DataFileMeta dataMeta : dataMetas) {
+ Path newPath = writerFactory.newChangelogPath(0);
DataFileMeta changelogMeta =
-
dataMeta.rename(writerFactory.newChangelogPath(0).getName());
- writerFactory.copyFile(dataMeta.fileName(),
changelogMeta.fileName(), 0);
+ dataMeta.rename(newPath.getParent().getName(),
newPath.getName());
+ writerFactory.copyFile(dataMeta, changelogMeta, 0);
changelogMetas.add(changelogMeta);
}
newFilesChangelog.addAll(changelogMetas);
@@ -341,7 +343,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
// 2. This file is not the input of upgraded.
if (!compactBefore.containsKey(file.fileName())
&& !afterFiles.contains(file.fileName())) {
- writerFactory.deleteFile(file.fileName(), file.level());
+ writerFactory.deleteFile(file, file.level());
}
} else {
compactBefore.put(file.fileName(), file);
@@ -375,7 +377,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
deletedFiles.clear();
for (DataFileMeta file : newFilesChangelog) {
- writerFactory.deleteFile(file.fileName(), file.level());
+ writerFactory.deleteFile(file, file.level());
}
newFilesChangelog.clear();
@@ -390,12 +392,12 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
compactAfter.clear();
for (DataFileMeta file : compactChangelog) {
- writerFactory.deleteFile(file.fileName(), file.level());
+ writerFactory.deleteFile(file, file.level());
}
compactChangelog.clear();
for (DataFileMeta file : delete) {
- writerFactory.deleteFile(file.fileName(), file.level());
+ writerFactory.deleteFile(file, file.level());
}
if (compactDeletionFile != null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 391c5f9bb6..51a0b5e2a9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -169,6 +169,7 @@ public class FileMetaUtils {
Collections.emptyList(),
null,
FileSource.APPEND,
+ null,
null);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 001132e167..0b4783b165 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -593,7 +593,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
toDelete.addAll(commitMessage.compactIncrement().changelogFiles());
for (DataFileMeta file : toDelete) {
- fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
+ fileIO.deleteQuietly(pathFactory.toPath(file));
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 4fda82f4e8..d0f3275b5a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -210,10 +210,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
FormatReaderContext formatReaderContext =
new FormatReaderContext(
- fileIO,
- dataFilePathFactory.toPath(file.fileName()),
- file.fileSize(),
- fileIndexResult);
+ fileIO, dataFilePathFactory.toPath(file),
file.fileSize(), fileIndexResult);
FileRecordReader<InternalRow> fileRecordReader =
new DataFileRecordReader(
formatReaderMapping.getReaderFactory(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 8ff5ce7a65..d474d4e2d0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -161,12 +161,7 @@ public class LocalTableQuery implements TableQuery {
readerFactoryBuilder.keyType(),
new
LookupLevels.KeyValueProcessor(readerFactoryBuilder.readValueType()),
file -> {
- RecordReader<KeyValue> reader =
- factory.createRecordReader(
- file.schemaId(),
- file.fileName(),
- file.fileSize(),
- file.level());
+ RecordReader<KeyValue> reader =
factory.createRecordReader(file);
if (cacheRowFilter != null) {
reader =
reader.filter(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index bf60234214..9178d25a91 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -180,8 +180,10 @@ public class DataSplit implements Split {
}
private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) {
+ String path = file.externalPath() != null ? file.externalPath() :
bucketPath;
+ path += "/" + file.fileName();
return new RawFile(
- bucketPath + "/" + file.fileName(),
+ path,
file.fileSize(),
0,
file.fileSize(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 6dcbb322d6..3107ebe150 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -385,8 +385,15 @@ public class FilesTable implements ReadonlyTable {
dataSplit.partition()))),
dataSplit::bucket,
() ->
- BinaryString.fromString(
- dataSplit.bucketPath() + "/" +
dataFileMeta.fileName()),
+ dataFileMeta.externalPath() == null
+ ? BinaryString.fromString(
+ dataSplit.bucketPath()
+ + "/"
+ +
dataFileMeta.fileName())
+ : BinaryString.fromString(
+ dataFileMeta.externalPath()
+ + "/"
+ +
dataFileMeta.fileName()),
() ->
BinaryString.fromString(
DataFilePathFactory.formatIdentifier(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index a9012ed89b..3f752be13e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -125,7 +125,7 @@ public class AppendOnlyWriterTest {
DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0);
assertThat(meta).isNotNull();
- Path path = pathFactory.toPath(meta.fileName());
+ Path path = pathFactory.toPath(meta);
assertThat(LocalFileIO.create().exists(path)).isTrue();
assertThat(meta.rowCount()).isEqualTo(1L);
@@ -186,7 +186,7 @@ public class AppendOnlyWriterTest {
assertThat(inc.newFilesIncrement().newFiles().size()).isEqualTo(1);
DataFileMeta meta = inc.newFilesIncrement().newFiles().get(0);
- Path path = pathFactory.toPath(meta.fileName());
+ Path path = pathFactory.toPath(meta);
assertThat(LocalFileIO.create().exists(path)).isTrue();
assertThat(meta.rowCount()).isEqualTo(100L);
@@ -227,7 +227,7 @@ public class AppendOnlyWriterTest {
int id = 0;
for (DataFileMeta meta : firstInc.newFilesIncrement().newFiles()) {
- Path path = pathFactory.toPath(meta.fileName());
+ Path path = pathFactory.toPath(meta);
assertThat(LocalFileIO.create().exists(path)).isTrue();
assertThat(meta.rowCount()).isEqualTo(1000L);
@@ -680,6 +680,7 @@ public class AppendOnlyWriterTest {
Collections.emptyList(),
null,
FileSource.APPEND,
+ null,
null);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index e43cd898db..e817562689 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -78,7 +78,7 @@ public class KeyValueFileReadWriteTest {
public void testReadNonExistentFile() {
KeyValueFileReaderFactory readerFactory =
createReaderFactory(tempDir.toString(), "avro", null, null);
- assertThatThrownBy(() -> readerFactory.createRecordReader(0,
"dummy_file.avro", 1, 0))
+ assertThatThrownBy(() -> readerFactory.createRecordReader(0,
"dummy_file.avro", 1, 0, null))
.hasMessageContaining(
"you can configure 'snapshot.time-retained' option
with a larger value.");
}
@@ -312,7 +312,8 @@ public class KeyValueFileReadWriteTest {
meta.schemaId(),
meta.fileName(),
meta.fileSize(),
- meta.level()));
+ meta.level(),
+ meta.externalPath()));
while (actualKvsIterator.hasNext()) {
assertThat(expectedIterator.hasNext()).isTrue();
KeyValue actualKv = actualKvsIterator.next();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index be49311427..fa96765a42 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -198,7 +198,11 @@ public class ContainsLevelsTest {
file ->
createReaderFactory()
.createRecordReader(
- 0, file.fileName(), file.fileSize(),
file.level()),
+ 0,
+ file.fileName(),
+ file.fileSize(),
+ file.level(),
+ file.externalPath()),
file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index a678534042..56c45cfdc4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -275,7 +275,11 @@ public class LookupLevelsTest {
file ->
createReaderFactory()
.createRecordReader(
- 0, file.fileName(), file.fileSize(),
file.level()),
+ 0,
+ file.fileName(),
+ file.fileSize(),
+ file.level(),
+ file.externalPath()),
file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index f2a9c44dd7..47d12ce47c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -592,7 +592,7 @@ public abstract class MergeTreeTestBase {
assertThat(remove).isTrue();
// See MergeTreeWriter.updateCompactResult
if (!newFileNames.contains(file.fileName()) &&
!afterFiles.contains(file.fileName())) {
- compactWriterFactory.deleteFile(file.fileName(), file.level());
+ compactWriterFactory.deleteFile(file, file.level());
}
}
compactedFiles.addAll(increment.compactIncrement().compactAfter());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 01d4e89af9..471b60d3cf 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -112,7 +112,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
table.store()
.pathFactory()
.createDataFilePathFactory(split.partition(),
split.bucket())
- .toPath(split.dataFiles().get(0).fileName());
+ .toPath(split.dataFiles().get(0));
table.fileIO().deleteQuietly(path);
// read
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index a4e581b701..a088f40dab 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -447,6 +447,7 @@ public class SplitTest {
Collections.emptyList(),
null,
null,
+ null,
null);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
index f83b5cf8f9..b7e1e60878 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
@@ -108,7 +108,7 @@ public class PickFilesUtil {
pathFactory
.createDataFilePathFactory(
simpleFileEntry.partition(),
simpleFileEntry.bucket())
- .toPath(simpleFileEntry.fileName());
+ .toPath(simpleFileEntry.fileName(),
simpleFileEntry.externalPath());
dataFiles.add(dataFilePath);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
index 6b95e36907..39860e418c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
@@ -96,7 +96,7 @@ public class ChangelogCompactTask implements Serializable {
outputStream,
results,
table,
- dataFilePathFactory.toPath(meta.fileName()),
+ dataFilePathFactory.toPath(meta),
bucket,
false,
meta);
@@ -111,7 +111,7 @@ public class ChangelogCompactTask implements Serializable {
outputStream,
results,
table,
- dataFilePathFactory.toPath(meta.fileName()),
+ dataFilePathFactory.toPath(meta),
bucket,
true,
meta);
@@ -167,7 +167,8 @@ public class ChangelogCompactTask implements Serializable {
table.fileIO()
.rename(
changelogTempPath,
- dataFilePathFactory.toPath(
+ dataFilePathFactory.toExtraFilePath(
+ baseResult.meta,
realName
+ "."
+
CompactedChangelogReadOnlyFormat.getIdentifier(
@@ -193,9 +194,9 @@ public class ChangelogCompactTask implements Serializable {
+
CompactedChangelogReadOnlyFormat.getIdentifier(
result.meta.fileFormat());
if (result.isCompactResult) {
- compactChangelog.add(result.meta.rename(name));
+
compactChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name));
} else {
- newFilesChangelog.add(result.meta.rename(name));
+
newFilesChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
index d9f863c6b9..d35cb09cb7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
@@ -198,16 +198,18 @@ public class RewriteFileIndexSink extends
FlinkWriteSink<ManifestEntry> {
String indexFile = indexFiles.get(0);
try (FileIndexFormat.Reader indexReader =
FileIndexFormat.createReader(
-
fileIO.newInputStream(dataFilePathFactory.toPath(indexFile)),
+ fileIO.newInputStream(
+ dataFilePathFactory.toExtraFilePath(
+ dataFileMeta, indexFile)),
schemaInfo.fileSchema)) {
maintainers = indexReader.readAll();
}
- newIndexPath =
createNewFileIndexFilePath(dataFilePathFactory.toPath(indexFile));
+ newIndexPath =
+ createNewFileIndexFilePath(
+
dataFilePathFactory.toExtraFilePath(dataFileMeta, indexFile));
} else {
maintainers = new HashMap<>();
- newIndexPath =
- dataFileToFileIndexPath(
-
dataFilePathFactory.toPath(dataFileMeta.fileName()));
+ newIndexPath =
dataFileToFileIndexPath(dataFilePathFactory.toPath(dataFileMeta));
}
// remove unnecessary
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
index a3223446f6..4997f65eaf 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
@@ -53,6 +53,7 @@ class ScanHelperTest extends PaimonSparkTestBase {
new java.util.ArrayList[String](),
null,
FileSource.APPEND,
+ null,
null)
}
@@ -89,6 +90,7 @@ class ScanHelperTest extends PaimonSparkTestBase {
new java.util.ArrayList[String](),
null,
FileSource.APPEND,
+ null,
null)
).asJava