zeroshade commented on code in PR #40090:
URL: https://github.com/apache/arrow/pull/40090#discussion_r1492715291
##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -461,43 +480,63 @@ func (c *Connection) QueryContext(ctx context.Context,
query string, args []driv
return nil, err
}
- rows := Rows{}
- for _, endpoint := range info.Endpoint {
- schema, records, err := readEndpoint(ctx, c.client, endpoint)
- if err != nil {
- return &rows, err
- }
- if rows.schema == nil {
- rows.schema = schema
- }
- rows.records = append(rows.records, records...)
- }
+ rows := newRows()
+ go rows.streamRecordset(ctx, c.client, info.Endpoint)
- return &rows, nil
+ <-rows.initializedChan
+ return rows, nil
}
-func readEndpoint(ctx context.Context, client *flightsql.Client, endpoint
*flight.FlightEndpoint) (*arrow.Schema, []arrow.Record, error) {
- reader, err := client.DoGet(ctx, endpoint.GetTicket())
- if err != nil {
- return nil, nil, fmt.Errorf("getting ticket failed: %w", err)
- }
- defer reader.Release()
+func (r *Rows) streamRecordset(ctx context.Context, c *flightsql.Client,
endpoints []*flight.FlightEndpoint) {
+ defer close(r.recordChan)
- schema := reader.Schema()
- var records []arrow.Record
- for reader.Next() {
- if record := reader.Record(); record.NumRows() > 0 {
- record.Retain()
- records = append(records, record)
- }
- }
+ initializeOnceOnly := &sync.Once{}
- if err := reader.Err(); err != nil && !errors.Is(err, io.EOF) {
- return nil, nil, err
- }
+ defer func() { // in case of error, init anyway
+ initializeOnceOnly.Do(func() { r.initializedChan <- true })
+ }()
+
+ // reads each endpoint
+ for _, endpoint := range endpoints {
+ func() { // with a func() is possible to {defer
reader.Release()}
+ reader, err := c.DoGet(ctx, endpoint.GetTicket())
+ if err != nil {
+ r.streamError = fmt.Errorf("getting ticket
failed: %w", err)
+ return
+ }
+
+ defer reader.Release()
+
+ r.schema = reader.Schema()
+
+ // reads each record into a blocking channel
+ for reader.Next() {
+ record := reader.Record()
+ record.Retain()
+
+ if record.NumRows() < 1 {
+ record.Release()
+ continue
+ }
- return schema, records, nil
+ select {
+ case r.recordChan <- record:
+ go initializeOnceOnly.Do(func() {
r.initializedChan <- true })
+
+ case <-ctx.Done():
+ r.releaseRecord()
+ r.streamError = fmt.Errorf("stream
recordset context timed out")
Review Comment:
> columns of the decimal type were arrow.Decimal instead of a float64.
On the one-hand, `decimal128.Num` does have a `ToFloat` function, but this
is a good point. We should probably add a configuration to have it auto convert
decimals to float64 for you.
> If I recall correctly, the problem was that the endpoint uri, which is
used in the cache lru, was trying to fetch data from 0.0.0.0:32010 and not the
url behind the ingress.
Interesting In theory it should be properly following the URIs provided in
the endpoints or use the same connection of the client. I'll keep an eye out
for the issue getting filed so I can take a look into it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]