[
https://issues.apache.org/jira/browse/SPARK-52780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18065992#comment-18065992
]
Callum Dempsey Leach commented on SPARK-52780:
----------------------------------------------
This is ready for review. See Github for latest changes.
> Expose Local Iterator in Spark Connect Go Client (For Streaming Rows)
> ---------------------------------------------------------------------
>
> Key: SPARK-52780
> URL: https://issues.apache.org/jira/browse/SPARK-52780
> Project: Spark
> Issue Type: New Feature
> Components: Connect
> Affects Versions: 4.0.0
> Reporter: Callum Dempsey Leach
> Priority: Major
> Labels: pull-request-available
> Original Estimate: 120h
> Remaining Estimate: 120h
>
> The current Spark Connect Go client implementation fetches all DataFrame rows
> at once using the `Collect()` method, which limits its practicality for
> handling very large datasets (GBs/TBs). In the Scala Spark Connect client, a
> `toLocalIterator()` method allows incremental streaming of rows, which is
> advantageous for efficiently processing large result sets by external clients
> or services. In my case, I have an API gateway which polls Spark Connect
> allowing me to distribute the results of my OLAP queries as a stream with
> fairly lightweight instance sizes (when multiple GB of rows are returned).
> Super useful!
>
> Originally I wanted to develop this gateway using the Spark Connect Go client
> but found it doesn't expose the `ToLocalIterator()` method within the Spark
> Connect Scala DataFrame interface, returning an iterator that streams rows
> incrementally as they land in Arrow. Quite explicitly in
> `client/sql/dataframe.go`:
>
> {code:go}
> func (df *dataFrameImpl) Collect() ([]Row, error) {
> responseClient, err := df.sparkSession.executePlan(df.createPlan())
> if err != nil
> { return nil, fmt.Errorf("failed to execute plan: %w", err) }
>
> var schema *StructType
> var allRows []Row
>
> for {
> response, err := responseClient.Recv()
> if err != nil {
> if errors.Is(err, io.EOF)
> { return allRows, nil }
> else
> { return nil, fmt.Errorf("failed to receive plan execution response: %w",
> err) }
> }
>
> dataType := response.GetSchema()
> if dataType != nil
> { schema = convertProtoDataTypeToStructType(dataType) continue }
>
> arrowBatch := response.GetArrowBatch()
> if arrowBatch == nil
> { continue }
>
> rowBatch, err := readArrowBatchData(arrowBatch.Data, schema)
> if err != nil
> { return nil, err }
>
> if allRows == nil
> { allRows = make([]Row, 0, len(rowBatch)) }
> allRows = append(allRows, rowBatch...)
> }
>
> return allRows, nil
> }
> {code}
>
> The parts are already there for us to define a clear `ToLocalIterator()`
> implementation., I'd probably avoid using channels so call-site can determine
> themselves whether they want the behaviour to be asynchronous and implement
> something like this that tries to gracefully handle the Arrow resources OOTB:
>
> {code:go}
> package sql
>
> import (
> "errors"
> "fmt"
> "io"
> )
>
> // RowIterator streams rows batch‑by‑batch, keeping only the
> // current batch in memory (i guess what Spark’s Scala
> // `toLocalIterator()` does under the hood).
> //
> // • At any time we hold one `[]Row` converted via the existing
> // `readArrowBatchData` helper.
> // • When the slice is exhausted we fetch the next Arrow batch
> // and replace it. Whatever row we do return is denoted by the cursor of
> the current []Row.
> type RowIterator interface
> { HasNext() bool Next() (Row, error) Close() error }
>
> type rowIteratorImpl struct
> { responseClient proto.SparkConnectService_ExecutePlanClient schema
> *StructType batch []Row // current batch rowIdx int //
> index inside current batch exhausted bool }
>
> func (it *rowIteratorImpl) HasNext() bool {
> if it.exhausted
> { return false }
> if it.rowIdx < len(it.batch)
> { return true }
> // current batch consumed -> try to pull next batch
> return it.fetchNextBatch() == nil && len(it.batch) > 0
> }
>
> func (it *rowIteratorImpl) Next() (Row, error) {
> if !it.HasNext()
> { return nil, io.EOF }
> r := it.batch[it.rowIdx]
> it.rowIdx++
> return r, nil
> }
>
> func (it *rowIteratorImpl) Close() error
> { return it.responseClient.CloseSend() }
>
> // fetchNextBatch reads Arrow messages until we convert a non‑empty batch.
> func (it *rowIteratorImpl) fetchNextBatch() error {
> it.batch = nil
> it.rowIdx = 0
>
> for {
> resp, err := it.responseClient.Recv()
> if err != nil {
> if errors.Is(err, io.EOF)
> { it.exhausted = true return io.EOF
> }
> return fmt.Errorf("receive execute‑plan response: %w", err)
> }
>
> // Schema message comes first (once)
> if sch := resp.GetSchema(); sch != nil && it.schema == nil
> { it.schema = convertProtoDataTypeToStructType(sch)
> continue }
>
> // Arrow batch
> if batch := resp.GetArrowBatch(); batch != nil {
> rows, err := readArrowBatchData(batch.Data, it.schema)
> if err != nil
> { return err }
> // Skip empty batches just in case
> if len(rows) == 0
> { continue }
> it.batch = rows
> return nil
> }
> }
> }
>
> // ToLocalIterator returns a streaming RowIterator for the DataFrame.
> func (df *dataFrameImpl) ToLocalIterator() (RowIterator, error) {
> client, err := df.sparkSession.executePlan(df.createPlan())
> if err != nil
> { return nil, fmt.Errorf("execute plan: %w", err) }
> return &rowIteratorImpl\{responseClient: client}, nil
> }
> {code}
>
> I'm not sold on the `ToLocalIterator` naming convention, because it
> obfuscates the idea that we strictly avoid pulling all the data into memory
> before enumerating it, which Arrow helps with a great deal, I'd prefer
> `ToStreamingIterator`.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]