[ 
https://issues.apache.org/jira/browse/SPARK-52780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18065992#comment-18065992
 ] 

Callum Dempsey Leach commented on SPARK-52780:
----------------------------------------------

This is ready for review. See Github for latest changes.

> Expose Local Iterator in Spark Connect Go Client (For Streaming Rows)
> ---------------------------------------------------------------------
>
>                 Key: SPARK-52780
>                 URL: https://issues.apache.org/jira/browse/SPARK-52780
>             Project: Spark
>          Issue Type: New Feature
>          Components: Connect
>    Affects Versions: 4.0.0
>            Reporter: Callum Dempsey Leach
>            Priority: Major
>              Labels: pull-request-available
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> The current Spark Connect Go client implementation fetches all DataFrame rows 
> at once using the `Collect()` method, which limits its practicality for 
> handling very large datasets (GBs/TBs). In the Scala Spark Connect client, a 
> `toLocalIterator()` method allows incremental streaming of rows, which is 
> advantageous for efficiently processing large result sets by external clients 
> or services. In my case, I have an API gateway which polls Spark Connect 
> allowing me to distribute the results of my OLAP queries as a stream with 
> fairly lightweight instance sizes (when multiple GB of rows are returned). 
> Super useful!
>  
> Originally I wanted to develop this gateway using the Spark Connect Go client 
> but found it doesn't expose the `ToLocalIterator()` method within the Spark 
> Connect Scala DataFrame interface, returning an iterator that streams rows 
> incrementally as they land in Arrow. Quite explicitly in 
> `client/sql/dataframe.go`:
>  
> {code:go}
> func (df *dataFrameImpl) Collect() ([]Row, error) {
> responseClient, err := df.sparkSession.executePlan(df.createPlan())
> if err != nil
> { return nil, fmt.Errorf("failed to execute plan: %w", err) }
>  
> var schema *StructType
> var allRows []Row
>  
> for {
> response, err := responseClient.Recv()
> if err != nil {
> if errors.Is(err, io.EOF)
> { return allRows, nil }
> else
> { return nil, fmt.Errorf("failed to receive plan execution response: %w", 
> err) }
> }
>  
> dataType := response.GetSchema()
> if dataType != nil
> { schema = convertProtoDataTypeToStructType(dataType) continue }
>  
> arrowBatch := response.GetArrowBatch()
> if arrowBatch == nil
> { continue }
>  
> rowBatch, err := readArrowBatchData(arrowBatch.Data, schema)
> if err != nil
> { return nil, err }
>  
> if allRows == nil
> { allRows = make([]Row, 0, len(rowBatch)) }
> allRows = append(allRows, rowBatch...)
> }
>  
> return allRows, nil
> }
> {code}
>  
> The parts are already there for us to define a clear `ToLocalIterator()` 
> implementation., I'd probably avoid using channels so call-site can determine 
> themselves whether they want the behaviour to be asynchronous and implement 
> something like this that tries to gracefully handle the Arrow resources OOTB:
>  
> {code:go}
> package sql
>  
> import (
>     "errors"
>     "fmt"
>     "io"
> )
>  
> // RowIterator streams rows batch‑by‑batch, keeping only the
> // current batch in memory (i guess what Spark’s Scala
> // `toLocalIterator()` does under the hood).
> //
> //   • At any time we hold one `[]Row` converted via the existing
> //     `readArrowBatchData` helper.
> //   • When the slice is exhausted we fetch the next Arrow batch
> //     and replace it. Whatever row we do return is denoted by the cursor of 
> the current []Row.
> type RowIterator interface
> {     HasNext() bool     Next() (Row, error)     Close() error }
>  
> type rowIteratorImpl struct
> {     responseClient proto.SparkConnectService_ExecutePlanClient     schema   
>       *StructType       batch   []Row // current batch     rowIdx  int   // 
> index inside current batch     exhausted bool }
>  
> func (it *rowIteratorImpl) HasNext() bool {
>     if it.exhausted
> {         return false     }
>     if it.rowIdx < len(it.batch)
> {         return true     }
>     // current batch consumed -> try to pull next batch
>     return it.fetchNextBatch() == nil && len(it.batch) > 0
> }
>  
> func (it *rowIteratorImpl) Next() (Row, error) {
>     if !it.HasNext()
> {         return nil, io.EOF     }
>     r := it.batch[it.rowIdx]
>     it.rowIdx++
>     return r, nil
> }
>  
> func (it *rowIteratorImpl) Close() error
> {     return it.responseClient.CloseSend() }
>  
> // fetchNextBatch reads Arrow messages until we convert a non‑empty batch.
> func (it *rowIteratorImpl) fetchNextBatch() error {
>     it.batch = nil
>     it.rowIdx = 0
>  
>     for {
>         resp, err := it.responseClient.Recv()
>         if err != nil {
>             if errors.Is(err, io.EOF)
> {                 it.exhausted = true                 return io.EOF           
>   }
>             return fmt.Errorf("receive execute‑plan response: %w", err)
>         }
>  
>         // Schema message comes first (once)
>         if sch := resp.GetSchema(); sch != nil && it.schema == nil
> {             it.schema = convertProtoDataTypeToStructType(sch)             
> continue         }
>  
>         // Arrow batch
>         if batch := resp.GetArrowBatch(); batch != nil {
>             rows, err := readArrowBatchData(batch.Data, it.schema)
>             if err != nil
> {                 return err             }
>             // Skip empty batches just in case
>             if len(rows) == 0
> {                 continue             }
>             it.batch = rows
>             return nil
>         }
>     }
> }
>  
> // ToLocalIterator returns a streaming RowIterator for the DataFrame.
> func (df *dataFrameImpl) ToLocalIterator() (RowIterator, error) {
>     client, err := df.sparkSession.executePlan(df.createPlan())
>     if err != nil
> {         return nil, fmt.Errorf("execute plan: %w", err)     }
>     return &rowIteratorImpl\{responseClient: client}, nil
> }
> {code}
>  
> I'm not sold on the `ToLocalIterator` naming convention, because it 
> obfuscates the idea that we strictly avoid pulling all the data into memory 
> before enumerating it, which Arrow helps with a great deal, I'd prefer 
> `ToStreamingIterator`. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to