This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a61f3fbbf7 [spark] Enable blob as descriptor with v2 write (#7115)
a61f3fbbf7 is described below
commit a61f3fbbf7c2a5549808e60438ef2f5ca33010ae
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Jan 26 10:23:29 2026 +0800
[spark] Enable blob as descriptor with v2 write (#7115)
---
.../paimon/spark/SparkInternalRowWrapper.java | 74 +++++++++++-----------
.../java/org/apache/paimon/spark/SparkRow.java | 4 +-
.../spark/catalog/functions/PaimonFunctions.scala | 4 +-
.../paimon/spark/format/PaimonFormatTable.scala | 2 +-
.../org/apache/paimon/spark/write/DataWrite.scala | 2 +-
.../paimon/spark/write/PaimonBatchWrite.scala | 14 ++--
.../paimon/spark/write/PaimonV2DataWriter.scala | 17 ++++-
.../org/apache/paimon/spark/sql/BlobTestBase.scala | 16 ++++-
8 files changed, 80 insertions(+), 53 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
index 7de1695af0..7d0f051756 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
@@ -18,9 +18,11 @@
package org.apache.paimon.spark;
+import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
@@ -29,6 +31,8 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.spark.util.shim.TypeUtils$;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.UriReader;
+import org.apache.paimon.utils.UriReaderFactory;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@@ -41,43 +45,43 @@ import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.TimestampNTZType;
import org.apache.spark.sql.types.TimestampType;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
-/** Wrapper to fetch value from the spark internal row. */
+/**
+ * An {@link InternalRow} wraps spark {@link
org.apache.spark.sql.catalyst.InternalRow} for v2
+ * write.
+ */
public class SparkInternalRowWrapper implements InternalRow, Serializable {
- private transient org.apache.spark.sql.catalyst.InternalRow internalRow;
- private final int length;
- private final int rowKindIdx;
private final StructType tableSchema;
- private int[] fieldIndexMap = null;
+ private final int length;
+ private final boolean blobAsDescriptor;
+ @Nullable private final UriReaderFactory uriReaderFactory;
+ @Nullable private final int[] fieldIndexMap;
- public SparkInternalRowWrapper(
- org.apache.spark.sql.catalyst.InternalRow internalRow,
- int rowKindIdx,
- StructType tableSchema,
- int length) {
- this.internalRow = internalRow;
- this.rowKindIdx = rowKindIdx;
- this.length = length;
- this.tableSchema = tableSchema;
- }
+ private transient org.apache.spark.sql.catalyst.InternalRow internalRow;
- public SparkInternalRowWrapper(int rowKindIdx, StructType tableSchema, int
length) {
- this.rowKindIdx = rowKindIdx;
- this.length = length;
- this.tableSchema = tableSchema;
+ public SparkInternalRowWrapper(StructType tableSchema, int length) {
+ this(tableSchema, length, null, false, null);
}
public SparkInternalRowWrapper(
- int rowKindIdx, StructType tableSchema, StructType dataSchema, int
length) {
- this.rowKindIdx = rowKindIdx;
- this.length = length;
+ StructType tableSchema,
+ int length,
+ StructType dataSchema,
+ boolean blobAsDescriptor,
+ CatalogContext catalogContext) {
this.tableSchema = tableSchema;
- this.fieldIndexMap = buildFieldIndexMap(tableSchema, dataSchema);
+ this.length = length;
+ this.fieldIndexMap =
+ dataSchema != null ? buildFieldIndexMap(tableSchema,
dataSchema) : null;
+ this.blobAsDescriptor = blobAsDescriptor;
+ this.uriReaderFactory = blobAsDescriptor ? new
UriReaderFactory(catalogContext) : null;
}
public SparkInternalRowWrapper
replace(org.apache.spark.sql.catalyst.InternalRow internalRow) {
@@ -128,12 +132,6 @@ public class SparkInternalRowWrapper implements
InternalRow, Serializable {
@Override
public RowKind getRowKind() {
- if (rowKindIdx != -1) {
- int actualPos = getActualFieldPosition(rowKindIdx);
- if (actualPos != -1) {
- return RowKind.fromByteValue(internalRow.getByte(actualPos));
- }
- }
return RowKind.INSERT;
}
@@ -244,7 +242,13 @@ public class SparkInternalRowWrapper implements
InternalRow, Serializable {
@Override
public Blob getBlob(int pos) {
- return new BlobData(internalRow.getBinary(pos));
+ if (blobAsDescriptor) {
+ BlobDescriptor blobDescriptor =
BlobDescriptor.deserialize(internalRow.getBinary(pos));
+ UriReader uriReader =
uriReaderFactory.create(blobDescriptor.uri());
+ return Blob.fromDescriptor(uriReader, blobDescriptor);
+ } else {
+ return new BlobData(internalRow.getBinary(pos));
+ }
}
@Override
@@ -276,10 +280,8 @@ public class SparkInternalRowWrapper implements
InternalRow, Serializable {
return null;
}
return new SparkInternalRowWrapper(
- internalRow.getStruct(actualPos, numFields),
- -1,
- (StructType) tableSchema.fields()[actualPos].dataType(),
- numFields);
+ (StructType)
tableSchema.fields()[actualPos].dataType(), numFields)
+ .replace(internalRow.getStruct(actualPos, numFields));
}
private static Timestamp convertToTimestamp(DataType dataType, long
micros) {
@@ -434,8 +436,8 @@ public class SparkInternalRowWrapper implements
InternalRow, Serializable {
@Override
public InternalRow getRow(int pos, int numFields) {
- return new SparkInternalRowWrapper(
- arrayData.getStruct(pos, numFields), -1, (StructType)
elementType, numFields);
+ return new SparkInternalRowWrapper((StructType) elementType,
numFields)
+ .replace(arrayData.getStruct(pos, numFields));
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
index d065743768..9ac1e59994 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
@@ -55,7 +55,7 @@ import java.util.Map;
import scala.collection.JavaConverters;
-/** A {@link InternalRow} wraps spark {@link Row}. */
+/** An {@link InternalRow} wraps spark {@link Row} for v1 write. */
public class SparkRow implements InternalRow, Serializable {
private final RowType type;
@@ -78,7 +78,7 @@ public class SparkRow implements InternalRow, Serializable {
this.row = row;
this.rowKind = rowkind;
this.blobAsDescriptor = blobAsDescriptor;
- this.uriReaderFactory = new UriReaderFactory(catalogContext);
+ this.uriReaderFactory = blobAsDescriptor ? new
UriReaderFactory(catalogContext) : null;
}
@Override
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
index a0d97c68d4..acdd40e9b2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
@@ -25,7 +25,7 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableMap,
import org.apache.paimon.spark.SparkInternalRowWrapper
import org.apache.paimon.spark.SparkTypeUtils.toPaimonRowType
import org.apache.paimon.spark.catalog.functions.PaimonFunctions._
-import org.apache.paimon.spark.function.{DescriptorToStringFunction,
DescriptorToStringUnbound, PathToDescriptorFunction, PathToDescriptorUnbound}
+import org.apache.paimon.spark.function.{DescriptorToStringUnbound,
PathToDescriptorUnbound}
import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.types.{ArrayType, DataType => PaimonDataType,
LocalZonedTimestampType, MapType, RowType, TimestampType}
import org.apache.paimon.utils.ProjectedRow
@@ -90,7 +90,7 @@ class BucketFunction(NAME: String, bucketFunctionType:
BucketFunctionType) exten
val serializer = new InternalRowSerializer(bucketKeyRowType)
val mapping = (1 to bucketKeyRowType.getFieldCount).toArray
val reusedRow =
- new SparkInternalRowWrapper(-1, inputType, inputType.fields.length)
+ new SparkInternalRowWrapper(inputType, inputType.fields.length)
val bucketFunc: bucket.BucketFunction =
bucket.BucketFunction.create(bucketFunctionType, bucketKeyRowType)
new ScalarFunction[Int]() {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
index fb53fafa25..717dc92b23 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
@@ -145,7 +145,7 @@ private class FormatTableDataWriter(batchWriteBuilder:
BatchWriteBuilder, writeS
private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow
= {
val numFields = writeSchema.fields.length
record => {
- new SparkInternalRowWrapper(-1, writeSchema, numFields).replace(record)
+ new SparkInternalRowWrapper(writeSchema, numFields).replace(record)
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWrite.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWrite.scala
index cfb6c04ee3..603ec7ecff 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWrite.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWrite.scala
@@ -117,7 +117,7 @@ abstract class abstractInnerTableDataWrite[T] extends
InnerTableDataWrite[T] wit
/** For batch write, batchId is None, for streaming write, batchId is the
current batch id (>= 0). */
val batchId: Option[Long]
- private val needFullCompaction: Boolean = {
+ private lazy val needFullCompaction: Boolean = {
fullCompactionDeltaCommits match {
case Some(deltaCommits) if deltaCommits > 0 =>
batchId match {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
index 589ba17451..1b58483e69 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
@@ -54,11 +54,15 @@ case class PaimonBatchWrite(
}
override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory = {
- val fullCompactionDeltaCommits: Option[Int] =
- Option.apply(coreOptions.fullCompactionDeltaCommits())
- (_: Int, _: Long) => {
- PaimonV2DataWriter(batchWriteBuilder, writeSchema, dataSchema,
fullCompactionDeltaCommits)
- }
+ (_: Int, _: Long) =>
+ {
+ PaimonV2DataWriter(
+ batchWriteBuilder,
+ writeSchema,
+ dataSchema,
+ coreOptions,
+ table.catalogEnvironment().catalogContext())
+ }
}
override def useCommitCoordinator(): Boolean = false
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
index d5291fe5d8..fbd166a183 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
@@ -18,6 +18,8 @@
package org.apache.paimon.spark.write
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils}
import org.apache.paimon.spark.metric.SparkMetricRegistry
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
TableWriteImpl}
@@ -32,15 +34,19 @@ case class PaimonV2DataWriter(
writeBuilder: BatchWriteBuilder,
writeSchema: StructType,
dataSchema: StructType,
- fullCompactionDeltaCommits: Option[Int],
+ coreOptions: CoreOptions,
+ catalogContext: CatalogContext,
batchId: Option[Long] = None)
extends abstractInnerTableDataWrite[InternalRow]
with InnerTableV2DataWrite {
private val ioManager = SparkUtils.createIOManager()
-
private val metricRegistry = SparkMetricRegistry()
+ val fullCompactionDeltaCommits: Option[Int] =
+ Option.apply(coreOptions.fullCompactionDeltaCommits())
+ val blobAsDescriptor: Boolean = coreOptions.blobAsDescriptor()
+
val write: TableWriteImpl[InternalRow] = {
writeBuilder
.newWrite()
@@ -51,7 +57,12 @@ case class PaimonV2DataWriter(
private val rowConverter: InternalRow => SparkInternalRowWrapper = {
val numFields = writeSchema.fields.length
- val reusableWrapper = new SparkInternalRowWrapper(-1, writeSchema,
dataSchema, numFields)
+ val reusableWrapper = new SparkInternalRowWrapper(
+ writeSchema,
+ numFields,
+ dataSchema,
+ blobAsDescriptor,
+ catalogContext)
record => reusableWrapper.replace(record)
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index 4bc1246612..5d8e1ef9fb 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -19,14 +19,14 @@
package org.apache.paimon.spark.sql
import org.apache.paimon.catalog.CatalogContext
-import org.apache.paimon.data.Blob
-import org.apache.paimon.data.BlobDescriptor
+import org.apache.paimon.data.{Blob, BlobDescriptor}
import org.apache.paimon.fs.Path
import org.apache.paimon.fs.local.LocalFileIO
import org.apache.paimon.options.Options
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.utils.UriReaderFactory
+import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import java.util
@@ -36,6 +36,10 @@ class BlobTestBase extends PaimonSparkTestBase {
private val RANDOM = new Random
+ override def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
+ }
+
test("Blob: test basic") {
withTable("t") {
sql(
@@ -216,7 +220,7 @@ class BlobTestBase extends PaimonSparkTestBase {
def bytesToHex(bytes: Array[Byte]): String = {
val hexChars = new Array[Char](bytes.length * 2)
- for (j <- 0 until bytes.length) {
+ for (j <- bytes.indices) {
val v = bytes(j) & 0xff
hexChars(j * 2) = HEX_ARRAY(v >>> 4)
hexChars(j * 2 + 1) = HEX_ARRAY(v & 0x0f)
@@ -224,3 +228,9 @@ class BlobTestBase extends PaimonSparkTestBase {
new String(hexChars)
}
}
+
+class BlobTestWithV2Write extends BlobTestBase {
+ override def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+ }
+}