[ 
https://issues.apache.org/jira/browse/SPARK-56999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yicong Huang updated SPARK-56999:
---------------------------------
    Description: 
When the output Arrow batches produced by a `mapInArrow` UDF do not exactly 
match the declared output schema, the failure surfaces in the JVM as an opaque 
error from `ArrowColumnVector$ArrowVectorAccessor` (for example calling 
`getInt` on a `LongAccessor`):

  [UNSUPPORTED_CALL.WITHOUT_SUGGESTION] Cannot call the method "getInt" of the 
class "org.apache.spark.sql.vectorized.ArrowColumnVector$ArrowVectorAccessor". 
SQLSTATE: 0A000

Repro:

{code:python}
import pyarrow as pa
from pyspark.sql.types import StructType, StructField, IntegerType

def double_x(iter_batches):
    for batch in iter_batches:
        df = batch.to_pandas()
        df["x"] = df["x"] * 2
        yield pa.RecordBatch.from_pandas(df[["x"]])

src = spark.createDataFrame([(1,), (2,), (3,)], ["x"])  # x is long (int64)
out = src.mapInArrow(double_x, schema=StructType([StructField("x", 
IntegerType())]))
out.show()
{code}

`createDataFrame` infers `x` as `LongType`, the pandas roundtrip preserves 
int64, the declared schema is int32, so the JVM picks `LongAccessor` for a 
`BigIntVector` and the outer scan calls `getInt` on it.

Root cause: the `SQL_MAP_ARROW_ITER_UDF` branch in `python/pyspark/worker.py` 
does not call `ArrowBatchTransformer.enforce_schema` on the output batches, 
while every sibling Arrow eval-type branch does. The defensive 
`spark.sql.execution.arrow.pyspark.validateSchema.enabled` check defaults to 
false and so does not catch this either.

Fix: derive the return Arrow schema from the UDF's declared output type and 
pipe each output batch through `ArrowBatchTransformer.enforce_schema(batch, 
return_schema, reorder_by_name=runner_conf.assign_cols_by_name)` before 
`wrap_struct`. This mirrors `SQL_GROUPED_MAP_ARROW_UDF` for the reorder 
semantics and lets pyarrow cast int64 -> int32 (and similar mismatches) via the 
default `arrow_cast=True`, with `safecheck=True` still catching overflow.

> Coerce mapInArrow output batches to declared Arrow schema
> ---------------------------------------------------------
>
>                 Key: SPARK-56999
>                 URL: https://issues.apache.org/jira/browse/SPARK-56999
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 5.0.0
>            Reporter: Yicong Huang
>            Priority: Major
>
> When the output Arrow batches produced by a `mapInArrow` UDF do not exactly 
> match the declared output schema, the failure surfaces in the JVM as an 
> opaque error from `ArrowColumnVector$ArrowVectorAccessor` (for example 
> calling `getInt` on a `LongAccessor`):
>   [UNSUPPORTED_CALL.WITHOUT_SUGGESTION] Cannot call the method "getInt" of 
> the class 
> "org.apache.spark.sql.vectorized.ArrowColumnVector$ArrowVectorAccessor". 
> SQLSTATE: 0A000
> Repro:
> {code:python}
> import pyarrow as pa
> from pyspark.sql.types import StructType, StructField, IntegerType
> def double_x(iter_batches):
>     for batch in iter_batches:
>         df = batch.to_pandas()
>         df["x"] = df["x"] * 2
>         yield pa.RecordBatch.from_pandas(df[["x"]])
> src = spark.createDataFrame([(1,), (2,), (3,)], ["x"])  # x is long (int64)
> out = src.mapInArrow(double_x, schema=StructType([StructField("x", 
> IntegerType())]))
> out.show()
> {code}
> `createDataFrame` infers `x` as `LongType`, the pandas roundtrip preserves 
> int64, the declared schema is int32, so the JVM picks `LongAccessor` for a 
> `BigIntVector` and the outer scan calls `getInt` on it.
> Root cause: the `SQL_MAP_ARROW_ITER_UDF` branch in `python/pyspark/worker.py` 
> does not call `ArrowBatchTransformer.enforce_schema` on the output batches, 
> while every sibling Arrow eval-type branch does. The defensive 
> `spark.sql.execution.arrow.pyspark.validateSchema.enabled` check defaults to 
> false and so does not catch this either.
> Fix: derive the return Arrow schema from the UDF's declared output type and 
> pipe each output batch through `ArrowBatchTransformer.enforce_schema(batch, 
> return_schema, reorder_by_name=runner_conf.assign_cols_by_name)` before 
> `wrap_struct`. This mirrors `SQL_GROUPED_MAP_ARROW_UDF` for the reorder 
> semantics and lets pyarrow cast int64 -> int32 (and similar mismatches) via 
> the default `arrow_cast=True`, with `safecheck=True` still catching overflow.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to