zeroshade commented on code in PR #40090:
URL: https://github.com/apache/arrow/pull/40090#discussion_r1491781757
##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -37,34 +38,50 @@ import (
)
type Rows struct {
- schema *arrow.Schema
- records []arrow.Record
- currentRecord int
- currentRow int
+ schema *arrow.Schema
+ recordChan chan arrow.Record
+ currentRecord arrow.Record
+ currentRecordMux sync.Mutex
+ currentRow uint64
+ initializedChan chan bool
+ streamError error
+}
+
+func newRows() *Rows {
+ return &Rows{
+ recordChan: make(chan arrow.Record, 1),
Review Comment:
This buffer size should probably be configurable, depending on the server in
use and how the consumer is operating, getting better performance and
backpressure may require manipulating the size of the buffer.
##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -37,34 +38,50 @@ import (
)
type Rows struct {
- schema *arrow.Schema
- records []arrow.Record
- currentRecord int
- currentRow int
+ schema *arrow.Schema
+ recordChan chan arrow.Record
+ currentRecord arrow.Record
+ currentRecordMux sync.Mutex
+ currentRow uint64
+ initializedChan chan bool
+ streamError error
+}
+
+func newRows() *Rows {
+ return &Rows{
+ recordChan: make(chan arrow.Record, 1),
+ initializedChan: make(chan bool),
+ }
}
// Columns returns the names of the columns.
func (r *Rows) Columns() []string {
- if len(r.records) == 0 {
+ if r.schema == nil {
return nil
}
// All records have the same columns
- var cols []string
- for _, c := range r.schema.Fields() {
- cols = append(cols, c.Name)
+ cols := make([]string, len(r.schema.Fields()))
+ for i, c := range r.schema.Fields() {
+ cols[i] = c.Name
}
return cols
}
+func (r *Rows) releaseRecord() {
+ r.currentRecordMux.Lock()
+ defer r.currentRecordMux.Unlock()
+
+ if r.currentRecord != nil {
+ r.currentRecord.Release()
+ r.currentRecord = nil
+ }
+}
+
// Close closes the rows iterator.
func (r *Rows) Close() error {
Review Comment:
what happens if `Close` is called before all the records have been
retrieved? How do we cancel the record reader?
##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -461,43 +480,63 @@ func (c *Connection) QueryContext(ctx context.Context,
query string, args []driv
return nil, err
}
- rows := Rows{}
- for _, endpoint := range info.Endpoint {
- schema, records, err := readEndpoint(ctx, c.client, endpoint)
- if err != nil {
- return &rows, err
- }
- if rows.schema == nil {
- rows.schema = schema
- }
- rows.records = append(rows.records, records...)
- }
+ rows := newRows()
+ go rows.streamRecordset(ctx, c.client, info.Endpoint)
Review Comment:
should probably use `context.WithCancel` on this and store the cancel
function so that `Rows` can call it on `Close` in and tell the streaming it can
end early.
##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -79,28 +96,37 @@ func (r *Rows) Close() error {
// should be taken when closing Rows not to modify
// a buffer held in dest.
func (r *Rows) Next(dest []driver.Value) error {
- if r.currentRecord >= len(r.records) {
- return io.EOF
- }
- record := r.records[r.currentRecord]
+ if r.currentRecord == nil || int64(r.currentRow) >=
r.currentRecord.NumRows() {
+ if r.streamError != nil {
+ return r.streamError
+ }
+
+ r.releaseRecord()
- if int64(r.currentRow) >= record.NumRows() {
- return ErrOutOfRange
+ // Get the next record from the channel
+ var ok bool
+ if r.currentRecord, ok = <-r.recordChan; !ok {
+ return io.EOF // Channel closed, no more records
+ }
Review Comment:
Same thing here, if we are going to have a mutex, it needs to be locked
before this
##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -79,28 +96,37 @@ func (r *Rows) Close() error {
// should be taken when closing Rows not to modify
// a buffer held in dest.
func (r *Rows) Next(dest []driver.Value) error {
- if r.currentRecord >= len(r.records) {
- return io.EOF
- }
- record := r.records[r.currentRecord]
+ if r.currentRecord == nil || int64(r.currentRow) >=
r.currentRecord.NumRows() {
+ if r.streamError != nil {
+ return r.streamError
+ }
+
+ r.releaseRecord()
- if int64(r.currentRow) >= record.NumRows() {
- return ErrOutOfRange
+ // Get the next record from the channel
+ var ok bool
+ if r.currentRecord, ok = <-r.recordChan; !ok {
+ return io.EOF // Channel closed, no more records
+ }
+
+ r.currentRow = 0
+
+ // safety double-check
+ if r.currentRecord == nil || int64(r.currentRow) >=
r.currentRecord.NumRows() {
+ return io.EOF // Channel closed, no more records
+ }
}
- for i, arr := range record.Columns() {
- v, err := fromArrowType(arr, r.currentRow)
+ for i, col := range r.currentRecord.Columns() {
+ v, err := fromArrowType(col, int(r.currentRow))
if err != nil {
return err
}
+
dest[i] = v
}
Review Comment:
again, same. either drop the mutex or it needs to be locked before this
##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -461,43 +480,63 @@ func (c *Connection) QueryContext(ctx context.Context,
query string, args []driv
return nil, err
}
- rows := Rows{}
- for _, endpoint := range info.Endpoint {
- schema, records, err := readEndpoint(ctx, c.client, endpoint)
- if err != nil {
- return &rows, err
- }
- if rows.schema == nil {
- rows.schema = schema
- }
- rows.records = append(rows.records, records...)
- }
+ rows := newRows()
+ go rows.streamRecordset(ctx, c.client, info.Endpoint)
- return &rows, nil
+ <-rows.initializedChan
+ return rows, nil
}
-func readEndpoint(ctx context.Context, client *flightsql.Client, endpoint
*flight.FlightEndpoint) (*arrow.Schema, []arrow.Record, error) {
- reader, err := client.DoGet(ctx, endpoint.GetTicket())
- if err != nil {
- return nil, nil, fmt.Errorf("getting ticket failed: %w", err)
- }
- defer reader.Release()
+func (r *Rows) streamRecordset(ctx context.Context, c *flightsql.Client,
endpoints []*flight.FlightEndpoint) {
+ defer close(r.recordChan)
- schema := reader.Schema()
- var records []arrow.Record
- for reader.Next() {
- if record := reader.Record(); record.NumRows() > 0 {
- record.Retain()
- records = append(records, record)
- }
- }
+ initializeOnceOnly := &sync.Once{}
- if err := reader.Err(); err != nil && !errors.Is(err, io.EOF) {
- return nil, nil, err
- }
+ defer func() { // in case of error, init anyway
+ initializeOnceOnly.Do(func() { r.initializedChan <- true })
+ }()
+
+ // reads each endpoint
+ for _, endpoint := range endpoints {
+ func() { // with a func() is possible to {defer
reader.Release()}
+ reader, err := c.DoGet(ctx, endpoint.GetTicket())
+ if err != nil {
+ r.streamError = fmt.Errorf("getting ticket
failed: %w", err)
+ return
+ }
+
+ defer reader.Release()
+
+ r.schema = reader.Schema()
+
+ // reads each record into a blocking channel
+ for reader.Next() {
+ record := reader.Record()
+ record.Retain()
+
+ if record.NumRows() < 1 {
+ record.Release()
+ continue
+ }
- return schema, records, nil
+ select {
+ case r.recordChan <- record:
+ go initializeOnceOnly.Do(func() {
r.initializedChan <- true })
+
+ case <-ctx.Done():
+ r.releaseRecord()
+ r.streamError = fmt.Errorf("stream
recordset context timed out")
Review Comment:
Should use `ctx.Err()` instead, it could be cancelled or any other
situation, not necessarily timed out.
##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -37,34 +38,50 @@ import (
)
type Rows struct {
- schema *arrow.Schema
- records []arrow.Record
- currentRecord int
- currentRow int
+ schema *arrow.Schema
+ recordChan chan arrow.Record
+ currentRecord arrow.Record
+ currentRecordMux sync.Mutex
+ currentRow uint64
+ initializedChan chan bool
+ streamError error
+}
+
+func newRows() *Rows {
+ return &Rows{
+ recordChan: make(chan arrow.Record, 1),
+ initializedChan: make(chan bool),
+ }
}
// Columns returns the names of the columns.
func (r *Rows) Columns() []string {
- if len(r.records) == 0 {
+ if r.schema == nil {
return nil
}
// All records have the same columns
- var cols []string
- for _, c := range r.schema.Fields() {
- cols = append(cols, c.Name)
+ cols := make([]string, len(r.schema.Fields()))
+ for i, c := range r.schema.Fields() {
+ cols[i] = c.Name
}
return cols
}
+func (r *Rows) releaseRecord() {
+ r.currentRecordMux.Lock()
+ defer r.currentRecordMux.Unlock()
+
+ if r.currentRecord != nil {
+ r.currentRecord.Release()
+ r.currentRecord = nil
+ }
+}
+
// Close closes the rows iterator.
func (r *Rows) Close() error {
- for _, rec := range r.records {
- rec.Release()
- }
- r.currentRecord = 0
- r.currentRow = 0
+ r.releaseRecord()
Review Comment:
shouldn't we still need `r.currentRow = 0` ?
##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -79,28 +96,37 @@ func (r *Rows) Close() error {
// should be taken when closing Rows not to modify
// a buffer held in dest.
func (r *Rows) Next(dest []driver.Value) error {
- if r.currentRecord >= len(r.records) {
- return io.EOF
- }
- record := r.records[r.currentRecord]
+ if r.currentRecord == nil || int64(r.currentRow) >=
r.currentRecord.NumRows() {
+ if r.streamError != nil {
+ return r.streamError
+ }
Review Comment:
if we are going to have a mutex, it needs to be locked before these usages
of `r.currentRecord`. But I still don't think we need a mutex for this. We can
just pull the next record from the channel as we finish the current record.
There wouldn't be any concurrent accessing of `r.currentRecord`
##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -37,34 +38,50 @@ import (
)
type Rows struct {
- schema *arrow.Schema
- records []arrow.Record
- currentRecord int
- currentRow int
+ schema *arrow.Schema
+ recordChan chan arrow.Record
+ currentRecord arrow.Record
+ currentRecordMux sync.Mutex
Review Comment:
do we actually need this mutex? I thought that the `Rows` object isn't used
by multiple goroutines concurrently?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]