This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4ac05e4b6e [core] External Path in DataFileMeta should be the file
path (#4766)
4ac05e4b6e is described below
commit 4ac05e4b6e730d976a51e92a518cfdbdeb10b51b
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Dec 24 16:07:44 2024 +0800
[core] External Path in DataFileMeta should be the file path (#4766)
---
.../java/org/apache/paimon/io/DataFileMeta.java | 16 +++++--
.../apache/paimon/io/DataFileMetaSerializer.java | 2 +-
.../org/apache/paimon/io/DataFilePathFactory.java | 54 ++++++++++------------
.../org/apache/paimon/io/FileIndexEvaluator.java | 2 +-
.../paimon/io/KeyValueFileReaderFactory.java | 40 ++++------------
.../paimon/io/KeyValueFileWriterFactory.java | 15 +++---
.../java/org/apache/paimon/manifest/FileEntry.java | 5 +-
.../org/apache/paimon/manifest/ManifestEntry.java | 7 ++-
.../apache/paimon/manifest/SimpleFileEntry.java | 1 +
.../apache/paimon/mergetree/MergeTreeWriter.java | 16 +++----
.../org/apache/paimon/table/source/DataSplit.java | 4 +-
.../org/apache/paimon/table/system/FilesTable.java | 53 +++++++++------------
.../apache/paimon/append/AppendOnlyWriterTest.java | 7 ++-
.../apache/paimon/io/DataFilePathFactoryTest.java | 12 +++--
.../paimon/io/KeyValueFileReadWriteTest.java | 14 +++---
.../paimon/mergetree/ContainsLevelsTest.java | 9 +---
.../apache/paimon/mergetree/LookupLevelsTest.java | 9 +---
.../apache/paimon/mergetree/MergeTreeTestBase.java | 8 ++--
.../paimon/utils/FileStorePathFactoryTest.java | 12 +++--
.../apache/paimon/flink/clone/CloneFileInfo.java | 15 ++++--
.../paimon/flink/clone/CopyFileOperator.java | 2 +-
.../flink/clone/PickFilesForCloneOperator.java | 6 ++-
.../apache/paimon/flink/clone/PickFilesUtil.java | 2 +-
.../compact/changelog/ChangelogCompactTask.java | 10 ++--
.../paimon/flink/sink/RewriteFileIndexSink.java | 5 +-
.../flink/action/RewriteFileIndexActionITCase.java | 2 +-
.../procedure/RewriteFileIndexProcedureITCase.java | 2 +-
...nlySingleTableCompactionWorkerOperatorTest.java | 7 +--
.../apache/paimon/spark/SparkFileIndexITCase.java | 3 +-
29 files changed, 157 insertions(+), 183 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 459cd788de..b164b60fe5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -368,9 +368,14 @@ public class DataFileMeta {
return split[split.length - 1];
}
- @Nullable
- public String externalPath() {
- return externalPath;
+ public Optional<String> externalPath() {
+ return Optional.ofNullable(externalPath);
+ }
+
+ public Optional<String> externalPathDir() {
+ return Optional.ofNullable(externalPath)
+ .map(Path::new)
+ .map(p -> p.getParent().toUri().toString());
}
public Optional<FileSource> fileSource() {
@@ -405,7 +410,8 @@ public class DataFileMeta {
externalPath);
}
- public DataFileMeta rename(String newExternalPath, String newFileName) {
+ public DataFileMeta rename(String newFileName) {
+ String newExternalPath = externalPathDir().map(dir -> dir + "/" +
newFileName).orElse(null);
return new DataFileMeta(
newFileName,
fileSize,
@@ -452,7 +458,7 @@ public class DataFileMeta {
public List<Path> collectFiles(DataFilePathFactory pathFactory) {
List<Path> paths = new ArrayList<>();
paths.add(pathFactory.toPath(this));
- extraFiles.forEach(f -> paths.add(pathFactory.toExtraFilePath(this,
f)));
+ extraFiles.forEach(f -> paths.add(pathFactory.toAlignedPath(f, this)));
return paths;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
index c8a5e326b0..a316f897ff 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
@@ -59,7 +59,7 @@ public class DataFileMetaSerializer extends
ObjectSerializer<DataFileMeta> {
meta.embeddedIndex(),
meta.fileSource().map(FileSource::toByteValue).orElse(null),
toStringArrayData(meta.valueStatsCols()),
- BinaryString.fromString(meta.externalPath()));
+
meta.externalPath().map(BinaryString::fromString).orElse(null));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index daeb9f52ea..19525ab6cd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -20,9 +20,11 @@ package org.apache.paimon.io;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileEntry;
import javax.annotation.concurrent.ThreadSafe;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@@ -67,47 +69,36 @@ public class DataFilePathFactory {
return newPath(changelogFilePrefix);
}
- private Path newPath(String prefix) {
+ public String newChangelogFileName() {
+ return newFileName(changelogFilePrefix);
+ }
+
+ public Path newPath(String prefix) {
+ return new Path(parent, newFileName(prefix));
+ }
+
+ private String newFileName(String prefix) {
String extension;
if (fileSuffixIncludeCompression) {
extension = "." + fileCompression + "." + formatIdentifier;
} else {
extension = "." + formatIdentifier;
}
- String name = prefix + uuid + "-" + pathCount.getAndIncrement() +
extension;
- return new Path(parent, name);
+ return prefix + uuid + "-" + pathCount.getAndIncrement() + extension;
}
- @VisibleForTesting
- public Path toPath(String fileName) {
- return new Path(parent + "/" + fileName);
- }
-
- /**
- * for read purpose.
- *
- * @param fileName the file name
- * @param externalPath the external path, if null, it will use the parent
path
- * @return the file's path
- */
- public Path toPath(String fileName, String externalPath) {
- return new Path((externalPath == null ? parent : externalPath) + "/" +
fileName);
+ public Path toPath(DataFileMeta file) {
+ return file.externalPath().map(Path::new).orElse(new Path(parent,
file.fileName()));
}
- public Path toPath(DataFileMeta dataFileMeta) {
- String externalPath = dataFileMeta.externalPath();
- String fileName = dataFileMeta.fileName();
- return new Path((externalPath == null ? parent : externalPath) + "/" +
fileName);
+ public Path toPath(FileEntry file) {
+ return Optional.ofNullable(file.externalPath())
+ .map(Path::new)
+ .orElse(new Path(parent, file.fileName()));
}
- public Path toExtraFilePath(DataFileMeta dataFileMeta, String extraFile) {
- String externalPath = dataFileMeta.externalPath();
- return new Path((externalPath == null ? parent : externalPath) + "/" +
extraFile);
- }
-
- @VisibleForTesting
- public String uuid() {
- return uuid;
+ public Path toAlignedPath(String fileName, DataFileMeta aligned) {
+ return new
Path(aligned.externalPathDir().map(Path::new).orElse(parent), fileName);
}
public static Path dataFileToFileIndexPath(Path dataFilePath) {
@@ -141,4 +132,9 @@ public class DataFilePathFactory {
return fileName.substring(index + 1);
}
+
+ @VisibleForTesting
+ String uuid() {
+ return uuid;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
index 9055097d37..3ed4c278d9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
@@ -62,7 +62,7 @@ public class FileIndexEvaluator {
// go to file index check
try (FileIndexPredicate predicate =
new FileIndexPredicate(
- dataFilePathFactory.toExtraFilePath(file,
indexFiles.get(0)),
+
dataFilePathFactory.toAlignedPath(indexFiles.get(0), file),
fileIO,
dataSchema.logicalRowType())) {
return predicate.evaluate(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 9d65a54113..14221d50be 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -20,7 +20,6 @@ package org.apache.paimon.io;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
@@ -98,37 +97,17 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
@Override
public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws
IOException {
- return createRecordReader(
- file.schemaId(),
- file.fileName(),
- file.fileSize(),
- file.level(),
- file.externalPath());
- }
-
- @VisibleForTesting
- public RecordReader<KeyValue> createRecordReader(
- long schemaId, String fileName, long fileSize, int level, String
externalPath)
- throws IOException {
- if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) {
- return new AsyncRecordReader<>(
- () ->
- createRecordReader(
- schemaId, fileName, level, false, 2,
fileSize, externalPath));
+ if (file.fileSize() >= asyncThreshold &&
file.fileName().endsWith(".orc")) {
+ return new AsyncRecordReader<>(() -> createRecordReader(file,
false, 2));
}
- return createRecordReader(schemaId, fileName, level, true, null,
fileSize, externalPath);
+ return createRecordReader(file, true, null);
}
private FileRecordReader<KeyValue> createRecordReader(
- long schemaId,
- String fileName,
- int level,
- boolean reuseFormat,
- @Nullable Integer orcPoolSize,
- long fileSize,
- String externalPath)
+ DataFileMeta file, boolean reuseFormat, @Nullable Integer
orcPoolSize)
throws IOException {
- String formatIdentifier =
DataFilePathFactory.formatIdentifier(fileName);
+ String formatIdentifier =
DataFilePathFactory.formatIdentifier(file.fileName());
+ long schemaId = file.schemaId();
Supplier<FormatReaderMapping> formatSupplier =
() ->
@@ -143,8 +122,9 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
new FormatKey(schemaId, formatIdentifier),
key -> formatSupplier.get())
: formatSupplier.get();
- Path filePath = pathFactory.toPath(fileName, externalPath);
+ Path filePath = pathFactory.toPath(file);
+ long fileSize = file.fileSize();
FileRecordReader<InternalRow> fileRecordReader =
new DataFileRecordReader(
formatReaderMapping.getReaderFactory(),
@@ -156,13 +136,13 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
formatReaderMapping.getCastMapping(),
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition));
- Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
+ Optional<DeletionVector> deletionVector =
dvFactory.create(file.fileName());
if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
fileRecordReader =
new ApplyDeletionVectorReader(fileRecordReader,
deletionVector.get());
}
- return new KeyValueDataFileRecordReader(fileRecordReader, keyType,
valueType, level);
+ return new KeyValueDataFileRecordReader(fileRecordReader, keyType,
valueType, file.level());
}
public static Builder builder(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index 500320c249..7b6f4f0e3c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -142,14 +142,13 @@ public class KeyValueFileWriterFactory {
fileIndexOptions);
}
- public void deleteFile(DataFileMeta meta, int level) {
- fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(meta));
+ public void deleteFile(DataFileMeta file) {
+
fileIO.deleteQuietly(formatContext.pathFactory(file.level()).toPath(file));
}
- public void copyFile(DataFileMeta sourceMeta, DataFileMeta targetMeta, int
level)
- throws IOException {
- Path sourcePath = formatContext.pathFactory(level).toPath(sourceMeta);
- Path targetPath = formatContext.pathFactory(level).toPath(targetMeta);
+ public void copyFile(DataFileMeta sourceFile, DataFileMeta targetFile)
throws IOException {
+ Path sourcePath =
formatContext.pathFactory(sourceFile.level()).toPath(sourceFile);
+ Path targetPath =
formatContext.pathFactory(targetFile.level()).toPath(targetFile);
fileIO.copyFile(sourcePath, targetPath, true);
}
@@ -157,8 +156,8 @@ public class KeyValueFileWriterFactory {
return fileIO;
}
- public Path newChangelogPath(int level) {
- return formatContext.pathFactory(level).newChangelogPath();
+ public String newChangelogFileName(int level) {
+ return formatContext.pathFactory(level).newChangelogFileName();
}
public static Builder builder(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 738776438b..dd77759de1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -54,6 +54,7 @@ public interface FileEntry {
String fileName();
+ @Nullable
String externalPath();
Identifier identifier();
@@ -161,7 +162,9 @@ public interface FileEntry {
+ ", extraFiles "
+ extraFiles
+ ", embeddedIndex "
- + Arrays.toString(embeddedIndex);
+ + Arrays.toString(embeddedIndex)
+ + ", externalPath "
+ + externalPath;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index d4748451d8..3cb5733a38 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -26,6 +26,8 @@ import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@@ -92,9 +94,10 @@ public class ManifestEntry implements FileEntry {
return file.fileName();
}
+ @Nullable
@Override
public String externalPath() {
- return file.externalPath();
+ return file.externalPath().orElse(null);
}
@Override
@@ -129,7 +132,7 @@ public class ManifestEntry implements FileEntry {
file.fileName(),
file.extraFiles(),
file.embeddedIndex(),
- file.externalPath());
+ externalPath());
}
public ManifestEntry copyWithoutStats() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
index f86bded52d..c8708db0b8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
@@ -106,6 +106,7 @@ public class SimpleFileEntry implements FileEntry {
return fileName;
}
+ @Nullable
@Override
public String externalPath() {
return externalPath;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index df48559223..1c805e764a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -27,7 +27,6 @@ import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
@@ -242,10 +241,9 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
} else if (changelogProducer == ChangelogProducer.INPUT &&
isInsertOnly) {
List<DataFileMeta> changelogMetas = new ArrayList<>();
for (DataFileMeta dataMeta : dataMetas) {
- Path newPath = writerFactory.newChangelogPath(0);
- DataFileMeta changelogMeta =
- dataMeta.rename(newPath.getParent().getName(),
newPath.getName());
- writerFactory.copyFile(dataMeta, changelogMeta, 0);
+ String newFileName = writerFactory.newChangelogFileName(0);
+ DataFileMeta changelogMeta = dataMeta.rename(newFileName);
+ writerFactory.copyFile(dataMeta, changelogMeta);
changelogMetas.add(changelogMeta);
}
newFilesChangelog.addAll(changelogMetas);
@@ -343,7 +341,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
// 2. This file is not the input of upgraded.
if (!compactBefore.containsKey(file.fileName())
&& !afterFiles.contains(file.fileName())) {
- writerFactory.deleteFile(file, file.level());
+ writerFactory.deleteFile(file);
}
} else {
compactBefore.put(file.fileName(), file);
@@ -377,7 +375,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
deletedFiles.clear();
for (DataFileMeta file : newFilesChangelog) {
- writerFactory.deleteFile(file, file.level());
+ writerFactory.deleteFile(file);
}
newFilesChangelog.clear();
@@ -392,12 +390,12 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
compactAfter.clear();
for (DataFileMeta file : compactChangelog) {
- writerFactory.deleteFile(file, file.level());
+ writerFactory.deleteFile(file);
}
compactChangelog.clear();
for (DataFileMeta file : delete) {
- writerFactory.deleteFile(file, file.level());
+ writerFactory.deleteFile(file);
}
if (compactDeletionFile != null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 9178d25a91..39f9269f41 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -180,10 +180,8 @@ public class DataSplit implements Split {
}
private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) {
- String path = file.externalPath() != null ? file.externalPath() :
bucketPath;
- path += "/" + file.fileName();
return new RawFile(
- path,
+ file.externalPath().orElse(bucketPath + "/" + file.fileName()),
file.fileSize(),
0,
file.fileSize(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 3107ebe150..5c7ccd4809 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -370,9 +370,9 @@ public class FilesTable implements ReadonlyTable {
DataSplit dataSplit,
RowDataToObjectArrayConverter partitionConverter,
Function<Long, RowDataToObjectArrayConverter> keyConverters,
- DataFileMeta dataFileMeta,
+ DataFileMeta file,
SimpleStatsEvolutions simpleStatsEvolutions) {
- StatsLazyGetter statsGetter = new StatsLazyGetter(dataFileMeta,
simpleStatsEvolutions);
+ StatsLazyGetter statsGetter = new StatsLazyGetter(file,
simpleStatsEvolutions);
@SuppressWarnings("unchecked")
Supplier<Object>[] fields =
new Supplier[] {
@@ -385,51 +385,44 @@ public class FilesTable implements ReadonlyTable {
dataSplit.partition()))),
dataSplit::bucket,
() ->
- dataFileMeta.externalPath() == null
- ? BinaryString.fromString(
- dataSplit.bucketPath()
- + "/"
- +
dataFileMeta.fileName())
- : BinaryString.fromString(
- dataFileMeta.externalPath()
- + "/"
- +
dataFileMeta.fileName()),
+ BinaryString.fromString(
+ file.externalPath()
+ .orElse(
+ dataSplit.bucketPath()
+ + "/"
+ +
file.fileName())),
() ->
BinaryString.fromString(
- DataFilePathFactory.formatIdentifier(
- dataFileMeta.fileName())),
- dataFileMeta::schemaId,
- dataFileMeta::level,
- dataFileMeta::rowCount,
- dataFileMeta::fileSize,
+
DataFilePathFactory.formatIdentifier(file.fileName())),
+ file::schemaId,
+ file::level,
+ file::rowCount,
+ file::fileSize,
() ->
- dataFileMeta.minKey().getFieldCount() <= 0
+ file.minKey().getFieldCount() <= 0
? null
: BinaryString.fromString(
Arrays.toString(
keyConverters
-
.apply(dataFileMeta.schemaId())
-
.convert(dataFileMeta.minKey()))),
+
.apply(file.schemaId())
+
.convert(file.minKey()))),
() ->
- dataFileMeta.maxKey().getFieldCount() <= 0
+ file.maxKey().getFieldCount() <= 0
? null
: BinaryString.fromString(
Arrays.toString(
keyConverters
-
.apply(dataFileMeta.schemaId())
-
.convert(dataFileMeta.maxKey()))),
+
.apply(file.schemaId())
+
.convert(file.maxKey()))),
() ->
BinaryString.fromString(statsGetter.nullValueCounts().toString()),
() ->
BinaryString.fromString(statsGetter.lowerValueBounds().toString()),
() ->
BinaryString.fromString(statsGetter.upperValueBounds().toString()),
- dataFileMeta::minSequenceNumber,
- dataFileMeta::maxSequenceNumber,
- dataFileMeta::creationTime,
+ file::minSequenceNumber,
+ file::maxSequenceNumber,
+ file::creationTime,
() ->
BinaryString.fromString(
- dataFileMeta
- .fileSource()
- .map(FileSource::toString)
- .orElse(null))
+
file.fileSource().map(FileSource::toString).orElse(null))
};
return new LazyGenericRow(fields);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 3f752be13e..7757020532 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -66,7 +66,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@@ -646,10 +645,10 @@ public class AppendOnlyWriterTest {
int size = toCompact.size();
long minSeq = toCompact.get(0).minSequenceNumber();
long maxSeq = toCompact.get(size - 1).maxSequenceNumber();
- String fileName = "compact-" + UUID.randomUUID();
- LocalFileIO.create().newOutputStream(pathFactory.toPath(fileName),
false).close();
+ Path path = pathFactory.newPath("compact-");
+ LocalFileIO.create().newOutputStream(path, false).close();
return DataFileMeta.forAppend(
- fileName,
+ path.getName(),
toCompact.stream().mapToLong(DataFileMeta::fileSize).sum(),
toCompact.stream().mapToLong(DataFileMeta::rowCount).sum(),
STATS_SERIALIZER.toBinaryAllMode(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
index d36966c55a..109f33c3dc 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
@@ -55,8 +55,9 @@ public class DataFilePathFactoryTest {
+ "."
+
CoreOptions.FILE_FORMAT.defaultValue()));
}
- assertThat(pathFactory.toPath("my-data-file-name"))
- .isEqualTo(new Path(tempDir.toString() +
"/bucket-123/my-data-file-name"));
+ assertThat(pathFactory.newPath("my-data-file-name").toString())
+ .startsWith(
+ new Path(tempDir.toString() +
"/bucket-123/my-data-file-name").toString());
}
@Test
@@ -83,8 +84,9 @@ public class DataFilePathFactoryTest {
+ "."
+
CoreOptions.FILE_FORMAT.defaultValue()));
}
- assertThat(pathFactory.toPath("my-data-file-name"))
- .isEqualTo(
- new Path(tempDir.toString() +
"/dt=20211224/bucket-123/my-data-file-name"));
+ assertThat(pathFactory.newPath("my-data-file-name").toString())
+ .startsWith(
+ new Path(tempDir.toString() +
"/dt=20211224/bucket-123/my-data-file-name")
+ .toString());
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index e817562689..8f2c815404 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -61,6 +61,7 @@ import java.util.function.Function;
import static org.apache.paimon.TestKeyValueGenerator.DEFAULT_ROW_TYPE;
import static org.apache.paimon.TestKeyValueGenerator.KEY_TYPE;
import static org.apache.paimon.TestKeyValueGenerator.createTestSchemaManager;
+import static org.apache.paimon.io.DataFileTestUtils.newFile;
import static
org.apache.paimon.stats.StatsTestUtils.convertWithoutSchemaEvolution;
import static
org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory;
import static org.assertj.core.api.Assertions.assertThat;
@@ -78,7 +79,10 @@ public class KeyValueFileReadWriteTest {
public void testReadNonExistentFile() {
KeyValueFileReaderFactory readerFactory =
createReaderFactory(tempDir.toString(), "avro", null, null);
- assertThatThrownBy(() -> readerFactory.createRecordReader(0,
"dummy_file.avro", 1, 0, null))
+ assertThatThrownBy(
+ () ->
+ readerFactory.createRecordReader(
+ newFile("non_avro_file.avro", 0, 0, 1,
0)))
.hasMessageContaining(
"you can configure 'snapshot.time-retained' option
with a larger value.");
}
@@ -307,13 +311,7 @@ public class KeyValueFileReadWriteTest {
for (DataFileMeta meta : actualMetas) {
// check the contents of data file
CloseableIterator<KeyValue> actualKvsIterator =
- new RecordReaderIterator<>(
- readerFactory.createRecordReader(
- meta.schemaId(),
- meta.fileName(),
- meta.fileSize(),
- meta.level(),
- meta.externalPath()));
+ new
RecordReaderIterator<>(readerFactory.createRecordReader(meta));
while (actualKvsIterator.hasNext()) {
assertThat(expectedIterator.hasNext()).isTrue();
KeyValue actualKv = actualKvsIterator.next();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index fa96765a42..fa9628b4c1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -195,14 +195,7 @@ public class ContainsLevelsTest {
comparator,
keyType,
new LookupLevels.ContainsValueProcessor(),
- file ->
- createReaderFactory()
- .createRecordReader(
- 0,
- file.fileName(),
- file.fileSize(),
- file.level(),
- file.externalPath()),
+ file -> createReaderFactory().createRecordReader(file),
file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 56c45cfdc4..b68a82935b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -272,14 +272,7 @@ public class LookupLevelsTest {
comparator,
keyType,
new LookupLevels.KeyValueProcessor(rowType),
- file ->
- createReaderFactory()
- .createRecordReader(
- 0,
- file.fileName(),
- file.fileSize(),
- file.level(),
- file.externalPath()),
+ file -> createReaderFactory().createRecordReader(file),
file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 47d12ce47c..e987e2ee99 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -114,7 +114,7 @@ public abstract class MergeTreeTestBase {
pathFactory = createNonPartFactory(path);
comparator = Comparator.comparingInt(o -> o.getInt(0));
recreateMergeTree(1024 * 1024);
- Path bucketDir =
writerFactory.pathFactory(0).toPath("ignore").getParent();
+ Path bucketDir = writerFactory.pathFactory(0).newPath().getParent();
LocalFileIO.create().mkdirs(bucketDir);
}
@@ -418,7 +418,7 @@ public abstract class MergeTreeTestBase {
writer.close();
- Path bucketDir =
writerFactory.pathFactory(0).toPath("ignore").getParent();
+ Path bucketDir = writerFactory.pathFactory(0).newPath().getParent();
Set<String> files =
Arrays.stream(LocalFileIO.create().listStatus(bucketDir))
.map(FileStatus::getPath)
@@ -475,7 +475,7 @@ public abstract class MergeTreeTestBase {
writer.close();
- Path bucketDir =
writerFactory.pathFactory(0).toPath("ignore").getParent();
+ Path bucketDir = writerFactory.pathFactory(0).newPath().getParent();
Set<String> files =
Arrays.stream(LocalFileIO.create().listStatus(bucketDir))
.map(FileStatus::getPath)
@@ -592,7 +592,7 @@ public abstract class MergeTreeTestBase {
assertThat(remove).isTrue();
// See MergeTreeWriter.updateCompactResult
if (!newFileNames.contains(file.fileName()) &&
!afterFiles.contains(file.fileName())) {
- compactWriterFactory.deleteFile(file, file.level());
+ compactWriterFactory.deleteFile(file);
}
}
compactedFiles.addAll(increment.compactIncrement().compactAfter());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
index 6ca15cf150..c5cda2286d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
@@ -72,8 +72,9 @@ public class FileStorePathFactoryTest {
FileStorePathFactory pathFactory = createNonPartFactory(new
Path(tempDir.toString()));
DataFilePathFactory dataFilePathFactory =
pathFactory.createDataFilePathFactory(new BinaryRow(0), 123);
- assertThat(dataFilePathFactory.toPath("my-data-file-name"))
- .isEqualTo(new Path(tempDir.toString() +
"/bucket-123/my-data-file-name"));
+ assertThat(dataFilePathFactory.newPath("my-data-file-name").toString())
+ .startsWith(
+ new Path(tempDir.toString() +
"/bucket-123/my-data-file-name").toString());
}
@Test
@@ -116,9 +117,10 @@ public class FileStorePathFactoryTest {
writer.complete();
DataFilePathFactory dataFilePathFactory =
pathFactory.createDataFilePathFactory(partition, 123);
- assertThat(dataFilePathFactory.toPath("my-data-file-name"))
- .isEqualTo(
- new Path(tempDir.toString() + expected +
"/bucket-123/my-data-file-name"));
+ assertThat(dataFilePathFactory.newPath("my-data-file-name").toString())
+ .startsWith(
+ new Path(tempDir.toString() + expected +
"/bucket-123/my-data-file-name")
+ .toString());
}
public static FileStorePathFactory createNonPartFactory(Path root) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
index d916958412..5c0ac75e16 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
@@ -21,17 +21,26 @@ package org.apache.paimon.flink.clone;
/** The information of copy file. */
public class CloneFileInfo {
+ private final String sourceFilePath;
private final String filePathExcludeTableRoot;
private final String sourceIdentifier;
private final String targetIdentifier;
public CloneFileInfo(
- String filePathExcludeTableRoot, String sourceIdentifier, String
targetIdentifier) {
+ String sourceFilePath,
+ String filePathExcludeTableRoot,
+ String sourceIdentifier,
+ String targetIdentifier) {
+ this.sourceFilePath = sourceFilePath;
this.filePathExcludeTableRoot = filePathExcludeTableRoot;
this.sourceIdentifier = sourceIdentifier;
this.targetIdentifier = targetIdentifier;
}
+ public String getSourceFilePath() {
+ return sourceFilePath;
+ }
+
public String getFilePathExcludeTableRoot() {
return filePathExcludeTableRoot;
}
@@ -47,7 +56,7 @@ public class CloneFileInfo {
@Override
public String toString() {
return String.format(
- "{ filePath: %s, sourceIdentifier: %s, targetIdentifier: %s }",
- filePathExcludeTableRoot, sourceIdentifier, targetIdentifier);
+ "{ sourceFilePath: %s, filePathExcludeTableRoot: %s,
sourceIdentifier: %s, targetIdentifier: %s }",
+ sourceFilePath, filePathExcludeTableRoot, sourceIdentifier,
targetIdentifier);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
index 8bcaa2a207..e7002cce1e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
@@ -99,7 +99,7 @@ public class CopyFileOperator extends
AbstractStreamOperator<CloneFileInfo>
});
String filePathExcludeTableRoot =
cloneFileInfo.getFilePathExcludeTableRoot();
- Path sourcePath = new Path(sourceTableRootPath +
filePathExcludeTableRoot);
+ Path sourcePath = new Path(cloneFileInfo.getSourceFilePath());
Path targetPath = new Path(targetTableRootPath +
filePathExcludeTableRoot);
if (targetTableFileIO.exists(targetPath)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java
index 67eecbc6f2..f58d3acafd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java
@@ -123,7 +123,11 @@ public class PickFilesForCloneOperator extends
AbstractStreamOperator<CloneFileI
for (Path file : files) {
Path relativePath = getPathExcludeTableRoot(file, sourceTableRoot);
result.add(
- new CloneFileInfo(relativePath.toString(),
sourceIdentifier, targetIdentifier));
+ new CloneFileInfo(
+ file.toUri().toString(),
+ relativePath.toString(),
+ sourceIdentifier,
+ targetIdentifier));
}
return result;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
index b7e1e60878..9de974d047 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
@@ -108,7 +108,7 @@ public class PickFilesUtil {
pathFactory
.createDataFilePathFactory(
simpleFileEntry.partition(),
simpleFileEntry.bucket())
- .toPath(simpleFileEntry.fileName(),
simpleFileEntry.externalPath());
+ .toPath(simpleFileEntry);
dataFiles.add(dataFilePath);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
index 39860e418c..7f3f730280 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
@@ -167,12 +167,12 @@ public class ChangelogCompactTask implements Serializable
{
table.fileIO()
.rename(
changelogTempPath,
- dataFilePathFactory.toExtraFilePath(
- baseResult.meta,
+ dataFilePathFactory.toAlignedPath(
realName
+ "."
+
CompactedChangelogReadOnlyFormat.getIdentifier(
-
baseResult.meta.fileFormat())));
+ baseResult.meta.fileFormat()),
+ baseResult.meta));
List<Committable> newCommittables = new ArrayList<>();
@@ -194,9 +194,9 @@ public class ChangelogCompactTask implements Serializable {
+
CompactedChangelogReadOnlyFormat.getIdentifier(
result.meta.fileFormat());
if (result.isCompactResult) {
-
compactChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name));
+ compactChangelog.add(result.meta.rename(name));
} else {
-
newFilesChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name));
+ newFilesChangelog.add(result.meta.rename(name));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
index d35cb09cb7..99061d4b82 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
@@ -199,14 +199,13 @@ public class RewriteFileIndexSink extends
FlinkWriteSink<ManifestEntry> {
try (FileIndexFormat.Reader indexReader =
FileIndexFormat.createReader(
fileIO.newInputStream(
- dataFilePathFactory.toExtraFilePath(
- dataFileMeta, indexFile)),
+
dataFilePathFactory.toAlignedPath(indexFile, dataFileMeta)),
schemaInfo.fileSchema)) {
maintainers = indexReader.readAll();
}
newIndexPath =
createNewFileIndexFilePath(
-
dataFilePathFactory.toExtraFilePath(dataFileMeta, indexFile));
+ dataFilePathFactory.toAlignedPath(indexFile,
dataFileMeta));
} else {
maintainers = new HashMap<>();
newIndexPath =
dataFileToFileIndexPath(dataFilePathFactory.toPath(dataFileMeta));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
index 242a751416..dc6e5523b0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
@@ -102,7 +102,7 @@ public class RewriteFileIndexActionITCase extends
ActionITCaseBase {
table.store()
.pathFactory()
.createDataFilePathFactory(entry.partition(),
entry.bucket())
- .toPath(file);
+ .toAlignedPath(file, entry.file());
try (FileIndexFormat.Reader reader =
FileIndexFormat.createReader(
table.fileIO().newInputStream(indexFilePath),
table.rowType())) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java
index 1abfe355a5..23102f2a3d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java
@@ -199,7 +199,7 @@ public class RewriteFileIndexProcedureITCase extends
CatalogITCaseBase {
table.store()
.pathFactory()
.createDataFilePathFactory(entry.partition(),
entry.bucket())
- .toPath(file);
+ .toAlignedPath(file, entry.file());
try (FileIndexFormat.Reader reader =
FileIndexFormat.createReader(
table.fileIO().newInputStream(indexFilePath),
table.rowType())) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
index 6238a9cbf3..aa33e4fe75 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
@@ -160,8 +160,7 @@ public class
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
List<DataFileMeta> fileMetas =
((CommitMessageImpl)
commitMessage).compactIncrement().compactAfter();
for (DataFileMeta fileMeta : fileMetas) {
- Assertions.assertThat(
-
localFileIO.exists(dataFilePathFactory.toPath(fileMeta.fileName())))
+
Assertions.assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta)))
.isTrue();
}
if (i++ > 2) {
@@ -188,9 +187,7 @@ public class
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
List<DataFileMeta> fileMetas =
((CommitMessageImpl)
commitMessage).compactIncrement().compactAfter();
for (DataFileMeta fileMeta : fileMetas) {
- Assertions.assertThat(
- localFileIO.exists(
-
dataFilePathFactory.toPath(fileMeta.fileName())))
+
Assertions.assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta)))
.isFalse();
}
} catch (Exception e) {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
index 0360def685..99e95cf40e 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
@@ -185,7 +185,8 @@ public class SparkFileIndexITCase extends SparkWriteITCase {
try (FileIndexFormat.Reader reader =
FileIndexFormat.createReader(
fileIO.newInputStream(
-
dataFilePathFactory.toPath(indexFiles.get(0))),
+ dataFilePathFactory.toAlignedPath(
+ indexFiles.get(0),
dataFileMeta)),
tableSchema.logicalRowType())) {
Optional<FileIndexReader> fileIndexReader =
reader.readColumnIndex("a").stream().findFirst();