This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4a2c3ce1f8 [core] Support blob read and write (#6344)
4a2c3ce1f8 is described below
commit 4a2c3ce1f838228237f9bbe52c07444b1a535a29
Author: YeJunHao <[email protected]>
AuthorDate: Mon Sep 29 13:50:51 2025 +0800
[core] Support blob read and write (#6344)
---
.../java/org/apache/paimon/types/BlobType.java | 10 +
.../main/java/org/apache/paimon/utils/Either.java | 217 +++++++++++++
.../org/apache/paimon/append/AppendOnlyWriter.java | 30 +-
.../apache/paimon/append/MergeAllBatchReader.java | 110 +++++++
.../java/org/apache/paimon/io/DataFileMeta.java | 4 +
.../paimon/operation/BaseAppendFileStoreWrite.java | 3 +-
.../paimon/operation/DataEvolutionSplitRead.java | 258 ++++++++++++++-
.../paimon/operation/FileStoreCommitImpl.java | 20 +-
.../org/apache/paimon/schema/SchemaValidation.java | 12 +-
.../table/source/DataEvolutionSplitGenerator.java | 3 +-
.../apache/paimon/append/AppendOnlyWriterTest.java | 9 +-
.../org/apache/paimon/append/BlobTableTest.java | 130 ++++++++
.../apache/paimon/format/FileFormatSuffixTest.java | 8 +-
.../paimon/operation/DataEvolutionReadTest.java | 351 +++++++++++++++++++++
.../org/apache/paimon/table/TableTestBase.java | 9 +
.../org.apache.paimon.format.FileFormatFactory | 1 +
16 files changed, 1139 insertions(+), 36 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
index 7f3fa010c4..b5ead703f0 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
@@ -82,4 +82,14 @@ public final class BlobType extends DataType {
return Pair.of(new RowType(normalFields), new RowType(blobFields));
}
+
+ public static boolean containsBlobType(RowType rowType) {
+ for (DataField field : rowType.getFields()) {
+ DataTypeRoot type = field.type().getTypeRoot();
+ if (type == DataTypeRoot.BLOB) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Either.java
b/paimon-common/src/main/java/org/apache/paimon/utils/Either.java
new file mode 100644
index 0000000000..1b7435290f
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Either.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+/**
+ * A functional programming utility class that represents a value of one of
two possible types. This
+ * is a discriminated union type that can hold either a Left value (typically
representing an error
+ * or failure case) or a Right value (typically representing a success or
correct result).
+ *
+ * <p>The Either type is commonly used in functional programming to handle
operations that can
+ * either succeed or fail, providing a type-safe alternative to exceptions or
null values.
+ *
+ * <p>Usage examples:
+ *
+ * <pre>{@code
+ * // Create a Left (error case)
+ * Either<String, Integer> error = Either.left("Invalid input");
+ *
+ * // Create a Right (success case)
+ * Either<String, Integer> success = Either.right(42);
+ *
+ * // Check and handle the result
+ * if (result.isRight()) {
+ * Integer value = result.getRight();
+ * // Process successful result
+ * } else {
+ * String error = result.getLeft();
+ * // Handle error case
+ * }
+ * }</pre>
+ *
+ * @param <L> the type of the Left value (typically error/failure)
+ * @param <R> the type of the Right value (typically success/result)
+ */
+public abstract class Either<L, R> {
+
+ /** Private constructor to prevent direct instantiation. */
+ private Either() {}
+
+ /**
+ * Static factory method to create a Left-sided Either instance. Typically
represents a failure
+ * or error.
+ *
+ * @param value The left-side value.
+ * @param <L> The type of the left value.
+ * @param <R> The type of the right value.
+ * @return A new Left instance.
+ */
+ public static <L, R> Either<L, R> left(L value) {
+ return new Left<>(value);
+ }
+
+ /**
+ * Static factory method to create a Right-sided Either instance.
Typically represents a
+ * successful or correct result.
+ *
+ * @param value The right-side value.
+ * @param <L> The type of the left value.
+ * @param <R> The type of the right value.
+ * @return A new Right instance.
+ */
+ public static <L, R> Either<L, R> right(R value) {
+ return new Right<>(value);
+ }
+
+ /**
+ * Checks if this instance is a Left.
+ *
+ * @return true if Left, false otherwise.
+ */
+ public abstract boolean isLeft();
+
+ /**
+ * Checks if this instance is a Right.
+ *
+ * @return true if Right, false otherwise.
+ */
+ public abstract boolean isRight();
+
+ /**
+ * Returns the left value if this is a Left instance.
+ *
+ * @return The left value.
+ * @throws NoSuchElementException if this is a Right instance.
+ */
+ public abstract L getLeft();
+
+ /**
+ * Returns the right value if this is a Right instance.
+ *
+ * @return The right value.
+ * @throws NoSuchElementException if this is a Left instance.
+ */
+ public abstract R getRight();
+
+ /**
+ * If this is a Left, performs the given action on its value.
+ *
+ * @param action The consumer function to execute.
+ */
+ public abstract void ifLeft(Consumer<L> action);
+
+ /**
+ * If this is a Right, performs the given action on its value.
+ *
+ * @param action The consumer function to execute.
+ */
+ public abstract void ifRight(Consumer<R> action);
+
+ /** Private static inner class representing the Left state of Either. */
+ private static final class Left<L, R> extends Either<L, R> {
+ private final L value;
+
+ private Left(L value) {
+ this.value = Objects.requireNonNull(value, "Left value cannot be
null");
+ }
+
+ @Override
+ public boolean isLeft() {
+ return true;
+ }
+
+ @Override
+ public boolean isRight() {
+ return false;
+ }
+
+ @Override
+ public L getLeft() {
+ return value;
+ }
+
+ @Override
+ public R getRight() {
+ throw new NoSuchElementException("Cannot call getRight() on a Left
instance");
+ }
+
+ @Override
+ public void ifLeft(Consumer<L> action) {
+ action.accept(value);
+ }
+
+ @Override
+ public void ifRight(Consumer<R> action) {
+ // Do nothing
+ }
+
+ @Override
+ public String toString() {
+ return "Left(" + value + ")";
+ }
+ }
+
+ /** Private static inner class representing the Right state of Either. */
+ private static final class Right<L, R> extends Either<L, R> {
+ private final R value;
+
+ private Right(R value) {
+ this.value = Objects.requireNonNull(value, "Right value cannot be
null");
+ }
+
+ @Override
+ public boolean isLeft() {
+ return false;
+ }
+
+ @Override
+ public boolean isRight() {
+ return true;
+ }
+
+ @Override
+ public L getLeft() {
+ throw new NoSuchElementException("Cannot call getLeft() on a Right
instance");
+ }
+
+ @Override
+ public R getRight() {
+ return value;
+ }
+
+ @Override
+ public void ifLeft(Consumer<L> action) {
+ // Do nothing
+ }
+
+ @Override
+ public void ifRight(Consumer<R> action) {
+ action.accept(value);
+ }
+
+ @Override
+ public String toString() {
+ return "Right(" + value + ")";
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index f8a074cab8..4ad04b2610 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -33,13 +33,14 @@ import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReaderIterator;
-import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BatchRecordWriter;
import org.apache.paimon.utils.CommitIncrement;
@@ -50,6 +51,7 @@ import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SinkWriter;
import org.apache.paimon.utils.SinkWriter.BufferedSinkWriter;
import org.apache.paimon.utils.SinkWriter.DirectSinkWriter;
+import org.apache.paimon.utils.StatsCollectorFactories;
import javax.annotation.Nullable;
@@ -84,7 +86,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
private final LongCounter seqNumCounter;
private final String fileCompression;
private final CompressOptions spillCompression;
- private final SimpleColStatsCollector.Factory[] statsCollectors;
+ private final StatsCollectorFactories statsCollectorFactories;
@Nullable private final IOManager ioManager;
private final FileIndexOptions fileIndexOptions;
private final MemorySize maxDiskSize;
@@ -111,7 +113,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
boolean spillable,
String fileCompression,
CompressOptions spillCompression,
- SimpleColStatsCollector.Factory[] statsCollectors,
+ StatsCollectorFactories statsCollectorFactories,
MemorySize maxDiskSize,
FileIndexOptions fileIndexOptions,
boolean asyncFileWrite,
@@ -136,7 +138,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
this.fileCompression = fileCompression;
this.spillCompression = spillCompression;
this.ioManager = ioManager;
- this.statsCollectors = statsCollectors;
+ this.statsCollectorFactories = statsCollectorFactories;
this.maxDiskSize = maxDiskSize;
this.fileIndexOptions = fileIndexOptions;
@@ -289,7 +291,23 @@ public class AppendOnlyWriter implements
BatchRecordWriter, MemoryOwner {
}
}
- private RowDataRollingFileWriter createRollingRowWriter() {
+ private RollingFileWriter<InternalRow, DataFileMeta>
createRollingRowWriter() {
+ if (BlobType.containsBlobType(writeSchema)) {
+ return new RollingBlobFileWriter(
+ fileIO,
+ schemaId,
+ fileFormat,
+ targetFileSize,
+ writeSchema,
+ pathFactory,
+ seqNumCounter,
+ fileCompression,
+ statsCollectorFactories,
+ fileIndexOptions,
+ FileSource.APPEND,
+ asyncFileWrite,
+ statsDenseStore);
+ }
return new RowDataRollingFileWriter(
fileIO,
schemaId,
@@ -299,7 +317,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
pathFactory,
seqNumCounter,
fileCompression,
- statsCollectors,
+
statsCollectorFactories.statsCollectors(writeSchema.getFieldNames()),
fileIndexOptions,
FileSource.APPEND,
asyncFileWrite,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/MergeAllBatchReader.java
b/paimon-core/src/main/java/org/apache/paimon/append/MergeAllBatchReader.java
new file mode 100644
index 0000000000..911eafd3b4
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/append/MergeAllBatchReader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * A record reader that merges all batches from a multi-batch reader into a
single concatenated
+ * batch. This reader wraps another RecordReader that produces multiple
batches and presents them as
+ * a single continuous stream of records.
+ *
+ * <p>The MergeAllBatchReader is particularly useful in scenarios where you
need to process multiple
+ * batches as a unified data stream, such as when reading from multiple files
or partitions that
+ * should be treated as a single logical dataset.
+ *
+ * <p>Key features:
+ *
+ * <ul>
+ * <li>Concatenates all batches from the underlying reader into one
continuous batch
+ * <li>Automatically handles batch transitions and resource cleanup
+ * <li>Provides a single readBatch() call that returns all data
+ * <li>Properly manages memory by releasing batches after consumption
+ * </ul>
+ *
+ * <p>This reader is commonly used in data evolution scenarios where multiple
file formats or
+ * schemas need to be read as a unified stream.
+ */
+public class MergeAllBatchReader implements RecordReader<InternalRow> {
+
+ private final RecordReader<InternalRow> multiBatchReader;
+ private ConcatBatch batch;
+
+ public MergeAllBatchReader(RecordReader<InternalRow> multiBatchReader) {
+ this.multiBatchReader = multiBatchReader;
+ this.batch = new ConcatBatch(multiBatchReader);
+ }
+
+ @Override
+ @Nullable
+ public RecordIterator<InternalRow> readBatch() throws IOException {
+ RecordIterator<InternalRow> returned = batch;
+ batch = null;
+ return returned;
+ }
+
+ @Override
+ public void close() throws IOException {
+ multiBatchReader.close();
+ }
+
+ private static class ConcatBatch implements RecordIterator<InternalRow> {
+
+ private final RecordReader<InternalRow> reader;
+ private RecordIterator<InternalRow> currentBatch;
+
+ private ConcatBatch(RecordReader<InternalRow> reader) {
+ this.reader = reader;
+ }
+
+ @Nullable
+ @Override
+ public InternalRow next() throws IOException {
+ if (currentBatch == null) {
+ currentBatch = reader.readBatch();
+ if (currentBatch == null) {
+ return null;
+ }
+ }
+
+ InternalRow next = currentBatch.next();
+
+ if (next == null) {
+ currentBatch.releaseBatch();
+ currentBatch = null;
+ return next();
+ }
+
+ return next;
+ }
+
+ @Override
+ public void releaseBatch() {
+ if (currentBatch != null) {
+ currentBatch.releaseBatch();
+ currentBatch = null;
+ }
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index e7bb8b9571..68ca8b20a0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -256,6 +256,10 @@ public interface DataFileMeta {
String fileName();
+ default boolean isBlob() {
+ return fileName().endsWith(".blob");
+ }
+
long fileSize();
long rowCount();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index 3bd8264831..98d699cb9d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -43,6 +43,7 @@ import org.apache.paimon.utils.IOExceptionSupplier;
import org.apache.paimon.utils.LongCounter;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.StatsCollectorFactories;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,7 +132,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
options.writeBufferSpillable() || forceBufferSpill,
options.fileCompression(),
options.spillCompressOptions(),
- statsCollectors(),
+ new StatsCollectorFactories(options),
options.writeBufferSpillDiskSize(),
fileIndexOptions,
options.asyncFileWrite(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index f4c426e86f..dd2809da7e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -18,6 +18,8 @@
package org.apache.paimon.operation;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.append.MergeAllBatchReader;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
@@ -42,6 +44,7 @@ import
org.apache.paimon.table.source.DataEvolutionSplitGenerator;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Either;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FormatReaderMapping;
import org.apache.paimon.utils.FormatReaderMapping.Builder;
@@ -51,9 +54,11 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static java.lang.String.format;
@@ -68,11 +73,11 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
public class DataEvolutionSplitRead implements SplitRead<InternalRow> {
private final FileIO fileIO;
- private final SchemaManager schemaManager;
private final TableSchema schema;
private final FileFormatDiscover formatDiscover;
private final FileStorePathFactory pathFactory;
private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
+ private final Function<Long, TableSchema> schemaFetcher;
protected RowType readRowType;
@@ -84,7 +89,9 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory) {
this.fileIO = fileIO;
- this.schemaManager = schemaManager;
+ final Map<Long, TableSchema> cache = new HashMap<>();
+ this.schemaFetcher =
+ schemaId -> cache.computeIfAbsent(schemaId, key ->
schemaManager.schema(schemaId));
this.schema = schema;
this.formatDiscover = formatDiscover;
this.pathFactory = pathFactory;
@@ -110,6 +117,8 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
@Override
public SplitRead<InternalRow> withFilter(@Nullable Predicate predicate) {
+ // TODO: Support File index push down (all conditions) and Predicate
push down (only if no
+ // column merge)
return this;
}
@@ -162,20 +171,34 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
DataFilePathFactory dataFilePathFactory,
Builder formatBuilder)
throws IOException {
- long rowCount = needMergeFiles.get(0).rowCount();
- long firstRowId = needMergeFiles.get(0).firstRowId();
- for (DataFileMeta file : needMergeFiles) {
+ List<FieldBunch> fieldsFiles =
+ splitFieldBunch(
+ needMergeFiles,
+ file -> {
+ checkArgument(
+ file.isBlob(), "Only blob file need to
call this method.");
+ return schemaFetcher
+ .apply(file.schemaId())
+ .logicalRowType()
+ .getField(file.writeCols().get(0))
+ .id();
+ });
+
+ long rowCount = fieldsFiles.get(0).rowCount();
+ long firstRowId = fieldsFiles.get(0).firstRowId();
+
+ for (FieldBunch files : fieldsFiles) {
checkArgument(
- file.rowCount() == rowCount,
+ files.rowCount() == rowCount,
"All files in a field merge split should have the same row
count.");
checkArgument(
- file.firstRowId() == firstRowId,
+ files.firstRowId() == firstRowId,
"All files in a field merge split should have the same
first row id and could not be null.");
}
// Init all we need to create a compound reader
List<DataField> allReadFields = readRowType.getFields();
- RecordReader<InternalRow>[] fileRecordReaders = new
RecordReader[needMergeFiles.size()];
+ RecordReader<InternalRow>[] fileRecordReaders = new
RecordReader[fieldsFiles.size()];
int[] readFieldIndex =
allReadFields.stream().mapToInt(DataField::id).toArray();
// which row the read field index belongs to
int[] rowOffsets = new int[allReadFields.size()];
@@ -184,11 +207,11 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
Arrays.fill(rowOffsets, -1);
Arrays.fill(fieldOffsets, -1);
- for (int i = 0; i < needMergeFiles.size(); i++) {
- DataFileMeta file = needMergeFiles.get(i);
- String formatIdentifier =
DataFilePathFactory.formatIdentifier(file.fileName());
+ for (int i = 0; i < fieldsFiles.size(); i++) {
+ FieldBunch file = fieldsFiles.get(i);
+ String formatIdentifier = file.formatIdentifier();
long schemaId = file.schemaId();
- TableSchema dataSchema =
schemaManager.schema(schemaId).project(file.writeCols());
+ TableSchema dataSchema =
schemaFetcher.apply(schemaId).project(file.writeCols());
int[] fieldIds =
SpecialFields.rowTypeWithRowTracking(dataSchema.logicalRowType()).getFields()
.stream()
@@ -231,7 +254,9 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
readFields,
false));
fileRecordReaders[i] =
- createFileReader(partition, file, dataFilePathFactory,
formatReaderMapping);
+ new MergeAllBatchReader(
+ createFileReader(
+ partition, file, dataFilePathFactory,
formatReaderMapping));
}
}
@@ -265,10 +290,43 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
schema,
schemaId == schema.id()
? schema
- :
schemaManager.schema(schemaId)));
+ :
schemaFetcher.apply(schemaId)));
return createFileReader(partition, file, dataFilePathFactory,
formatReaderMapping);
}
+ private RecordReader<InternalRow> createFileReader(
+ BinaryRow partition,
+ FieldBunch files,
+ DataFilePathFactory dataFilePathFactory,
+ FormatReaderMapping formatReaderMapping)
+ throws IOException {
+ if (files.size() == 1) {
+ return createFileReader(
+ partition, files.getFirstFile(), dataFilePathFactory,
formatReaderMapping);
+ }
+ List<ReaderSupplier<InternalRow>> readerSuppliers = new ArrayList<>();
+ for (DataFileMeta file : files.files()) {
+ FormatReaderContext formatReaderContext =
+ new FormatReaderContext(
+ fileIO, dataFilePathFactory.toPath(file),
file.fileSize(), null);
+ readerSuppliers.add(
+ () ->
+ new DataFileRecordReader(
+ schema.logicalRowType(),
+ formatReaderMapping.getReaderFactory(),
+ formatReaderContext,
+ formatReaderMapping.getIndexMapping(),
+ formatReaderMapping.getCastMapping(),
+ PartitionUtils.create(
+
formatReaderMapping.getPartitionPair(), partition),
+ true,
+ file.firstRowId(),
+ file.maxSequenceNumber(),
+ formatReaderMapping.getSystemFields()));
+ }
+ return ConcatRecordReader.create(readerSuppliers);
+ }
+
private FileRecordReader<InternalRow> createFileReader(
BinaryRow partition,
DataFileMeta file,
@@ -290,4 +348,176 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
file.maxSequenceNumber(),
formatReaderMapping.getSystemFields());
}
+
+ @VisibleForTesting
+ public static List<FieldBunch> splitFieldBunch(
+ List<DataFileMeta> needMergeFiles, Function<DataFileMeta, Integer>
blobFileToFieldId) {
+ List<FieldBunch> fieldsFiles = new ArrayList<>();
+ Map<Integer, BlobBunch> blobBunchMap = new HashMap<>();
+ long rowCount = -1;
+ for (DataFileMeta file : needMergeFiles) {
+ if (file.isBlob()) {
+ int fieldId = blobFileToFieldId.apply(file);
+ final long expectedRowCount = rowCount;
+ blobBunchMap
+ .computeIfAbsent(fieldId, key -> new
BlobBunch(expectedRowCount))
+ .add(file);
+ } else {
+ // Normal file, just add it to the current merge split
+ fieldsFiles.add(FieldBunch.file(file));
+ rowCount = file.rowCount();
+ }
+ }
+ blobBunchMap.values().forEach(blobBunch ->
fieldsFiles.add(FieldBunch.blob(blobBunch)));
+ return fieldsFiles;
+ }
+
+ /** Files for one field. */
+ public static class FieldBunch {
+ final Either<DataFileMeta, BlobBunch> fileOrBlob;
+
+ FieldBunch(Either<DataFileMeta, BlobBunch> fileOrBlob) {
+ this.fileOrBlob = fileOrBlob;
+ }
+
+ static FieldBunch file(DataFileMeta file) {
+ return new FieldBunch(Either.left(file));
+ }
+
+ static FieldBunch blob(BlobBunch blob) {
+ return new FieldBunch(Either.right(blob));
+ }
+
+ long rowCount() {
+ return fileOrBlob.isLeft()
+ ? fileOrBlob.getLeft().rowCount()
+ : fileOrBlob.getRight().rowCount();
+ }
+
+ long firstRowId() {
+ return fileOrBlob.isLeft()
+ ? fileOrBlob.getLeft().firstRowId()
+ : fileOrBlob.getRight().firstRowId();
+ }
+
+ List<String> writeCols() {
+ return fileOrBlob.isLeft()
+ ? fileOrBlob.getLeft().writeCols()
+ : fileOrBlob.getRight().writeCols();
+ }
+
+ String formatIdentifier() {
+ return fileOrBlob.isLeft()
+ ?
DataFilePathFactory.formatIdentifier(fileOrBlob.getLeft().fileName())
+ : "blob";
+ }
+
+ long schemaId() {
+ return fileOrBlob.isLeft()
+ ? fileOrBlob.getLeft().schemaId()
+ : fileOrBlob.getRight().schemaId();
+ }
+
+ @VisibleForTesting
+ public int size() {
+ return fileOrBlob.isLeft() ? 1 :
fileOrBlob.getRight().files.size();
+ }
+
+ DataFileMeta getFirstFile() {
+ return fileOrBlob.isLeft() ? fileOrBlob.getLeft() :
fileOrBlob.getRight().files.get(0);
+ }
+
+ List<DataFileMeta> files() {
+ return fileOrBlob.isLeft()
+ ? Collections.singletonList(fileOrBlob.getLeft())
+ : fileOrBlob.getRight().files;
+ }
+ }
+
+ @VisibleForTesting
+ static class BlobBunch {
+ final long expectedRowCount;
+ List<DataFileMeta> files;
+ long latestFistRowId = -1;
+ long expectedNextFirstRowId = -1;
+ long lastestMaxSequenceNumber = -1;
+ long rowCount;
+
+ BlobBunch(long expectedRowCount) {
+ this.files = new ArrayList<>();
+ this.rowCount = 0;
+ this.expectedRowCount = expectedRowCount;
+ }
+
+ void add(DataFileMeta file) {
+ if (!file.isBlob()) {
+ throw new IllegalArgumentException("Only blob file can be
added to a blob bunch.");
+ }
+
+ if (file.firstRowId() == latestFistRowId) {
+ if (file.maxSequenceNumber() >= lastestMaxSequenceNumber) {
+ throw new IllegalArgumentException(
+ "Blob file with same first row id should have
decreasing sequence number.");
+ }
+ return;
+ }
+ if (!files.isEmpty()) {
+ long firstRowId = file.firstRowId();
+ if (firstRowId < expectedNextFirstRowId) {
+ checkArgument(
+ file.maxSequenceNumber() <
lastestMaxSequenceNumber,
+ "Blob file with overlapping row id should have
decreasing sequence number.");
+ return;
+ } else if (firstRowId > expectedNextFirstRowId) {
+ throw new IllegalArgumentException(
+ "Blob file first row id should be continuous,
expect "
+ + expectedNextFirstRowId
+ + " but got "
+ + firstRowId);
+ }
+ checkArgument(
+ file.schemaId() == files.get(0).schemaId(),
+ "All files in a blob bunch should have the same schema
id.");
+ checkArgument(
+ file.writeCols().equals(files.get(0).writeCols()),
+ "All files in a blob bunch should have the same write
columns.");
+ }
+ files.add(file);
+ rowCount += file.rowCount();
+ checkArgument(
+ rowCount <= expectedRowCount,
+ "Blob files row count exceed the expect " +
expectedRowCount);
+ this.lastestMaxSequenceNumber = file.maxSequenceNumber();
+ this.latestFistRowId = file.firstRowId();
+ this.expectedNextFirstRowId = latestFistRowId + file.rowCount();
+ }
+
+ long rowCount() {
+ return rowCount;
+ }
+
+ long firstRowId() {
+ if (files.isEmpty()) {
+ return -1;
+ } else {
+ return files.get(0).firstRowId();
+ }
+ }
+
+ List<String> writeCols() {
+ if (files.isEmpty()) {
+ return new ArrayList<>();
+ } else {
+ return files.get(0).writeCols();
+ }
+ }
+
+ long schemaId() {
+ if (files.isEmpty()) {
+ return -1;
+ } else {
+ return files.get(0).schemaId();
+ }
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 466df201a3..870bd498c1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -1210,15 +1210,29 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
// assign row id for new files
long start = firstRowIdStart;
+ long blobStart = firstRowIdStart;
for (ManifestEntry entry : deltaFiles) {
checkArgument(
entry.file().fileSource().isPresent(),
"This is a bug, file source field for row-tracking table
must present.");
if (entry.file().fileSource().get().equals(FileSource.APPEND)
&& entry.file().firstRowId() == null) {
- long rowCount = entry.file().rowCount();
- rowIdAssigned.add(entry.assignFirstRowId(start));
- start += rowCount;
+ if (entry.file().isBlob()) {
+ if (blobStart >= start) {
+ throw new IllegalStateException(
+ String.format(
+ "This is a bug, blobStart %d should be
less than start %d when assigning a blob entry file.",
+ blobStart, start));
+ }
+ long rowCount = entry.file().rowCount();
+ rowIdAssigned.add(entry.assignFirstRowId(blobStart));
+ blobStart += rowCount;
+ } else {
+ long rowCount = entry.file().rowCount();
+ rowIdAssigned.add(entry.assignFirstRowId(start));
+ blobStart = start;
+ start += rowCount;
+ }
} else {
// for compact file, do not assign first row id.
rowIdAssigned.add(entry);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 09de891a37..0521351a44 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -30,6 +30,7 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
@@ -159,7 +160,7 @@ public class SchemaValidation {
FileFormat fileFormat =
FileFormat.fromIdentifier(options.formatType(), new
Options(schema.options()));
- fileFormat.validateDataFields(new RowType(schema.fields()));
+ fileFormat.validateDataFields(BlobType.splitBlob(new
RowType(schema.fields())).getLeft());
// Check column names in schema
schema.fieldNames()
@@ -649,6 +650,15 @@ public class SchemaValidation {
!options.deletionVectorsEnabled(),
"Data evolution config must disabled with
deletion-vectors.enabled");
}
+
+ if (BlobType.containsBlobType(schema.logicalRowType())) {
+ checkArgument(
+ options.dataEvolutionEnabled(),
+ "Data evolution config must enabled for table with BLOB
type column.");
+ checkArgument(
+
BlobType.splitBlob(schema.logicalRowType()).getRight().getFieldCount() == 1,
+ "Table with BLOB type column only support one BLOB
column.");
+ }
}
private static void validateIncrementalClustering(TableSchema schema,
CoreOptions options) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
index 39fa2d9ce7..c2f115d32c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
@@ -86,6 +86,7 @@ public class DataEvolutionSplitGenerator implements
SplitGenerator {
value.firstRowId() == null
? Long.MIN_VALUE
: value.firstRowId())
+ .thenComparingInt(f -> f.isBlob() ? 1 : 0)
.thenComparing(
(f1, f2) -> {
// If firstRowId is the same, we should
read the file with
@@ -105,7 +106,7 @@ public class DataEvolutionSplitGenerator implements
SplitGenerator {
splitByRowId.add(Collections.singletonList(file));
continue;
}
- if (firstRowId != lastRowId) {
+ if (!file.isBlob() && firstRowId != lastRowId) {
if (!currentSplit.isEmpty()) {
splitByRowId.add(currentSplit);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index ad91a3c70a..ce3a2d4e85 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -62,7 +62,6 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
@@ -613,7 +612,8 @@ public class AppendOnlyWriterTest {
generateCompactAfter(compactBefore));
},
null);
- CoreOptions options = new CoreOptions(new HashMap<>());
+ CoreOptions options =
+ new
CoreOptions(Collections.singletonMap("metadata.stats-mode", "truncate(16)"));
AppendOnlyWriter writer =
new AppendOnlyWriter(
LocalFileIO.create(),
@@ -633,10 +633,7 @@ public class AppendOnlyWriterTest {
spillable,
CoreOptions.FILE_COMPRESSION.defaultValue(),
CompressOptions.defaultOptions(),
- StatsCollectorFactories.createStatsFactories(
- "truncate(16)",
- options,
- AppendOnlyWriterTest.SCHEMA.getFieldNames()),
+ new StatsCollectorFactories(options),
MemorySize.MAX_VALUE,
new FileIndexOptions(),
true,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
new file mode 100644
index 0000000000..31e19d4f5c
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.DataEvolutionSplitRead;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for table with blob. */
+public class BlobTableTest extends TableTestBase {
+
+ private final byte[] blobBytes = randomBytes();
+
+ @Test
+ public void testBasic() throws Exception {
+ createTableDefault();
+
+ commitDefault(writeDataDefault(100, 1));
+
+ AtomicInteger integer = new AtomicInteger(0);
+
+ List<DataFileMeta> filesMetas =
+ getTableDefault().store().newScan().plan().files().stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList());
+
+ List<DataEvolutionSplitRead.FieldBunch> fieldGroups =
+ DataEvolutionSplitRead.splitFieldBunch(filesMetas, key -> 0);
+
+ assertThat(fieldGroups.size()).isEqualTo(2);
+ assertThat(fieldGroups.get(0).size()).isEqualTo(1);
+ assertThat(fieldGroups.get(1).size()).isEqualTo(8);
+
+ readDefault(
+ row -> {
+ integer.incrementAndGet();
+ if (integer.get() % 50 == 0) {
+
assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+ }
+ });
+
+ assertThat(integer.get()).isEqualTo(100);
+ }
+
+ @Test
+ public void testMultiBatch() throws Exception {
+ createTableDefault();
+
+ commitDefault(writeDataDefault(100, 2));
+
+ AtomicInteger integer = new AtomicInteger(0);
+
+ List<DataFileMeta> filesMetas =
+ getTableDefault().store().newScan().plan().files().stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList());
+
+ List<List<DataFileMeta>> batches =
DataEvolutionSplitGenerator.split(filesMetas);
+ assertThat(batches.size()).isEqualTo(2);
+ for (List<DataFileMeta> batch : batches) {
+ List<DataEvolutionSplitRead.FieldBunch> fieldGroups =
+ DataEvolutionSplitRead.splitFieldBunch(batch, file -> 0);
+ assertThat(fieldGroups.size()).isEqualTo(2);
+ assertThat(fieldGroups.get(0).size()).isEqualTo(1);
+ assertThat(fieldGroups.get(1).size()).isEqualTo(8);
+ }
+
+ readDefault(
+ row -> {
+ if (integer.getAndIncrement() % 50 == 0) {
+
assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+ }
+ });
+ assertThat(integer.get()).isEqualTo(200);
+ }
+
+ protected Schema schemaDefault() {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ return schemaBuilder.build();
+ }
+
+ protected InternalRow dataDefault(int time, int size) {
+ return GenericRow.of(RANDOM.nextInt(), randomString(), new
BlobData(blobBytes));
+ }
+
+ @Override
+ protected byte[] randomBytes() {
+ byte[] binary = new byte[2 * 1024 * 1024];
+ RANDOM.nextBytes(binary);
+ return binary;
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index ca0b65b2ee..a4dfdfd733 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -45,7 +45,7 @@ import org.apache.paimon.utils.StatsCollectorFactories;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.LinkedList;
import static org.assertj.core.api.Assertions.assertThat;
@@ -76,7 +76,8 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
null);
FileFormat fileFormat = FileFormat.fromIdentifier(format, new
Options());
LinkedList<DataFileMeta> toCompact = new LinkedList<>();
- CoreOptions options = new CoreOptions(new HashMap<>());
+ CoreOptions options =
+ new
CoreOptions(Collections.singletonMap("metadata.stats-mode", "truncate(16)"));
AppendOnlyWriter appendOnlyWriter =
new AppendOnlyWriter(
LocalFileIO.create(),
@@ -97,8 +98,7 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
false,
CoreOptions.FILE_COMPRESSION.defaultValue(),
CompressOptions.defaultOptions(),
- StatsCollectorFactories.createStatsFactories(
- "truncate(16)", options,
SCHEMA.getFieldNames()),
+ new StatsCollectorFactories(options),
MemorySize.MAX_VALUE,
new FileIndexOptions(),
true,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
new file mode 100644
index 0000000000..ce38d6e4b5
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link DataEvolutionSplitRead.BlobBunch}. */
+public class DataEvolutionReadTest {
+
+ private DataEvolutionSplitRead.BlobBunch blobBunch;
+
+ @BeforeEach
+ public void setUp() {
+ blobBunch = new DataEvolutionSplitRead.BlobBunch(Long.MAX_VALUE);
+ }
+
+ @Test
+ public void testAddSingleBlobEntry() {
+ DataFileMeta blobEntry = createBlobFile("blob1", 0L, 100L, 1L);
+
+ blobBunch.add(blobEntry);
+
+ assertThat(blobBunch.files).hasSize(1);
+ assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
+ assertThat(blobBunch.rowCount()).isEqualTo(100);
+ assertThat(blobBunch.firstRowId()).isEqualTo(0);
+ assertThat(blobBunch.writeCols()).isEqualTo(Arrays.asList("blob_col"));
+ }
+
+ @Test
+ public void testAddBlobEntryAndTail() {
+ DataFileMeta blobEntry = createBlobFile("blob1", 0, 100, 1);
+ DataFileMeta blobTail = createBlobFile("blob2", 100, 200, 1);
+
+ blobBunch.add(blobEntry);
+ blobBunch.add(blobTail);
+
+ assertThat(blobBunch.files).hasSize(2);
+ assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
+ assertThat(blobBunch.files.get(1)).isEqualTo(blobTail);
+ assertThat(blobBunch.rowCount()).isEqualTo(300);
+ assertThat(blobBunch.firstRowId()).isEqualTo(0);
+ assertThat(blobBunch.writeCols()).isEqualTo(Arrays.asList("blob_col"));
+ assertThat(blobBunch.schemaId()).isEqualTo(0L);
+ }
+
+ @Test
+ public void testAddNonBlobFileThrowsException() {
+ DataFileMeta normalFile = createNormalFile("normal1.parquet", 0, 100,
1, 0L);
+
+ assertThatThrownBy(() -> blobBunch.add(normalFile))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Only blob file can be added to a blob bunch.");
+ }
+
+ @Test
+ public void testAddBlobFileWithSameFirstRowId() {
+ DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
+ DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 2);
+
+ blobBunch.add(blobEntry1);
+ // Adding file with same firstRowId but higher sequence number should
throw exception
+ assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Blob file with same first row id should have
decreasing sequence number.");
+ }
+
+ @Test
+ public void testAddBlobFileWithSameFirstRowIdAndLowerSequenceNumber() {
+ DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2);
+ DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 1);
+
+ blobBunch.add(blobEntry1);
+ // Adding file with same firstRowId and lower sequence number should
be ignored
+ blobBunch.add(blobEntry2);
+
+ assertThat(blobBunch.files).hasSize(1);
+ assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1);
+ }
+
+ @Test
+ public void testAddBlobFileWithOverlappingRowId() {
+ DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2);
+ DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 1);
+
+ blobBunch.add(blobEntry1);
+ // Adding file with overlapping row id and lower sequence number
should be ignored
+ blobBunch.add(blobEntry2);
+
+ assertThat(blobBunch.files).hasSize(1);
+ assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1);
+ }
+
+ @Test
+ public void testAddBlobFileWithOverlappingRowIdAndHigherSequenceNumber() {
+ DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
+ DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 2);
+
+ blobBunch.add(blobEntry1);
+ // Adding file with overlapping row id and higher sequence number
should throw exception
+ assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Blob file with overlapping row id should have
decreasing sequence number.");
+ }
+
+ @Test
+ public void testAddBlobFileWithNonContinuousRowId() {
+ DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
+ DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1);
+
+ blobBunch.add(blobEntry1);
+ // Adding file with non-continuous row id should throw exception
+ assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Blob file first row id should be continuous,
expect 100 but got 200");
+ }
+
+ @Test
+ public void testAddBlobFileWithDifferentWriteCols() {
+ DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
+ DataFileMeta blobEntry2 =
+ createBlobFileWithCols("blob2", 100, 200, 1,
Arrays.asList("different_col"));
+
+ blobBunch.add(blobEntry1);
+ // Adding file with different write columns should throw exception
+ assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("All files in a blob bunch should have the same
write columns.");
+ }
+
+ @Test
+ public void testComplexBlobBunchScenario() {
+ // Create a complex scenario with multiple blob entries and a tail
+ DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
+ DataFileMeta blobEntry2 = createBlobFile("blob2", 100, 200, 1);
+ DataFileMeta blobEntry3 = createBlobFile("blob3", 300, 300, 1);
+ DataFileMeta blobTail = createBlobFile("blob4", 600, 400, 1);
+
+ blobBunch.add(blobEntry1);
+ blobBunch.add(blobEntry2);
+ blobBunch.add(blobEntry3);
+ blobBunch.add(blobTail);
+
+ assertThat(blobBunch.files).hasSize(4);
+ assertThat(blobBunch.rowCount()).isEqualTo(1000);
+ assertThat(blobBunch.firstRowId()).isEqualTo(0);
+ assertThat(blobBunch.writeCols()).isEqualTo(Arrays.asList("blob_col"));
+ }
+
+ @Test
+ public void testComplexBlobBunchScenario2() {
+
+ List<DataFileMeta> waited = new ArrayList<>();
+
+ waited.add(createNormalFile("others.parquet", 0, 1000, 1, 1));
+ waited.add(createBlobFile("blob1", 0, 1000, 1));
+ waited.add(createBlobFile("blob2", 0, 500, 2));
+ waited.add(createBlobFile("blob3", 500, 250, 2));
+ waited.add(createBlobFile("blob4", 750, 250, 2));
+ waited.add(createBlobFile("blob5", 0, 100, 3));
+ waited.add(createBlobFile("blob6", 100, 400, 3));
+ waited.add(createBlobFile("blob7", 750, 100, 3));
+ waited.add(createBlobFile("blob8", 850, 150, 3));
+ waited.add(createBlobFile("blob9", 100, 650, 4));
+
+ List<List<DataFileMeta>> batches =
DataEvolutionSplitGenerator.split(waited);
+ assertThat(batches.size()).isEqualTo(1);
+
+ List<DataFileMeta> batch = batches.get(0);
+
+ assertThat(batch.get(1).fileName()).contains("blob5"); // pick
+ assertThat(batch.get(2).fileName()).contains("blob2"); // skip
+ assertThat(batch.get(3).fileName()).contains("blob1"); // skip
+ assertThat(batch.get(4).fileName()).contains("blob9"); // pick
+ assertThat(batch.get(5).fileName()).contains("blob6"); // skip
+ assertThat(batch.get(6).fileName()).contains("blob3"); // skip
+ assertThat(batch.get(7).fileName()).contains("blob7"); // pick
+ assertThat(batch.get(8).fileName()).contains("blob4"); // skip
+ assertThat(batch.get(9).fileName()).contains("blob8"); // pick
+
+ List<DataEvolutionSplitRead.FieldBunch> fieldBunches =
+ DataEvolutionSplitRead.splitFieldBunch(batch, file -> 0);
+ assertThat(fieldBunches.size()).isEqualTo(2);
+
+ DataEvolutionSplitRead.BlobBunch blobBunch =
fieldBunches.get(1).fileOrBlob.getRight();
+ assertThat(blobBunch.files).hasSize(4);
+ assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
+ assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
+ assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
+ assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
+ }
+
+ @Test
+ public void testComplexBlobBunchScenario3() {
+
+ List<DataFileMeta> waited = new ArrayList<>();
+
+ waited.add(createNormalFile("others.parquet", 0, 1000, 1, 1));
+ waited.add(createBlobFile("blob1", 0, 1000, 1));
+ waited.add(createBlobFile("blob2", 0, 500, 2));
+ waited.add(createBlobFile("blob3", 500, 250, 2));
+ waited.add(createBlobFile("blob4", 750, 250, 2));
+ waited.add(createBlobFile("blob5", 0, 100, 3));
+ waited.add(createBlobFile("blob6", 100, 400, 3));
+ waited.add(createBlobFile("blob7", 750, 100, 3));
+ waited.add(createBlobFile("blob8", 850, 150, 3));
+ waited.add(createBlobFile("blob9", 100, 650, 4));
+ waited.add(
+ createBlobFileWithCols("blob11", 0, 1000, 1,
Collections.singletonList("blobc2")));
+ waited.add(
+ createBlobFileWithCols("blob12", 0, 500, 2,
Collections.singletonList("blobc2")));
+ waited.add(
+ createBlobFileWithCols("blob13", 500, 250, 2,
Collections.singletonList("blobc2")));
+ waited.add(
+ createBlobFileWithCols("blob14", 750, 250, 2,
Collections.singletonList("blobc2")));
+ waited.add(
+ createBlobFileWithCols("blob15", 0, 100, 3,
Collections.singletonList("blobc2")));
+ waited.add(
+ createBlobFileWithCols("blob16", 100, 400, 3,
Collections.singletonList("blobc2")));
+ waited.add(
+ createBlobFileWithCols("blob17", 750, 100, 3,
Collections.singletonList("blobc2")));
+ waited.add(
+ createBlobFileWithCols("blob18", 850, 150, 3,
Collections.singletonList("blobc2")));
+ waited.add(
+ createBlobFileWithCols("blob19", 100, 650, 4,
Collections.singletonList("blobc2")));
+
+ List<List<DataFileMeta>> batches =
DataEvolutionSplitGenerator.split(waited);
+ assertThat(batches.size()).isEqualTo(1);
+
+ List<DataFileMeta> batch = batches.get(0);
+
+ List<DataEvolutionSplitRead.FieldBunch> fieldBunches =
+ DataEvolutionSplitRead.splitFieldBunch(
+ batch, file -> file.writeCols().get(0).hashCode());
+ assertThat(fieldBunches.size()).isEqualTo(3);
+
+ DataEvolutionSplitRead.BlobBunch blobBunch =
fieldBunches.get(1).fileOrBlob.getRight();
+ assertThat(blobBunch.files).hasSize(4);
+ assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
+ assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
+ assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
+ assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
+
+ blobBunch = fieldBunches.get(2).fileOrBlob.getRight();
+ assertThat(blobBunch.files).hasSize(4);
+ assertThat(blobBunch.files.get(0).fileName()).contains("blob15");
+ assertThat(blobBunch.files.get(1).fileName()).contains("blob19");
+ assertThat(blobBunch.files.get(2).fileName()).contains("blob17");
+ assertThat(blobBunch.files.get(3).fileName()).contains("blob18");
+ }
+
+ /** Creates a blob file with the specified parameters. */
+ private DataFileMeta createBlobFile(
+ String fileName, long firstRowId, long rowCount, long
maxSequenceNumber) {
+ return createBlobFileWithCols(
+ fileName, firstRowId, rowCount, maxSequenceNumber,
Arrays.asList("blob_col"));
+ }
+
+ /** Creates a blob file with custom write columns. */
+ private DataFileMeta createBlobFileWithCols(
+ String fileName,
+ long firstRowId,
+ long rowCount,
+ long maxSequenceNumber,
+ List<String> writeCols) {
+ return DataFileMeta.create(
+ fileName + ".blob",
+ rowCount,
+ rowCount,
+ DataFileMeta.EMPTY_MIN_KEY,
+ DataFileMeta.EMPTY_MAX_KEY,
+ SimpleStats.EMPTY_STATS,
+ SimpleStats.EMPTY_STATS,
+ 0,
+ maxSequenceNumber,
+ 0L,
+ DataFileMeta.DUMMY_LEVEL,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(System.currentTimeMillis()),
+ rowCount,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ firstRowId,
+ writeCols);
+ }
+
+ /** Creates a normal (non-blob) file for testing. */
+ private DataFileMeta createNormalFile(
+ String fileName,
+ long firstRowId,
+ long rowCount,
+ long maxSequenceNumber,
+ long schemaId) {
+ return DataFileMeta.create(
+ fileName,
+ rowCount,
+ rowCount,
+ DataFileMeta.EMPTY_MIN_KEY,
+ DataFileMeta.EMPTY_MAX_KEY,
+ SimpleStats.EMPTY_STATS,
+ SimpleStats.EMPTY_STATS,
+ 0L,
+ maxSequenceNumber,
+ schemaId,
+ DataFileMeta.DUMMY_LEVEL,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(System.currentTimeMillis()),
+ rowCount,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ firstRowId,
+ null);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index aedf5553e0..2c45323088 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -57,6 +57,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -195,6 +196,14 @@ public abstract class TableTestBase {
return rows;
}
+ protected void readDefault(Consumer<InternalRow> consumer) throws
Exception {
+ Table table = getTableDefault();
+ ReadBuilder readBuilder = table.newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ reader.forEachRemaining(consumer);
+ }
+
public void createTableDefault() throws Exception {
catalog.createTable(identifier(), schemaDefault(), true);
}
diff --git
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
index d05cf3844f..09a4216e34 100644
---
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
+++
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
@@ -18,3 +18,4 @@ org.apache.paimon.format.orc.OrcFileFormatFactory
org.apache.paimon.format.parquet.ParquetFileFormatFactory
org.apache.paimon.format.csv.CsvFileFormatFactory
org.apache.paimon.format.json.JsonFileFormatFactory
+org.apache.paimon.format.blob.BlobFileFormatFactory