This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d0fc63ca4d [spark] Support spark blob basic read write (#6368)
d0fc63ca4d is described below

commit d0fc63ca4d282c9e671f32449dc96045544e62db
Author: YeJunHao <[email protected]>
AuthorDate: Thu Oct 9 17:56:55 2025 +0800

    [spark] Support spark blob basic read write (#6368)
---
 .../java/org/apache/paimon/spark/SparkCatalog.java | 14 +++++++++++-
 .../org/apache/paimon/spark/SparkTypeUtils.java    |  6 ++++++
 .../paimon/spark/data/SparkInternalRow.scala       | 22 +++++++++++++++++--
 .../apache/spark/sql/paimon/shims/SparkShim.scala  |  2 ++
 .../apache/paimon/spark/sql/BlobTestBase.scala}    | 25 ++++++++++++----------
 .../spark/data/Spark3InternalRowWithBlob.scala}    | 20 ++++++++---------
 .../apache/spark/sql/paimon/shims/Spark3Shim.scala |  8 ++++++-
 .../spark/data/Spark4InternalRowWithBlob.scala}    | 20 ++++++++---------
 .../apache/spark/sql/paimon/shims/Spark4Shim.scala |  8 ++++++-
 9 files changed, 88 insertions(+), 37 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index a7bda93290..d388cb51bf 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -39,6 +39,7 @@ import 
org.apache.paimon.spark.catalog.functions.PaimonFunctions;
 import org.apache.paimon.spark.catalog.functions.V1FunctionConverter;
 import org.apache.paimon.spark.utils.CatalogUtils;
 import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.types.BlobType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.ExceptionUtils;
@@ -92,6 +93,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
@@ -109,6 +111,7 @@ import static 
org.apache.paimon.spark.utils.CatalogUtils.isUpdateColumnDefaultVa
 import static org.apache.paimon.spark.utils.CatalogUtils.removeCatalogName;
 import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
 import static 
org.apache.paimon.spark.utils.CatalogUtils.toUpdateColumnDefaultValue;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Spark {@link TableCatalog} for paimon. */
 public class SparkCatalog extends SparkBaseCatalog
