Biruk Tesfaye created SPARK-57275:
-------------------------------------
Summary: Spark Connect Python client throws on valid Arrow IPC
streams containing multiple RecordBatches
Key: SPARK-57275
URL: https://issues.apache.org/jira/browse/SPARK-57275
Project: Spark
Issue Type: Bug
Components: Connect
Affects Versions: 4.0.0
Reporter: Biruk Tesfaye
The Arrow IPC streaming format wraps a result as
`{{{}[Schema][RecordBatch]*[EOS]{}}}` a single message can carry multiple
RecordBatches, and `{{{}pa.ipc.open_stream(...){}}}` parses all of them. The
server's arrow_batch.row_count is the total rows across every RecordBatch in
the message and the spark connect client validates the row count inside the
per-batch loop:
{code:python}
for batch in reader:
num_records_in_batch += batch.num_rows
if num_records_in_batch != b.arrow_batch.row_count: # checked too early
raise SparkConnectException(...)
num_records += num_records_in_batch # also
double-counts
{code}
When a message contains more than one RecordBatch, the check fires after the
first batch before the stream is fully consumed and throws:
{{SparkConnectException: Expected N rows in arrow batch but got M. (M < N)}}
*Impact*: Any code path that produces multi-RecordBatch IPC streams (e.g.
Arrow-native IPC buffer compression) fails to fetch results, even though the
payload is well-formed and parseable by PyArrow.
*Fix*: Count each RecordBatch once (num_records += batch.num_rows) and
validate row_count only after the IPC stream is fully consumed. (The Scala
client in SparkResult.scala already validates after the loop and is unaffected.)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]