This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c3ceae4523cb [SPARK-55366][SQL][PYTHON] Remove
`errorOnDuplicatedFieldNames` from Python UDFs
c3ceae4523cb is described below
commit c3ceae4523cb5b996b594f7ee811042c3f72037e
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Feb 6 07:34:32 2026 +0900
[SPARK-55366][SQL][PYTHON] Remove `errorOnDuplicatedFieldNames` from Python
UDFs
### What changes were proposed in this pull request?
Remove `errorOnDuplicatedFieldNames` from Python UDFs
### Why are the changes needed?
to make the logic more clearer, the `errorOnDuplicatedFieldNames` was
introduced in
https://github.com/apache/spark/commit/305aa4a89efe02f517f82039225a99b31b20146f
for `DataFrame.toPandas`, and it is always `true` in Python UDFs so duplicated
fields are never allowed in Python UDFs.
Remove this always-true variable to reduce confusion when refactoring UDF
stuffs.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #54153 from zhengruifeng/fail_duplicate_names.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/sql/util/ArrowUtils.scala | 37 ++++++++++++++++++++++
.../sql/execution/arrow/ArrowWriterWrapper.scala | 4 +--
.../sql/execution/python/ArrowPythonRunner.scala | 4 +--
.../execution/python/ArrowPythonUDTFRunner.scala | 4 +--
.../python/CoGroupedArrowPythonRunner.scala | 7 ++--
.../sql/execution/python/PythonArrowInput.scala | 8 ++---
.../ApplyInPandasWithStatePythonRunner.scala | 3 +-
.../TransformWithStateInPySparkPythonRunner.scala | 3 +-
8 files changed, 54 insertions(+), 16 deletions(-)
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
b/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
index dd3aad8ef598..e6fa93af64de 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
@@ -324,6 +324,43 @@ private[sql] object ArrowUtils {
}.asJava)
}
+ /**
+ * Maps schema from Spark to Arrow. NOTE: timeZoneId required for
TimestampType in StructType
+ */
+ def toArrowSchema(schema: StructType, timeZoneId: String, largeVarTypes:
Boolean): Schema = {
+ new Schema(schema.map { field =>
+ toArrowField(
+ field.name,
+ field.dataType,
+ field.nullable,
+ timeZoneId,
+ largeVarTypes,
+ field.metadata)
+ }.asJava)
+ }
+
+ /**
+ * Check the schema and fail once a struct type contains duplicated field
names.
+ */
+ def failDuplicatedFieldNames(dt: DataType): Unit = {
+ dt match {
+ case st: StructType =>
+ if (st.names.toSet.size != st.names.length) {
+ throw ExecutionErrors.duplicatedFieldNameInArrowStructError(
+ st.names.toImmutableArraySeq)
+ }
+ st.fields.foreach { field => failDuplicatedFieldNames(field.dataType) }
+ case arr: ArrayType =>
+ failDuplicatedFieldNames(arr.elementType)
+ case map: MapType =>
+ failDuplicatedFieldNames(map.keyType)
+ failDuplicatedFieldNames(map.valueType)
+ case udt: UserDefinedType[_] =>
+ failDuplicatedFieldNames(udt.sqlType)
+ case _ =>
+ }
+ }
+
def fromArrowSchema(schema: Schema): StructType = {
StructType(schema.getFields.asScala.map { field =>
StructField(
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriterWrapper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriterWrapper.scala
index c04bae07f67d..0008c1319ec9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriterWrapper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriterWrapper.scala
@@ -69,12 +69,10 @@ object ArrowWriterWrapper {
schema: StructType,
timeZoneId: String,
allocatorOwner: String,
- errorOnDuplicatedFieldNames: Boolean,
largeVarTypes: Boolean,
dataOut: DataOutputStream,
context: TaskContext): ArrowWriterWrapper = {
- val arrowSchema =
- ArrowUtils.toArrowSchema(schema, timeZoneId,
errorOnDuplicatedFieldNames, largeVarTypes)
+ val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId,
largeVarTypes)
val allocator = ArrowUtils.rootAllocator.newChildAllocator(
s"stdout writer for $allocatorOwner", 0, Long.MaxValue)
val root = VectorSchemaRoot.create(arrowSchema, allocator)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index a5536621c531..94354e815ad3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.python.EvalPythonExec.ArgumentMetadata
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
abstract class BaseArrowPythonRunner[IN, OUT <: AnyRef](
@@ -42,6 +43,7 @@ abstract class BaseArrowPythonRunner[IN, OUT <: AnyRef](
funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
with PythonArrowInput[IN]
with PythonArrowOutput[OUT] {
+ ArrowUtils.failDuplicatedFieldNames(schema)
override val envVars: util.Map[String, String] = {
val envVars = new util.HashMap(funcs.head._1.funcs.head.envVars)
@@ -62,8 +64,6 @@ abstract class BaseArrowPythonRunner[IN, OUT <: AnyRef](
override val killWorkerOnFlushFailure: Boolean =
SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
- override val errorOnDuplicatedFieldNames: Boolean = true
-
override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
override val simplifiedTraceback: Boolean =
SQLConf.get.pysparkSimplifiedTraceback
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
index e5c7be2f4070..818a05cbdd66 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.python.EvalPythonExec.ArgumentMetadata
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
/**
@@ -48,6 +49,7 @@ class ArrowPythonUDTFRunner(
jobArtifactUUID, pythonMetrics)
with BatchedPythonArrowInput
with BasicPythonArrowOutput {
+ ArrowUtils.failDuplicatedFieldNames(schema)
override protected def runnerConf: Map[String, String] = super.runnerConf ++
pythonRunnerConf
@@ -88,8 +90,6 @@ class ArrowPythonUDTFRunner(
override val killWorkerOnFlushFailure: Boolean =
SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
- override val errorOnDuplicatedFieldNames: Boolean = true
-
override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
override val simplifiedTraceback: Boolean =
SQLConf.get.pysparkSimplifiedTraceback
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
index d1fbbdba7131..df108187c9f0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.arrow.ArrowWriterWrapper
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
/**
@@ -53,6 +54,8 @@ class CoGroupedArrowPythonRunner(
(Iterator[InternalRow], Iterator[InternalRow]), ColumnarBatch](
funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
with BasicPythonArrowOutput {
+ ArrowUtils.failDuplicatedFieldNames(leftSchema)
+ ArrowUtils.failDuplicatedFieldNames(rightSchema)
override protected def runnerConf: Map[String, String] = super.runnerConf ++
pythonRunnerConf
@@ -154,7 +157,7 @@ class CoGroupedArrowPythonRunner(
if (nextBatchInLeftGroup != null) {
if (leftGroupArrowWriter == null) {
leftGroupArrowWriter =
ArrowWriterWrapper.createAndStartArrowWriter(leftSchema,
- timeZoneId, pythonExec + " (left)", errorOnDuplicatedFieldNames
= true,
+ timeZoneId, pythonExec + " (left)",
largeVarTypes, dataOut, context)
// Set the unloader with compression after creating the writer
leftGroupArrowWriter.unloader =
createUnloader(leftGroupArrowWriter.root)
@@ -177,7 +180,7 @@ class CoGroupedArrowPythonRunner(
} else if (nextBatchInRightGroup != null) {
if (rightGroupArrowWriter == null) {
rightGroupArrowWriter =
ArrowWriterWrapper.createAndStartArrowWriter(rightSchema,
- timeZoneId, pythonExec + " (right)", errorOnDuplicatedFieldNames
= true,
+ timeZoneId, pythonExec + " (right)",
largeVarTypes, dataOut, context)
// Set the unloader with compression after creating the writer
rightGroupArrowWriter.unloader =
createUnloader(rightGroupArrowWriter.root)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
index 58a48b1815e1..2b200294803d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
@@ -47,8 +47,6 @@ private[python] trait PythonArrowInput[IN] { self:
BasePythonRunner[IN, _] =>
protected def timeZoneId: String
- protected def errorOnDuplicatedFieldNames: Boolean
-
protected def largeVarTypes: Boolean
protected def pythonMetrics: Map[String, SQLMetric]
@@ -65,8 +63,8 @@ private[python] trait PythonArrowInput[IN] { self:
BasePythonRunner[IN, _] =>
ArrowUtils.rootAllocator.newChildAllocator(s"stdout writer for
$pythonExec", 0, Long.MaxValue)
protected lazy val root: VectorSchemaRoot = {
- val arrowSchema = ArrowUtils.toArrowSchema(
- schema, timeZoneId, errorOnDuplicatedFieldNames, largeVarTypes)
+ ArrowUtils.failDuplicatedFieldNames(schema)
+ val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId,
largeVarTypes)
VectorSchemaRoot.create(arrowSchema, allocator)
}
@@ -288,7 +286,7 @@ private[python] trait GroupedPythonArrowInput { self:
RowInputArrowPythonRunner
assert(writer == null || writer.isClosed)
writer = ArrowWriterWrapper.createAndStartArrowWriter(
schema, timeZoneId, pythonExec,
- errorOnDuplicatedFieldNames, largeVarTypes, dataOut, context)
+ largeVarTypes, dataOut, context)
// Set the unloader with compression after creating the writer
writer.unloader = new VectorUnloader(writer.root, true,
self.codec, true)
nextBatchStart = inputIterator.next()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
index 89d8e425fd2b..786c5fc408dc 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
@@ -41,6 +41,7 @@ import
org.apache.spark.sql.execution.python.streaming.ApplyInPandasWithStateWri
import
org.apache.spark.sql.execution.streaming.operators.stateful.flatmapgroupswithstate.GroupStateImpl
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
@@ -69,6 +70,7 @@ class ApplyInPandasWithStatePythonRunner(
funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
with PythonArrowInput[InType]
with PythonArrowOutput[OutType] {
+ ArrowUtils.failDuplicatedFieldNames(inputSchema)
override val pythonExec: String =
SQLConf.get.pysparkWorkerPythonExecutable.getOrElse(
@@ -85,7 +87,6 @@ class ApplyInPandasWithStatePythonRunner(
private val sqlConf = SQLConf.get
override val schema: StructType = inputSchema.add("__state",
STATE_METADATA_SCHEMA)
- override val errorOnDuplicatedFieldNames: Boolean = true
override val hideTraceback: Boolean = sqlConf.pysparkHideTraceback
override val simplifiedTraceback: Boolean =
sqlConf.pysparkSimplifiedTraceback
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
index 05771d38cd84..33fe0cdfee3f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
@@ -38,6 +38,7 @@ import
org.apache.spark.sql.execution.python.streaming.TransformWithStateInPySpa
import
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.statefulprocessor.{DriverStatefulProcessorHandleImpl,
StatefulProcessorHandleImpl}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.ThreadUtils
@@ -233,6 +234,7 @@ abstract class
TransformWithStateInPySparkPythonBaseRunner[I](
with BasicPythonArrowOutput
with TransformWithStateInPySparkPythonRunnerUtils
with Logging {
+ ArrowUtils.failDuplicatedFieldNames(schema)
protected val sqlConf = SQLConf.get
protected val arrowMaxRecordsPerBatch = sqlConf.arrowMaxRecordsPerBatch
@@ -251,7 +253,6 @@ abstract class
TransformWithStateInPySparkPythonBaseRunner[I](
(if (isUnixDomainSock) stateServerSocketPath else
stateServerSocketPort.toString)
)
- override protected val errorOnDuplicatedFieldNames: Boolean = true
override protected val largeVarTypes: Boolean = sqlConf.arrowUseLargeVarTypes
override def compute(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]