caldempsey commented on PR #152: URL: https://github.com/apache/spark-connect-go/pull/152#issuecomment-3250721352
I’ve made the changes we discussed @grundprinzip @zeroshade. Few thoughts: - I haven’t run a full end-to-end test yet to confirm that we’re truly fetching data at client pace without leaks. The channel-based implementation already handled this case well by blocking explicitly, so I’ve passed the sequences wrapping the business logic in the same way. My main concern is converting the push iterator in `ToRecordIterator` into a `Pull2` upstream. Hopefully this... just works, and won’t introduce leaks, but I’m still new to iterators in Go (we’ve been without them for 15 years!), so we better test E2E. - One thing I don’t love is [this detail from the docs](https://pkg.go.dev/iter#Pull2): _“It is an error to call next or stop from multiple goroutines simultaneously.”_. That limitation feels a bit restrictive, but it does simplify the model, and I don't see any reason you need more than one go-routine doing the consumption here anyway. - Iterators bring some nice benefits: being synchronous, guaranteeing no data race if the iterator and function body share state (although arguably modifying a data-structure while you are iterating it is slightly cursed and hard to read), has a fraction of the cost of channels + spinning up a goroutine, supports loop breaking control flow (return inside a loop body and break), while with channels you need to pass-in a cancellation channel (or context same thing) and manually close it when you break out, and here you can just... for loop over the Sequence type gracefully. That's pretty neat! - But again, worried that this could be a bit of a leaky abstraction as we cross the context boundaries of gRPC, client.go, and the Pull2 type (which I've aliased to make sure we know its a 'Pull' sequence, as they both share the same type, frustratingly), as I've never used Sequences before today. - I settled on the name 'Stream' with the Pull2 type: 'Stream' to convey the intent 'We want to stream records from Spark over time' and the type to convey 'We want the client to determine the rate Rows get consumed'. But maybe ToLocalIterator with the `Seq2` type (not a `Pull2`) would make more sense, I'm just not exactly sure if that's the semantics we want to go for (again unfamiliar with the exact implications). Hence noted in `dataframe.go`: ``` // StreamRows exposes a pull-based iterator over Arrow record batches from Spark types.RowPull2. // No rows are fetched from Spark over gRPC until the previous one has been consumed. // It provides no internal buffering: each Row is produced only when the caller // requests it, ensuring client back-pressure is respected. // types.RowPull2 is single use (can only be ranged once). StreamRows(ctx context.Context) (types.RowPull2, error) ``` So please don't crucify me if this isn't exactly right!!!!! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org