[ 
https://issues.apache.org/jira/browse/SPARK-57275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-57275:
-----------------------------------
    Labels: pull-request-available  (was: )

> Spark Connect Python client throws on valid Arrow IPC streams containing 
> multiple RecordBatches
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-57275
>                 URL: https://issues.apache.org/jira/browse/SPARK-57275
>             Project: Spark
>          Issue Type: Bug
>          Components: Connect
>    Affects Versions: 4.0.0
>            Reporter: Biruk Tesfaye
>            Priority: Major
>              Labels: pull-request-available
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The Arrow IPC streaming format wraps a result as 
> `{{{}[Schema][RecordBatch]*[EOS]{}}}` a single message can carry multiple 
> RecordBatches, and `{{{}pa.ipc.open_stream(...){}}}` parses all of them. The 
> server's arrow_batch.row_count is the total rows across every RecordBatch in 
> the message and the spark connect client validates the row count inside the 
> per-batch loop:
>  
> {code:python}
>   for batch in reader:
>       num_records_in_batch += batch.num_rows
>       if num_records_in_batch != b.arrow_batch.row_count:   # checked too 
> early
>           raise SparkConnectException(...)
>       num_records += num_records_in_batch                    # also 
> double-counts
> {code}
> When a message contains more than one RecordBatch, the check fires after the 
> first batch before the stream is fully consumed and throws:
>   {{SparkConnectException: Expected N rows in arrow batch but got M.   (M < 
> N)}}
> *Impact*: Any code path that produces multi-RecordBatch IPC streams (e.g. 
> Arrow-native IPC buffer compression) fails to fetch results, even though the 
> payload is well-formed and parseable by PyArrow.
>  *Fix*: Count each RecordBatch once (num_records += batch.num_rows) and 
> validate row_count only after the IPC stream is fully consumed. (The Scala 
> client in SparkResult.scala already validates after the loop and is 
> unaffected.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to