@@ -456,6 +459,7 @@ public class SparkCatalog extends SparkBaseCatalog
     private Schema toInitialSchema(
             StructType schema, Transform[] partitions, Map<String, String> 
properties) {
         Map<String, String> normalizedProperties = new HashMap<>(properties);
+        String blobFieldName = properties.get(CoreOptions.BLOB_FIELD.key());
         String provider = properties.get(TableCatalog.PROP_PROVIDER);
         if (!usePaimon(provider)) {
             if (isFormatTable(provider)) {
@@ -488,7 +492,15 @@ public class SparkCatalog extends SparkBaseCatalog
 
         for (StructField field : schema.fields()) {
             String name = field.name();
-            DataType type = 
toPaimonType(field.dataType()).copy(field.nullable());
+            DataType type;
+            if (Objects.equals(blobFieldName, name)) {
+                checkArgument(
+                        field.dataType() instanceof 
org.apache.spark.sql.types.BinaryType,
+                        "The type of blob field must be binary");
+                type = new BlobType();
+            } else {
+                type = toPaimonType(field.dataType()).copy(field.nullable());
+            }
             String comment = field.getComment().getOrElse(() -> null);
             if 
(field.metadata().contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
                 String defaultValue =
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
index de14ef4316..dc2f8b30ac 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
@@ -23,6 +23,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BlobType;
 import org.apache.paimon.types.BooleanType;
 import org.apache.paimon.types.CharType;
 import org.apache.paimon.types.DataField;
@@ -161,6 +162,11 @@ public class SparkTypeUtils {
             return DataTypes.BinaryType;
         }
 
+        @Override
+        public DataType visit(BlobType blobType) {
+            return DataTypes.BinaryType;
+        }
+
         @Override
         public DataType visit(VarBinaryType varBinaryType) {
             return DataTypes.BinaryType;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
index b0916447c0..b3ac41598e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
@@ -18,11 +18,13 @@
 
 package org.apache.paimon.spark.data
 
-import org.apache.paimon.types.RowType
+import org.apache.paimon.types.{DataTypeRoot, RowType}
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.paimon.shims.SparkShimLoader
 
+import java.util.OptionalInt
+
 abstract class SparkInternalRow extends InternalRow {
   def replace(row: org.apache.paimon.data.InternalRow): SparkInternalRow
 }
@@ -30,7 +32,23 @@ abstract class SparkInternalRow extends InternalRow {
 object SparkInternalRow {
 
   def create(rowType: RowType): SparkInternalRow = {
-    SparkShimLoader.shim.createSparkInternalRow(rowType)
+    val fieldIndex = blobFieldIndex(rowType)
+    if (fieldIndex.isPresent) {
+      SparkShimLoader.shim.createSparkInternalRowWithBlob(rowType, 
fieldIndex.getAsInt)
+    } else {
+      SparkShimLoader.shim.createSparkInternalRow(rowType)
+    }
+  }
+
+  private def blobFieldIndex(rowType: RowType): OptionalInt = {
+    var i: Int = 0
+    while (i < rowType.getFieldCount) {
+      if (rowType.getTypeAt(i).getTypeRoot.equals(DataTypeRoot.BLOB)) {
+        return OptionalInt.of(i)
+      }
+      i += 1
+    }
+    OptionalInt.empty()
   }
 
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
index eb34fac022..bb89ed7649 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
@@ -51,6 +51,8 @@ trait SparkShim {
 
   def createSparkInternalRow(rowType: RowType): SparkInternalRow
 
+  def createSparkInternalRowWithBlob(rowType: RowType, blobFieldIndex: Int): 
SparkInternalRow
+
   def createSparkArrayData(elementType: DataType): SparkArrayData
 
   def createTable(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
similarity index 56%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
copy to 
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index b0916447c0..fa175f9e2b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -16,21 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.data
+package org.apache.paimon.spark.sql
 
-import org.apache.paimon.types.RowType
+import org.apache.paimon.spark.PaimonSparkTestBase
 
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.Row
 
-abstract class SparkInternalRow extends InternalRow {
-  def replace(row: org.apache.paimon.data.InternalRow): SparkInternalRow
-}
-
-object SparkInternalRow {
+class BlobTestBase extends PaimonSparkTestBase {
+  test("Blob: test basic") {
+    withTable("t") {
+      sql(
+        "CREATE TABLE t (id INT, data STRING, picture BINARY) TBLPROPERTIES 
('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 
'blob-field'='picture')")
+      sql("INSERT INTO t VALUES (1, 'paimon', X'48656C6C6F')")
 
-  def create(rowType: RowType): SparkInternalRow = {
-    SparkShimLoader.shim.createSparkInternalRow(rowType)
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"),
+        Seq(Row(1, "paimon", Array[Byte](72, 101, 108, 108, 111), 0, 1))
+      )
+    }
   }
 
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala
similarity index 70%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
copy to 
paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala
index b0916447c0..5922239ec2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala
@@ -18,19 +18,17 @@
 
 package org.apache.paimon.spark.data
 
+import org.apache.paimon.spark.AbstractSparkInternalRow
 import org.apache.paimon.types.RowType
 
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.paimon.shims.SparkShimLoader
+class Spark3InternalRowWithBlob(rowType: RowType, blobFieldIndex: Int)
+  extends Spark3InternalRow(rowType) {
 
-abstract class SparkInternalRow extends InternalRow {
-  def replace(row: org.apache.paimon.data.InternalRow): SparkInternalRow
-}
-
-object SparkInternalRow {
-
-  def create(rowType: RowType): SparkInternalRow = {
-    SparkShimLoader.shim.createSparkInternalRow(rowType)
+  override def getBinary(ordinal: Int): Array[Byte] = {
+    if (ordinal == blobFieldIndex) {
+      row.getBlob(ordinal).toData
+    } else {
+      super.getBinary(ordinal)
+    }
   }
-
 }
diff --git 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
index 74803626d6..276e22bdb9 100644
--- 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql.paimon.shims
 import org.apache.paimon.data.variant.Variant
 import org.apache.paimon.spark.catalyst.analysis.Spark3ResolutionRules
 import 
org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark3SqlExtensionsParser
-import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow, 
SparkArrayData, SparkInternalRow}
+import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow, 
Spark3InternalRowWithBlob, SparkArrayData, SparkInternalRow}
 import org.apache.paimon.types.{DataType, RowType}
 
 import org.apache.spark.sql.SparkSession
@@ -54,6 +54,12 @@ class Spark3Shim extends SparkShim {
     new Spark3InternalRow(rowType)
   }
 
+  override def createSparkInternalRowWithBlob(
+      rowType: RowType,
+      blobFieldIndex: Int): SparkInternalRow = {
+    new Spark3InternalRowWithBlob(rowType, blobFieldIndex)
+  }
+
   override def createSparkArrayData(elementType: DataType): SparkArrayData = {
     new Spark3ArrayData(elementType)
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
similarity index 69%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
copy to 
paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
index b0916447c0..836fe46b37 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
@@ -18,19 +18,19 @@
 
 package org.apache.paimon.spark.data
 
+import org.apache.paimon.spark.AbstractSparkInternalRow
 import org.apache.paimon.types.RowType
 
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.unsafe.types.VariantVal
 
-abstract class SparkInternalRow extends InternalRow {
-  def replace(row: org.apache.paimon.data.InternalRow): SparkInternalRow
-}
-
-object SparkInternalRow {
+class Spark4InternalRowWithBlob(rowType: RowType, blobFieldIndex: Int)
+  extends Spark4InternalRow(rowType) {
 
-  def create(rowType: RowType): SparkInternalRow = {
-    SparkShimLoader.shim.createSparkInternalRow(rowType)
+  override def getBinary(ordinal: Int): Array[Byte] = {
+    if (ordinal == blobFieldIndex) {
+      row.getBlob(ordinal).toData
+    } else {
+      super.getBinary(ordinal)
+    }
   }
-
 }
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index 2ae52f7118..53f2b0b199 100644
--- 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql.paimon.shims
 import org.apache.paimon.data.variant.{GenericVariant, Variant}
 import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules
 import 
org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser
-import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, 
SparkArrayData, SparkInternalRow}
+import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, 
Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow}
 import org.apache.paimon.types.{DataType, RowType}
 
 import org.apache.spark.sql.SparkSession
@@ -55,6 +55,12 @@ class Spark4Shim extends SparkShim {
     new Spark4InternalRow(rowType)
   }
 
+  override def createSparkInternalRowWithBlob(
+      rowType: RowType,
+      blobFieldIndex: Int): SparkInternalRow = {
+    new Spark4InternalRowWithBlob(rowType, blobFieldIndex)
+  }
+
   override def createSparkArrayData(elementType: DataType): SparkArrayData = {
     new Spark4ArrayData(elementType)
   }

Reply via email to