This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 527f2ab5f0 [core] Support multiple blob fields definition (#7105)
527f2ab5f0 is described below
commit 527f2ab5f013d62ee69b97b50553d7f0f5262cce
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jan 23 12:07:32 2026 +0800
[core] Support multiple blob fields definition (#7105)
---
.../main/java/org/apache/paimon/CoreOptions.java | 9 ++
.../paimon/append/MultipleBlobFileWriter.java | 134 +++++++++++++++++++++
...tedFileWriter.java => ProjectedFileWriter.java} | 4 +-
.../paimon/append/RollingBlobFileWriter.java | 114 ++++--------------
.../operation/commit/RowTrackingCommitUtils.java | 12 +-
.../org/apache/paimon/schema/SchemaValidation.java | 10 +-
.../paimon/append/MultipleBlobTableTest.java | 112 +++++++++++++++++
.../apache/paimon/schema/SchemaValidationTest.java | 30 +++--
.../java/org/apache/paimon/flink/FlinkCatalog.java | 6 +-
.../apache/paimon/flink/FlinkRowDataWithBlob.java | 11 +-
.../flink/source/FileStoreSourceSplitReader.java | 16 +--
.../org/apache/paimon/flink/BlobTableITCase.java | 22 +++-
.../java/org/apache/paimon/spark/SparkCatalog.java | 5 +-
.../paimon/spark/data/SparkInternalRow.scala | 18 ++-
.../apache/spark/sql/paimon/shims/SparkShim.scala | 2 +-
.../org/apache/paimon/spark/sql/BlobTestBase.scala | 23 ++++
.../spark/data/Spark3InternalRowWithBlob.scala | 4 +-
.../apache/spark/sql/paimon/shims/Spark3Shim.scala | 4 +-
.../spark/data/Spark4InternalRowWithBlob.scala | 4 +-
.../apache/spark/sql/paimon/shims/Spark4Shim.scala | 4 +-
20 files changed, 399 insertions(+), 145 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 1b8170a9e2..e29da00961 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2958,6 +2958,15 @@ public class CoreOptions implements Serializable {
.orElse(Collections.emptyList());
}
+ public static List<String> blobField(Map<String, String> options) {
+ String string = options.get(BLOB_FIELD.key());
+ if (string == null) {
+ return Collections.emptyList();
+ }
+
+ return
Arrays.stream(string.split(",")).map(String::trim).collect(Collectors.toList());
+ }
+
public boolean sequenceFieldSortOrderIsAscending() {
return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
new file mode 100644
index 0000000000..f9edb0ef65
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.data.BlobConsumer;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.format.blob.BlobFileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.io.RollingFileWriterImpl;
+import org.apache.paimon.io.RowDataFileWriter;
+import org.apache.paimon.io.SingleFileWriter;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.BlobType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.LongCounter;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static java.util.Collections.singletonList;
+
+/** A blob file writer that writes blob files. */
+public class MultipleBlobFileWriter implements Closeable {
+
+ private final List<BlobProjectedFileWriter> blobWriters;
+
+ public MultipleBlobFileWriter(
+ FileIO fileIO,
+ long schemaId,
+ RowType writeSchema,
+ DataFilePathFactory pathFactory,
+ LongCounter seqNumCounter,
+ FileSource fileSource,
+ boolean asyncFileWrite,
+ boolean statsDenseStore,
+ long targetFileSize,
+ @Nullable BlobConsumer blobConsumer) {
+ RowType blobRowType = BlobType.splitBlob(writeSchema).getRight();
+ this.blobWriters = new ArrayList<>();
+ for (String blobFieldName : blobRowType.getFieldNames()) {
+ BlobFileFormat blobFileFormat = new BlobFileFormat();
+ blobFileFormat.setWriteConsumer(blobConsumer);
+ blobWriters.add(
+ new BlobProjectedFileWriter(
+ () ->
+ new RowDataFileWriter(
+ fileIO,
+
RollingFileWriter.createFileWriterContext(
+ blobFileFormat,
+
writeSchema.project(blobFieldName),
+ new
SimpleColStatsCollector.Factory[] {
+
NoneSimpleColStatsCollector::new
+ },
+ "none"),
+ pathFactory.newBlobPath(),
+ writeSchema.project(blobFieldName),
+ schemaId,
+ seqNumCounter,
+ new FileIndexOptions(),
+ fileSource,
+ asyncFileWrite,
+ statsDenseStore,
+ pathFactory.isExternalPath(),
+ singletonList(blobFieldName)),
+ targetFileSize,
+
writeSchema.projectIndexes(singletonList(blobFieldName))));
+ }
+ }
+
+ public void write(InternalRow row) throws IOException {
+ for (BlobProjectedFileWriter blobWriter : blobWriters) {
+ blobWriter.write(row);
+ }
+ }
+
+ public void abort() {
+ for (BlobProjectedFileWriter blobWriter : blobWriters) {
+ blobWriter.abort();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ for (BlobProjectedFileWriter blobWriter : blobWriters) {
+ blobWriter.close();
+ }
+ }
+
+ public List<DataFileMeta> result() throws IOException {
+ List<DataFileMeta> results = new ArrayList<>();
+ for (BlobProjectedFileWriter blobWriter : blobWriters) {
+ results.addAll(blobWriter.result());
+ }
+ return results;
+ }
+
+ private static class BlobProjectedFileWriter
+ extends ProjectedFileWriter<
+ RollingFileWriterImpl<InternalRow, DataFileMeta>,
List<DataFileMeta>> {
+ public BlobProjectedFileWriter(
+ Supplier<? extends SingleFileWriter<InternalRow,
DataFileMeta>> writerFactory,
+ long targetFileSize,
+ int[] projection) {
+ super(new RollingFileWriterImpl<>(writerFactory, targetFileSize),
projection);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/PeojectedFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/ProjectedFileWriter.java
similarity index 94%
rename from
paimon-core/src/main/java/org/apache/paimon/append/PeojectedFileWriter.java
rename to
paimon-core/src/main/java/org/apache/paimon/append/ProjectedFileWriter.java
index db9140d3ea..2067986622 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/PeojectedFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/ProjectedFileWriter.java
@@ -31,13 +31,13 @@ import java.io.IOException;
* <p>This is useful when the physical file schema is a subset of the logical
write schema. The
* projection is evaluated via {@link ProjectedRow} to avoid object
allocations.
*/
-public class PeojectedFileWriter<T extends FileWriter<InternalRow, R>, R>
+public class ProjectedFileWriter<T extends FileWriter<InternalRow, R>, R>
implements FileWriter<InternalRow, R> {
private final T writer;
private final ProjectedRow projectedRow;
- public PeojectedFileWriter(T writer, int[] projection) {
+ public ProjectedFileWriter(T writer, int[] projection) {
this.writer = writer;
this.projectedRow = ProjectedRow.from(projection);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index 583c0ee44b..f565212d1f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -22,23 +22,18 @@ import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.blob.BlobFileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.FileWriterAbortExecutor;
import org.apache.paimon.io.RollingFileWriter;
-import org.apache.paimon.io.RollingFileWriterImpl;
import org.apache.paimon.io.RowDataFileWriter;
import org.apache.paimon.io.SingleFileWriter;
import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
-import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.LongCounter;
-import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StatsCollectorFactories;
@@ -50,11 +45,11 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.function.Supplier;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
/**
* A rolling file writer that handles both normal data and blob data. This
writer creates separate
* files for normal columns and blob columns, managing their lifecycle and
ensuring consistency
@@ -82,27 +77,20 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
/** Constant for checking rolling condition periodically. */
private static final long CHECK_ROLLING_RECORD_CNT = 1000L;
- /** Expected number of blob fields in a table. */
- private static final int EXPECTED_BLOB_FIELD_COUNT = 1;
-
// Core components
private final Supplier<
- PeojectedFileWriter<SingleFileWriter<InternalRow,
DataFileMeta>, DataFileMeta>>
+ ProjectedFileWriter<SingleFileWriter<InternalRow,
DataFileMeta>, DataFileMeta>>
writerFactory;
- private final Supplier<
- PeojectedFileWriter<
- RollingFileWriterImpl<InternalRow, DataFileMeta>,
List<DataFileMeta>>>
- blobWriterFactory;
+ private final Supplier<MultipleBlobFileWriter> blobWriterFactory;
private final long targetFileSize;
// State management
private final List<FileWriterAbortExecutor> closedWriters;
private final List<DataFileMeta> results;
- private PeojectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>,
DataFileMeta>
+
+ private ProjectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>,
DataFileMeta>
currentWriter;
- private PeojectedFileWriter<
- RollingFileWriterImpl<InternalRow, DataFileMeta>,
List<DataFileMeta>>
- blobWriter;
+ private MultipleBlobFileWriter blobWriter;
private long recordCount = 0;
private boolean closed = false;
@@ -127,18 +115,13 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
this.results = new ArrayList<>();
this.closedWriters = new ArrayList<>();
- // Split schema into normal and blob parts
- Pair<RowType, RowType> typeWithBlob = BlobType.splitBlob(writeSchema);
- RowType normalRowType = typeWithBlob.getLeft();
- RowType blobType = typeWithBlob.getRight();
-
// Initialize writer factory for normal data
this.writerFactory =
createNormalWriterFactory(
fileIO,
schemaId,
fileFormat,
- normalRowType,
+ BlobType.splitBlob(writeSchema).getLeft(),
writeSchema,
pathFactory,
seqNumCounter,
@@ -152,10 +135,9 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
// Initialize blob writer
this.blobWriterFactory =
() ->
- createBlobWriter(
+ new MultipleBlobFileWriter(
fileIO,
schemaId,
- blobType,
writeSchema,
pathFactory,
seqNumCounter,
@@ -168,7 +150,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
/** Creates a factory for normal data writers. */
private static Supplier<
- PeojectedFileWriter<SingleFileWriter<InternalRow,
DataFileMeta>, DataFileMeta>>
+ ProjectedFileWriter<SingleFileWriter<InternalRow,
DataFileMeta>, DataFileMeta>>
createNormalWriterFactory(
FileIO fileIO,
long schemaId,
@@ -206,61 +188,10 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
statsDenseStore,
pathFactory.isExternalPath(),
normalColumnNames);
- return new PeojectedFileWriter<>(rowDataFileWriter,
projectionNormalFields);
+ return new ProjectedFileWriter<>(rowDataFileWriter,
projectionNormalFields);
};
}
- /** Creates a blob writer for handling blob data. */
- private static PeojectedFileWriter<
- RollingFileWriterImpl<InternalRow, DataFileMeta>,
List<DataFileMeta>>
- createBlobWriter(
- FileIO fileIO,
- long schemaId,
- RowType blobType,
- RowType writeSchema,
- DataFilePathFactory pathFactory,
- LongCounter seqNumCounter,
- FileSource fileSource,
- boolean asyncFileWrite,
- boolean statsDenseStore,
- long targetFileSize,
- @Nullable BlobConsumer blobConsumer) {
- BlobFileFormat blobFileFormat = new BlobFileFormat();
- blobFileFormat.setWriteConsumer(blobConsumer);
- List<String> blobNames = blobType.getFieldNames();
-
- // Validate blob field count
- checkArgument(
- blobNames.size() == EXPECTED_BLOB_FIELD_COUNT,
- "Limit exactly one blob fields in one paimon table yet.");
-
- int[] blobProjection = writeSchema.projectIndexes(blobNames);
- return new PeojectedFileWriter<>(
- new RollingFileWriterImpl<>(
- () ->
- new RowDataFileWriter(
- fileIO,
-
RollingFileWriter.createFileWriterContext(
- blobFileFormat,
- blobType,
- new
SimpleColStatsCollector.Factory[] {
-
NoneSimpleColStatsCollector::new
- },
- "none"),
- pathFactory.newBlobPath(),
- blobType,
- schemaId,
- seqNumCounter,
- new FileIndexOptions(),
- fileSource,
- asyncFileWrite,
- statsDenseStore,
- pathFactory.isExternalPath(),
- blobNames),
- targetFileSize),
- blobProjection);
- }
-
/**
* Writes a single row to both normal and blob writers. Automatically
handles file rolling when
* target size is reached.
@@ -397,14 +328,21 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
private void validateFileConsistency(
DataFileMeta mainDataFileMeta, List<DataFileMeta> blobTaggedMetas)
{
long mainRowCount = mainDataFileMeta.rowCount();
- long blobRowCount =
blobTaggedMetas.stream().mapToLong(DataFileMeta::rowCount).sum();
-
- if (mainRowCount != blobRowCount) {
- throw new IllegalStateException(
- String.format(
- "This is a bug: The row count of main file and
blob files does not match. "
- + "Main file: %s (row count: %d), blob
files: %s (total row count: %d)",
- mainDataFileMeta, mainRowCount, blobTaggedMetas,
blobRowCount));
+
+ Map<String, Long> blobRowCounts = new HashMap<>();
+ for (DataFileMeta file : blobTaggedMetas) {
+ long count = file.rowCount();
+ blobRowCounts.compute(file.writeCols().get(0), (k, v) -> v == null
? count : v + count);
+ }
+ for (String blobFieldName : blobRowCounts.keySet()) {
+ long blobRowCount = blobRowCounts.get(blobFieldName);
+ if (mainRowCount != blobRowCount) {
+ throw new IllegalStateException(
+ String.format(
+ "This is a bug: The row count of main file and
blob file does not match. "
+ + "Main file: %s (row count: %d), blob
field name: %s (row count: %d)",
+ mainDataFileMeta, mainRowCount, blobFieldName,
blobRowCount));
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
index d6eeae17f0..36e7beec85 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
@@ -23,7 +23,9 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.table.SpecialFields;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
@@ -60,7 +62,8 @@ public class RowTrackingCommitUtils {
}
// assign row id for new files
long start = firstRowIdStart;
- long blobStart = firstRowIdStart;
+ long blobStartDefault = firstRowIdStart;
+ Map<String, Long> blobStarts = new HashMap<>();
for (ManifestEntry entry : deltaFiles) {
Optional<FileSource> fileSource = entry.file().fileSource();
checkArgument(
@@ -74,6 +77,8 @@ public class RowTrackingCommitUtils {
&& !containsRowId) {
long rowCount = entry.file().rowCount();
if (isBlobFile(entry.file().fileName())) {
+ String blobFieldName = entry.file().writeCols().get(0);
+ long blobStart = blobStarts.getOrDefault(blobFieldName,
blobStartDefault);
if (blobStart >= start) {
throw new IllegalStateException(
String.format(
@@ -81,10 +86,11 @@ public class RowTrackingCommitUtils {
blobStart, start));
}
rowIdAssigned.add(entry.assignFirstRowId(blobStart));
- blobStart += rowCount;
+ blobStarts.put(blobFieldName, blobStart + rowCount);
} else {
rowIdAssigned.add(entry.assignFirstRowId(start));
- blobStart = start;
+ blobStartDefault = start;
+ blobStarts.clear();
start += rowCount;
}
} else {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 062f71f057..3817cdef08 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -40,6 +40,7 @@ import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
@@ -600,17 +601,14 @@ public class SchemaValidation {
"Data evolution config must disabled with
deletion-vectors.enabled");
}
- List<String> blobNames =
-
BlobType.splitBlob(schema.logicalRowType()).getRight().getFieldNames();
+ Pair<RowType, RowType> normalAndBlobType =
BlobType.splitBlob(schema.logicalRowType());
+ List<String> blobNames = normalAndBlobType.getRight().getFieldNames();
if (!blobNames.isEmpty()) {
checkArgument(
options.dataEvolutionEnabled(),
"Data evolution config must enabled for table with BLOB
type column.");
checkArgument(
- blobNames.size() == 1,
- "Table with BLOB type column only support one BLOB
column.");
- checkArgument(
- schema.fields().size() > 1,
+ normalAndBlobType.getLeft().getFieldCount() > 0,
"Table with BLOB type column must have other normal
columns.");
checkArgument(
!schema.partitionKeys().contains(blobNames.get(0)),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
new file mode 100644
index 0000000000..97f621a720
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static
org.apache.paimon.operation.DataEvolutionSplitRead.splitFieldBunches;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for table with blob. */
+public class MultipleBlobTableTest extends TableTestBase {
+
+ private final byte[] blobBytes1 = randomBytes();
+ private final byte[] blobBytes2 = randomBytes();
+
+ @Test
+ public void testBasic() throws Exception {
+ createTableDefault();
+
+ commitDefault(writeDataDefault(1000, 1));
+
+ AtomicInteger integer = new AtomicInteger(0);
+
+ FileStoreTable table = getTableDefault();
+ List<DataFileMeta> filesMetas =
+ table.store().newScan().plan().files().stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList());
+
+ RowType rowType = table.schema().logicalRowType();
+ List<FieldBunch> fieldGroups =
+ splitFieldBunches(
+ filesMetas, file ->
rowType.getField(file.writeCols().get(0)).id());
+
+ assertThat(fieldGroups.size()).isEqualTo(3);
+ assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
+ assertThat(fieldGroups.get(1).files().size()).isEqualTo(10);
+ assertThat(fieldGroups.get(2).files().size()).isEqualTo(10);
+
+ readDefault(
+ row -> {
+ integer.incrementAndGet();
+ if (integer.get() % 50 == 0) {
+
assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes1);
+
assertThat(row.getBlob(3).toData()).isEqualTo(blobBytes2);
+ }
+ });
+
+ assertThat(integer.get()).isEqualTo(1000);
+ }
+
+ protected Schema schemaDefault() {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.column("f3", DataTypes.BLOB());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ return schemaBuilder.build();
+ }
+
+ protected InternalRow dataDefault(int time, int size) {
+ return GenericRow.of(
+ RANDOM.nextInt(),
+ BinaryString.fromBytes(randomBytes()),
+ new BlobData(blobBytes1),
+ new BlobData(blobBytes2));
+ }
+
+ @Override
+ protected byte[] randomBytes() {
+ byte[] binary = new byte[2 * 1024 * 124];
+ RANDOM.nextBytes(binary);
+ return binary;
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index 1af8c83346..57613fe67f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -28,11 +28,12 @@ import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
import static org.apache.paimon.schema.SchemaValidation.validateTableSchema;
@@ -49,8 +50,8 @@ class SchemaValidationTest {
new DataField(1, "f1", DataTypes.INT()),
new DataField(2, "f2", DataTypes.INT()),
new DataField(3, "f3", DataTypes.STRING()));
- List<String> partitionKeys = Collections.singletonList("f0");
- List<String> primaryKeys = Collections.singletonList("f1");
+ List<String> partitionKeys = singletonList("f0");
+ List<String> primaryKeys = singletonList("f1");
options.put(BUCKET.key(), String.valueOf(-1));
validateTableSchema(
new TableSchema(1, fields, 10, partitionKeys, primaryKeys,
options, ""));
@@ -64,8 +65,7 @@ class SchemaValidationTest {
new DataField(2, "f2", DataTypes.BLOB()),
new DataField(3, "f3", DataTypes.STRING()));
options.put(BUCKET.key(), String.valueOf(-1));
- validateTableSchema(
- new TableSchema(1, fields, 10, partitions,
Collections.emptyList(), options, ""));
+ validateTableSchema(new TableSchema(1, fields, 10, partitions,
emptyList(), options, ""));
}
@Test
@@ -139,17 +139,29 @@ class SchemaValidationTest {
public void testBlobTableSchema() {
Map<String, String> options = new HashMap<>();
- // 1. must set row-tracking = true and data-evolution = true
options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
- assertThatThrownBy(() -> validateBlobSchema(options,
Collections.emptyList()))
+ assertThatThrownBy(() -> validateBlobSchema(options, emptyList()))
.hasMessage("Data evolution config must enabled for table with
BLOB type column.");
- // 2. blob column cannot be part of partition keys
options.clear();
options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
- assertThatThrownBy(() -> validateBlobSchema(options,
Collections.singletonList("f2")))
+ assertThatThrownBy(() -> validateBlobSchema(options,
singletonList("f2")))
.hasMessage("The BLOB type column can not be part of partition
keys.");
+
+ assertThatThrownBy(
+ () -> {
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ singletonList(new DataField(2,
"f2", DataTypes.BLOB())),
+ 10,
+ emptyList(),
+ emptyList(),
+ options,
+ ""));
+ })
+ .hasMessage("Table with BLOB type column must have other
normal columns.");
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 8bba96cf78..2239114cca 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -1012,8 +1012,8 @@ public class FlinkCatalog extends AbstractCatalog {
RowType rowType = (RowType)
schema.toPhysicalRowDataType().getLogicalType();
Map<String, String> options = new HashMap<>(catalogTable.getOptions());
- String blobName = options.get(CoreOptions.BLOB_FIELD.key());
- if (blobName != null) {
+ List<String> blobFields = CoreOptions.blobField(options);
+ if (!blobFields.isEmpty()) {
checkArgument(
options.containsKey(CoreOptions.DATA_EVOLUTION_ENABLED.key()),
"When setting '"
@@ -1041,7 +1041,7 @@ public class FlinkCatalog extends AbstractCatalog {
field ->
schemaBuilder.column(
field.getName(),
- field.getName().equals(blobName)
+ blobFields.contains(field.getName())
? toBlobType(field.getType())
: toDataType(field.getType()),
columnComments.get(field.getName())));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java
index 07cad1bc59..dc38588a7c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java
@@ -21,21 +21,24 @@ package org.apache.paimon.flink;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.InternalRow;
+import java.util.Set;
+
/** Convert to Flink row data with blob. */
public class FlinkRowDataWithBlob extends FlinkRowData {
- private final int blobField;
+ private final Set<Integer> blobFields;
private final boolean blobAsDescriptor;
- public FlinkRowDataWithBlob(InternalRow row, int blobField, boolean
blobAsDescriptor) {
+ public FlinkRowDataWithBlob(
+ InternalRow row, Set<Integer> blobFields, boolean
blobAsDescriptor) {
super(row);
- this.blobField = blobField;
+ this.blobFields = blobFields;
this.blobAsDescriptor = blobAsDescriptor;
}
@Override
public byte[] getBinary(int pos) {
- if (pos == blobField) {
+ if (blobFields.contains(pos)) {
Blob blob = row.getBlob(pos);
return blobAsDescriptor ? blob.toDescriptor().serialize() :
blob.toData();
} else {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
index 1955e6c439..b49b9adb94 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
@@ -48,6 +48,7 @@ import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Queue;
@@ -267,19 +268,20 @@ public class FileStoreSourceSplitReader
private final MutableRecordAndPosition<RowData> recordAndPosition =
new MutableRecordAndPosition<>();
- @Nullable private final Integer blobField;
+ private final Set<Integer> blobFields;
private FileStoreRecordIterator(@Nullable RowType rowType) {
- this.blobField = rowType == null ? null : blobFieldIndex(rowType);
+ this.blobFields = rowType == null ? Collections.emptySet() :
blobFieldIndex(rowType);
}
- private Integer blobFieldIndex(RowType rowType) {
+ private Set<Integer> blobFieldIndex(RowType rowType) {
+ Set<Integer> result = new HashSet<>();
for (int i = 0; i < rowType.getFieldCount(); i++) {
if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) {
- return i;
+ result.add(i);
}
}
- return null;
+ return result;
}
public FileStoreRecordIterator replace(RecordIterator<InternalRow>
iterator) {
@@ -305,9 +307,9 @@ public class FileStoreSourceSplitReader
}
recordAndPosition.setNext(
- blobField == null
+ blobFields.isEmpty()
? new FlinkRowData(row)
- : new FlinkRowDataWithBlob(row, blobField,
blobAsDescriptor));
+ : new FlinkRowDataWithBlob(row, blobFields,
blobAsDescriptor));
currentNumRead++;
if (limiter != null) {
limiter.increment();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index f879b9c67a..1d192cb437 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -49,7 +49,8 @@ public class BlobTableITCase extends CatalogITCaseBase {
protected List<String> ddl() {
return Arrays.asList(
"CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING,
picture BYTES) WITH ('row-tracking.enabled'='true',
'data-evolution.enabled'='true', 'blob-field'='picture')",
- "CREATE TABLE IF NOT EXISTS blob_table_descriptor (id INT,
data STRING, picture BYTES) WITH ('row-tracking.enabled'='true',
'data-evolution.enabled'='true', 'blob-field'='picture',
'blob-as-descriptor'='true')");
+ "CREATE TABLE IF NOT EXISTS blob_table_descriptor (id INT,
data STRING, picture BYTES) WITH ('row-tracking.enabled'='true',
'data-evolution.enabled'='true', 'blob-field'='picture',
'blob-as-descriptor'='true')",
+ "CREATE TABLE IF NOT EXISTS multiple_blob_table (id INT, data
STRING, pic1 BYTES, pic2 BYTES) WITH ('row-tracking.enabled'='true',
'data-evolution.enabled'='true', 'blob-field'='pic1,pic2')");
}
@Test
@@ -64,6 +65,25 @@ public class BlobTableITCase extends CatalogITCaseBase {
assertThat(batchSql("SELECT file_path FROM
`blob_table$files`").size()).isEqualTo(2);
}
+ @Test
+ public void testMultipleBlobs() {
+ batchSql("SELECT * FROM multiple_blob_table");
+ batchSql("INSERT INTO multiple_blob_table VALUES (1, 'paimon',
X'48656C6C6F', X'5945')");
+ assertThat(batchSql("SELECT * FROM multiple_blob_table"))
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1,
+ "paimon",
+ new byte[] {72, 101, 108, 108, 111},
+ new byte[] {89, 69}));
+ assertThat(batchSql("SELECT pic1 FROM multiple_blob_table"))
+ .containsExactlyInAnyOrder(Row.of(new byte[] {72, 101, 108,
108, 111}));
+ assertThat(batchSql("SELECT pic2 FROM multiple_blob_table"))
+ .containsExactlyInAnyOrder(Row.of(new byte[] {89, 69}));
+ assertThat(batchSql("SELECT file_path FROM
`multiple_blob_table$files`").size())
+ .isEqualTo(3);
+ }
+
@Test
public void testWriteBlobAsDescriptor() throws Exception {
byte[] blobData = new byte[1024 * 1024];
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 5b5a18c80e..9d12201c91 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -83,7 +83,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
@@ -457,7 +456,7 @@ public class SparkCatalog extends SparkBaseCatalog
private Schema toInitialSchema(
StructType schema, Transform[] partitions, Map<String, String>
properties) {
Map<String, String> normalizedProperties = new HashMap<>(properties);
- String blobFieldName = properties.get(CoreOptions.BLOB_FIELD.key());
+ List<String> blobFields = CoreOptions.blobField(properties);
String provider = properties.get(TableCatalog.PROP_PROVIDER);
if (!usePaimon(provider)) {
if (isFormatTable(provider)) {
@@ -491,7 +490,7 @@ public class SparkCatalog extends SparkBaseCatalog
for (StructField field : schema.fields()) {
String name = field.name();
DataType type;
- if (Objects.equals(blobFieldName, name)) {
+ if (blobFields.contains(name)) {
checkArgument(
field.dataType() instanceof
org.apache.spark.sql.types.BinaryType,
"The type of blob field must be binary");
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
index 5abacf9c37..ae504b2412 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.types.{DataTypeRoot, RowType}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.paimon.shims.SparkShimLoader
-import java.util.OptionalInt
+import scala.collection.mutable
abstract class SparkInternalRow extends InternalRow {
def replace(row: org.apache.paimon.data.InternalRow): SparkInternalRow
@@ -36,26 +36,24 @@ object SparkInternalRow {
}
def create(rowType: RowType, blobAsDescriptor: Boolean): SparkInternalRow = {
- val fieldIndex = blobFieldIndex(rowType)
- if (fieldIndex.isPresent) {
- SparkShimLoader.shim.createSparkInternalRowWithBlob(
- rowType,
- fieldIndex.getAsInt,
- blobAsDescriptor)
+ val blobs = blobFields(rowType)
+ if (blobs.nonEmpty) {
+ SparkShimLoader.shim.createSparkInternalRowWithBlob(rowType, blobs,
blobAsDescriptor)
} else {
SparkShimLoader.shim.createSparkInternalRow(rowType)
}
}
- private def blobFieldIndex(rowType: RowType): OptionalInt = {
+ private def blobFields(rowType: RowType): Set[Int] = {
var i: Int = 0
+ val blobFields = new mutable.HashSet[Int]()
while (i < rowType.getFieldCount) {
if (rowType.getTypeAt(i).getTypeRoot.equals(DataTypeRoot.BLOB)) {
- return OptionalInt.of(i)
+ blobFields.add(i)
}
i += 1
}
- OptionalInt.empty()
+ blobFields.toSet
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
index 98296a4006..d85fd9a427 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
@@ -53,7 +53,7 @@ trait SparkShim {
def createSparkInternalRowWithBlob(
rowType: RowType,
- blobFieldIndex: Int,
+ blobFields: Set[Int],
blobAsDescriptor: Boolean): SparkInternalRow
def createSparkArrayData(elementType: DataType): SparkArrayData
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index e05c9ce644..4bc1246612 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -46,6 +46,29 @@ class BlobTestBase extends PaimonSparkTestBase {
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"),
Seq(Row(1, "paimon", Array[Byte](72, 101, 108, 108, 111), 0, 1))
)
+
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM `t$files`"),
+ Seq(Row(2))
+ )
+ }
+ }
+
+ test("Blob: test multiple blobs") {
+ withTable("t") {
+ sql("CREATE TABLE t (id INT, data STRING, pic1 BINARY, pic2 BINARY)
TBLPROPERTIES (" +
+ "'row-tracking.enabled'='true', 'data-evolution.enabled'='true',
'blob-field'='pic1,pic2')")
+ sql("INSERT INTO t VALUES (1, 'paimon', X'48656C6C6F', X'5945')")
+
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"),
+ Seq(Row(1, "paimon", Array[Byte](72, 101, 108, 108, 111),
Array[Byte](89, 69), 0, 1))
+ )
+
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM `t$files`"),
+ Seq(Row(3))
+ )
}
}
diff --git
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala
index 4c7c978f8c..6c1dbd9d21 100644
---
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala
+++
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala
@@ -23,11 +23,11 @@ import
org.apache.paimon.utils.InternalRowUtils.copyInternalRow
import org.apache.spark.sql.catalyst.InternalRow
-class Spark3InternalRowWithBlob(rowType: RowType, blobFieldIndex: Int,
blobAsDescriptor: Boolean)
+class Spark3InternalRowWithBlob(rowType: RowType, blobFields: Set[Int],
blobAsDescriptor: Boolean)
extends Spark3InternalRow(rowType) {
override def getBinary(ordinal: Int): Array[Byte] = {
- if (ordinal == blobFieldIndex) {
+ if (blobFields.contains(ordinal)) {
if (blobAsDescriptor) {
row.getBlob(ordinal).toDescriptor.serialize()
} else {
diff --git
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
index 70011e14c3..df84150ca2 100644
---
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
+++
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
@@ -56,9 +56,9 @@ class Spark3Shim extends SparkShim {
override def createSparkInternalRowWithBlob(
rowType: RowType,
- blobFieldIndex: Int,
+ blobFields: Set[Int],
blobAsDescriptor: Boolean): SparkInternalRow = {
- new Spark3InternalRowWithBlob(rowType, blobFieldIndex, blobAsDescriptor)
+ new Spark3InternalRowWithBlob(rowType, blobFields, blobAsDescriptor)
}
override def createSparkArrayData(elementType: DataType): SparkArrayData = {
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
index 0a208daea2..2a120e5b4c 100644
---
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
@@ -25,11 +25,11 @@ import
org.apache.paimon.utils.InternalRowUtils.copyInternalRow
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.unsafe.types.VariantVal
-class Spark4InternalRowWithBlob(rowType: RowType, blobFieldIndex: Int,
blobAsDescriptor: Boolean)
+class Spark4InternalRowWithBlob(rowType: RowType, blobFields: Set[Int],
blobAsDescriptor: Boolean)
extends Spark4InternalRow(rowType) {
override def getBinary(ordinal: Int): Array[Byte] = {
- if (ordinal == blobFieldIndex) {
+ if (blobFields.contains(ordinal)) {
if (blobAsDescriptor) {
row.getBlob(ordinal).toDescriptor.serialize()
} else {
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index ad36acfb26..24782ffdd2 100644
---
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -57,9 +57,9 @@ class Spark4Shim extends SparkShim {
override def createSparkInternalRowWithBlob(
rowType: RowType,
- blobFieldIndex: Int,
+ blobFields: Set[Int],
blobAsDescriptor: Boolean): SparkInternalRow = {
- new Spark4InternalRowWithBlob(rowType, blobFieldIndex, blobAsDescriptor)
+ new Spark4InternalRowWithBlob(rowType, blobFields, blobAsDescriptor)
}
override def createSparkArrayData(elementType: DataType): SparkArrayData = {