This is an automated email from the ASF dual-hosted git repository.
asf-gitbox-commits pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new da9e3fc68f6c [SPARK-55897][SQL][4.1] Handle UserDefinedType in
ColumnarRow, ColumnarBatchRow, and ColumnarArray get()
da9e3fc68f6c is described below
commit da9e3fc68f6c8423064a98f591a0f002f5fc1879
Author: jameswillis <[email protected]>
AuthorDate: Fri May 22 21:32:31 2026 -0700
[SPARK-55897][SQL][4.1] Handle UserDefinedType in ColumnarRow,
ColumnarBatchRow, and ColumnarArray get()
Backport of #54701 to branch-4.1.
### What changes were proposed in this pull request?
`ColumnarRow.get()`, `ColumnarBatchRow.get()`, and `ColumnarArray.get()`
throw `SparkUnsupportedOperationException` when called with a `UserDefinedType`
because they have no branch to handle UDTs.
This PR adds UDT handling to all three methods:
- **ColumnarRow** and **ColumnarBatchRow**: Add an `instanceof
UserDefinedType` branch that recurses with `udt.sqlType()`, matching the
pattern already used in `SpecializedGettersReader.read()`.
- **ColumnarArray**: Change the `handleUserDefinedType` flag from `false`
to `true` in the existing call to `SpecializedGettersReader.read()`.
### Why are the changes needed?
The codegen path (`CodeGenerator.getValue()`) unwraps `udt.sqlType()`
before generating accessor calls, so UDT columns work when whole-stage codegen
is active. However, on the interpreted eval path — when codegen is disabled,
falls back, or the number of fields exceeds `spark.sql.codegen.maxFields` —
`GetStructField.nullSafeEval` calls `ColumnarRow.get(ordinal, udtType)`
directly, which hits the unhandled branch and throws.
### Does this PR introduce _any_ user-facing change?
Yes. UDT columns in columnar data sources (e.g., Parquet) now work
correctly on the interpreted evaluation path. Previously they would throw
`SparkUnsupportedOperationException`.
### How was this patch tested?
Added 6 new tests in `ColumnarBatchSuite` covering all 3 methods x 2 UDT
backing types (primitive `IntegerType` and complex `StructType`). Each test
creates columnar vectors with UDT data and verifies that `get()` returns the
correct value. Two helper UDT classes (`TestIntUDT`, `TestStructWrapperUDT`)
are defined for the tests.
Cherry-picked cleanly from 472735cefef on master; no conflicts.
### Was this patch authored or co-authored using generative AI tooling?
Yes. Opus 4.6
Closes #55989 from james-willis/backport-SPARK-55897-4.1.
Authored-by: jameswillis <[email protected]>
Signed-off-by: Huaxin Gao <[email protected]>
---
.../apache/spark/sql/vectorized/ColumnarArray.java | 2 +-
.../spark/sql/vectorized/ColumnarBatchRow.java | 2 +
.../apache/spark/sql/vectorized/ColumnarRow.java | 2 +
.../execution/vectorized/ColumnarBatchSuite.scala | 121 +++++++++++++++++++++
4 files changed, 126 insertions(+), 1 deletion(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
index fad1817aca19..861a6a4c50e4 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
@@ -213,7 +213,7 @@ public final class ColumnarArray extends ArrayData {
@Override
public Object get(int ordinal, DataType dataType) {
- return SpecializedGettersReader.read(this, ordinal, dataType, false,
false);
+ return SpecializedGettersReader.read(this, ordinal, dataType, false, true);
}
@Override
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
index 3d1e780f6e05..42b335dfd2bc 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
@@ -215,6 +215,8 @@ public final class ColumnarBatchRow extends InternalRow {
return getMap(ordinal);
} else if (dataType instanceof VariantType) {
return getVariant(ordinal);
+ } else if (dataType instanceof UserDefinedType<?> udt) {
+ return get(ordinal, udt.sqlType());
} else {
throw new SparkUnsupportedOperationException(
"_LEGACY_ERROR_TEMP_3152", Map.of("dataType",
String.valueOf(dataType)));
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
index 656c5f8a8f30..d66baa8fd8fe 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
@@ -217,6 +217,8 @@ public final class ColumnarRow extends InternalRow {
return getMap(ordinal);
} else if (dataType instanceof VariantType) {
return getVariant(ordinal);
+ } else if (dataType instanceof UserDefinedType<?> udt) {
+ return get(ordinal, udt.sqlType());
} else {
throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3155");
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 6d90bb985e26..93b3ea67d6bc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -48,6 +48,38 @@ import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal}
import org.apache.spark.util.ArrayImplicits._
+/**
+ * A minimal UDT backed by IntegerType, used by SPARK-55897 tests.
+ */
+@SQLUserDefinedType(udt = classOf[TestIntUDT])
+private case class TestIntWrapper(value: Int)
+
+private class TestIntUDT extends UserDefinedType[TestIntWrapper] {
+ override def sqlType: DataType = IntegerType
+ override def serialize(obj: TestIntWrapper): Any = obj.value
+ override def userClass: Class[TestIntWrapper] = classOf[TestIntWrapper]
+ override def deserialize(datum: Any): TestIntWrapper = datum match {
+ case v: Int => TestIntWrapper(v)
+ }
+}
+
+/**
+ * A minimal UDT backed by StructType, used by SPARK-55897 tests.
+ */
+@SQLUserDefinedType(udt = classOf[TestStructWrapperUDT])
+private case class TestStructWrapper(x: Int, y: Long)
+
+private class TestStructWrapperUDT extends UserDefinedType[TestStructWrapper] {
+ override def sqlType: DataType = new StructType()
+ .add("x", IntegerType)
+ .add("y", LongType)
+ override def serialize(obj: TestStructWrapper): Any = InternalRow(obj.x,
obj.y)
+ override def userClass: Class[TestStructWrapper] = classOf[TestStructWrapper]
+ override def deserialize(datum: Any): TestStructWrapper = datum match {
+ case row: InternalRow => TestStructWrapper(row.getInt(0), row.getLong(1))
+ }
+}
+
@ExtendedSQLTest
class ColumnarBatchSuite extends SparkFunSuite {
@@ -2060,4 +2092,93 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
}
}
+
+ testVector(
+ "SPARK-55897: ColumnarRow.get with primitive-backed UDT",
+ 10,
+ new StructType().add("name", StringType).add("udt_field", IntegerType)) {
column =>
+ column.getChild(0).putByteArray(0, "hello".getBytes)
+ column.getChild(1).putInt(0, 42)
+
+ val row = column.getStruct(0)
+ assert(row.get(1, new TestIntUDT()) === 42)
+ }
+
+ testVector(
+ "SPARK-55897: ColumnarRow.get with struct-backed UDT",
+ 10,
+ new StructType()
+ .add("id", IntegerType)
+ .add("nested", new StructType().add("x", IntegerType).add("y",
LongType))) { column =>
+ column.getChild(0).putInt(0, 1)
+ column.getChild(1).getChild(0).putInt(0, 10)
+ column.getChild(1).getChild(1).putLong(0, 20L)
+
+ val row = column.getStruct(0)
+ val nested = row.get(1, new
TestStructWrapperUDT()).asInstanceOf[InternalRow]
+ assert(nested.getInt(0) === 10)
+ assert(nested.getLong(1) === 20L)
+ }
+
+ testVector(
+ "SPARK-55897: ColumnarArray.get with primitive-backed UDT",
+ 10,
+ new ArrayType(IntegerType, false)) { column =>
+ val data = column.arrayData()
+ data.putInt(0, 10)
+ data.putInt(1, 20)
+ column.putArray(0, 0, 2)
+
+ val arr = column.getArray(0)
+ assert(arr.get(0, new TestIntUDT()) === 10)
+ assert(arr.get(1, new TestIntUDT()) === 20)
+ }
+
+ testVector(
+ "SPARK-55897: ColumnarArray.get with struct-backed UDT",
+ 10,
+ new ArrayType(new StructType().add("x", IntegerType).add("y", LongType),
false)) { column =>
+ val data = column.arrayData()
+ data.getChild(0).putInt(0, 100)
+ data.getChild(1).putLong(0, 200L)
+ column.putArray(0, 0, 1)
+
+ val arr = column.getArray(0)
+ val row = arr.get(0, new
TestStructWrapperUDT()).asInstanceOf[InternalRow]
+ assert(row.getInt(0) === 100)
+ assert(row.getLong(1) === 200L)
+ }
+
+ test("SPARK-55897: ColumnarBatchRow.get with primitive-backed UDT") {
+ Seq(MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP).foreach { memMode =>
+ val col = allocate(10, IntegerType, memMode)
+ try {
+ col.putInt(0, 99)
+ val batchRow = new ColumnarBatchRow(Array(col))
+ batchRow.rowId = 0
+ assert(batchRow.get(0, new TestIntUDT()) === 99)
+ } finally {
+ col.close()
+ }
+ }
+ }
+
+ test("SPARK-55897: ColumnarBatchRow.get with struct-backed UDT") {
+ Seq(MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP).foreach { memMode =>
+ val col = allocate(10,
+ new StructType().add("x", IntegerType).add("y", LongType), memMode)
+ try {
+ col.getChild(0).putInt(0, 5)
+ col.getChild(1).putLong(0, 15L)
+ val batchRow = new ColumnarBatchRow(Array(col))
+ batchRow.rowId = 0
+
+ val row = batchRow.get(0, new
TestStructWrapperUDT()).asInstanceOf[InternalRow]
+ assert(row.getInt(0) === 5)
+ assert(row.getLong(1) === 15L)
+ } finally {
+ col.close()
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]