zeroshade commented on code in PR #40090:
URL: https://github.com/apache/arrow/pull/40090#discussion_r1491781757


##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -37,34 +38,50 @@ import (
 )
 
 type Rows struct {
-       schema        *arrow.Schema
-       records       []arrow.Record
-       currentRecord int
-       currentRow    int
+       schema           *arrow.Schema
+       recordChan       chan arrow.Record
+       currentRecord    arrow.Record
+       currentRecordMux sync.Mutex
+       currentRow       uint64
+       initializedChan  chan bool
+       streamError      error
+}
+
+func newRows() *Rows {
+       return &Rows{
+               recordChan:      make(chan arrow.Record, 1),

Review Comment:
   This buffer size should probably be configurable, depending on the server in 
use and how the consumer is operating, getting better performance and 
backpressure may require manipulating the size of the buffer.



##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -37,34 +38,50 @@ import (
 )
 
 type Rows struct {
-       schema        *arrow.Schema
-       records       []arrow.Record
-       currentRecord int
-       currentRow    int
+       schema           *arrow.Schema
+       recordChan       chan arrow.Record
+       currentRecord    arrow.Record
+       currentRecordMux sync.Mutex
+       currentRow       uint64
+       initializedChan  chan bool
+       streamError      error
+}
+
+func newRows() *Rows {
+       return &Rows{
+               recordChan:      make(chan arrow.Record, 1),
+               initializedChan: make(chan bool),
+       }
 }
 
 // Columns returns the names of the columns.
 func (r *Rows) Columns() []string {
-       if len(r.records) == 0 {
+       if r.schema == nil {
                return nil
        }
 
        // All records have the same columns
-       var cols []string
-       for _, c := range r.schema.Fields() {
-               cols = append(cols, c.Name)
+       cols := make([]string, len(r.schema.Fields()))
+       for i, c := range r.schema.Fields() {
+               cols[i] = c.Name
        }
 
        return cols
 }
 
+func (r *Rows) releaseRecord() {
+       r.currentRecordMux.Lock()
+       defer r.currentRecordMux.Unlock()
+
+       if r.currentRecord != nil {
+               r.currentRecord.Release()
+               r.currentRecord = nil
+       }
+}
+
 // Close closes the rows iterator.
 func (r *Rows) Close() error {

Review Comment:
   what happens if `Close` is called before all the records have been 
retrieved? How do we cancel the record reader?



##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -461,43 +480,63 @@ func (c *Connection) QueryContext(ctx context.Context, 
query string, args []driv
                return nil, err
        }
 
-       rows := Rows{}
-       for _, endpoint := range info.Endpoint {
-               schema, records, err := readEndpoint(ctx, c.client, endpoint)
-               if err != nil {
-                       return &rows, err
-               }
-               if rows.schema == nil {
-                       rows.schema = schema
-               }
-               rows.records = append(rows.records, records...)
-       }
+       rows := newRows()
+       go rows.streamRecordset(ctx, c.client, info.Endpoint)

Review Comment:
   should probably use `context.WithCancel` on this and store the cancel 
function so that `Rows` can call it on `Close` in and tell the streaming it can 
end early.



##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -79,28 +96,37 @@ func (r *Rows) Close() error {
 // should be taken when closing Rows not to modify
 // a buffer held in dest.
 func (r *Rows) Next(dest []driver.Value) error {
-       if r.currentRecord >= len(r.records) {
-               return io.EOF
-       }
-       record := r.records[r.currentRecord]
+       if r.currentRecord == nil || int64(r.currentRow) >= 
r.currentRecord.NumRows() {
+               if r.streamError != nil {
+                       return r.streamError
+               }
+
+               r.releaseRecord()
 
-       if int64(r.currentRow) >= record.NumRows() {
-               return ErrOutOfRange
+               // Get the next record from the channel
+               var ok bool
+               if r.currentRecord, ok = <-r.recordChan; !ok {
+                       return io.EOF // Channel closed, no more records
+               }

Review Comment:
   Same thing here, if we are going to have a mutex, it needs to be locked 
before this



##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -79,28 +96,37 @@ func (r *Rows) Close() error {
 // should be taken when closing Rows not to modify
 // a buffer held in dest.
 func (r *Rows) Next(dest []driver.Value) error {
-       if r.currentRecord >= len(r.records) {
-               return io.EOF
-       }
-       record := r.records[r.currentRecord]
+       if r.currentRecord == nil || int64(r.currentRow) >= 
r.currentRecord.NumRows() {
+               if r.streamError != nil {
+                       return r.streamError
+               }
+
+               r.releaseRecord()
 
-       if int64(r.currentRow) >= record.NumRows() {
-               return ErrOutOfRange
+               // Get the next record from the channel
+               var ok bool
+               if r.currentRecord, ok = <-r.recordChan; !ok {
+                       return io.EOF // Channel closed, no more records
+               }
+
+               r.currentRow = 0
+
+               // safety double-check
+               if r.currentRecord == nil || int64(r.currentRow) >= 
r.currentRecord.NumRows() {
+                       return io.EOF // Channel closed, no more records
+               }
        }
 
-       for i, arr := range record.Columns() {
-               v, err := fromArrowType(arr, r.currentRow)
+       for i, col := range r.currentRecord.Columns() {
+               v, err := fromArrowType(col, int(r.currentRow))
                if err != nil {
                        return err
                }
+
                dest[i] = v
        }

Review Comment:
   again, same. either drop the mutex or it needs to be locked before this



##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -461,43 +480,63 @@ func (c *Connection) QueryContext(ctx context.Context, 
query string, args []driv
                return nil, err
        }
 
-       rows := Rows{}
-       for _, endpoint := range info.Endpoint {
-               schema, records, err := readEndpoint(ctx, c.client, endpoint)
-               if err != nil {
-                       return &rows, err
-               }
-               if rows.schema == nil {
-                       rows.schema = schema
-               }
-               rows.records = append(rows.records, records...)
-       }
+       rows := newRows()
+       go rows.streamRecordset(ctx, c.client, info.Endpoint)
 
-       return &rows, nil
+       <-rows.initializedChan
 
+       return rows, nil
 }
 
-func readEndpoint(ctx context.Context, client *flightsql.Client, endpoint 
*flight.FlightEndpoint) (*arrow.Schema, []arrow.Record, error) {
-       reader, err := client.DoGet(ctx, endpoint.GetTicket())
-       if err != nil {
-               return nil, nil, fmt.Errorf("getting ticket failed: %w", err)
-       }
-       defer reader.Release()
+func (r *Rows) streamRecordset(ctx context.Context, c *flightsql.Client, 
endpoints []*flight.FlightEndpoint) {
+       defer close(r.recordChan)
 
-       schema := reader.Schema()
-       var records []arrow.Record
-       for reader.Next() {
-               if record := reader.Record(); record.NumRows() > 0 {
-                       record.Retain()
-                       records = append(records, record)
-               }
-       }
+       initializeOnceOnly := &sync.Once{}
 
-       if err := reader.Err(); err != nil && !errors.Is(err, io.EOF) {
-               return nil, nil, err
-       }
+       defer func() { // in case of error, init anyway
+               initializeOnceOnly.Do(func() { r.initializedChan <- true })
+       }()
+
+       // reads each endpoint
+       for _, endpoint := range endpoints {
+               func() { // with a func() is possible to {defer 
reader.Release()}
+                       reader, err := c.DoGet(ctx, endpoint.GetTicket())
+                       if err != nil {
+                               r.streamError = fmt.Errorf("getting ticket 
failed: %w", err)
+                               return
+                       }
+
+                       defer reader.Release()
+
+                       r.schema = reader.Schema()
+
+                       // reads each record into a blocking channel
+                       for reader.Next() {
+                               record := reader.Record()
+                               record.Retain()
+
+                               if record.NumRows() < 1 {
+                                       record.Release()
+                                       continue
+                               }
 
-       return schema, records, nil
+                               select {
+                               case r.recordChan <- record:
+                                       go initializeOnceOnly.Do(func() { 
r.initializedChan <- true })
+
+                               case <-ctx.Done():
+                                       r.releaseRecord()
+                                       r.streamError = fmt.Errorf("stream 
recordset context timed out")

Review Comment:
   Should use `ctx.Err()` instead, it could be cancelled or any other 
situation, not necessarily timed out.



##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -37,34 +38,50 @@ import (
 )
 
 type Rows struct {
-       schema        *arrow.Schema
-       records       []arrow.Record
-       currentRecord int
-       currentRow    int
+       schema           *arrow.Schema
+       recordChan       chan arrow.Record
+       currentRecord    arrow.Record
+       currentRecordMux sync.Mutex
+       currentRow       uint64
+       initializedChan  chan bool
+       streamError      error
+}
+
+func newRows() *Rows {
+       return &Rows{
+               recordChan:      make(chan arrow.Record, 1),
+               initializedChan: make(chan bool),
+       }
 }
 
 // Columns returns the names of the columns.
 func (r *Rows) Columns() []string {
-       if len(r.records) == 0 {
+       if r.schema == nil {
                return nil
        }
 
        // All records have the same columns
-       var cols []string
-       for _, c := range r.schema.Fields() {
-               cols = append(cols, c.Name)
+       cols := make([]string, len(r.schema.Fields()))
+       for i, c := range r.schema.Fields() {
+               cols[i] = c.Name
        }
 
        return cols
 }
 
+func (r *Rows) releaseRecord() {
+       r.currentRecordMux.Lock()
+       defer r.currentRecordMux.Unlock()
+
+       if r.currentRecord != nil {
+               r.currentRecord.Release()
+               r.currentRecord = nil
+       }
+}
+
 // Close closes the rows iterator.
 func (r *Rows) Close() error {
-       for _, rec := range r.records {
-               rec.Release()
-       }
-       r.currentRecord = 0
-       r.currentRow = 0
+       r.releaseRecord()

Review Comment:
   shouldn't we still need `r.currentRow = 0` ?



##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -79,28 +96,37 @@ func (r *Rows) Close() error {
 // should be taken when closing Rows not to modify
 // a buffer held in dest.
 func (r *Rows) Next(dest []driver.Value) error {
-       if r.currentRecord >= len(r.records) {
-               return io.EOF
-       }
-       record := r.records[r.currentRecord]
+       if r.currentRecord == nil || int64(r.currentRow) >= 
r.currentRecord.NumRows() {
+               if r.streamError != nil {
+                       return r.streamError
+               }

Review Comment:
   if we are going to have a mutex, it needs to be locked before these usages 
of `r.currentRecord`. But I still don't think we need a mutex for this. We can 
just pull the next record from the channel as we finish the current record. 
There wouldn't be any concurrent accessing of `r.currentRecord`



##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -37,34 +38,50 @@ import (
 )
 
 type Rows struct {
-       schema        *arrow.Schema
-       records       []arrow.Record
-       currentRecord int
-       currentRow    int
+       schema           *arrow.Schema
+       recordChan       chan arrow.Record
+       currentRecord    arrow.Record
+       currentRecordMux sync.Mutex

Review Comment:
   do we actually need this mutex? I thought that the `Rows` object isn't used 
by multiple goroutines concurrently?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to