mgross-ebner commented on code in PR #40090:
URL: https://github.com/apache/arrow/pull/40090#discussion_r1492268878


##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -461,43 +480,63 @@ func (c *Connection) QueryContext(ctx context.Context, 
query string, args []driv
                return nil, err
        }
 
-       rows := Rows{}
-       for _, endpoint := range info.Endpoint {
-               schema, records, err := readEndpoint(ctx, c.client, endpoint)
-               if err != nil {
-                       return &rows, err
-               }
-               if rows.schema == nil {
-                       rows.schema = schema
-               }
-               rows.records = append(rows.records, records...)
-       }
+       rows := newRows()
+       go rows.streamRecordset(ctx, c.client, info.Endpoint)
 
-       return &rows, nil
+       <-rows.initializedChan
 
+       return rows, nil
 }
 
-func readEndpoint(ctx context.Context, client *flightsql.Client, endpoint 
*flight.FlightEndpoint) (*arrow.Schema, []arrow.Record, error) {
-       reader, err := client.DoGet(ctx, endpoint.GetTicket())
-       if err != nil {
-               return nil, nil, fmt.Errorf("getting ticket failed: %w", err)
-       }
-       defer reader.Release()
+func (r *Rows) streamRecordset(ctx context.Context, c *flightsql.Client, 
endpoints []*flight.FlightEndpoint) {
+       defer close(r.recordChan)
 
-       schema := reader.Schema()
-       var records []arrow.Record
-       for reader.Next() {
-               if record := reader.Record(); record.NumRows() > 0 {
-                       record.Retain()
-                       records = append(records, record)
-               }
-       }
+       initializeOnceOnly := &sync.Once{}
 
-       if err := reader.Err(); err != nil && !errors.Is(err, io.EOF) {
-               return nil, nil, err
-       }
+       defer func() { // in case of error, init anyway
+               initializeOnceOnly.Do(func() { r.initializedChan <- true })
+       }()
+
+       // reads each endpoint
+       for _, endpoint := range endpoints {
+               func() { // with a func() is possible to {defer 
reader.Release()}
+                       reader, err := c.DoGet(ctx, endpoint.GetTicket())
+                       if err != nil {
+                               r.streamError = fmt.Errorf("getting ticket 
failed: %w", err)
+                               return
+                       }
+
+                       defer reader.Release()
+
+                       r.schema = reader.Schema()
+
+                       // reads each record into a blocking channel
+                       for reader.Next() {
+                               record := reader.Record()
+                               record.Retain()
+
+                               if record.NumRows() < 1 {
+                                       record.Release()
+                                       continue
+                               }
 
-       return schema, records, nil
+                               select {
+                               case r.recordChan <- record:
+                                       go initializeOnceOnly.Do(func() { 
r.initializedChan <- true })
+
+                               case <-ctx.Done():
+                                       r.releaseRecord()
+                                       r.streamError = fmt.Errorf("stream 
recordset context timed out")

Review Comment:
   @zeroshade 
   
   Yes, we conducted an evaluation of the ADBC driver but ultimately chose to 
forgo it in favor of adapting the Arrow driver. Initially, we encountered 
issues integrating the driver with our Dremio deployment. We have yet to 
ascertain whether the problem lies with our deployment, with Dremio, or with 
the driver itself. A more significant consideration is our understanding that 
the ADBC driver utilizes Arrow data types. Our application is extensive and 
includes numerous structs, which would necessitate considerable refactoring.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to