This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a61f3fbbf7 [spark] Enable blob as descriptor with v2 write (#7115)
a61f3fbbf7 is described below

commit a61f3fbbf7c2a5549808e60438ef2f5ca33010ae
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Jan 26 10:23:29 2026 +0800

    [spark] Enable blob as descriptor with v2 write (#7115)
---
 .../paimon/spark/SparkInternalRowWrapper.java      | 74 +++++++++++-----------
 .../java/org/apache/paimon/spark/SparkRow.java     |  4 +-
 .../spark/catalog/functions/PaimonFunctions.scala  |  4 +-
 .../paimon/spark/format/PaimonFormatTable.scala    |  2 +-
 .../org/apache/paimon/spark/write/DataWrite.scala  |  2 +-
 .../paimon/spark/write/PaimonBatchWrite.scala      | 14 ++--
 .../paimon/spark/write/PaimonV2DataWriter.scala    | 17 ++++-
 .../org/apache/paimon/spark/sql/BlobTestBase.scala | 16 ++++-
 8 files changed, 80 insertions(+), 53 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
index 7de1695af0..7d0f051756 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
@@ -18,9 +18,11 @@
 
 package org.apache.paimon.spark;
 
+import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Blob;
 import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobDescriptor;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
@@ -29,6 +31,8 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.variant.Variant;
 import org.apache.paimon.spark.util.shim.TypeUtils$;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.UriReader;
+import org.apache.paimon.utils.UriReaderFactory;
 
 import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@@ -41,43 +45,43 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.types.TimestampNTZType;
 import org.apache.spark.sql.types.TimestampType;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.util.HashMap;
 import java.util.Map;
 
-/** Wrapper to fetch value from the spark internal row. */
+/**
+ * An {@link InternalRow} wraps spark {@link 
org.apache.spark.sql.catalyst.InternalRow} for v2
+ * write.
+ */
 public class SparkInternalRowWrapper implements InternalRow, Serializable {
 
-    private transient org.apache.spark.sql.catalyst.InternalRow internalRow;
-    private final int length;
-    private final int rowKindIdx;
     private final StructType tableSchema;
-    private int[] fieldIndexMap = null;
+    private final int length;
+    private final boolean blobAsDescriptor;
+    @Nullable private final UriReaderFactory uriReaderFactory;
+    @Nullable private final int[] fieldIndexMap;
 
-    public SparkInternalRowWrapper(
-            org.apache.spark.sql.catalyst.InternalRow internalRow,
-            int rowKindIdx,
-            StructType tableSchema,
-            int length) {
-        this.internalRow = internalRow;
-        this.rowKindIdx = rowKindIdx;
-        this.length = length;
-        this.tableSchema = tableSchema;
-    }
+    private transient org.apache.spark.sql.catalyst.InternalRow internalRow;
 
-    public SparkInternalRowWrapper(int rowKindIdx, StructType tableSchema, int 
length) {
-        this.rowKindIdx = rowKindIdx;
-        this.length = length;
-        this.tableSchema = tableSchema;
+    public SparkInternalRowWrapper(StructType tableSchema, int length) {
+        this(tableSchema, length, null, false, null);
     }
 
     public SparkInternalRowWrapper(
-            int rowKindIdx, StructType tableSchema, StructType dataSchema, int 
length) {
-        this.rowKindIdx = rowKindIdx;
-        this.length = length;
+            StructType tableSchema,
+            int length,
+            StructType dataSchema,
+            boolean blobAsDescriptor,
+            CatalogContext catalogContext) {
         this.tableSchema = tableSchema;
-        this.fieldIndexMap = buildFieldIndexMap(tableSchema, dataSchema);
+        this.length = length;
+        this.fieldIndexMap =
+                dataSchema != null ? buildFieldIndexMap(tableSchema, 
dataSchema) : null;
+        this.blobAsDescriptor = blobAsDescriptor;
+        this.uriReaderFactory = blobAsDescriptor ? new 
UriReaderFactory(catalogContext) : null;
     }
 
     public SparkInternalRowWrapper 
replace(org.apache.spark.sql.catalyst.InternalRow internalRow) {
@@ -128,12 +132,6 @@ public class SparkInternalRowWrapper implements 
InternalRow, Serializable {
 
     @Override
     public RowKind getRowKind() {
-        if (rowKindIdx != -1) {
-            int actualPos = getActualFieldPosition(rowKindIdx);
-            if (actualPos != -1) {
-                return RowKind.fromByteValue(internalRow.getByte(actualPos));
-            }
-        }
         return RowKind.INSERT;
     }
 
@@ -244,7 +242,13 @@ public class SparkInternalRowWrapper implements 
InternalRow, Serializable {
 
     @Override
     public Blob getBlob(int pos) {
-        return new BlobData(internalRow.getBinary(pos));
+        if (blobAsDescriptor) {
+            BlobDescriptor blobDescriptor = 
BlobDescriptor.deserialize(internalRow.getBinary(pos));
+            UriReader uriReader = 
uriReaderFactory.create(blobDescriptor.uri());
+            return Blob.fromDescriptor(uriReader, blobDescriptor);
+        } else {
+            return new BlobData(internalRow.getBinary(pos));
+        }
     }
 
     @Override
@@ -276,10 +280,8 @@ public class SparkInternalRowWrapper implements 
InternalRow, Serializable {
             return null;
         }
         return new SparkInternalRowWrapper(
-                internalRow.getStruct(actualPos, numFields),
-                -1,
-                (StructType) tableSchema.fields()[actualPos].dataType(),
-                numFields);
+                        (StructType) 
tableSchema.fields()[actualPos].dataType(), numFields)
+                .replace(internalRow.getStruct(actualPos, numFields));
     }
 
     private static Timestamp convertToTimestamp(DataType dataType, long 
micros) {
@@ -434,8 +436,8 @@ public class SparkInternalRowWrapper implements 
InternalRow, Serializable {
 
         @Override
         public InternalRow getRow(int pos, int numFields) {
-            return new SparkInternalRowWrapper(
-                    arrayData.getStruct(pos, numFields), -1, (StructType) 
elementType, numFields);
+            return new SparkInternalRowWrapper((StructType) elementType, 
numFields)
+                    .replace(arrayData.getStruct(pos, numFields));
         }
     }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
index d065743768..9ac1e59994 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
@@ -55,7 +55,7 @@ import java.util.Map;
 
 import scala.collection.JavaConverters;
 
-/** A {@link InternalRow} wraps spark {@link Row}. */
+/** An {@link InternalRow} wraps spark {@link Row} for v1 write. */
 public class SparkRow implements InternalRow, Serializable {
 
     private final RowType type;
@@ -78,7 +78,7 @@ public class SparkRow implements InternalRow, Serializable {
         this.row = row;
         this.rowKind = rowkind;
         this.blobAsDescriptor = blobAsDescriptor;
-        this.uriReaderFactory = new UriReaderFactory(catalogContext);
+        this.uriReaderFactory = blobAsDescriptor ? new 
UriReaderFactory(catalogContext) : null;
     }
 
     @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
index a0d97c68d4..acdd40e9b2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
@@ -25,7 +25,7 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableMap,
 import org.apache.paimon.spark.SparkInternalRowWrapper
 import org.apache.paimon.spark.SparkTypeUtils.toPaimonRowType
 import org.apache.paimon.spark.catalog.functions.PaimonFunctions._
-import org.apache.paimon.spark.function.{DescriptorToStringFunction, 
DescriptorToStringUnbound, PathToDescriptorFunction, PathToDescriptorUnbound}
+import org.apache.paimon.spark.function.{DescriptorToStringUnbound, 
PathToDescriptorUnbound}
 import org.apache.paimon.table.{BucketMode, FileStoreTable}
 import org.apache.paimon.types.{ArrayType, DataType => PaimonDataType, 
LocalZonedTimestampType, MapType, RowType, TimestampType}
 import org.apache.paimon.utils.ProjectedRow
@@ -90,7 +90,7 @@ class BucketFunction(NAME: String, bucketFunctionType: 
BucketFunctionType) exten
     val serializer = new InternalRowSerializer(bucketKeyRowType)
     val mapping = (1 to bucketKeyRowType.getFieldCount).toArray
     val reusedRow =
-      new SparkInternalRowWrapper(-1, inputType, inputType.fields.length)
+      new SparkInternalRowWrapper(inputType, inputType.fields.length)
     val bucketFunc: bucket.BucketFunction =
       bucket.BucketFunction.create(bucketFunctionType, bucketKeyRowType)
     new ScalarFunction[Int]() {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
index fb53fafa25..717dc92b23 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
@@ -145,7 +145,7 @@ private class FormatTableDataWriter(batchWriteBuilder: 
BatchWriteBuilder, writeS
   private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow 
= {
     val numFields = writeSchema.fields.length
     record => {
-      new SparkInternalRowWrapper(-1, writeSchema, numFields).replace(record)
+      new SparkInternalRowWrapper(writeSchema, numFields).replace(record)
     }
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWrite.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWrite.scala
index cfb6c04ee3..603ec7ecff 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWrite.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWrite.scala
@@ -117,7 +117,7 @@ abstract class abstractInnerTableDataWrite[T] extends 
InnerTableDataWrite[T] wit
   /** For batch write, batchId is None, for streaming write, batchId is the 
current batch id (>= 0). */
   val batchId: Option[Long]
 
-  private val needFullCompaction: Boolean = {
+  private lazy val needFullCompaction: Boolean = {
     fullCompactionDeltaCommits match {
       case Some(deltaCommits) if deltaCommits > 0 =>
         batchId match {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
index 589ba17451..1b58483e69 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
@@ -54,11 +54,15 @@ case class PaimonBatchWrite(
   }
 
   override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory = {
-    val fullCompactionDeltaCommits: Option[Int] =
-      Option.apply(coreOptions.fullCompactionDeltaCommits())
-    (_: Int, _: Long) => {
-      PaimonV2DataWriter(batchWriteBuilder, writeSchema, dataSchema, 
fullCompactionDeltaCommits)
-    }
+    (_: Int, _: Long) =>
+      {
+        PaimonV2DataWriter(
+          batchWriteBuilder,
+          writeSchema,
+          dataSchema,
+          coreOptions,
+          table.catalogEnvironment().catalogContext())
+      }
   }
 
   override def useCommitCoordinator(): Boolean = false
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
index d5291fe5d8..fbd166a183 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.spark.write
 
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.catalog.CatalogContext
 import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils}
 import org.apache.paimon.spark.metric.SparkMetricRegistry
 import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
TableWriteImpl}
@@ -32,15 +34,19 @@ case class PaimonV2DataWriter(
     writeBuilder: BatchWriteBuilder,
     writeSchema: StructType,
     dataSchema: StructType,
-    fullCompactionDeltaCommits: Option[Int],
+    coreOptions: CoreOptions,
+    catalogContext: CatalogContext,
     batchId: Option[Long] = None)
   extends abstractInnerTableDataWrite[InternalRow]
   with InnerTableV2DataWrite {
 
   private val ioManager = SparkUtils.createIOManager()
-
   private val metricRegistry = SparkMetricRegistry()
 
+  val fullCompactionDeltaCommits: Option[Int] =
+    Option.apply(coreOptions.fullCompactionDeltaCommits())
+  val blobAsDescriptor: Boolean = coreOptions.blobAsDescriptor()
+
   val write: TableWriteImpl[InternalRow] = {
     writeBuilder
       .newWrite()
@@ -51,7 +57,12 @@ case class PaimonV2DataWriter(
 
   private val rowConverter: InternalRow => SparkInternalRowWrapper = {
     val numFields = writeSchema.fields.length
-    val reusableWrapper = new SparkInternalRowWrapper(-1, writeSchema, 
dataSchema, numFields)
+    val reusableWrapper = new SparkInternalRowWrapper(
+      writeSchema,
+      numFields,
+      dataSchema,
+      blobAsDescriptor,
+      catalogContext)
     record => reusableWrapper.replace(record)
   }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index 4bc1246612..5d8e1ef9fb 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -19,14 +19,14 @@
 package org.apache.paimon.spark.sql
 
 import org.apache.paimon.catalog.CatalogContext
-import org.apache.paimon.data.Blob
-import org.apache.paimon.data.BlobDescriptor
+import org.apache.paimon.data.{Blob, BlobDescriptor}
 import org.apache.paimon.fs.Path
 import org.apache.paimon.fs.local.LocalFileIO
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.paimon.utils.UriReaderFactory
 
+import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
 
 import java.util
@@ -36,6 +36,10 @@ class BlobTestBase extends PaimonSparkTestBase {
 
   private val RANDOM = new Random
 
+  override def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
+  }
+
   test("Blob: test basic") {
     withTable("t") {
       sql(
@@ -216,7 +220,7 @@ class BlobTestBase extends PaimonSparkTestBase {
 
   def bytesToHex(bytes: Array[Byte]): String = {
     val hexChars = new Array[Char](bytes.length * 2)
-    for (j <- 0 until bytes.length) {
+    for (j <- bytes.indices) {
       val v = bytes(j) & 0xff
       hexChars(j * 2) = HEX_ARRAY(v >>> 4)
       hexChars(j * 2 + 1) = HEX_ARRAY(v & 0x0f)
@@ -224,3 +228,9 @@ class BlobTestBase extends PaimonSparkTestBase {
     new String(hexChars)
   }
 }
+
+class BlobTestWithV2Write extends BlobTestBase {
+  override def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+  }
+}

Reply via email to