zeroshade commented on code in PR #1805: URL: https://github.com/apache/arrow-adbc/pull/1805#discussion_r1589324715
########## go/adbc/driver/snowflake/record_reader.go: ########## @@ -596,80 +582,103 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake } }() - group.SetLimit(prefetchConcurrency) - group.Go(func() error { - defer rr.Release() - defer r.Close() - if len(batches) > 1 { - defer close(ch) - } - - for rr.Next() && ctx.Err() == nil { - rec := rr.Record() - rec, err = recTransform(ctx, rec) - if err != nil { - return err - } - ch <- rec - } - return rr.Err() - }) - chs := make([]chan arrow.Record, len(batches)) - chs[0] = ch rdr := &reader{ refCount: 1, chs: chs, err: nil, cancelFn: cancelFn, - schema: schema, } - lastChannelIndex := len(chs) - 1 - go func() { - for i, b := range batches[1:] { - batch, batchIdx := b, i+1 - chs[batchIdx] = make(chan arrow.Record, bufferSize) - group.Go(func() error { - // close channels (except the last) so that Next can move on to the next channel properly - if batchIdx != lastChannelIndex { - defer close(chs[batchIdx]) - } + if len(batches) > 0 { + r, err := batches[0].GetStream(ctx) + if err != nil { + return nil, errToAdbcErr(adbc.StatusIO, err) + } - rdr, err := batch.GetStream(ctx) - if err != nil { - return err - } - defer rdr.Close() + rr, err := ipc.NewReader(r, ipc.WithAllocator(alloc)) + if err != nil { + return nil, adbc.Error{ + Msg: err.Error(), + Code: adbc.StatusInvalidState, + } + } + + var recTransform recordTransformer + rdr.schema, recTransform = getTransformer(rr.Schema(), ld, useHighPrecision) - rr, err := ipc.NewReader(rdr, ipc.WithAllocator(alloc)) + group.Go(func() error { + defer rr.Release() + defer r.Close() + if len(batches) > 1 { + defer close(ch) + } + + for rr.Next() && ctx.Err() == nil { + rec := rr.Record() + rec, err = recTransform(ctx, rec) if err != nil { return err } - defer rr.Release() + ch <- rec + } + return rr.Err() + }) + + chs[0] = ch + + lastChannelIndex := len(chs) - 1 + go func() { + for i, b := range batches[1:] { + batch, batchIdx := b, i+1 + chs[batchIdx] = make(chan arrow.Record, bufferSize) + group.Go(func() error { + // close channels (except the last) so that Next can move on to the next channel properly + if batchIdx != lastChannelIndex { + defer close(chs[batchIdx]) + } - for rr.Next() && ctx.Err() == nil { - rec := rr.Record() - rec, err = recTransform(ctx, rec) + rdr, err := batch.GetStream(ctx) if err != nil { return err } - chs[batchIdx] <- rec - } + defer rdr.Close() - return rr.Err() - }) - } + rr, err := ipc.NewReader(rdr, ipc.WithAllocator(alloc)) + if err != nil { + return err + } + defer rr.Release() - // place this here so that we always clean up, but they can't be in a - // separate goroutine. Otherwise we'll have a race condition between - // the call to wait and the calls to group.Go to kick off the jobs - // to perform the pre-fetching (GH-1283). - rdr.err = group.Wait() - // don't close the last channel until after the group is finished, - // so that Next() can only return after reader.err may have been set - close(chs[lastChannelIndex]) - }() + for rr.Next() && ctx.Err() == nil { + rec := rr.Record() + rec, err = recTransform(ctx, rec) + if err != nil { + return err + } + chs[batchIdx] <- rec + } + + return rr.Err() + }) + } + + // place this here so that we always clean up, but they can't be in a + // separate goroutine. Otherwise we'll have a race condition between + // the call to wait and the calls to group.Go to kick off the jobs + // to perform the pre-fetching (GH-1283). + rdr.err = group.Wait() + // don't close the last channel until after the group is finished, + // so that Next() can only return after reader.err may have been set + close(chs[lastChannelIndex]) + }() + } else { + schema, err := rowTypesToArrowSchema(ctx, ld, useHighPrecision) + if err != nil { + return nil, err + } + rdr.schema, _ = getTransformer(schema, ld, useHighPrecision) + } Review Comment: we could even skip all of the channel creation etc. for this case too most likely -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org