gonzojive commented on issue #21817: URL: https://github.com/apache/beam/issues/21817#issuecomment-1160105331
Taking a look at [`datasource.go`](https://sourcegraph.com/github.com/apache/beam@de5c56a5b8a8a030e7e67323a696d52495e37f7f/-/blob/sdks/go/pkg/beam/core/runtime/exec/datasource.go?L168:21): ```go func (n *DataSource) Process(ctx context.Context) error { //... var valReStreams []ReStream for _, cv := range cvs { values, err := n.makeReStream(ctx, pe, cv, &dataReaderCounted) if err != nil { return err } valReStreams = append(valReStreams, values) } if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil { return err } } ``` Does `values, err := n.makeReStream(ctx, pe, cv, &dataReaderCounted)` load all of the values in the iterator? It seems to: ```go func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv ElementDecoder, bcr *byteCountReader) (ReStream, error) { // TODO(lostluck) 2020/02/22: Do we include the chunk size, or just the element sizes? size, err := coder.DecodeInt32(bcr.reader) if err != nil { return nil, errors.Wrap(err, "stream size decoding failed") } switch { case size >= 0: // Single chunk streams are fully read in and buffered in memory. buf := make([]FullValue, 0, size) buf, err = readStreamToBuffer(cv, bcr, int64(size), buf) if err != nil { return nil, err } return &FixedReStream{Buf: buf}, nil // ... }} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
