This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 527f2ab5f0 [core] Support multiple blob fields definition (#7105)
527f2ab5f0 is described below

commit 527f2ab5f013d62ee69b97b50553d7f0f5262cce
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jan 23 12:07:32 2026 +0800

    [core] Support multiple blob fields definition (#7105)
---
 .../main/java/org/apache/paimon/CoreOptions.java   |   9 ++
 .../paimon/append/MultipleBlobFileWriter.java      | 134 +++++++++++++++++++++
 ...tedFileWriter.java => ProjectedFileWriter.java} |   4 +-
 .../paimon/append/RollingBlobFileWriter.java       | 114 ++++--------------
 .../operation/commit/RowTrackingCommitUtils.java   |  12 +-
 .../org/apache/paimon/schema/SchemaValidation.java |  10 +-
 .../paimon/append/MultipleBlobTableTest.java       | 112 +++++++++++++++++
 .../apache/paimon/schema/SchemaValidationTest.java |  30 +++--
 .../java/org/apache/paimon/flink/FlinkCatalog.java |   6 +-
 .../apache/paimon/flink/FlinkRowDataWithBlob.java  |  11 +-
 .../flink/source/FileStoreSourceSplitReader.java   |  16 +--
 .../org/apache/paimon/flink/BlobTableITCase.java   |  22 +++-
 .../java/org/apache/paimon/spark/SparkCatalog.java |   5 +-
 .../paimon/spark/data/SparkInternalRow.scala       |  18 ++-
 .../apache/spark/sql/paimon/shims/SparkShim.scala  |   2 +-
 .../org/apache/paimon/spark/sql/BlobTestBase.scala |  23 ++++
 .../spark/data/Spark3InternalRowWithBlob.scala     |   4 +-
 .../apache/spark/sql/paimon/shims/Spark3Shim.scala |   4 +-
 .../spark/data/Spark4InternalRowWithBlob.scala     |   4 +-
 .../apache/spark/sql/paimon/shims/Spark4Shim.scala |   4 +-
 20 files changed, 399 insertions(+), 145 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 1b8170a9e2..e29da00961 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2958,6 +2958,15 @@ public class CoreOptions implements Serializable {
                 .orElse(Collections.emptyList());
     }
 
+    public static List<String> blobField(Map<String, String> options) {
+        String string = options.get(BLOB_FIELD.key());
+        if (string == null) {
+            return Collections.emptyList();
+        }
+
+        return 
Arrays.stream(string.split(",")).map(String::trim).collect(Collectors.toList());
+    }
+
     public boolean sequenceFieldSortOrderIsAscending() {
         return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
new file mode 100644
index 0000000000..f9edb0ef65
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.data.BlobConsumer;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.format.blob.BlobFileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.io.RollingFileWriterImpl;
+import org.apache.paimon.io.RowDataFileWriter;
+import org.apache.paimon.io.SingleFileWriter;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.BlobType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.LongCounter;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static java.util.Collections.singletonList;
+
+/** A blob file writer that writes blob files. */
+public class MultipleBlobFileWriter implements Closeable {
+
+    private final List<BlobProjectedFileWriter> blobWriters;
+
+    public MultipleBlobFileWriter(
+            FileIO fileIO,
+            long schemaId,
+            RowType writeSchema,
+            DataFilePathFactory pathFactory,
+            LongCounter seqNumCounter,
+            FileSource fileSource,
+            boolean asyncFileWrite,
+            boolean statsDenseStore,
+            long targetFileSize,
+            @Nullable BlobConsumer blobConsumer) {
+        RowType blobRowType = BlobType.splitBlob(writeSchema).getRight();
+        this.blobWriters = new ArrayList<>();
+        for (String blobFieldName : blobRowType.getFieldNames()) {
+            BlobFileFormat blobFileFormat = new BlobFileFormat();
+            blobFileFormat.setWriteConsumer(blobConsumer);
+            blobWriters.add(
+                    new BlobProjectedFileWriter(
+                            () ->
+                                    new RowDataFileWriter(
+                                            fileIO,
+                                            
RollingFileWriter.createFileWriterContext(
+                                                    blobFileFormat,
+                                                    
writeSchema.project(blobFieldName),
+                                                    new 
SimpleColStatsCollector.Factory[] {
+                                                        
NoneSimpleColStatsCollector::new
+                                                    },
+                                                    "none"),
+                                            pathFactory.newBlobPath(),
+                                            writeSchema.project(blobFieldName),
+                                            schemaId,
+                                            seqNumCounter,
+                                            new FileIndexOptions(),
+                                            fileSource,
+                                            asyncFileWrite,
+                                            statsDenseStore,
+                                            pathFactory.isExternalPath(),
+                                            singletonList(blobFieldName)),
+                            targetFileSize,
+                            
writeSchema.projectIndexes(singletonList(blobFieldName))));
+        }
+    }
+
+    public void write(InternalRow row) throws IOException {
+        for (BlobProjectedFileWriter blobWriter : blobWriters) {
+            blobWriter.write(row);
+        }
+    }
+
+    public void abort() {
+        for (BlobProjectedFileWriter blobWriter : blobWriters) {
+            blobWriter.abort();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        for (BlobProjectedFileWriter blobWriter : blobWriters) {
+            blobWriter.close();
+        }
+    }
+
+    public List<DataFileMeta> result() throws IOException {
+        List<DataFileMeta> results = new ArrayList<>();
+        for (BlobProjectedFileWriter blobWriter : blobWriters) {
+            results.addAll(blobWriter.result());
+        }
+        return results;
+    }
+
+    private static class BlobProjectedFileWriter
+            extends ProjectedFileWriter<
+                    RollingFileWriterImpl<InternalRow, DataFileMeta>, 
List<DataFileMeta>> {
+        public BlobProjectedFileWriter(
+                Supplier<? extends SingleFileWriter<InternalRow, 
DataFileMeta>> writerFactory,
+                long targetFileSize,
+                int[] projection) {
+            super(new RollingFileWriterImpl<>(writerFactory, targetFileSize), 
projection);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/PeojectedFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/ProjectedFileWriter.java
similarity index 94%
rename from 
paimon-core/src/main/java/org/apache/paimon/append/PeojectedFileWriter.java
rename to 
paimon-core/src/main/java/org/apache/paimon/append/ProjectedFileWriter.java
index db9140d3ea..2067986622 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/PeojectedFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/ProjectedFileWriter.java
@@ -31,13 +31,13 @@ import java.io.IOException;
  * <p>This is useful when the physical file schema is a subset of the logical 
write schema. The
  * projection is evaluated via {@link ProjectedRow} to avoid object 
allocations.
  */
-public class PeojectedFileWriter<T extends FileWriter<InternalRow, R>, R>
+public class ProjectedFileWriter<T extends FileWriter<InternalRow, R>, R>
         implements FileWriter<InternalRow, R> {
 
     private final T writer;
     private final ProjectedRow projectedRow;
 
-    public PeojectedFileWriter(T writer, int[] projection) {
+    public ProjectedFileWriter(T writer, int[] projection) {
         this.writer = writer;
         this.projectedRow = ProjectedRow.from(projection);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index 583c0ee44b..f565212d1f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -22,23 +22,18 @@ import org.apache.paimon.data.BlobConsumer;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.blob.BlobFileFormat;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.BundleRecords;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.FileWriterAbortExecutor;
 import org.apache.paimon.io.RollingFileWriter;
-import org.apache.paimon.io.RollingFileWriterImpl;
 import org.apache.paimon.io.RowDataFileWriter;
 import org.apache.paimon.io.SingleFileWriter;
 import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
-import org.apache.paimon.statistics.SimpleColStatsCollector;
 import org.apache.paimon.types.BlobType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.LongCounter;
-import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StatsCollectorFactories;
 
@@ -50,11 +45,11 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Supplier;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
 /**
  * A rolling file writer that handles both normal data and blob data. This 
writer creates separate
  * files for normal columns and blob columns, managing their lifecycle and 
ensuring consistency
@@ -82,27 +77,20 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
     /** Constant for checking rolling condition periodically. */
     private static final long CHECK_ROLLING_RECORD_CNT = 1000L;
 
-    /** Expected number of blob fields in a table. */
-    private static final int EXPECTED_BLOB_FIELD_COUNT = 1;
-
     // Core components
     private final Supplier<
-                    PeojectedFileWriter<SingleFileWriter<InternalRow, 
DataFileMeta>, DataFileMeta>>
+                    ProjectedFileWriter<SingleFileWriter<InternalRow, 
DataFileMeta>, DataFileMeta>>
             writerFactory;
-    private final Supplier<
-                    PeojectedFileWriter<
-                            RollingFileWriterImpl<InternalRow, DataFileMeta>, 
List<DataFileMeta>>>
-            blobWriterFactory;
+    private final Supplier<MultipleBlobFileWriter> blobWriterFactory;
     private final long targetFileSize;
 
     // State management
     private final List<FileWriterAbortExecutor> closedWriters;
     private final List<DataFileMeta> results;
-    private PeojectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>, 
DataFileMeta>
+
+    private ProjectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>, 
DataFileMeta>
             currentWriter;
-    private PeojectedFileWriter<
-                    RollingFileWriterImpl<InternalRow, DataFileMeta>, 
List<DataFileMeta>>
-            blobWriter;
+    private MultipleBlobFileWriter blobWriter;
     private long recordCount = 0;
     private boolean closed = false;
 
@@ -127,18 +115,13 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
         this.results = new ArrayList<>();
         this.closedWriters = new ArrayList<>();
 
-        // Split schema into normal and blob parts
-        Pair<RowType, RowType> typeWithBlob = BlobType.splitBlob(writeSchema);
-        RowType normalRowType = typeWithBlob.getLeft();
-        RowType blobType = typeWithBlob.getRight();
-
         // Initialize writer factory for normal data
         this.writerFactory =
                 createNormalWriterFactory(
                         fileIO,
                         schemaId,
                         fileFormat,
-                        normalRowType,
+                        BlobType.splitBlob(writeSchema).getLeft(),
                         writeSchema,
                         pathFactory,
                         seqNumCounter,
@@ -152,10 +135,9 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
         // Initialize blob writer
         this.blobWriterFactory =
                 () ->
-                        createBlobWriter(
+                        new MultipleBlobFileWriter(
                                 fileIO,
                                 schemaId,
-                                blobType,
                                 writeSchema,
                                 pathFactory,
                                 seqNumCounter,
@@ -168,7 +150,7 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
 
     /** Creates a factory for normal data writers. */
     private static Supplier<
-                    PeojectedFileWriter<SingleFileWriter<InternalRow, 
DataFileMeta>, DataFileMeta>>
+                    ProjectedFileWriter<SingleFileWriter<InternalRow, 
DataFileMeta>, DataFileMeta>>
             createNormalWriterFactory(
                     FileIO fileIO,
                     long schemaId,
@@ -206,61 +188,10 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
                             statsDenseStore,
                             pathFactory.isExternalPath(),
                             normalColumnNames);
-            return new PeojectedFileWriter<>(rowDataFileWriter, 
projectionNormalFields);
+            return new ProjectedFileWriter<>(rowDataFileWriter, 
projectionNormalFields);
         };
     }
 
-    /** Creates a blob writer for handling blob data. */
-    private static PeojectedFileWriter<
-                    RollingFileWriterImpl<InternalRow, DataFileMeta>, 
List<DataFileMeta>>
-            createBlobWriter(
-                    FileIO fileIO,
-                    long schemaId,
-                    RowType blobType,
-                    RowType writeSchema,
-                    DataFilePathFactory pathFactory,
-                    LongCounter seqNumCounter,
-                    FileSource fileSource,
-                    boolean asyncFileWrite,
-                    boolean statsDenseStore,
-                    long targetFileSize,
-                    @Nullable BlobConsumer blobConsumer) {
-        BlobFileFormat blobFileFormat = new BlobFileFormat();
-        blobFileFormat.setWriteConsumer(blobConsumer);
-        List<String> blobNames = blobType.getFieldNames();
-
-        // Validate blob field count
-        checkArgument(
-                blobNames.size() == EXPECTED_BLOB_FIELD_COUNT,
-                "Limit exactly one blob fields in one paimon table yet.");
-
-        int[] blobProjection = writeSchema.projectIndexes(blobNames);
-        return new PeojectedFileWriter<>(
-                new RollingFileWriterImpl<>(
-                        () ->
-                                new RowDataFileWriter(
-                                        fileIO,
-                                        
RollingFileWriter.createFileWriterContext(
-                                                blobFileFormat,
-                                                blobType,
-                                                new 
SimpleColStatsCollector.Factory[] {
-                                                    
NoneSimpleColStatsCollector::new
-                                                },
-                                                "none"),
-                                        pathFactory.newBlobPath(),
-                                        blobType,
-                                        schemaId,
-                                        seqNumCounter,
-                                        new FileIndexOptions(),
-                                        fileSource,
-                                        asyncFileWrite,
-                                        statsDenseStore,
-                                        pathFactory.isExternalPath(),
-                                        blobNames),
-                        targetFileSize),
-                blobProjection);
-    }
-
     /**
      * Writes a single row to both normal and blob writers. Automatically 
handles file rolling when
      * target size is reached.
@@ -397,14 +328,21 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
     private void validateFileConsistency(
             DataFileMeta mainDataFileMeta, List<DataFileMeta> blobTaggedMetas) 
{
         long mainRowCount = mainDataFileMeta.rowCount();
-        long blobRowCount = 
blobTaggedMetas.stream().mapToLong(DataFileMeta::rowCount).sum();
-
-        if (mainRowCount != blobRowCount) {
-            throw new IllegalStateException(
-                    String.format(
-                            "This is a bug: The row count of main file and 
blob files does not match. "
-                                    + "Main file: %s (row count: %d), blob 
files: %s (total row count: %d)",
-                            mainDataFileMeta, mainRowCount, blobTaggedMetas, 
blobRowCount));
+
+        Map<String, Long> blobRowCounts = new HashMap<>();
+        for (DataFileMeta file : blobTaggedMetas) {
+            long count = file.rowCount();
+            blobRowCounts.compute(file.writeCols().get(0), (k, v) -> v == null 
? count : v + count);
+        }
+        for (String blobFieldName : blobRowCounts.keySet()) {
+            long blobRowCount = blobRowCounts.get(blobFieldName);
+            if (mainRowCount != blobRowCount) {
+                throw new IllegalStateException(
+                        String.format(
+                                "This is a bug: The row count of main file and 
blob file does not match. "
+                                        + "Main file: %s (row count: %d), blob 
field name: %s (row count: %d)",
+                                mainDataFileMeta, mainRowCount, blobFieldName, 
blobRowCount));
+            }
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
index d6eeae17f0..36e7beec85 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
@@ -23,7 +23,9 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.table.SpecialFields;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
@@ -60,7 +62,8 @@ public class RowTrackingCommitUtils {
         }
         // assign row id for new files
         long start = firstRowIdStart;
-        long blobStart = firstRowIdStart;
+        long blobStartDefault = firstRowIdStart;
+        Map<String, Long> blobStarts = new HashMap<>();
         for (ManifestEntry entry : deltaFiles) {
             Optional<FileSource> fileSource = entry.file().fileSource();
             checkArgument(
@@ -74,6 +77,8 @@ public class RowTrackingCommitUtils {
                     && !containsRowId) {
                 long rowCount = entry.file().rowCount();
                 if (isBlobFile(entry.file().fileName())) {
+                    String blobFieldName = entry.file().writeCols().get(0);
+                    long blobStart = blobStarts.getOrDefault(blobFieldName, 
blobStartDefault);
                     if (blobStart >= start) {
                         throw new IllegalStateException(
                                 String.format(
@@ -81,10 +86,11 @@ public class RowTrackingCommitUtils {
                                         blobStart, start));
                     }
                     rowIdAssigned.add(entry.assignFirstRowId(blobStart));
-                    blobStart += rowCount;
+                    blobStarts.put(blobFieldName, blobStart + rowCount);
                 } else {
                     rowIdAssigned.add(entry.assignFirstRowId(start));
-                    blobStart = start;
+                    blobStartDefault = start;
+                    blobStarts.clear();
                     start += rowCount;
                 }
             } else {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 062f71f057..3817cdef08 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -40,6 +40,7 @@ import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.MultisetType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
@@ -600,17 +601,14 @@ public class SchemaValidation {
                     "Data evolution config must disabled with 
deletion-vectors.enabled");
         }
 
-        List<String> blobNames =
-                
BlobType.splitBlob(schema.logicalRowType()).getRight().getFieldNames();
+        Pair<RowType, RowType> normalAndBlobType = 
BlobType.splitBlob(schema.logicalRowType());
+        List<String> blobNames = normalAndBlobType.getRight().getFieldNames();
         if (!blobNames.isEmpty()) {
             checkArgument(
                     options.dataEvolutionEnabled(),
                     "Data evolution config must enabled for table with BLOB 
type column.");
             checkArgument(
-                    blobNames.size() == 1,
-                    "Table with BLOB type column only support one BLOB 
column.");
-            checkArgument(
-                    schema.fields().size() > 1,
+                    normalAndBlobType.getLeft().getFieldCount() > 0,
                     "Table with BLOB type column must have other normal 
columns.");
             checkArgument(
                     !schema.partitionKeys().contains(blobNames.get(0)),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
new file mode 100644
index 0000000000..97f621a720
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static 
org.apache.paimon.operation.DataEvolutionSplitRead.splitFieldBunches;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for table with blob. */
+public class MultipleBlobTableTest extends TableTestBase {
+
+    private final byte[] blobBytes1 = randomBytes();
+    private final byte[] blobBytes2 = randomBytes();
+
+    @Test
+    public void testBasic() throws Exception {
+        createTableDefault();
+
+        commitDefault(writeDataDefault(1000, 1));
+
+        AtomicInteger integer = new AtomicInteger(0);
+
+        FileStoreTable table = getTableDefault();
+        List<DataFileMeta> filesMetas =
+                table.store().newScan().plan().files().stream()
+                        .map(ManifestEntry::file)
+                        .collect(Collectors.toList());
+
+        RowType rowType = table.schema().logicalRowType();
+        List<FieldBunch> fieldGroups =
+                splitFieldBunches(
+                        filesMetas, file -> 
rowType.getField(file.writeCols().get(0)).id());
+
+        assertThat(fieldGroups.size()).isEqualTo(3);
+        assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
+        assertThat(fieldGroups.get(1).files().size()).isEqualTo(10);
+        assertThat(fieldGroups.get(2).files().size()).isEqualTo(10);
+
+        readDefault(
+                row -> {
+                    integer.incrementAndGet();
+                    if (integer.get() % 50 == 0) {
+                        
assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes1);
+                        
assertThat(row.getBlob(3).toData()).isEqualTo(blobBytes2);
+                    }
+                });
+
+        assertThat(integer.get()).isEqualTo(1000);
+    }
+
+    protected Schema schemaDefault() {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.BLOB());
+        schemaBuilder.column("f3", DataTypes.BLOB());
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        return schemaBuilder.build();
+    }
+
+    protected InternalRow dataDefault(int time, int size) {
+        return GenericRow.of(
+                RANDOM.nextInt(),
+                BinaryString.fromBytes(randomBytes()),
+                new BlobData(blobBytes1),
+                new BlobData(blobBytes2));
+    }
+
+    @Override
+    protected byte[] randomBytes() {
+        byte[] binary = new byte[2 * 1024 * 124];
+        RANDOM.nextBytes(binary);
+        return binary;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index 1af8c83346..57613fe67f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -28,11 +28,12 @@ import org.junit.jupiter.api.Test;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
 import static org.apache.paimon.schema.SchemaValidation.validateTableSchema;
@@ -49,8 +50,8 @@ class SchemaValidationTest {
                         new DataField(1, "f1", DataTypes.INT()),
                         new DataField(2, "f2", DataTypes.INT()),
                         new DataField(3, "f3", DataTypes.STRING()));
-        List<String> partitionKeys = Collections.singletonList("f0");
-        List<String> primaryKeys = Collections.singletonList("f1");
+        List<String> partitionKeys = singletonList("f0");
+        List<String> primaryKeys = singletonList("f1");
         options.put(BUCKET.key(), String.valueOf(-1));
         validateTableSchema(
                 new TableSchema(1, fields, 10, partitionKeys, primaryKeys, 
options, ""));
@@ -64,8 +65,7 @@ class SchemaValidationTest {
                         new DataField(2, "f2", DataTypes.BLOB()),
                         new DataField(3, "f3", DataTypes.STRING()));
         options.put(BUCKET.key(), String.valueOf(-1));
-        validateTableSchema(
-                new TableSchema(1, fields, 10, partitions, 
Collections.emptyList(), options, ""));
+        validateTableSchema(new TableSchema(1, fields, 10, partitions, 
emptyList(), options, ""));
     }
 
     @Test
@@ -139,17 +139,29 @@ class SchemaValidationTest {
     public void testBlobTableSchema() {
         Map<String, String> options = new HashMap<>();
 
-        // 1. must set row-tracking = true and data-evolution = true
         options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
-        assertThatThrownBy(() -> validateBlobSchema(options, 
Collections.emptyList()))
+        assertThatThrownBy(() -> validateBlobSchema(options, emptyList()))
                 .hasMessage("Data evolution config must enabled for table with 
BLOB type column.");
 
-        // 2. blob column cannot be part of partition keys
         options.clear();
         options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
         options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
-        assertThatThrownBy(() -> validateBlobSchema(options, 
Collections.singletonList("f2")))
+        assertThatThrownBy(() -> validateBlobSchema(options, 
singletonList("f2")))
                 .hasMessage("The BLOB type column can not be part of partition 
keys.");
+
+        assertThatThrownBy(
+                        () -> {
+                            validateTableSchema(
+                                    new TableSchema(
+                                            1,
+                                            singletonList(new DataField(2, 
"f2", DataTypes.BLOB())),
+                                            10,
+                                            emptyList(),
+                                            emptyList(),
+                                            options,
+                                            ""));
+                        })
+                .hasMessage("Table with BLOB type column must have other 
normal columns.");
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 8bba96cf78..2239114cca 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -1012,8 +1012,8 @@ public class FlinkCatalog extends AbstractCatalog {
         RowType rowType = (RowType) 
schema.toPhysicalRowDataType().getLogicalType();
 
         Map<String, String> options = new HashMap<>(catalogTable.getOptions());
-        String blobName = options.get(CoreOptions.BLOB_FIELD.key());
-        if (blobName != null) {
+        List<String> blobFields = CoreOptions.blobField(options);
+        if (!blobFields.isEmpty()) {
             checkArgument(
                     
options.containsKey(CoreOptions.DATA_EVOLUTION_ENABLED.key()),
                     "When setting '"
@@ -1041,7 +1041,7 @@ public class FlinkCatalog extends AbstractCatalog {
                         field ->
                                 schemaBuilder.column(
                                         field.getName(),
-                                        field.getName().equals(blobName)
+                                        blobFields.contains(field.getName())
                                                 ? toBlobType(field.getType())
                                                 : toDataType(field.getType()),
                                         columnComments.get(field.getName())));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java
index 07cad1bc59..dc38588a7c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java
@@ -21,21 +21,24 @@ package org.apache.paimon.flink;
 import org.apache.paimon.data.Blob;
 import org.apache.paimon.data.InternalRow;
 
+import java.util.Set;
+
 /** Convert to Flink row data with blob. */
 public class FlinkRowDataWithBlob extends FlinkRowData {
 
-    private final int blobField;
+    private final Set<Integer> blobFields;
     private final boolean blobAsDescriptor;
 
-    public FlinkRowDataWithBlob(InternalRow row, int blobField, boolean 
blobAsDescriptor) {
+    public FlinkRowDataWithBlob(
+            InternalRow row, Set<Integer> blobFields, boolean 
blobAsDescriptor) {
         super(row);
-        this.blobField = blobField;
+        this.blobFields = blobFields;
         this.blobAsDescriptor = blobAsDescriptor;
     }
 
     @Override
     public byte[] getBinary(int pos) {
-        if (pos == blobField) {
+        if (blobFields.contains(pos)) {
             Blob blob = row.getBlob(pos);
             return blobAsDescriptor ? blob.toDescriptor().serialize() : 
blob.toData();
         } else {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
index 1955e6c439..b49b9adb94 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
@@ -48,6 +48,7 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Objects;
 import java.util.Queue;
@@ -267,19 +268,20 @@ public class FileStoreSourceSplitReader
 
         private final MutableRecordAndPosition<RowData> recordAndPosition =
                 new MutableRecordAndPosition<>();
-        @Nullable private final Integer blobField;
+        private final Set<Integer> blobFields;
 
         private FileStoreRecordIterator(@Nullable RowType rowType) {
-            this.blobField = rowType == null ? null : blobFieldIndex(rowType);
+            this.blobFields = rowType == null ? Collections.emptySet() : 
blobFieldIndex(rowType);
         }
 
-        private Integer blobFieldIndex(RowType rowType) {
+        private Set<Integer> blobFieldIndex(RowType rowType) {
+            Set<Integer> result = new HashSet<>();
             for (int i = 0; i < rowType.getFieldCount(); i++) {
                 if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) {
-                    return i;
+                    result.add(i);
                 }
             }
-            return null;
+            return result;
         }
 
         public FileStoreRecordIterator replace(RecordIterator<InternalRow> 
iterator) {
@@ -305,9 +307,9 @@ public class FileStoreSourceSplitReader
             }
 
             recordAndPosition.setNext(
-                    blobField == null
+                    blobFields.isEmpty()
                             ? new FlinkRowData(row)
-                            : new FlinkRowDataWithBlob(row, blobField, 
blobAsDescriptor));
+                            : new FlinkRowDataWithBlob(row, blobFields, 
blobAsDescriptor));
             currentNumRead++;
             if (limiter != null) {
                 limiter.increment();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index f879b9c67a..1d192cb437 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -49,7 +49,8 @@ public class BlobTableITCase extends CatalogITCaseBase {
     protected List<String> ddl() {
         return Arrays.asList(
                 "CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING, 
picture BYTES) WITH ('row-tracking.enabled'='true', 
'data-evolution.enabled'='true', 'blob-field'='picture')",
-                "CREATE TABLE IF NOT EXISTS blob_table_descriptor (id INT, 
data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 
'data-evolution.enabled'='true', 'blob-field'='picture', 
'blob-as-descriptor'='true')");
+                "CREATE TABLE IF NOT EXISTS blob_table_descriptor (id INT, 
data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 
'data-evolution.enabled'='true', 'blob-field'='picture', 
'blob-as-descriptor'='true')",
+                "CREATE TABLE IF NOT EXISTS multiple_blob_table (id INT, data 
STRING, pic1 BYTES, pic2 BYTES) WITH ('row-tracking.enabled'='true', 
'data-evolution.enabled'='true', 'blob-field'='pic1,pic2')");
     }
 
     @Test
@@ -64,6 +65,25 @@ public class BlobTableITCase extends CatalogITCaseBase {
         assertThat(batchSql("SELECT file_path FROM 
`blob_table$files`").size()).isEqualTo(2);
     }
 
+    @Test
+    public void testMultipleBlobs() {
+        batchSql("SELECT * FROM multiple_blob_table");
+        batchSql("INSERT INTO multiple_blob_table VALUES (1, 'paimon', 
X'48656C6C6F', X'5945')");
+        assertThat(batchSql("SELECT * FROM multiple_blob_table"))
+                .containsExactlyInAnyOrder(
+                        Row.of(
+                                1,
+                                "paimon",
+                                new byte[] {72, 101, 108, 108, 111},
+                                new byte[] {89, 69}));
+        assertThat(batchSql("SELECT pic1 FROM multiple_blob_table"))
+                .containsExactlyInAnyOrder(Row.of(new byte[] {72, 101, 108, 
108, 111}));
+        assertThat(batchSql("SELECT pic2 FROM multiple_blob_table"))
+                .containsExactlyInAnyOrder(Row.of(new byte[] {89, 69}));
+        assertThat(batchSql("SELECT file_path FROM 
`multiple_blob_table$files`").size())
+                .isEqualTo(3);
+    }
+
     @Test
     public void testWriteBlobAsDescriptor() throws Exception {
         byte[] blobData = new byte[1024 * 1024];
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 5b5a18c80e..9d12201c91 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -83,7 +83,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
@@ -457,7 +456,7 @@ public class SparkCatalog extends SparkBaseCatalog
     private Schema toInitialSchema(
             StructType schema, Transform[] partitions, Map<String, String> 
properties) {
         Map<String, String> normalizedProperties = new HashMap<>(properties);
-        String blobFieldName = properties.get(CoreOptions.BLOB_FIELD.key());
+        List<String> blobFields = CoreOptions.blobField(properties);
         String provider = properties.get(TableCatalog.PROP_PROVIDER);
         if (!usePaimon(provider)) {
             if (isFormatTable(provider)) {
@@ -491,7 +490,7 @@ public class SparkCatalog extends SparkBaseCatalog
         for (StructField field : schema.fields()) {
             String name = field.name();
             DataType type;
-            if (Objects.equals(blobFieldName, name)) {
+            if (blobFields.contains(name)) {
                 checkArgument(
                         field.dataType() instanceof 
org.apache.spark.sql.types.BinaryType,
                         "The type of blob field must be binary");
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
index 5abacf9c37..ae504b2412 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.types.{DataTypeRoot, RowType}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.paimon.shims.SparkShimLoader
 
-import java.util.OptionalInt
+import scala.collection.mutable
 
 abstract class SparkInternalRow extends InternalRow {
   def replace(row: org.apache.paimon.data.InternalRow): SparkInternalRow
@@ -36,26 +36,24 @@ object SparkInternalRow {
   }
 
   def create(rowType: RowType, blobAsDescriptor: Boolean): SparkInternalRow = {
-    val fieldIndex = blobFieldIndex(rowType)
-    if (fieldIndex.isPresent) {
-      SparkShimLoader.shim.createSparkInternalRowWithBlob(
-        rowType,
-        fieldIndex.getAsInt,
-        blobAsDescriptor)
+    val blobs = blobFields(rowType)
+    if (blobs.nonEmpty) {
+      SparkShimLoader.shim.createSparkInternalRowWithBlob(rowType, blobs, 
blobAsDescriptor)
     } else {
       SparkShimLoader.shim.createSparkInternalRow(rowType)
     }
   }
 
-  private def blobFieldIndex(rowType: RowType): OptionalInt = {
+  private def blobFields(rowType: RowType): Set[Int] = {
     var i: Int = 0
+    val blobFields = new mutable.HashSet[Int]()
     while (i < rowType.getFieldCount) {
       if (rowType.getTypeAt(i).getTypeRoot.equals(DataTypeRoot.BLOB)) {
-        return OptionalInt.of(i)
+        blobFields.add(i)
       }
       i += 1
     }
-    OptionalInt.empty()
+    blobFields.toSet
   }
 
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
index 98296a4006..d85fd9a427 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
@@ -53,7 +53,7 @@ trait SparkShim {
 
   def createSparkInternalRowWithBlob(
       rowType: RowType,
-      blobFieldIndex: Int,
+      blobFields: Set[Int],
       blobAsDescriptor: Boolean): SparkInternalRow
 
   def createSparkArrayData(elementType: DataType): SparkArrayData
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index e05c9ce644..4bc1246612 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -46,6 +46,29 @@ class BlobTestBase extends PaimonSparkTestBase {
         sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"),
         Seq(Row(1, "paimon", Array[Byte](72, 101, 108, 108, 111), 0, 1))
       )
+
+      checkAnswer(
+        sql("SELECT COUNT(*) FROM `t$files`"),
+        Seq(Row(2))
+      )
+    }
+  }
+
+  test("Blob: test multiple blobs") {
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, data STRING, pic1 BINARY, pic2 BINARY) 
TBLPROPERTIES (" +
+        "'row-tracking.enabled'='true', 'data-evolution.enabled'='true', 
'blob-field'='pic1,pic2')")
+      sql("INSERT INTO t VALUES (1, 'paimon', X'48656C6C6F', X'5945')")
+
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"),
+        Seq(Row(1, "paimon", Array[Byte](72, 101, 108, 108, 111), 
Array[Byte](89, 69), 0, 1))
+      )
+
+      checkAnswer(
+        sql("SELECT COUNT(*) FROM `t$files`"),
+        Seq(Row(3))
+      )
     }
   }
 
diff --git 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala
index 4c7c978f8c..6c1dbd9d21 100644
--- 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala
@@ -23,11 +23,11 @@ import 
org.apache.paimon.utils.InternalRowUtils.copyInternalRow
 
 import org.apache.spark.sql.catalyst.InternalRow
 
-class Spark3InternalRowWithBlob(rowType: RowType, blobFieldIndex: Int, 
blobAsDescriptor: Boolean)
+class Spark3InternalRowWithBlob(rowType: RowType, blobFields: Set[Int], 
blobAsDescriptor: Boolean)
   extends Spark3InternalRow(rowType) {
 
   override def getBinary(ordinal: Int): Array[Byte] = {
-    if (ordinal == blobFieldIndex) {
+    if (blobFields.contains(ordinal)) {
       if (blobAsDescriptor) {
         row.getBlob(ordinal).toDescriptor.serialize()
       } else {
diff --git 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
index 70011e14c3..df84150ca2 100644
--- 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
@@ -56,9 +56,9 @@ class Spark3Shim extends SparkShim {
 
   override def createSparkInternalRowWithBlob(
       rowType: RowType,
-      blobFieldIndex: Int,
+      blobFields: Set[Int],
       blobAsDescriptor: Boolean): SparkInternalRow = {
-    new Spark3InternalRowWithBlob(rowType, blobFieldIndex, blobAsDescriptor)
+    new Spark3InternalRowWithBlob(rowType, blobFields, blobAsDescriptor)
   }
 
   override def createSparkArrayData(elementType: DataType): SparkArrayData = {
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
index 0a208daea2..2a120e5b4c 100644
--- 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
@@ -25,11 +25,11 @@ import 
org.apache.paimon.utils.InternalRowUtils.copyInternalRow
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.unsafe.types.VariantVal
 
-class Spark4InternalRowWithBlob(rowType: RowType, blobFieldIndex: Int, 
blobAsDescriptor: Boolean)
+class Spark4InternalRowWithBlob(rowType: RowType, blobFields: Set[Int], 
blobAsDescriptor: Boolean)
   extends Spark4InternalRow(rowType) {
 
   override def getBinary(ordinal: Int): Array[Byte] = {
-    if (ordinal == blobFieldIndex) {
+    if (blobFields.contains(ordinal)) {
       if (blobAsDescriptor) {
         row.getBlob(ordinal).toDescriptor.serialize()
       } else {
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index ad36acfb26..24782ffdd2 100644
--- 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -57,9 +57,9 @@ class Spark4Shim extends SparkShim {
 
   override def createSparkInternalRowWithBlob(
       rowType: RowType,
-      blobFieldIndex: Int,
+      blobFields: Set[Int],
       blobAsDescriptor: Boolean): SparkInternalRow = {
-    new Spark4InternalRowWithBlob(rowType, blobFieldIndex, blobAsDescriptor)
+    new Spark4InternalRowWithBlob(rowType, blobFields, blobAsDescriptor)
   }
 
   override def createSparkArrayData(elementType: DataType): SparkArrayData = {

Reply via email to