[
https://issues.apache.org/jira/browse/SPARK-56999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-56999:
-----------------------------------
Labels: pull-request-available (was: )
> Coerce mapInArrow output batches to declared Arrow schema
> ---------------------------------------------------------
>
> Key: SPARK-56999
> URL: https://issues.apache.org/jira/browse/SPARK-56999
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 5.0.0
> Reporter: Yicong Huang
> Priority: Major
> Labels: pull-request-available
>
> When the output Arrow batches produced by a `mapInArrow` UDF do not exactly
> match the declared output schema, the failure surfaces in the JVM as an
> opaque error from `ArrowColumnVector$ArrowVectorAccessor` (for example
> calling `getInt` on a `LongAccessor`):
> [UNSUPPORTED_CALL.WITHOUT_SUGGESTION] Cannot call the method "getInt" of
> the class
> "org.apache.spark.sql.vectorized.ArrowColumnVector$ArrowVectorAccessor".
> SQLSTATE: 0A000
> Repro:
> {code:python}
> import pyarrow as pa
> from pyspark.sql.types import StructType, StructField, IntegerType
> def double_x(iter_batches):
> for batch in iter_batches:
> df = batch.to_pandas()
> df["x"] = df["x"] * 2
> yield pa.RecordBatch.from_pandas(df[["x"]])
> src = spark.createDataFrame([(1,), (2,), (3,)], ["x"]) # x is long (int64)
> out = src.mapInArrow(double_x, schema=StructType([StructField("x",
> IntegerType())]))
> out.show()
> {code}
> `createDataFrame` infers `x` as `LongType`, the pandas roundtrip preserves
> int64, the declared schema is int32, so the JVM picks `LongAccessor` for a
> `BigIntVector` and the outer scan calls `getInt` on it.
> Root cause: the `SQL_MAP_ARROW_ITER_UDF` branch in `python/pyspark/worker.py`
> does not call `ArrowBatchTransformer.enforce_schema` on the output batches,
> while every sibling Arrow eval-type branch does. The defensive
> `spark.sql.execution.arrow.pyspark.validateSchema.enabled` check defaults to
> false and so does not catch this either.
> Fix: derive the return Arrow schema from the UDF's declared output type and
> pipe each output batch through `ArrowBatchTransformer.enforce_schema(batch,
> return_schema, reorder_by_name=runner_conf.assign_cols_by_name)` before
> `wrap_struct`. This mirrors `SQL_GROUPED_MAP_ARROW_UDF` for the reorder
> semantics and lets pyarrow cast int64 -> int32 (and similar mismatches) via
> the default `arrow_cast=True`, with `safecheck=True` still catching overflow.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]