This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new daf482e286 [flink] Support creating table with blob in flink sql. 
(#6351)
daf482e286 is described below

commit daf482e2866cd0ec7c24700b27261d19a60106f9
Author: YeJunHao <[email protected]>
AuthorDate: Tue Sep 30 11:45:45 2025 +0800

    [flink] Support creating table with blob in flink sql. (#6351)
---
 .../shortcodes/generated/core_configuration.html   |  6 +++
 .../main/java/org/apache/paimon/CoreOptions.java   |  6 +++
 .../apache/paimon/io/RowDataRollingFileWriter.java | 37 +---------------
 .../apache/paimon/io/RollingFileWriterTest.java    |  2 +-
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 15 ++++++-
 .../java/org/apache/paimon/flink/FlinkRowData.java |  2 +-
 .../apache/paimon/flink/FlinkRowDataWithBlob.java  | 37 ++++++++++++++++
 .../apache/paimon/flink/LogicalTypeConversion.java | 12 ++++++
 .../paimon/flink/source/FileStoreSourceReader.java |  7 +++-
 .../flink/source/FileStoreSourceSplitReader.java   | 27 ++++++++++--
 .../apache/paimon/flink/source/FlinkSource.java    |  4 +-
 .../align/AlignedContinuousFileStoreSource.java    |  8 +++-
 .../flink/source/align/AlignedSourceReader.java    |  6 ++-
 .../org/apache/paimon/flink/BlobTableITCase.java   | 47 +++++++++++++++++++++
 .../flink/source/FileStoreSourceReaderTest.java    |  1 +
 .../source/FileStoreSourceSplitReaderTest.java     |  3 +-
 .../source/align/AlignedSourceReaderTest.java      |  1 +
 .../spark/commands/DataEvolutionPaimonWriter.scala |  7 ++++
 .../MergeIntoPaimonDataEvolutionTable.scala        | 49 +++++++++++++++++++---
 19 files changed, 222 insertions(+), 55 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 2eed98adaa..f2e90e5dce 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -50,6 +50,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether to create underlying storage when reading and writing 
the table.</td>
         </tr>
+        <tr>
+            <td><h5>blob.field</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Specify the blob field.</td>
+        </tr>
         <tr>
             <td><h5>bucket</h5></td>
             <td style="word-wrap: break-word;">-1</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 500b16843d..e2046da926 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1989,6 +1989,12 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Format table file path only contain 
partition value.");
 
+    public static final ConfigOption<String> BLOB_FIELD =
+            key("blob.field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Specify the blob field.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index c309a10515..328bc193b9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -18,22 +18,17 @@
 
 package org.apache.paimon.io;
 
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.SimpleStatsCollector;
-import org.apache.paimon.format.avro.AvroFileFormat;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.LongCounter;
 
 import javax.annotation.Nullable;
 
-import java.util.Arrays;
 import java.util.List;
 
 /** {@link RollingFileWriterImpl} for data files containing {@link 
InternalRow}. */
@@ -58,7 +53,7 @@ public class RowDataRollingFileWriter extends 
RollingFileWriterImpl<InternalRow,
                 () ->
                         new RowDataFileWriter(
                                 fileIO,
-                                createFileWriterContext(
+                                RollingFileWriter.createFileWriterContext(
                                         fileFormat, writeSchema, 
statsCollectors, fileCompression),
                                 pathFactory.newPath(),
                                 writeSchema,
@@ -72,34 +67,4 @@ public class RowDataRollingFileWriter extends 
RollingFileWriterImpl<InternalRow,
                                 writeCols),
                 targetFileSize);
     }
-
-    @VisibleForTesting
-    static FileWriterContext createFileWriterContext(
-            FileFormat fileFormat,
-            RowType rowType,
-            SimpleColStatsCollector.Factory[] statsCollectors,
-            String fileCompression) {
-        return new FileWriterContext(
-                fileFormat.createWriterFactory(rowType),
-                createStatsProducer(fileFormat, rowType, statsCollectors),
-                fileCompression);
-    }
-
-    private static SimpleStatsProducer createStatsProducer(
-            FileFormat fileFormat,
-            RowType rowType,
-            SimpleColStatsCollector.Factory[] statsCollectors) {
-        boolean isDisabled =
-                Arrays.stream(SimpleColStatsCollector.create(statsCollectors))
-                        .allMatch(p -> p instanceof 
NoneSimpleColStatsCollector);
-        if (isDisabled) {
-            return SimpleStatsProducer.disabledProducer();
-        }
-        if (fileFormat instanceof AvroFileFormat) {
-            SimpleStatsCollector collector = new SimpleStatsCollector(rowType, 
statsCollectors);
-            return SimpleStatsProducer.fromCollector(collector);
-        }
-        return SimpleStatsProducer.fromExtractor(
-                fileFormat.createStatsExtractor(rowType, 
statsCollectors).orElse(null));
-    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index 8ac16aac2b..96adc5fc21 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -71,7 +71,7 @@ public class RollingFileWriterTest {
                         () ->
                                 new RowDataFileWriter(
                                         LocalFileIO.create(),
-                                        
RowDataRollingFileWriter.createFileWriterContext(
+                                        
RollingFileWriter.createFileWriterContext(
                                                 fileFormat,
                                                 SCHEMA,
                                                 
SimpleColStatsCollector.createFullStatsFactories(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index d9205c814f..cd259903f2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -151,6 +151,7 @@ import static 
org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP;
 import static 
org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB;
 import static 
org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
 import static org.apache.paimon.flink.FlinkCatalogOptions.REGISTER_TIMEOUT;
+import static org.apache.paimon.flink.LogicalTypeConversion.toBlobType;
 import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
 import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem;
@@ -1050,6 +1051,16 @@ public class FlinkCatalog extends AbstractCatalog {
         RowType rowType = (RowType) 
schema.toPhysicalRowDataType().getLogicalType();
 
         Map<String, String> options = new HashMap<>(catalogTable.getOptions());
+        String blobName = options.get(CoreOptions.BLOB_FIELD.key());
+        if (blobName != null) {
+            checkArgument(
+                    
options.containsKey(CoreOptions.DATA_EVOLUTION_ENABLED.key()),
+                    "When setting '"
+                            + CoreOptions.BLOB_FIELD.key()
+                            + "', you must also set '"
+                            + CoreOptions.DATA_EVOLUTION_ENABLED.key()
+                            + "'");
+        }
         // Serialize virtual columns and watermark to the options
         // This is what Flink SQL needs, the storage itself does not need them
         options.putAll(columnOptions(schema));
@@ -1069,7 +1080,9 @@ public class FlinkCatalog extends AbstractCatalog {
                         field ->
                                 schemaBuilder.column(
                                         field.getName(),
-                                        toDataType(field.getType()),
+                                        field.getName().equals(blobName)
+                                                ? toBlobType(field.getType())
+                                                : toDataType(field.getType()),
                                         columnComments.get(field.getName())));
 
         return schemaBuilder.build();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
index a4d81364b4..0447a0c6f9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
@@ -41,7 +41,7 @@ import static 
org.apache.paimon.flink.FlinkRowWrapper.fromFlinkRowKind;
 /** Convert to Flink row data. */
 public class FlinkRowData implements RowData {
 
-    private InternalRow row;
+    protected InternalRow row;
 
     public FlinkRowData(InternalRow row) {
         this.row = row;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java
new file mode 100644
index 0000000000..541fa5bdf4
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.data.InternalRow;
+
+/** Convert to Flink row data with blob. */
+public class FlinkRowDataWithBlob extends FlinkRowData {
+
+    private final int blobField;
+
+    public FlinkRowDataWithBlob(InternalRow row, int blobField) {
+        super(row);
+        this.blobField = blobField;
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        return pos == blobField ? row.getBlob(pos).toData() : 
row.getBinary(pos);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
index 896f8d296e..8c7779e318 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
@@ -18,13 +18,18 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.types.BlobType;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
 
+import org.apache.flink.table.types.logical.BinaryType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarBinaryType;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /** Conversion between {@link LogicalType} and {@link DataType}. */
 public class LogicalTypeConversion {
 
@@ -37,6 +42,13 @@ public class LogicalTypeConversion {
         return dataType.accept(DataTypeToLogicalType.INSTANCE);
     }
 
+    public static BlobType toBlobType(LogicalType logicalType) {
+        checkArgument(
+                logicalType instanceof BinaryType || logicalType instanceof 
VarBinaryType,
+                "Expected BinaryType or VarBinaryType, but got: " + 
logicalType);
+        return new BlobType();
+    }
+
     public static RowType 
toDataType(org.apache.flink.table.types.logical.RowType logicalType) {
         return (RowType) toDataType(logicalType, new AtomicInteger(-1));
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index 622b186780..f7d7d86791 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -23,6 +23,7 @@ import org.apache.paimon.flink.NestedProjectedRowData;
 import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.RowType;
 
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
@@ -50,14 +51,16 @@ public class FileStoreSourceReader
             FileStoreSourceReaderMetrics metrics,
             IOManager ioManager,
             @Nullable Long limit,
-            @Nullable NestedProjectedRowData rowData) {
+            @Nullable NestedProjectedRowData rowData,
+            @Nullable RowType readType) {
         // limiter is created in SourceReader, it can be shared in all split 
readers
         super(
                 () ->
                         new FileStoreSourceSplitReader(
                                 tableRead.withIOManager(ioManager),
                                 RecordLimiter.create(limit),
-                                metrics),
+                                metrics,
+                                readType),
                 (element, output, state) ->
                         FlinkRecordsWithSplitIds.emitRecord(
                                 readerContext, element, output, state, 
metrics, rowData),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
index 94e3c67b6c..7dba5b6015 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
@@ -20,12 +20,15 @@ package org.apache.paimon.flink.source;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.FlinkRowDataWithBlob;
 import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReader.RecordIterator;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Pool;
 
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -77,12 +80,13 @@ public class FileStoreSourceSplitReader
     public FileStoreSourceSplitReader(
             TableRead tableRead,
             @Nullable RecordLimiter limiter,
-            FileStoreSourceReaderMetrics metrics) {
+            FileStoreSourceReaderMetrics metrics,
+            @Nullable RowType readType) {
         this.tableRead = tableRead;
         this.limiter = limiter;
         this.splits = new LinkedList<>();
         this.pool = new Pool<>(1);
-        this.pool.add(new FileStoreRecordIterator());
+        this.pool.add(new FileStoreRecordIterator(readType));
         this.paused = false;
         this.metrics = metrics;
         this.wakeup = new AtomicBoolean(false);
@@ -260,6 +264,20 @@ public class FileStoreSourceSplitReader
 
         private final MutableRecordAndPosition<RowData> recordAndPosition =
                 new MutableRecordAndPosition<>();
+        @Nullable private final Integer blobField;
+
+        private FileStoreRecordIterator(@Nullable RowType rowType) {
+            this.blobField = rowType == null ? null : blobFieldIndex(rowType);
+        }
+
+        private Integer blobFieldIndex(RowType rowType) {
+            for (int i = 0; i < rowType.getFieldCount(); i++) {
+                if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) {
+                    return i;
+                }
+            }
+            return null;
+        }
 
         public FileStoreRecordIterator replace(RecordIterator<InternalRow> 
iterator) {
             this.iterator = iterator;
@@ -283,7 +301,10 @@ public class FileStoreSourceSplitReader
                 return null;
             }
 
-            recordAndPosition.setNext(new FlinkRowData(row));
+            recordAndPosition.setNext(
+                    blobField == null
+                            ? new FlinkRowData(row)
+                            : new FlinkRowDataWithBlob(row, blobField));
             currentNumRead++;
             if (limiter != null) {
                 limiter.increment();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
index 42da33cfa7..bfccb5a71c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
@@ -67,13 +67,15 @@ public abstract class FlinkSource
                 new FileStoreSourceReaderMetrics(metricGroup);
         TableRead tableRead =
                 readBuilder.newRead().withMetricRegistry(new 
FlinkMetricRegistry(metricGroup));
+
         return new FileStoreSourceReader(
                 context,
                 tableRead,
                 sourceReaderMetrics,
                 ioManager,
                 limit,
-                NestedProjectedRowData.copy(rowData));
+                NestedProjectedRowData.copy(rowData),
+                readBuilder.readType());
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index 076175b31f..ca9b553036 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -65,7 +65,13 @@ public class AlignedContinuousFileStoreSource extends 
ContinuousFileStoreSource
         FileStoreSourceReaderMetrics sourceReaderMetrics =
                 new FileStoreSourceReaderMetrics(context.metricGroup());
         return new AlignedSourceReader(
-                context, readBuilder.newRead(), sourceReaderMetrics, 
ioManager, limit, rowData);
+                context,
+                readBuilder.newRead(),
+                sourceReaderMetrics,
+                ioManager,
+                limit,
+                rowData,
+                readBuilder.readType());
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
index 1c3a489b1f..114b94d707 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
@@ -25,6 +25,7 @@ import org.apache.paimon.flink.source.FileStoreSourceSplit;
 import org.apache.paimon.flink.source.FileStoreSourceSplitState;
 import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ReflectionUtils;
 
 import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
@@ -60,8 +61,9 @@ public class AlignedSourceReader extends FileStoreSourceReader
             FileStoreSourceReaderMetrics metrics,
             IOManager ioManager,
             @Nullable Long limit,
-            @Nullable NestedProjectedRowData rowData) {
-        super(readerContext, tableRead, metrics, ioManager, limit, rowData);
+            @Nullable NestedProjectedRowData rowData,
+            @Nullable RowType readType) {
+        super(readerContext, tableRead, metrics, ioManager, limit, rowData, 
readType);
         this.nextCheckpointId = null;
         try {
             // In lower versions of Flink, the SplitFetcherManager does not 
provide the getQueue
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
new file mode 100644
index 0000000000..412968468b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test write and read table with blob type. */
+public class BlobTableITCase extends CatalogITCaseBase {
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList(
+                "CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING, 
picture BYTES) WITH ('row-tracking.enabled'='true', 
'data-evolution.enabled'='true', 'blob.field'='picture')");
+    }
+
+    @Test
+    public void testBasic() {
+        batchSql("SELECT * FROM blob_table");
+        batchSql("INSERT INTO blob_table VALUES (1, 'paimon', X'48656C6C6F')");
+        assertThat(batchSql("SELECT * FROM blob_table"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, "paimon", new byte[] {72, 101, 108, 108, 
111}));
+        assertThat(batchSql("SELECT file_path FROM 
`blob_table$files`").size()).isEqualTo(2);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
index f231b105fa..3900ffa6b4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
@@ -164,6 +164,7 @@ public class FileStoreSourceReaderTest {
                 new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
                 IOManager.create(tempDir.toString()),
                 null,
+                null,
                 null);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
index 2afb7fa197..fbe1e72424 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
@@ -130,7 +130,8 @@ public class FileStoreSourceSplitReaderTest {
         return new FileStoreSourceSplitReader(
                 tableRead,
                 limit == null ? null : new RecordLimiter(limit),
-                new FileStoreSourceReaderMetrics(new DummyMetricGroup()));
+                new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
+                null);
     }
 
     private void innerTestOnce(int skip) throws Exception {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
index 6ea389c161..8d3724822f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
@@ -223,6 +223,7 @@ public class AlignedSourceReaderTest extends 
FileStoreSourceReaderTest {
                 new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
                 IOManager.create(tempDir.toString()),
                 null,
+                null,
                 null);
     }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
index d1c47ce4df..1bf95b059d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
@@ -25,6 +25,8 @@ import 
org.apache.paimon.spark.commands.DataEvolutionPaimonWriter.{deserializeCo
 import org.apache.paimon.spark.write.WriteHelper
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink._
+import org.apache.paimon.types.DataType
+import org.apache.paimon.types.DataTypeRoot.BLOB
 
 import org.apache.spark.sql._
 
@@ -61,6 +63,11 @@ case class DataEvolutionPaimonWriter(paimonTable: 
FileStoreTable) extends WriteH
     assert(data.columns.length == columnNames.size + 2)
     val writeType = table.rowType().project(columnNames.asJava)
 
+    if (writeType.getFieldTypes.stream.anyMatch((t: DataType) => t.is(BLOB))) {
+      throw new UnsupportedOperationException(
+        "DataEvolution does not support writing partial columns mixed with 
BLOB type.")
+    }
+
     val written =
       data.mapPartitions {
         iter =>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index d26426d6c4..e29b765105 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.commands
 
+import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile
 import org.apache.paimon.spark.SparkTable
 import org.apache.paimon.spark.catalyst.analysis.PaimonRelation
 import org.apache.paimon.spark.catalyst.analysis.PaimonUpdateTable.toColumn
@@ -75,17 +76,44 @@ case class MergeIntoPaimonDataEvolutionTable(
 
   override val table: FileStoreTable = 
v2Table.getTable.asInstanceOf[FileStoreTable]
   private val firstRowIds: Seq[Long] = table
-    .newSnapshotReader()
-    .withManifestEntryFilter(entry => entry.file().firstRowId() != null)
-    .read()
-    .splits()
+    .store()
+    .newScan()
+    .withManifestEntryFilter(
+      entry =>
+        entry.file().firstRowId() != null && (!isBlobFile(
+          entry
+            .file()
+            .fileName())))
+    .plan()
+    .files()
     .asScala
-    .map(_.asInstanceOf[DataSplit])
-    .flatMap(split => split.dataFiles().asScala.map(s => 
s.firstRowId().asInstanceOf[Long]))
+    .map(file => file.file().firstRowId().asInstanceOf[Long])
     .distinct
     .sorted
     .toSeq
 
+  private val firstRowIdToBlobFirstRowIds = {
+    val map = new mutable.HashMap[Long, List[Long]]()
+    val files = table
+      .store()
+      .newScan()
+      .withManifestEntryFilter(entry => isBlobFile(entry.file().fileName()))
+      .plan()
+      .files()
+      .asScala
+      .sortBy(f => f.file().firstRowId())
+
+    for (file <- files) {
+      val firstRowId = file.file().firstRowId().asInstanceOf[Long]
+      val firstIdInNormalFile = floorBinarySearch(firstRowIds, firstRowId)
+      map.update(
+        firstIdInNormalFile,
+        map.getOrElseUpdate(firstIdInNormalFile, List.empty[Long]) :+ 
firstRowId
+      )
+    }
+    map
+  }
+
   lazy val targetRelation: DataSourceV2Relation = 
PaimonRelation.getPaimonRelation(targetTable)
   lazy val sourceRelation: DataSourceV2Relation = 
PaimonRelation.getPaimonRelation(sourceTable)
 
@@ -283,11 +311,20 @@ case class MergeIntoPaimonDataEvolutionTable(
       identifier: String): Array[Long] = {
     import sparkSession.implicits._
     val firstRowIdsFinal = firstRowIds
+    val firstRowIdToBlobFirstRowIdsFinal = firstRowIdToBlobFirstRowIds
     val firstRowIdUdf = udf((rowId: Long) => 
floorBinarySearch(firstRowIdsFinal, rowId))
     dataset
       .select(firstRowIdUdf(col(identifier)))
       .distinct()
       .as[Long]
+      .flatMap(
+        f => {
+          if (firstRowIdToBlobFirstRowIdsFinal.contains(f)) {
+            firstRowIdToBlobFirstRowIdsFinal(f)
+          } else {
+            Seq(f)
+          }
+        })
       .collect()
   }
 

Reply via email to