This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e1dcc85ed2 [core] Refactor FieldBunch in DataEvolutionSplitRead
e1dcc85ed2 is described below
commit e1dcc85ed2a81a37fde6904a6ae8da3a611e2aa8
Author: JingsongLi <[email protected]>
AuthorDate: Mon Sep 29 16:38:28 2025 +0800
[core] Refactor FieldBunch in DataEvolutionSplitRead
---
.../java/org/apache/paimon/io/DataFileMeta.java | 5 -
.../paimon/operation/DataEvolutionSplitRead.java | 156 ++++++++-------------
.../paimon/operation/FileStoreCommitImpl.java | 3 +-
.../table/source/DataEvolutionSplitGenerator.java | 5 +-
.../org/apache/paimon/append/BlobTableTest.java | 12 +-
.../paimon/operation/DataEvolutionReadTest.java | 37 ++---
.../apache/paimon/format/blob/BlobFileFormat.java | 4 +
7 files changed, 89 insertions(+), 133 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 7f266d4554..e7bb8b9571 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -21,7 +21,6 @@ package org.apache.paimon.io;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.format.blob.BlobFileFormatFactory;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.SimpleStats;
@@ -257,10 +256,6 @@ public interface DataFileMeta {
String fileName();
- default boolean isBlobFile() {
- return fileName().endsWith("." + BlobFileFormatFactory.IDENTIFIER);
- }
-
long fileSize();
long rowCount();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index ead9fa18e6..f2f15dcbbd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -44,7 +44,6 @@ import
org.apache.paimon.table.source.DataEvolutionSplitGenerator;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Either;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FormatReaderMapping;
import org.apache.paimon.utils.FormatReaderMapping.Builder;
@@ -62,6 +61,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static java.lang.String.format;
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -172,11 +172,12 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
Builder formatBuilder)
throws IOException {
List<FieldBunch> fieldsFiles =
- splitFieldBunch(
+ splitFieldBunches(
needMergeFiles,
file -> {
checkArgument(
- file.isBlobFile(), "Only blob file need to
call this method.");
+ isBlobFile(file.fileName()),
+ "Only blob file need to call this
method.");
return schemaFetcher
.apply(file.schemaId())
.logicalRowType()
@@ -185,14 +186,14 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
});
long rowCount = fieldsFiles.get(0).rowCount();
- long firstRowId = fieldsFiles.get(0).firstRowId();
+ long firstRowId = fieldsFiles.get(0).files().get(0).firstRowId();
- for (FieldBunch files : fieldsFiles) {
+ for (FieldBunch bunch : fieldsFiles) {
checkArgument(
- files.rowCount() == rowCount,
+ bunch.rowCount() == rowCount,
"All files in a field merge split should have the same row
count.");
checkArgument(
- files.firstRowId() == firstRowId,
+ bunch.files().get(0).firstRowId() == firstRowId,
"All files in a field merge split should have the same
first row id and could not be null.");
}
@@ -208,10 +209,11 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
Arrays.fill(fieldOffsets, -1);
for (int i = 0; i < fieldsFiles.size(); i++) {
- FieldBunch file = fieldsFiles.get(i);
- String formatIdentifier = file.formatIdentifier();
- long schemaId = file.schemaId();
- TableSchema dataSchema =
schemaFetcher.apply(schemaId).project(file.writeCols());
+ FieldBunch bunch = fieldsFiles.get(i);
+ DataFileMeta firstFile = bunch.files().get(0);
+ String formatIdentifier =
DataFilePathFactory.formatIdentifier(firstFile.fileName());
+ long schemaId = firstFile.schemaId();
+ TableSchema dataSchema =
schemaFetcher.apply(schemaId).project(firstFile.writeCols());
int[] fieldIds =
SpecialFields.rowTypeWithRowTracking(dataSchema.logicalRowType()).getFields()
.stream()
@@ -245,7 +247,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
readFields.stream().map(DataField::name).collect(Collectors.toList());
FormatReaderMapping formatReaderMapping =
formatReaderMappings.computeIfAbsent(
- new FormatKey(file.schemaId(),
formatIdentifier, readFieldNames),
+ new FormatKey(schemaId, formatIdentifier,
readFieldNames),
key ->
formatBuilder.build(
formatIdentifier,
@@ -256,7 +258,10 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
fileRecordReaders[i] =
new ForceSingleBatchReader(
createFileReader(
- partition, file, dataFilePathFactory,
formatReaderMapping));
+ partition,
+ bunch,
+ dataFilePathFactory,
+ formatReaderMapping));
}
}
@@ -296,16 +301,16 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
private RecordReader<InternalRow> createFileReader(
BinaryRow partition,
- FieldBunch files,
+ FieldBunch bunch,
DataFilePathFactory dataFilePathFactory,
FormatReaderMapping formatReaderMapping)
throws IOException {
- if (files.size() == 1) {
+ if (bunch.files().size() == 1) {
return createFileReader(
- partition, files.getFirstFile(), dataFilePathFactory,
formatReaderMapping);
+ partition, bunch.files().get(0), dataFilePathFactory,
formatReaderMapping);
}
List<ReaderSupplier<InternalRow>> readerSuppliers = new ArrayList<>();
- for (DataFileMeta file : files.files()) {
+ for (DataFileMeta file : bunch.files()) {
FormatReaderContext formatReaderContext =
new FormatReaderContext(
fileIO, dataFilePathFactory.toPath(file),
file.fileSize(), null);
@@ -350,13 +355,13 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
}
@VisibleForTesting
- public static List<FieldBunch> splitFieldBunch(
+ public static List<FieldBunch> splitFieldBunches(
List<DataFileMeta> needMergeFiles, Function<DataFileMeta, Integer>
blobFileToFieldId) {
List<FieldBunch> fieldsFiles = new ArrayList<>();
Map<Integer, BlobBunch> blobBunchMap = new HashMap<>();
long rowCount = -1;
for (DataFileMeta file : needMergeFiles) {
- if (file.isBlobFile()) {
+ if (isBlobFile(file.fileName())) {
int fieldId = blobFileToFieldId.apply(file);
final long expectedRowCount = rowCount;
blobBunchMap
@@ -364,83 +369,50 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
.add(file);
} else {
// Normal file, just add it to the current merge split
- fieldsFiles.add(FieldBunch.file(file));
+ fieldsFiles.add(new DataBunch(file));
rowCount = file.rowCount();
}
}
- blobBunchMap.values().forEach(blobBunch ->
fieldsFiles.add(FieldBunch.blob(blobBunch)));
+ fieldsFiles.addAll(blobBunchMap.values());
return fieldsFiles;
}
- /** Files for one field. */
- public static class FieldBunch {
- final Either<DataFileMeta, BlobBunch> fileOrBlob;
-
- FieldBunch(Either<DataFileMeta, BlobBunch> fileOrBlob) {
- this.fileOrBlob = fileOrBlob;
- }
-
- static FieldBunch file(DataFileMeta file) {
- return new FieldBunch(Either.left(file));
- }
-
- static FieldBunch blob(BlobBunch blob) {
- return new FieldBunch(Either.right(blob));
- }
+ /** Files for partial field. */
+ public interface FieldBunch {
- long rowCount() {
- return fileOrBlob.isLeft()
- ? fileOrBlob.getLeft().rowCount()
- : fileOrBlob.getRight().rowCount();
- }
+ long rowCount();
- long firstRowId() {
- return fileOrBlob.isLeft()
- ? fileOrBlob.getLeft().firstRowId()
- : fileOrBlob.getRight().firstRowId();
- }
+ List<DataFileMeta> files();
+ }
- List<String> writeCols() {
- return fileOrBlob.isLeft()
- ? fileOrBlob.getLeft().writeCols()
- : fileOrBlob.getRight().writeCols();
- }
+ private static class DataBunch implements FieldBunch {
- String formatIdentifier() {
- return fileOrBlob.isLeft()
- ?
DataFilePathFactory.formatIdentifier(fileOrBlob.getLeft().fileName())
- : "blob";
- }
+ private final DataFileMeta dataFile;
- long schemaId() {
- return fileOrBlob.isLeft()
- ? fileOrBlob.getLeft().schemaId()
- : fileOrBlob.getRight().schemaId();
+ private DataBunch(DataFileMeta dataFile) {
+ this.dataFile = dataFile;
}
- @VisibleForTesting
- public int size() {
- return fileOrBlob.isLeft() ? 1 :
fileOrBlob.getRight().files.size();
+ @Override
+ public long rowCount() {
+ return dataFile.rowCount();
}
- DataFileMeta getFirstFile() {
- return fileOrBlob.isLeft() ? fileOrBlob.getLeft() :
fileOrBlob.getRight().files.get(0);
- }
-
- List<DataFileMeta> files() {
- return fileOrBlob.isLeft()
- ? Collections.singletonList(fileOrBlob.getLeft())
- : fileOrBlob.getRight().files;
+ @Override
+ public List<DataFileMeta> files() {
+ return Collections.singletonList(dataFile);
}
}
@VisibleForTesting
- static class BlobBunch {
+ static class BlobBunch implements FieldBunch {
+
+ final List<DataFileMeta> files;
final long expectedRowCount;
- List<DataFileMeta> files;
+
long latestFistRowId = -1;
long expectedNextFirstRowId = -1;
- long lastestMaxSequenceNumber = -1;
+ long latestMaxSequenceNumber = -1;
long rowCount;
BlobBunch(long expectedRowCount) {
@@ -450,12 +422,12 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
}
void add(DataFileMeta file) {
- if (!file.isBlobFile()) {
+ if (!isBlobFile(file.fileName())) {
throw new IllegalArgumentException("Only blob file can be
added to a blob bunch.");
}
if (file.firstRowId() == latestFistRowId) {
- if (file.maxSequenceNumber() >= lastestMaxSequenceNumber) {
+ if (file.maxSequenceNumber() >= latestMaxSequenceNumber) {
throw new IllegalArgumentException(
"Blob file with same first row id should have
decreasing sequence number.");
}
@@ -465,7 +437,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
long firstRowId = file.firstRowId();
if (firstRowId < expectedNextFirstRowId) {
checkArgument(
- file.maxSequenceNumber() <
lastestMaxSequenceNumber,
+ file.maxSequenceNumber() < latestMaxSequenceNumber,
"Blob file with overlapping row id should have
decreasing sequence number.");
return;
} else if (firstRowId > expectedNextFirstRowId) {
@@ -487,37 +459,19 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
checkArgument(
rowCount <= expectedRowCount,
"Blob files row count exceed the expect " +
expectedRowCount);
- this.lastestMaxSequenceNumber = file.maxSequenceNumber();
+ this.latestMaxSequenceNumber = file.maxSequenceNumber();
this.latestFistRowId = file.firstRowId();
this.expectedNextFirstRowId = latestFistRowId + file.rowCount();
}
- long rowCount() {
+ @Override
+ public long rowCount() {
return rowCount;
}
- long firstRowId() {
- if (files.isEmpty()) {
- return -1;
- } else {
- return files.get(0).firstRowId();
- }
- }
-
- List<String> writeCols() {
- if (files.isEmpty()) {
- return new ArrayList<>();
- } else {
- return files.get(0).writeCols();
- }
- }
-
- long schemaId() {
- if (files.isEmpty()) {
- return -1;
- } else {
- return files.get(0).schemaId();
- }
+ @Override
+ public List<DataFileMeta> files() {
+ return files;
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index e311aa0b2e..f41de87aa9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -88,6 +88,7 @@ import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.manifest.ManifestEntry.recordCount;
import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
@@ -1217,7 +1218,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
"This is a bug, file source field for row-tracking table
must present.");
if (entry.file().fileSource().get().equals(FileSource.APPEND)
&& entry.file().firstRowId() == null) {
- if (entry.file().isBlobFile()) {
+ if (isBlobFile(entry.file().fileName())) {
if (blobStart >= start) {
throw new IllegalStateException(
String.format(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
index 18ae25592a..8b758166b5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
@@ -30,6 +30,7 @@ import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Append data evolution table split generator, which implementation of
{@link SplitGenerator}. */
@@ -86,7 +87,7 @@ public class DataEvolutionSplitGenerator implements
SplitGenerator {
value.firstRowId() == null
? Long.MIN_VALUE
: value.firstRowId())
- .thenComparingInt(f -> f.isBlobFile() ? 1 : 0)
+ .thenComparingInt(f -> isBlobFile(f.fileName()) ? 1 :
0)
.thenComparing(
(f1, f2) -> {
// If firstRowId is the same, we should
read the file with
@@ -106,7 +107,7 @@ public class DataEvolutionSplitGenerator implements
SplitGenerator {
splitByRowId.add(Collections.singletonList(file));
continue;
}
- if (!file.isBlobFile() && firstRowId != lastRowId) {
+ if (!isBlobFile(file.fileName()) && firstRowId != lastRowId) {
if (!currentSplit.isEmpty()) {
splitByRowId.add(currentSplit);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 31e19d4f5c..c6567577bf 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -57,11 +57,11 @@ public class BlobTableTest extends TableTestBase {
.collect(Collectors.toList());
List<DataEvolutionSplitRead.FieldBunch> fieldGroups =
- DataEvolutionSplitRead.splitFieldBunch(filesMetas, key -> 0);
+ DataEvolutionSplitRead.splitFieldBunches(filesMetas, key -> 0);
assertThat(fieldGroups.size()).isEqualTo(2);
- assertThat(fieldGroups.get(0).size()).isEqualTo(1);
- assertThat(fieldGroups.get(1).size()).isEqualTo(8);
+ assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
+ assertThat(fieldGroups.get(1).files().size()).isEqualTo(8);
readDefault(
row -> {
@@ -91,10 +91,10 @@ public class BlobTableTest extends TableTestBase {
assertThat(batches.size()).isEqualTo(2);
for (List<DataFileMeta> batch : batches) {
List<DataEvolutionSplitRead.FieldBunch> fieldGroups =
- DataEvolutionSplitRead.splitFieldBunch(batch, file -> 0);
+ DataEvolutionSplitRead.splitFieldBunches(batch, file -> 0);
assertThat(fieldGroups.size()).isEqualTo(2);
- assertThat(fieldGroups.get(0).size()).isEqualTo(1);
- assertThat(fieldGroups.get(1).size()).isEqualTo(8);
+ assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
+ assertThat(fieldGroups.get(1).files().size()).isEqualTo(8);
}
readDefault(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
index ce38d6e4b5..87fcb732ed 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
@@ -21,6 +21,8 @@ package org.apache.paimon.operation;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.operation.DataEvolutionSplitRead.BlobBunch;
+import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
@@ -32,17 +34,18 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static
org.apache.paimon.operation.DataEvolutionSplitRead.splitFieldBunches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Tests for {@link DataEvolutionSplitRead.BlobBunch}. */
+/** Tests for {@link BlobBunch}. */
public class DataEvolutionReadTest {
- private DataEvolutionSplitRead.BlobBunch blobBunch;
+ private BlobBunch blobBunch;
@BeforeEach
public void setUp() {
- blobBunch = new DataEvolutionSplitRead.BlobBunch(Long.MAX_VALUE);
+ blobBunch = new BlobBunch(Long.MAX_VALUE);
}
@Test
@@ -54,8 +57,8 @@ public class DataEvolutionReadTest {
assertThat(blobBunch.files).hasSize(1);
assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
assertThat(blobBunch.rowCount()).isEqualTo(100);
- assertThat(blobBunch.firstRowId()).isEqualTo(0);
- assertThat(blobBunch.writeCols()).isEqualTo(Arrays.asList("blob_col"));
+ assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
+
assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
}
@Test
@@ -70,9 +73,9 @@ public class DataEvolutionReadTest {
assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
assertThat(blobBunch.files.get(1)).isEqualTo(blobTail);
assertThat(blobBunch.rowCount()).isEqualTo(300);
- assertThat(blobBunch.firstRowId()).isEqualTo(0);
- assertThat(blobBunch.writeCols()).isEqualTo(Arrays.asList("blob_col"));
- assertThat(blobBunch.schemaId()).isEqualTo(0L);
+ assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
+
assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
+ assertThat(blobBunch.files.get(0).schemaId()).isEqualTo(0L);
}
@Test
@@ -176,8 +179,8 @@ public class DataEvolutionReadTest {
assertThat(blobBunch.files).hasSize(4);
assertThat(blobBunch.rowCount()).isEqualTo(1000);
- assertThat(blobBunch.firstRowId()).isEqualTo(0);
- assertThat(blobBunch.writeCols()).isEqualTo(Arrays.asList("blob_col"));
+ assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
+
assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
}
@Test
@@ -211,11 +214,10 @@ public class DataEvolutionReadTest {
assertThat(batch.get(8).fileName()).contains("blob4"); // skip
assertThat(batch.get(9).fileName()).contains("blob8"); // pick
- List<DataEvolutionSplitRead.FieldBunch> fieldBunches =
- DataEvolutionSplitRead.splitFieldBunch(batch, file -> 0);
+ List<FieldBunch> fieldBunches = splitFieldBunches(batch, file -> 0);
assertThat(fieldBunches.size()).isEqualTo(2);
- DataEvolutionSplitRead.BlobBunch blobBunch =
fieldBunches.get(1).fileOrBlob.getRight();
+ BlobBunch blobBunch = (BlobBunch) fieldBunches.get(1);
assertThat(blobBunch.files).hasSize(4);
assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
@@ -262,19 +264,18 @@ public class DataEvolutionReadTest {
List<DataFileMeta> batch = batches.get(0);
- List<DataEvolutionSplitRead.FieldBunch> fieldBunches =
- DataEvolutionSplitRead.splitFieldBunch(
- batch, file -> file.writeCols().get(0).hashCode());
+ List<FieldBunch> fieldBunches =
+ splitFieldBunches(batch, file ->
file.writeCols().get(0).hashCode());
assertThat(fieldBunches.size()).isEqualTo(3);
- DataEvolutionSplitRead.BlobBunch blobBunch =
fieldBunches.get(1).fileOrBlob.getRight();
+ BlobBunch blobBunch = (BlobBunch) fieldBunches.get(1);
assertThat(blobBunch.files).hasSize(4);
assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
- blobBunch = fieldBunches.get(2).fileOrBlob.getRight();
+ blobBunch = (BlobBunch) fieldBunches.get(2);
assertThat(blobBunch.files).hasSize(4);
assertThat(blobBunch.files.get(0).fileName()).contains("blob15");
assertThat(blobBunch.files.get(1).fileName()).contains("blob19");
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
index f6b5345ebf..29f5df13df 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
@@ -47,6 +47,10 @@ public class BlobFileFormat extends FileFormat {
super(BlobFileFormatFactory.IDENTIFIER);
}
+ public static boolean isBlobFile(String fileName) {
+ return fileName.endsWith("." + BlobFileFormatFactory.IDENTIFIER);
+ }
+
@Override
public FormatReaderFactory createReaderFactory(
RowType dataSchemaRowType,