zeroshade commented on code in PR #1805:
URL: https://github.com/apache/arrow-adbc/pull/1805#discussion_r1589324715


##########
go/adbc/driver/snowflake/record_reader.go:
##########
@@ -596,80 +582,103 @@ func newRecordReader(ctx context.Context, alloc 
memory.Allocator, ld gosnowflake
                }
        }()
 
-       group.SetLimit(prefetchConcurrency)
-       group.Go(func() error {
-               defer rr.Release()
-               defer r.Close()
-               if len(batches) > 1 {
-                       defer close(ch)
-               }
-
-               for rr.Next() && ctx.Err() == nil {
-                       rec := rr.Record()
-                       rec, err = recTransform(ctx, rec)
-                       if err != nil {
-                               return err
-                       }
-                       ch <- rec
-               }
-               return rr.Err()
-       })
-
        chs := make([]chan arrow.Record, len(batches))
-       chs[0] = ch
        rdr := &reader{
                refCount: 1,
                chs:      chs,
                err:      nil,
                cancelFn: cancelFn,
-               schema:   schema,
        }
 
-       lastChannelIndex := len(chs) - 1
-       go func() {
-               for i, b := range batches[1:] {
-                       batch, batchIdx := b, i+1
-                       chs[batchIdx] = make(chan arrow.Record, bufferSize)
-                       group.Go(func() error {
-                               // close channels (except the last) so that 
Next can move on to the next channel properly
-                               if batchIdx != lastChannelIndex {
-                                       defer close(chs[batchIdx])
-                               }
+       if len(batches) > 0 {
+               r, err := batches[0].GetStream(ctx)
+               if err != nil {
+                       return nil, errToAdbcErr(adbc.StatusIO, err)
+               }
 
-                               rdr, err := batch.GetStream(ctx)
-                               if err != nil {
-                                       return err
-                               }
-                               defer rdr.Close()
+               rr, err := ipc.NewReader(r, ipc.WithAllocator(alloc))
+               if err != nil {
+                       return nil, adbc.Error{
+                               Msg:  err.Error(),
+                               Code: adbc.StatusInvalidState,
+                       }
+               }
+
+               var recTransform recordTransformer
+               rdr.schema, recTransform = getTransformer(rr.Schema(), ld, 
useHighPrecision)
 
-                               rr, err := ipc.NewReader(rdr, 
ipc.WithAllocator(alloc))
+               group.Go(func() error {
+                       defer rr.Release()
+                       defer r.Close()
+                       if len(batches) > 1 {
+                               defer close(ch)
+                       }
+
+                       for rr.Next() && ctx.Err() == nil {
+                               rec := rr.Record()
+                               rec, err = recTransform(ctx, rec)
                                if err != nil {
                                        return err
                                }
-                               defer rr.Release()
+                               ch <- rec
+                       }
+                       return rr.Err()
+               })
+
+               chs[0] = ch
+
+               lastChannelIndex := len(chs) - 1
+               go func() {
+                       for i, b := range batches[1:] {
+                               batch, batchIdx := b, i+1
+                               chs[batchIdx] = make(chan arrow.Record, 
bufferSize)
+                               group.Go(func() error {
+                                       // close channels (except the last) so 
that Next can move on to the next channel properly
+                                       if batchIdx != lastChannelIndex {
+                                               defer close(chs[batchIdx])
+                                       }
 
-                               for rr.Next() && ctx.Err() == nil {
-                                       rec := rr.Record()
-                                       rec, err = recTransform(ctx, rec)
+                                       rdr, err := batch.GetStream(ctx)
                                        if err != nil {
                                                return err
                                        }
-                                       chs[batchIdx] <- rec
-                               }
+                                       defer rdr.Close()
 
-                               return rr.Err()
-                       })
-               }
+                                       rr, err := ipc.NewReader(rdr, 
ipc.WithAllocator(alloc))
+                                       if err != nil {
+                                               return err
+                                       }
+                                       defer rr.Release()
 
-               // place this here so that we always clean up, but they can't 
be in a
-               // separate goroutine. Otherwise we'll have a race condition 
between
-               // the call to wait and the calls to group.Go to kick off the 
jobs
-               // to perform the pre-fetching (GH-1283).
-               rdr.err = group.Wait()
-               // don't close the last channel until after the group is 
finished,
-               // so that Next() can only return after reader.err may have 
been set
-               close(chs[lastChannelIndex])
-       }()
+                                       for rr.Next() && ctx.Err() == nil {
+                                               rec := rr.Record()
+                                               rec, err = recTransform(ctx, 
rec)
+                                               if err != nil {
+                                                       return err
+                                               }
+                                               chs[batchIdx] <- rec
+                                       }
+
+                                       return rr.Err()
+                               })
+                       }
+
+                       // place this here so that we always clean up, but they 
can't be in a
+                       // separate goroutine. Otherwise we'll have a race 
condition between
+                       // the call to wait and the calls to group.Go to kick 
off the jobs
+                       // to perform the pre-fetching (GH-1283).
+                       rdr.err = group.Wait()
+                       // don't close the last channel until after the group 
is finished,
+                       // so that Next() can only return after reader.err may 
have been set
+                       close(chs[lastChannelIndex])
+               }()
+       } else {
+               schema, err := rowTypesToArrowSchema(ctx, ld, useHighPrecision)
+               if err != nil {
+                       return nil, err
+               }
+               rdr.schema, _ = getTransformer(schema, ld, useHighPrecision)
+       }

Review Comment:
   we could even skip all of the channel creation etc. for this case too most 
likely



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to