zeroshade commented on code in PR #40090:
URL: https://github.com/apache/arrow/pull/40090#discussion_r1492681859
##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -461,43 +480,63 @@ func (c *Connection) QueryContext(ctx context.Context,
query string, args []driv
return nil, err
}
- rows := Rows{}
- for _, endpoint := range info.Endpoint {
- schema, records, err := readEndpoint(ctx, c.client, endpoint)
- if err != nil {
- return &rows, err
- }
- if rows.schema == nil {
- rows.schema = schema
- }
- rows.records = append(rows.records, records...)
- }
+ rows := newRows()
+ go rows.streamRecordset(ctx, c.client, info.Endpoint)
- return &rows, nil
+ <-rows.initializedChan
+ return rows, nil
}
-func readEndpoint(ctx context.Context, client *flightsql.Client, endpoint
*flight.FlightEndpoint) (*arrow.Schema, []arrow.Record, error) {
- reader, err := client.DoGet(ctx, endpoint.GetTicket())
- if err != nil {
- return nil, nil, fmt.Errorf("getting ticket failed: %w", err)
- }
- defer reader.Release()
+func (r *Rows) streamRecordset(ctx context.Context, c *flightsql.Client,
endpoints []*flight.FlightEndpoint) {
+ defer close(r.recordChan)
- schema := reader.Schema()
- var records []arrow.Record
- for reader.Next() {
- if record := reader.Record(); record.NumRows() > 0 {
- record.Retain()
- records = append(records, record)
- }
- }
+ initializeOnceOnly := &sync.Once{}
- if err := reader.Err(); err != nil && !errors.Is(err, io.EOF) {
- return nil, nil, err
- }
+ defer func() { // in case of error, init anyway
+ initializeOnceOnly.Do(func() { r.initializedChan <- true })
+ }()
+
+ // reads each endpoint
+ for _, endpoint := range endpoints {
+ func() { // with a func() is possible to {defer
reader.Release()}
+ reader, err := c.DoGet(ctx, endpoint.GetTicket())
+ if err != nil {
+ r.streamError = fmt.Errorf("getting ticket
failed: %w", err)
+ return
+ }
+
+ defer reader.Release()
+
+ r.schema = reader.Schema()
+
+ // reads each record into a blocking channel
+ for reader.Next() {
+ record := reader.Record()
+ record.Retain()
+
+ if record.NumRows() < 1 {
+ record.Release()
+ continue
+ }
- return schema, records, nil
+ select {
+ case r.recordChan <- record:
+ go initializeOnceOnly.Do(func() {
r.initializedChan <- true })
+
+ case <-ctx.Done():
+ r.releaseRecord()
+ r.streamError = fmt.Errorf("stream
recordset context timed out")
Review Comment:
@mgross-ebner We maintain usage of Dremio for the CI testing with the ADBC
driver to ensure compatibility. So it would definitely be good for us to
determine whether the issue is in the Driver or not.
> A more significant consideration is our understanding that the ADBC
driver utilizes Arrow data types. Our application is extensive and includes
numerous structs, which would necessitate considerable refactoring.
There is an adapter in the arrow-adbc library that wraps the ADBC driver for
the go `database/sql` interface which should allow you to drop in replace the
usage without any changes. See the example here:
https://pkg.go.dev/github.com/apache/arrow-adbc/go/[email protected]/sqldriver/flightsql#example-package
That should avoid the need for refactoring, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]