This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a2c3ce1f8 [core] Support blob read and write (#6344)
4a2c3ce1f8 is described below

commit 4a2c3ce1f838228237f9bbe52c07444b1a535a29
Author: YeJunHao <[email protected]>
AuthorDate: Mon Sep 29 13:50:51 2025 +0800

    [core] Support blob read and write (#6344)
---
 .../java/org/apache/paimon/types/BlobType.java     |  10 +
 .../main/java/org/apache/paimon/utils/Either.java  | 217 +++++++++++++
 .../org/apache/paimon/append/AppendOnlyWriter.java |  30 +-
 .../apache/paimon/append/MergeAllBatchReader.java  | 110 +++++++
 .../java/org/apache/paimon/io/DataFileMeta.java    |   4 +
 .../paimon/operation/BaseAppendFileStoreWrite.java |   3 +-
 .../paimon/operation/DataEvolutionSplitRead.java   | 258 ++++++++++++++-
 .../paimon/operation/FileStoreCommitImpl.java      |  20 +-
 .../org/apache/paimon/schema/SchemaValidation.java |  12 +-
 .../table/source/DataEvolutionSplitGenerator.java  |   3 +-
 .../apache/paimon/append/AppendOnlyWriterTest.java |   9 +-
 .../org/apache/paimon/append/BlobTableTest.java    | 130 ++++++++
 .../apache/paimon/format/FileFormatSuffixTest.java |   8 +-
 .../paimon/operation/DataEvolutionReadTest.java    | 351 +++++++++++++++++++++
 .../org/apache/paimon/table/TableTestBase.java     |   9 +
 .../org.apache.paimon.format.FileFormatFactory     |   1 +
 16 files changed, 1139 insertions(+), 36 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java 
b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
index 7f3fa010c4..b5ead703f0 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
@@ -82,4 +82,14 @@ public final class BlobType extends DataType {
 
         return Pair.of(new RowType(normalFields), new RowType(blobFields));
     }
+
+    public static boolean containsBlobType(RowType rowType) {
+        for (DataField field : rowType.getFields()) {
+            DataTypeRoot type = field.type().getTypeRoot();
+            if (type == DataTypeRoot.BLOB) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Either.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/Either.java
new file mode 100644
index 0000000000..1b7435290f
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Either.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+/**
+ * A functional programming utility class that represents a value of one of 
two possible types. This
+ * is a discriminated union type that can hold either a Left value (typically 
representing an error
+ * or failure case) or a Right value (typically representing a success or 
correct result).
+ *
+ * <p>The Either type is commonly used in functional programming to handle 
operations that can
+ * either succeed or fail, providing a type-safe alternative to exceptions or 
null values.
+ *
+ * <p>Usage examples:
+ *
+ * <pre>{@code
+ * // Create a Left (error case)
+ * Either<String, Integer> error = Either.left("Invalid input");
+ *
+ * // Create a Right (success case)
+ * Either<String, Integer> success = Either.right(42);
+ *
+ * // Check and handle the result
+ * if (result.isRight()) {
+ *     Integer value = result.getRight();
+ *     // Process successful result
+ * } else {
+ *     String error = result.getLeft();
+ *     // Handle error case
+ * }
+ * }</pre>
+ *
+ * @param <L> the type of the Left value (typically error/failure)
+ * @param <R> the type of the Right value (typically success/result)
+ */
+public abstract class Either<L, R> {
+
+    /** Private constructor to prevent direct instantiation. */
+    private Either() {}
+
+    /**
+     * Static factory method to create a Left-sided Either instance. Typically 
represents a failure
+     * or error.
+     *
+     * @param value The left-side value.
+     * @param <L> The type of the left value.
+     * @param <R> The type of the right value.
+     * @return A new Left instance.
+     */
+    public static <L, R> Either<L, R> left(L value) {
+        return new Left<>(value);
+    }
+
+    /**
+     * Static factory method to create a Right-sided Either instance. 
Typically represents a
+     * successful or correct result.
+     *
+     * @param value The right-side value.
+     * @param <L> The type of the left value.
+     * @param <R> The type of the right value.
+     * @return A new Right instance.
+     */
+    public static <L, R> Either<L, R> right(R value) {
+        return new Right<>(value);
+    }
+
+    /**
+     * Checks if this instance is a Left.
+     *
+     * @return true if Left, false otherwise.
+     */
+    public abstract boolean isLeft();
+
+    /**
+     * Checks if this instance is a Right.
+     *
+     * @return true if Right, false otherwise.
+     */
+    public abstract boolean isRight();
+
+    /**
+     * Returns the left value if this is a Left instance.
+     *
+     * @return The left value.
+     * @throws NoSuchElementException if this is a Right instance.
+     */
+    public abstract L getLeft();
+
+    /**
+     * Returns the right value if this is a Right instance.
+     *
+     * @return The right value.
+     * @throws NoSuchElementException if this is a Left instance.
+     */
+    public abstract R getRight();
+
+    /**
+     * If this is a Left, performs the given action on its value.
+     *
+     * @param action The consumer function to execute.
+     */
+    public abstract void ifLeft(Consumer<L> action);
+
+    /**
+     * If this is a Right, performs the given action on its value.
+     *
+     * @param action The consumer function to execute.
+     */
+    public abstract void ifRight(Consumer<R> action);
+
+    /** Private static inner class representing the Left state of Either. */
+    private static final class Left<L, R> extends Either<L, R> {
+        private final L value;
+
+        private Left(L value) {
+            this.value = Objects.requireNonNull(value, "Left value cannot be 
null");
+        }
+
+        @Override
+        public boolean isLeft() {
+            return true;
+        }
+
+        @Override
+        public boolean isRight() {
+            return false;
+        }
+
+        @Override
+        public L getLeft() {
+            return value;
+        }
+
+        @Override
+        public R getRight() {
+            throw new NoSuchElementException("Cannot call getRight() on a Left 
instance");
+        }
+
+        @Override
+        public void ifLeft(Consumer<L> action) {
+            action.accept(value);
+        }
+
+        @Override
+        public void ifRight(Consumer<R> action) {
+            // Do nothing
+        }
+
+        @Override
+        public String toString() {
+            return "Left(" + value + ")";
+        }
+    }
+
+    /** Private static inner class representing the Right state of Either. */
+    private static final class Right<L, R> extends Either<L, R> {
+        private final R value;
+
+        private Right(R value) {
+            this.value = Objects.requireNonNull(value, "Right value cannot be 
null");
+        }
+
+        @Override
+        public boolean isLeft() {
+            return false;
+        }
+
+        @Override
+        public boolean isRight() {
+            return true;
+        }
+
+        @Override
+        public L getLeft() {
+            throw new NoSuchElementException("Cannot call getLeft() on a Right 
instance");
+        }
+
+        @Override
+        public R getRight() {
+            return value;
+        }
+
+        @Override
+        public void ifLeft(Consumer<L> action) {
+            // Do nothing
+        }
+
+        @Override
+        public void ifRight(Consumer<R> action) {
+            action.accept(value);
+        }
+
+        @Override
+        public String toString() {
+            return "Right(" + value + ")";
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index f8a074cab8..4ad04b2610 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -33,13 +33,14 @@ import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.io.RowDataRollingFileWriter;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.reader.RecordReaderIterator;
-import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.BlobType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BatchRecordWriter;
 import org.apache.paimon.utils.CommitIncrement;
@@ -50,6 +51,7 @@ import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SinkWriter;
 import org.apache.paimon.utils.SinkWriter.BufferedSinkWriter;
 import org.apache.paimon.utils.SinkWriter.DirectSinkWriter;
+import org.apache.paimon.utils.StatsCollectorFactories;
 
 import javax.annotation.Nullable;
 
@@ -84,7 +86,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
     private final LongCounter seqNumCounter;
     private final String fileCompression;
     private final CompressOptions spillCompression;
-    private final SimpleColStatsCollector.Factory[] statsCollectors;
+    private final StatsCollectorFactories statsCollectorFactories;
     @Nullable private final IOManager ioManager;
     private final FileIndexOptions fileIndexOptions;
     private final MemorySize maxDiskSize;
@@ -111,7 +113,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
             boolean spillable,
             String fileCompression,
             CompressOptions spillCompression,
-            SimpleColStatsCollector.Factory[] statsCollectors,
+            StatsCollectorFactories statsCollectorFactories,
             MemorySize maxDiskSize,
             FileIndexOptions fileIndexOptions,
             boolean asyncFileWrite,
@@ -136,7 +138,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
         this.fileCompression = fileCompression;
         this.spillCompression = spillCompression;
         this.ioManager = ioManager;
-        this.statsCollectors = statsCollectors;
+        this.statsCollectorFactories = statsCollectorFactories;
         this.maxDiskSize = maxDiskSize;
         this.fileIndexOptions = fileIndexOptions;
 
@@ -289,7 +291,23 @@ public class AppendOnlyWriter implements 
BatchRecordWriter, MemoryOwner {
         }
     }
 
-    private RowDataRollingFileWriter createRollingRowWriter() {
+    private RollingFileWriter<InternalRow, DataFileMeta> 
createRollingRowWriter() {
+        if (BlobType.containsBlobType(writeSchema)) {
+            return new RollingBlobFileWriter(
+                    fileIO,
+                    schemaId,
+                    fileFormat,
+                    targetFileSize,
+                    writeSchema,
+                    pathFactory,
+                    seqNumCounter,
+                    fileCompression,
+                    statsCollectorFactories,
+                    fileIndexOptions,
+                    FileSource.APPEND,
+                    asyncFileWrite,
+                    statsDenseStore);
+        }
         return new RowDataRollingFileWriter(
                 fileIO,
                 schemaId,
@@ -299,7 +317,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
                 pathFactory,
                 seqNumCounter,
                 fileCompression,
-                statsCollectors,
+                
statsCollectorFactories.statsCollectors(writeSchema.getFieldNames()),
                 fileIndexOptions,
                 FileSource.APPEND,
                 asyncFileWrite,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/MergeAllBatchReader.java 
b/paimon-core/src/main/java/org/apache/paimon/append/MergeAllBatchReader.java
new file mode 100644
index 0000000000..911eafd3b4
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/MergeAllBatchReader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * A record reader that merges all batches from a multi-batch reader into a 
single concatenated
+ * batch. This reader wraps another RecordReader that produces multiple 
batches and presents them as
+ * a single continuous stream of records.
+ *
+ * <p>The MergeAllBatchReader is particularly useful in scenarios where you 
need to process multiple
+ * batches as a unified data stream, such as when reading from multiple files 
or partitions that
+ * should be treated as a single logical dataset.
+ *
+ * <p>Key features:
+ *
+ * <ul>
+ *   <li>Concatenates all batches from the underlying reader into one 
continuous batch
+ *   <li>Automatically handles batch transitions and resource cleanup
+ *   <li>Provides a single readBatch() call that returns all data
+ *   <li>Properly manages memory by releasing batches after consumption
+ * </ul>
+ *
+ * <p>This reader is commonly used in data evolution scenarios where multiple 
file formats or
+ * schemas need to be read as a unified stream.
+ */
+public class MergeAllBatchReader implements RecordReader<InternalRow> {
+
+    private final RecordReader<InternalRow> multiBatchReader;
+    private ConcatBatch batch;
+
+    public MergeAllBatchReader(RecordReader<InternalRow> multiBatchReader) {
+        this.multiBatchReader = multiBatchReader;
+        this.batch = new ConcatBatch(multiBatchReader);
+    }
+
+    @Override
+    @Nullable
+    public RecordIterator<InternalRow> readBatch() throws IOException {
+        RecordIterator<InternalRow> returned = batch;
+        batch = null;
+        return returned;
+    }
+
+    @Override
+    public void close() throws IOException {
+        multiBatchReader.close();
+    }
+
+    private static class ConcatBatch implements RecordIterator<InternalRow> {
+
+        private final RecordReader<InternalRow> reader;
+        private RecordIterator<InternalRow> currentBatch;
+
+        private ConcatBatch(RecordReader<InternalRow> reader) {
+            this.reader = reader;
+        }
+
+        @Nullable
+        @Override
+        public InternalRow next() throws IOException {
+            if (currentBatch == null) {
+                currentBatch = reader.readBatch();
+                if (currentBatch == null) {
+                    return null;
+                }
+            }
+
+            InternalRow next = currentBatch.next();
+
+            if (next == null) {
+                currentBatch.releaseBatch();
+                currentBatch = null;
+                return next();
+            }
+
+            return next;
+        }
+
+        @Override
+        public void releaseBatch() {
+            if (currentBatch != null) {
+                currentBatch.releaseBatch();
+                currentBatch = null;
+            }
+        }
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index e7bb8b9571..68ca8b20a0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -256,6 +256,10 @@ public interface DataFileMeta {
 
     String fileName();
 
+    default boolean isBlob() {
+        return fileName().endsWith(".blob");
+    }
+
     long fileSize();
 
     long rowCount();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index 3bd8264831..98d699cb9d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -43,6 +43,7 @@ import org.apache.paimon.utils.IOExceptionSupplier;
 import org.apache.paimon.utils.LongCounter;
 import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.StatsCollectorFactories;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -131,7 +132,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
                 options.writeBufferSpillable() || forceBufferSpill,
                 options.fileCompression(),
                 options.spillCompressOptions(),
-                statsCollectors(),
+                new StatsCollectorFactories(options),
                 options.writeBufferSpillDiskSize(),
                 fileIndexOptions,
                 options.asyncFileWrite(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index f4c426e86f..dd2809da7e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.append.MergeAllBatchReader;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
@@ -42,6 +44,7 @@ import 
org.apache.paimon.table.source.DataEvolutionSplitGenerator;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Either;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.FormatReaderMapping;
 import org.apache.paimon.utils.FormatReaderMapping.Builder;
@@ -51,9 +54,11 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.lang.String.format;
@@ -68,11 +73,11 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 public class DataEvolutionSplitRead implements SplitRead<InternalRow> {
 
     private final FileIO fileIO;
-    private final SchemaManager schemaManager;
     private final TableSchema schema;
     private final FileFormatDiscover formatDiscover;
     private final FileStorePathFactory pathFactory;
     private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
+    private final Function<Long, TableSchema> schemaFetcher;
 
     protected RowType readRowType;
 
@@ -84,7 +89,9 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
             FileFormatDiscover formatDiscover,
             FileStorePathFactory pathFactory) {
         this.fileIO = fileIO;
-        this.schemaManager = schemaManager;
+        final Map<Long, TableSchema> cache = new HashMap<>();
+        this.schemaFetcher =
+                schemaId -> cache.computeIfAbsent(schemaId, key -> 
schemaManager.schema(schemaId));
         this.schema = schema;
         this.formatDiscover = formatDiscover;
         this.pathFactory = pathFactory;
@@ -110,6 +117,8 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
 
     @Override
     public SplitRead<InternalRow> withFilter(@Nullable Predicate predicate) {
+        // TODO: Support File index push down (all conditions) and Predicate 
push down (only if no
+        // column merge)
         return this;
     }
 
@@ -162,20 +171,34 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
             DataFilePathFactory dataFilePathFactory,
             Builder formatBuilder)
             throws IOException {
-        long rowCount = needMergeFiles.get(0).rowCount();
-        long firstRowId = needMergeFiles.get(0).firstRowId();
-        for (DataFileMeta file : needMergeFiles) {
+        List<FieldBunch> fieldsFiles =
+                splitFieldBunch(
+                        needMergeFiles,
+                        file -> {
+                            checkArgument(
+                                    file.isBlob(), "Only blob file need to 
call this method.");
+                            return schemaFetcher
+                                    .apply(file.schemaId())
+                                    .logicalRowType()
+                                    .getField(file.writeCols().get(0))
+                                    .id();
+                        });
+
+        long rowCount = fieldsFiles.get(0).rowCount();
+        long firstRowId = fieldsFiles.get(0).firstRowId();
+
+        for (FieldBunch files : fieldsFiles) {
             checkArgument(
-                    file.rowCount() == rowCount,
+                    files.rowCount() == rowCount,
                     "All files in a field merge split should have the same row 
count.");
             checkArgument(
-                    file.firstRowId() == firstRowId,
+                    files.firstRowId() == firstRowId,
                     "All files in a field merge split should have the same 
first row id and could not be null.");
         }
 
         // Init all we need to create a compound reader
         List<DataField> allReadFields = readRowType.getFields();
-        RecordReader<InternalRow>[] fileRecordReaders = new 
RecordReader[needMergeFiles.size()];
+        RecordReader<InternalRow>[] fileRecordReaders = new 
RecordReader[fieldsFiles.size()];
         int[] readFieldIndex = 
allReadFields.stream().mapToInt(DataField::id).toArray();
         // which row the read field index belongs to
         int[] rowOffsets = new int[allReadFields.size()];
@@ -184,11 +207,11 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
         Arrays.fill(rowOffsets, -1);
         Arrays.fill(fieldOffsets, -1);
 
-        for (int i = 0; i < needMergeFiles.size(); i++) {
-            DataFileMeta file = needMergeFiles.get(i);
-            String formatIdentifier = 
DataFilePathFactory.formatIdentifier(file.fileName());
+        for (int i = 0; i < fieldsFiles.size(); i++) {
+            FieldBunch file = fieldsFiles.get(i);
+            String formatIdentifier = file.formatIdentifier();
             long schemaId = file.schemaId();
-            TableSchema dataSchema = 
schemaManager.schema(schemaId).project(file.writeCols());
+            TableSchema dataSchema = 
schemaFetcher.apply(schemaId).project(file.writeCols());
             int[] fieldIds =
                     
SpecialFields.rowTypeWithRowTracking(dataSchema.logicalRowType()).getFields()
                             .stream()
@@ -231,7 +254,9 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                                                 readFields,
                                                 false));
                 fileRecordReaders[i] =
-                        createFileReader(partition, file, dataFilePathFactory, 
formatReaderMapping);
+                        new MergeAllBatchReader(
+                                createFileReader(
+                                        partition, file, dataFilePathFactory, 
formatReaderMapping));
             }
         }
 
@@ -265,10 +290,43 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                                         schema,
                                         schemaId == schema.id()
                                                 ? schema
-                                                : 
schemaManager.schema(schemaId)));
+                                                : 
schemaFetcher.apply(schemaId)));
         return createFileReader(partition, file, dataFilePathFactory, 
formatReaderMapping);
     }
 
+    private RecordReader<InternalRow> createFileReader(
+            BinaryRow partition,
+            FieldBunch files,
+            DataFilePathFactory dataFilePathFactory,
+            FormatReaderMapping formatReaderMapping)
+            throws IOException {
+        if (files.size() == 1) {
+            return createFileReader(
+                    partition, files.getFirstFile(), dataFilePathFactory, 
formatReaderMapping);
+        }
+        List<ReaderSupplier<InternalRow>> readerSuppliers = new ArrayList<>();
+        for (DataFileMeta file : files.files()) {
+            FormatReaderContext formatReaderContext =
+                    new FormatReaderContext(
+                            fileIO, dataFilePathFactory.toPath(file), 
file.fileSize(), null);
+            readerSuppliers.add(
+                    () ->
+                            new DataFileRecordReader(
+                                    schema.logicalRowType(),
+                                    formatReaderMapping.getReaderFactory(),
+                                    formatReaderContext,
+                                    formatReaderMapping.getIndexMapping(),
+                                    formatReaderMapping.getCastMapping(),
+                                    PartitionUtils.create(
+                                            
formatReaderMapping.getPartitionPair(), partition),
+                                    true,
+                                    file.firstRowId(),
+                                    file.maxSequenceNumber(),
+                                    formatReaderMapping.getSystemFields()));
+        }
+        return ConcatRecordReader.create(readerSuppliers);
+    }
+
     private FileRecordReader<InternalRow> createFileReader(
             BinaryRow partition,
             DataFileMeta file,
@@ -290,4 +348,176 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                 file.maxSequenceNumber(),
                 formatReaderMapping.getSystemFields());
     }
+
+    @VisibleForTesting
+    public static List<FieldBunch> splitFieldBunch(
+            List<DataFileMeta> needMergeFiles, Function<DataFileMeta, Integer> 
blobFileToFieldId) {
+        List<FieldBunch> fieldsFiles = new ArrayList<>();
+        Map<Integer, BlobBunch> blobBunchMap = new HashMap<>();
+        long rowCount = -1;
+        for (DataFileMeta file : needMergeFiles) {
+            if (file.isBlob()) {
+                int fieldId = blobFileToFieldId.apply(file);
+                final long expectedRowCount = rowCount;
+                blobBunchMap
+                        .computeIfAbsent(fieldId, key -> new 
BlobBunch(expectedRowCount))
+                        .add(file);
+            } else {
+                // Normal file, just add it to the current merge split
+                fieldsFiles.add(FieldBunch.file(file));
+                rowCount = file.rowCount();
+            }
+        }
+        blobBunchMap.values().forEach(blobBunch -> 
fieldsFiles.add(FieldBunch.blob(blobBunch)));
+        return fieldsFiles;
+    }
+
+    /** Files for one field. */
+    public static class FieldBunch {
+        final Either<DataFileMeta, BlobBunch> fileOrBlob;
+
+        FieldBunch(Either<DataFileMeta, BlobBunch> fileOrBlob) {
+            this.fileOrBlob = fileOrBlob;
+        }
+
+        static FieldBunch file(DataFileMeta file) {
+            return new FieldBunch(Either.left(file));
+        }
+
+        static FieldBunch blob(BlobBunch blob) {
+            return new FieldBunch(Either.right(blob));
+        }
+
+        long rowCount() {
+            return fileOrBlob.isLeft()
+                    ? fileOrBlob.getLeft().rowCount()
+                    : fileOrBlob.getRight().rowCount();
+        }
+
+        long firstRowId() {
+            return fileOrBlob.isLeft()
+                    ? fileOrBlob.getLeft().firstRowId()
+                    : fileOrBlob.getRight().firstRowId();
+        }
+
+        List<String> writeCols() {
+            return fileOrBlob.isLeft()
+                    ? fileOrBlob.getLeft().writeCols()
+                    : fileOrBlob.getRight().writeCols();
+        }
+
+        String formatIdentifier() {
+            return fileOrBlob.isLeft()
+                    ? 
DataFilePathFactory.formatIdentifier(fileOrBlob.getLeft().fileName())
+                    : "blob";
+        }
+
+        long schemaId() {
+            return fileOrBlob.isLeft()
+                    ? fileOrBlob.getLeft().schemaId()
+                    : fileOrBlob.getRight().schemaId();
+        }
+
+        @VisibleForTesting
+        public int size() {
+            return fileOrBlob.isLeft() ? 1 : 
fileOrBlob.getRight().files.size();
+        }
+
+        DataFileMeta getFirstFile() {
+            return fileOrBlob.isLeft() ? fileOrBlob.getLeft() : 
fileOrBlob.getRight().files.get(0);
+        }
+
+        List<DataFileMeta> files() {
+            return fileOrBlob.isLeft()
+                    ? Collections.singletonList(fileOrBlob.getLeft())
+                    : fileOrBlob.getRight().files;
+        }
+    }
+
+    @VisibleForTesting
+    static class BlobBunch {
+        final long expectedRowCount;
+        List<DataFileMeta> files;
+        long latestFistRowId = -1;
+        long expectedNextFirstRowId = -1;
+        long lastestMaxSequenceNumber = -1;
+        long rowCount;
+
+        BlobBunch(long expectedRowCount) {
+            this.files = new ArrayList<>();
+            this.rowCount = 0;
+            this.expectedRowCount = expectedRowCount;
+        }
+
+        void add(DataFileMeta file) {
+            if (!file.isBlob()) {
+                throw new IllegalArgumentException("Only blob file can be 
added to a blob bunch.");
+            }
+
+            if (file.firstRowId() == latestFistRowId) {
+                if (file.maxSequenceNumber() >= lastestMaxSequenceNumber) {
+                    throw new IllegalArgumentException(
+                            "Blob file with same first row id should have 
decreasing sequence number.");
+                }
+                return;
+            }
+            if (!files.isEmpty()) {
+                long firstRowId = file.firstRowId();
+                if (firstRowId < expectedNextFirstRowId) {
+                    checkArgument(
+                            file.maxSequenceNumber() < 
lastestMaxSequenceNumber,
+                            "Blob file with overlapping row id should have 
decreasing sequence number.");
+                    return;
+                } else if (firstRowId > expectedNextFirstRowId) {
+                    throw new IllegalArgumentException(
+                            "Blob file first row id should be continuous, 
expect "
+                                    + expectedNextFirstRowId
+                                    + " but got "
+                                    + firstRowId);
+                }
+                checkArgument(
+                        file.schemaId() == files.get(0).schemaId(),
+                        "All files in a blob bunch should have the same schema 
id.");
+                checkArgument(
+                        file.writeCols().equals(files.get(0).writeCols()),
+                        "All files in a blob bunch should have the same write 
columns.");
+            }
+            files.add(file);
+            rowCount += file.rowCount();
+            checkArgument(
+                    rowCount <= expectedRowCount,
+                    "Blob files row count exceed the expect " + 
expectedRowCount);
+            this.lastestMaxSequenceNumber = file.maxSequenceNumber();
+            this.latestFistRowId = file.firstRowId();
+            this.expectedNextFirstRowId = latestFistRowId + file.rowCount();
+        }
+
+        long rowCount() {
+            return rowCount;
+        }
+
+        long firstRowId() {
+            if (files.isEmpty()) {
+                return -1;
+            } else {
+                return files.get(0).firstRowId();
+            }
+        }
+
+        List<String> writeCols() {
+            if (files.isEmpty()) {
+                return new ArrayList<>();
+            } else {
+                return files.get(0).writeCols();
+            }
+        }
+
+        long schemaId() {
+            if (files.isEmpty()) {
+                return -1;
+            } else {
+                return files.get(0).schemaId();
+            }
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 466df201a3..870bd498c1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -1210,15 +1210,29 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
         // assign row id for new files
         long start = firstRowIdStart;
+        long blobStart = firstRowIdStart;
         for (ManifestEntry entry : deltaFiles) {
             checkArgument(
                     entry.file().fileSource().isPresent(),
                     "This is a bug, file source field for row-tracking table 
must present.");
             if (entry.file().fileSource().get().equals(FileSource.APPEND)
                     && entry.file().firstRowId() == null) {
-                long rowCount = entry.file().rowCount();
-                rowIdAssigned.add(entry.assignFirstRowId(start));
-                start += rowCount;
+                if (entry.file().isBlob()) {
+                    if (blobStart >= start) {
+                        throw new IllegalStateException(
+                                String.format(
+                                        "This is a bug, blobStart %d should be 
less than start %d when assigning a blob entry file.",
+                                        blobStart, start));
+                    }
+                    long rowCount = entry.file().rowCount();
+                    rowIdAssigned.add(entry.assignFirstRowId(blobStart));
+                    blobStart += rowCount;
+                } else {
+                    long rowCount = entry.file().rowCount();
+                    rowIdAssigned.add(entry.assignFirstRowId(start));
+                    blobStart = start;
+                    start += rowCount;
+                }
             } else {
                 // for compact file, do not assign first row id.
                 rowIdAssigned.add(entry);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 09de891a37..0521351a44 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -30,6 +30,7 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BlobType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.IntType;
@@ -159,7 +160,7 @@ public class SchemaValidation {
 
         FileFormat fileFormat =
                 FileFormat.fromIdentifier(options.formatType(), new 
Options(schema.options()));
-        fileFormat.validateDataFields(new RowType(schema.fields()));
+        fileFormat.validateDataFields(BlobType.splitBlob(new 
RowType(schema.fields())).getLeft());
 
         // Check column names in schema
         schema.fieldNames()
@@ -649,6 +650,15 @@ public class SchemaValidation {
                     !options.deletionVectorsEnabled(),
                     "Data evolution config must disabled with 
deletion-vectors.enabled");
         }
+
+        if (BlobType.containsBlobType(schema.logicalRowType())) {
+            checkArgument(
+                    options.dataEvolutionEnabled(),
+                    "Data evolution config must enabled for table with BLOB 
type column.");
+            checkArgument(
+                    
BlobType.splitBlob(schema.logicalRowType()).getRight().getFieldCount() == 1,
+                    "Table with BLOB type column only support one BLOB 
column.");
+        }
     }
 
     private static void validateIncrementalClustering(TableSchema schema, 
CoreOptions options) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
index 39fa2d9ce7..c2f115d32c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
@@ -86,6 +86,7 @@ public class DataEvolutionSplitGenerator implements 
SplitGenerator {
                                                 value.firstRowId() == null
                                                         ? Long.MIN_VALUE
                                                         : value.firstRowId())
+                        .thenComparingInt(f -> f.isBlob() ? 1 : 0)
                         .thenComparing(
                                 (f1, f2) -> {
                                     // If firstRowId is the same, we should 
read the file with
@@ -105,7 +106,7 @@ public class DataEvolutionSplitGenerator implements 
SplitGenerator {
                 splitByRowId.add(Collections.singletonList(file));
                 continue;
             }
-            if (firstRowId != lastRowId) {
+            if (!file.isBlob() && firstRowId != lastRowId) {
                 if (!currentSplit.isEmpty()) {
                     splitByRowId.add(currentSplit);
                 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index ad91a3c70a..ce3a2d4e85 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -62,7 +62,6 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
@@ -613,7 +612,8 @@ public class AppendOnlyWriterTest {
                                             
generateCompactAfter(compactBefore));
                         },
                         null);
-        CoreOptions options = new CoreOptions(new HashMap<>());
+        CoreOptions options =
+                new 
CoreOptions(Collections.singletonMap("metadata.stats-mode", "truncate(16)"));
         AppendOnlyWriter writer =
                 new AppendOnlyWriter(
                         LocalFileIO.create(),
@@ -633,10 +633,7 @@ public class AppendOnlyWriterTest {
                         spillable,
                         CoreOptions.FILE_COMPRESSION.defaultValue(),
                         CompressOptions.defaultOptions(),
-                        StatsCollectorFactories.createStatsFactories(
-                                "truncate(16)",
-                                options,
-                                AppendOnlyWriterTest.SCHEMA.getFieldNames()),
+                        new StatsCollectorFactories(options),
                         MemorySize.MAX_VALUE,
                         new FileIndexOptions(),
                         true,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
new file mode 100644
index 0000000000..31e19d4f5c
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.DataEvolutionSplitRead;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for table with blob. */
+public class BlobTableTest extends TableTestBase {
+
+    private final byte[] blobBytes = randomBytes();
+
+    @Test
+    public void testBasic() throws Exception {
+        createTableDefault();
+
+        commitDefault(writeDataDefault(100, 1));
+
+        AtomicInteger integer = new AtomicInteger(0);
+
+        List<DataFileMeta> filesMetas =
+                getTableDefault().store().newScan().plan().files().stream()
+                        .map(ManifestEntry::file)
+                        .collect(Collectors.toList());
+
+        List<DataEvolutionSplitRead.FieldBunch> fieldGroups =
+                DataEvolutionSplitRead.splitFieldBunch(filesMetas, key -> 0);
+
+        assertThat(fieldGroups.size()).isEqualTo(2);
+        assertThat(fieldGroups.get(0).size()).isEqualTo(1);
+        assertThat(fieldGroups.get(1).size()).isEqualTo(8);
+
+        readDefault(
+                row -> {
+                    integer.incrementAndGet();
+                    if (integer.get() % 50 == 0) {
+                        
assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+                    }
+                });
+
+        assertThat(integer.get()).isEqualTo(100);
+    }
+
+    @Test
+    public void testMultiBatch() throws Exception {
+        createTableDefault();
+
+        commitDefault(writeDataDefault(100, 2));
+
+        AtomicInteger integer = new AtomicInteger(0);
+
+        List<DataFileMeta> filesMetas =
+                getTableDefault().store().newScan().plan().files().stream()
+                        .map(ManifestEntry::file)
+                        .collect(Collectors.toList());
+
+        List<List<DataFileMeta>> batches = 
DataEvolutionSplitGenerator.split(filesMetas);
+        assertThat(batches.size()).isEqualTo(2);
+        for (List<DataFileMeta> batch : batches) {
+            List<DataEvolutionSplitRead.FieldBunch> fieldGroups =
+                    DataEvolutionSplitRead.splitFieldBunch(batch, file -> 0);
+            assertThat(fieldGroups.size()).isEqualTo(2);
+            assertThat(fieldGroups.get(0).size()).isEqualTo(1);
+            assertThat(fieldGroups.get(1).size()).isEqualTo(8);
+        }
+
+        readDefault(
+                row -> {
+                    if (integer.getAndIncrement() % 50 == 0) {
+                        
assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+                    }
+                });
+        assertThat(integer.get()).isEqualTo(200);
+    }
+
+    protected Schema schemaDefault() {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.BLOB());
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        return schemaBuilder.build();
+    }
+
+    protected InternalRow dataDefault(int time, int size) {
+        return GenericRow.of(RANDOM.nextInt(), randomString(), new 
BlobData(blobBytes));
+    }
+
+    @Override
+    protected byte[] randomBytes() {
+        byte[] binary = new byte[2 * 1024 * 1024];
+        RANDOM.nextBytes(binary);
+        return binary;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index ca0b65b2ee..a4dfdfd733 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -45,7 +45,7 @@ import org.apache.paimon.utils.StatsCollectorFactories;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.LinkedList;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -76,7 +76,8 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         null);
         FileFormat fileFormat = FileFormat.fromIdentifier(format, new 
Options());
         LinkedList<DataFileMeta> toCompact = new LinkedList<>();
-        CoreOptions options = new CoreOptions(new HashMap<>());
+        CoreOptions options =
+                new 
CoreOptions(Collections.singletonMap("metadata.stats-mode", "truncate(16)"));
         AppendOnlyWriter appendOnlyWriter =
                 new AppendOnlyWriter(
                         LocalFileIO.create(),
@@ -97,8 +98,7 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         false,
                         CoreOptions.FILE_COMPRESSION.defaultValue(),
                         CompressOptions.defaultOptions(),
-                        StatsCollectorFactories.createStatsFactories(
-                                "truncate(16)", options, 
SCHEMA.getFieldNames()),
+                        new StatsCollectorFactories(options),
                         MemorySize.MAX_VALUE,
                         new FileIndexOptions(),
                         true,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
new file mode 100644
index 0000000000..ce38d6e4b5
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link DataEvolutionSplitRead.BlobBunch}. */
+public class DataEvolutionReadTest {
+
+    private DataEvolutionSplitRead.BlobBunch blobBunch;
+
+    @BeforeEach
+    public void setUp() {
+        blobBunch = new DataEvolutionSplitRead.BlobBunch(Long.MAX_VALUE);
+    }
+
+    @Test
+    public void testAddSingleBlobEntry() {
+        DataFileMeta blobEntry = createBlobFile("blob1", 0L, 100L, 1L);
+
+        blobBunch.add(blobEntry);
+
+        assertThat(blobBunch.files).hasSize(1);
+        assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
+        assertThat(blobBunch.rowCount()).isEqualTo(100);
+        assertThat(blobBunch.firstRowId()).isEqualTo(0);
+        assertThat(blobBunch.writeCols()).isEqualTo(Arrays.asList("blob_col"));
+    }
+
+    @Test
+    public void testAddBlobEntryAndTail() {
+        DataFileMeta blobEntry = createBlobFile("blob1", 0, 100, 1);
+        DataFileMeta blobTail = createBlobFile("blob2", 100, 200, 1);
+
+        blobBunch.add(blobEntry);
+        blobBunch.add(blobTail);
+
+        assertThat(blobBunch.files).hasSize(2);
+        assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
+        assertThat(blobBunch.files.get(1)).isEqualTo(blobTail);
+        assertThat(blobBunch.rowCount()).isEqualTo(300);
+        assertThat(blobBunch.firstRowId()).isEqualTo(0);
+        assertThat(blobBunch.writeCols()).isEqualTo(Arrays.asList("blob_col"));
+        assertThat(blobBunch.schemaId()).isEqualTo(0L);
+    }
+
+    @Test
+    public void testAddNonBlobFileThrowsException() {
+        DataFileMeta normalFile = createNormalFile("normal1.parquet", 0, 100, 
1, 0L);
+
+        assertThatThrownBy(() -> blobBunch.add(normalFile))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("Only blob file can be added to a blob bunch.");
+    }
+
+    @Test
+    public void testAddBlobFileWithSameFirstRowId() {
+        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
+        DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 2);
+
+        blobBunch.add(blobEntry1);
+        // Adding file with same firstRowId but higher sequence number should 
throw exception
+        assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Blob file with same first row id should have 
decreasing sequence number.");
+    }
+
+    @Test
+    public void testAddBlobFileWithSameFirstRowIdAndLowerSequenceNumber() {
+        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2);
+        DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 1);
+
+        blobBunch.add(blobEntry1);
+        // Adding file with same firstRowId and lower sequence number should 
be ignored
+        blobBunch.add(blobEntry2);
+
+        assertThat(blobBunch.files).hasSize(1);
+        assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1);
+    }
+
+    @Test
+    public void testAddBlobFileWithOverlappingRowId() {
+        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2);
+        DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 1);
+
+        blobBunch.add(blobEntry1);
+        // Adding file with overlapping row id and lower sequence number 
should be ignored
+        blobBunch.add(blobEntry2);
+
+        assertThat(blobBunch.files).hasSize(1);
+        assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1);
+    }
+
+    @Test
+    public void testAddBlobFileWithOverlappingRowIdAndHigherSequenceNumber() {
+        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
+        DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 2);
+
+        blobBunch.add(blobEntry1);
+        // Adding file with overlapping row id and higher sequence number 
should throw exception
+        assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Blob file with overlapping row id should have 
decreasing sequence number.");
+    }
+
+    @Test
+    public void testAddBlobFileWithNonContinuousRowId() {
+        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
+        DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1);
+
+        blobBunch.add(blobEntry1);
+        // Adding file with non-continuous row id should throw exception
+        assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("Blob file first row id should be continuous, 
expect 100 but got 200");
+    }
+
+    @Test
+    public void testAddBlobFileWithDifferentWriteCols() {
+        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
+        DataFileMeta blobEntry2 =
+                createBlobFileWithCols("blob2", 100, 200, 1, 
Arrays.asList("different_col"));
+
+        blobBunch.add(blobEntry1);
+        // Adding file with different write columns should throw exception
+        assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("All files in a blob bunch should have the same 
write columns.");
+    }
+
+    @Test
+    public void testComplexBlobBunchScenario() {
+        // Create a complex scenario with multiple blob entries and a tail
+        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
+        DataFileMeta blobEntry2 = createBlobFile("blob2", 100, 200, 1);
+        DataFileMeta blobEntry3 = createBlobFile("blob3", 300, 300, 1);
+        DataFileMeta blobTail = createBlobFile("blob4", 600, 400, 1);
+
+        blobBunch.add(blobEntry1);
+        blobBunch.add(blobEntry2);
+        blobBunch.add(blobEntry3);
+        blobBunch.add(blobTail);
+
+        assertThat(blobBunch.files).hasSize(4);
+        assertThat(blobBunch.rowCount()).isEqualTo(1000);
+        assertThat(blobBunch.firstRowId()).isEqualTo(0);
+        assertThat(blobBunch.writeCols()).isEqualTo(Arrays.asList("blob_col"));
+    }
+
+    @Test
+    public void testComplexBlobBunchScenario2() {
+
+        List<DataFileMeta> waited = new ArrayList<>();
+
+        waited.add(createNormalFile("others.parquet", 0, 1000, 1, 1));
+        waited.add(createBlobFile("blob1", 0, 1000, 1));
+        waited.add(createBlobFile("blob2", 0, 500, 2));
+        waited.add(createBlobFile("blob3", 500, 250, 2));
+        waited.add(createBlobFile("blob4", 750, 250, 2));
+        waited.add(createBlobFile("blob5", 0, 100, 3));
+        waited.add(createBlobFile("blob6", 100, 400, 3));
+        waited.add(createBlobFile("blob7", 750, 100, 3));
+        waited.add(createBlobFile("blob8", 850, 150, 3));
+        waited.add(createBlobFile("blob9", 100, 650, 4));
+
+        List<List<DataFileMeta>> batches = 
DataEvolutionSplitGenerator.split(waited);
+        assertThat(batches.size()).isEqualTo(1);
+
+        List<DataFileMeta> batch = batches.get(0);
+
+        assertThat(batch.get(1).fileName()).contains("blob5"); // pick
+        assertThat(batch.get(2).fileName()).contains("blob2"); // skip
+        assertThat(batch.get(3).fileName()).contains("blob1"); // skip
+        assertThat(batch.get(4).fileName()).contains("blob9"); // pick
+        assertThat(batch.get(5).fileName()).contains("blob6"); // skip
+        assertThat(batch.get(6).fileName()).contains("blob3"); // skip
+        assertThat(batch.get(7).fileName()).contains("blob7"); // pick
+        assertThat(batch.get(8).fileName()).contains("blob4"); // skip
+        assertThat(batch.get(9).fileName()).contains("blob8"); // pick
+
+        List<DataEvolutionSplitRead.FieldBunch> fieldBunches =
+                DataEvolutionSplitRead.splitFieldBunch(batch, file -> 0);
+        assertThat(fieldBunches.size()).isEqualTo(2);
+
+        DataEvolutionSplitRead.BlobBunch blobBunch = 
fieldBunches.get(1).fileOrBlob.getRight();
+        assertThat(blobBunch.files).hasSize(4);
+        assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
+        assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
+        assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
+        assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
+    }
+
+    @Test
+    public void testComplexBlobBunchScenario3() {
+
+        List<DataFileMeta> waited = new ArrayList<>();
+
+        waited.add(createNormalFile("others.parquet", 0, 1000, 1, 1));
+        waited.add(createBlobFile("blob1", 0, 1000, 1));
+        waited.add(createBlobFile("blob2", 0, 500, 2));
+        waited.add(createBlobFile("blob3", 500, 250, 2));
+        waited.add(createBlobFile("blob4", 750, 250, 2));
+        waited.add(createBlobFile("blob5", 0, 100, 3));
+        waited.add(createBlobFile("blob6", 100, 400, 3));
+        waited.add(createBlobFile("blob7", 750, 100, 3));
+        waited.add(createBlobFile("blob8", 850, 150, 3));
+        waited.add(createBlobFile("blob9", 100, 650, 4));
+        waited.add(
+                createBlobFileWithCols("blob11", 0, 1000, 1, 
Collections.singletonList("blobc2")));
+        waited.add(
+                createBlobFileWithCols("blob12", 0, 500, 2, 
Collections.singletonList("blobc2")));
+        waited.add(
+                createBlobFileWithCols("blob13", 500, 250, 2, 
Collections.singletonList("blobc2")));
+        waited.add(
+                createBlobFileWithCols("blob14", 750, 250, 2, 
Collections.singletonList("blobc2")));
+        waited.add(
+                createBlobFileWithCols("blob15", 0, 100, 3, 
Collections.singletonList("blobc2")));
+        waited.add(
+                createBlobFileWithCols("blob16", 100, 400, 3, 
Collections.singletonList("blobc2")));
+        waited.add(
+                createBlobFileWithCols("blob17", 750, 100, 3, 
Collections.singletonList("blobc2")));
+        waited.add(
+                createBlobFileWithCols("blob18", 850, 150, 3, 
Collections.singletonList("blobc2")));
+        waited.add(
+                createBlobFileWithCols("blob19", 100, 650, 4, 
Collections.singletonList("blobc2")));
+
+        List<List<DataFileMeta>> batches = 
DataEvolutionSplitGenerator.split(waited);
+        assertThat(batches.size()).isEqualTo(1);
+
+        List<DataFileMeta> batch = batches.get(0);
+
+        List<DataEvolutionSplitRead.FieldBunch> fieldBunches =
+                DataEvolutionSplitRead.splitFieldBunch(
+                        batch, file -> file.writeCols().get(0).hashCode());
+        assertThat(fieldBunches.size()).isEqualTo(3);
+
+        DataEvolutionSplitRead.BlobBunch blobBunch = 
fieldBunches.get(1).fileOrBlob.getRight();
+        assertThat(blobBunch.files).hasSize(4);
+        assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
+        assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
+        assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
+        assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
+
+        blobBunch = fieldBunches.get(2).fileOrBlob.getRight();
+        assertThat(blobBunch.files).hasSize(4);
+        assertThat(blobBunch.files.get(0).fileName()).contains("blob15");
+        assertThat(blobBunch.files.get(1).fileName()).contains("blob19");
+        assertThat(blobBunch.files.get(2).fileName()).contains("blob17");
+        assertThat(blobBunch.files.get(3).fileName()).contains("blob18");
+    }
+
+    /** Creates a blob file with the specified parameters. */
+    private DataFileMeta createBlobFile(
+            String fileName, long firstRowId, long rowCount, long 
maxSequenceNumber) {
+        return createBlobFileWithCols(
+                fileName, firstRowId, rowCount, maxSequenceNumber, 
Arrays.asList("blob_col"));
+    }
+
+    /** Creates a blob file with custom write columns. */
+    private DataFileMeta createBlobFileWithCols(
+            String fileName,
+            long firstRowId,
+            long rowCount,
+            long maxSequenceNumber,
+            List<String> writeCols) {
+        return DataFileMeta.create(
+                fileName + ".blob",
+                rowCount,
+                rowCount,
+                DataFileMeta.EMPTY_MIN_KEY,
+                DataFileMeta.EMPTY_MAX_KEY,
+                SimpleStats.EMPTY_STATS,
+                SimpleStats.EMPTY_STATS,
+                0,
+                maxSequenceNumber,
+                0L,
+                DataFileMeta.DUMMY_LEVEL,
+                Collections.emptyList(),
+                Timestamp.fromEpochMillis(System.currentTimeMillis()),
+                rowCount,
+                null,
+                FileSource.APPEND,
+                null,
+                null,
+                firstRowId,
+                writeCols);
+    }
+
+    /** Creates a normal (non-blob) file for testing. */
+    private DataFileMeta createNormalFile(
+            String fileName,
+            long firstRowId,
+            long rowCount,
+            long maxSequenceNumber,
+            long schemaId) {
+        return DataFileMeta.create(
+                fileName,
+                rowCount,
+                rowCount,
+                DataFileMeta.EMPTY_MIN_KEY,
+                DataFileMeta.EMPTY_MAX_KEY,
+                SimpleStats.EMPTY_STATS,
+                SimpleStats.EMPTY_STATS,
+                0L,
+                maxSequenceNumber,
+                schemaId,
+                DataFileMeta.DUMMY_LEVEL,
+                Collections.emptyList(),
+                Timestamp.fromEpochMillis(System.currentTimeMillis()),
+                rowCount,
+                null,
+                FileSource.APPEND,
+                null,
+                null,
+                firstRowId,
+                null);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index aedf5553e0..2c45323088 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -57,6 +57,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 
@@ -195,6 +196,14 @@ public abstract class TableTestBase {
         return rows;
     }
 
+    protected void readDefault(Consumer<InternalRow> consumer) throws 
Exception {
+        Table table = getTableDefault();
+        ReadBuilder readBuilder = table.newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        reader.forEachRemaining(consumer);
+    }
+
     public void createTableDefault() throws Exception {
         catalog.createTable(identifier(), schemaDefault(), true);
     }
diff --git 
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
 
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
index d05cf3844f..09a4216e34 100644
--- 
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
+++ 
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
@@ -18,3 +18,4 @@ org.apache.paimon.format.orc.OrcFileFormatFactory
 org.apache.paimon.format.parquet.ParquetFileFormatFactory
 org.apache.paimon.format.csv.CsvFileFormatFactory
 org.apache.paimon.format.json.JsonFileFormatFactory
+org.apache.paimon.format.blob.BlobFileFormatFactory

Reply via email to