[ https://issues.apache.org/jira/browse/BEAM-7726?focusedWorklogId=279096&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-279096 ]
ASF GitHub Bot logged work on BEAM-7726: ---------------------------------------- Author: ASF GitHub Bot Created on: 18/Jul/19 16:06 Start Date: 18/Jul/19 16:06 Worklog Time Spent: 10m Work Description: lostluck commented on pull request #9080: [BEAM-7726] Implement State Backed Iterables in Go SDK URL: https://github.com/apache/beam/pull/9080#discussion_r304999826 ########## File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go ########## @@ -72,117 +79,129 @@ func (n *DataSource) Process(ctx context.Context) error { c := coder.SkipW(n.Coder) wc := MakeWindowDecoder(n.Coder.Window) + var cp ElementDecoder // Decoder for the primary element or the key in CoGBKs. + var cvs []ElementDecoder // Decoders for each value stream in CoGBKs. + switch { case coder.IsCoGBK(c): - ck := MakeElementDecoder(c.Components[0]) - cv := MakeElementDecoder(c.Components[1]) + cp = MakeElementDecoder(c.Components[0]) - for { - if n.IncrementCountAndCheckSplit(ctx) { + // TODO(BEAM-490): Support multiple value streams (coder components) with + // with CoGBK. + cvs = []ElementDecoder{MakeElementDecoder(c.Components[1])} + default: + cp = MakeElementDecoder(c) + } + + for { + if n.IncrementCountAndCheckSplit(ctx) { + return nil + } + ws, t, err := DecodeWindowedValueHeader(wc, r) + if err != nil { + if err == io.EOF { return nil } - ws, t, err := DecodeWindowedValueHeader(wc, r) - if err != nil { - if err == io.EOF { - return nil - } - return errors.Wrap(err, "source failed") - } + return errors.Wrap(err, "source failed") + } - // Decode key + // Decode key or parallel element. + pe, err := cp.Decode(r) + if err != nil { + return errors.Wrap(err, "source decode failed") + } + pe.Timestamp = t + pe.Windows = ws - key, err := ck.Decode(r) + var valReStreams []ReStream + for _, cv := range cvs { + values, err := n.makeReStream(ctx, pe, cv, r) if err != nil { - return errors.Wrap(err, "source decode failed") + return err } - key.Timestamp = t - key.Windows = ws + valReStreams = append(valReStreams, values) + } - // TODO(herohde) 4/30/2017: the State API will be handle re-iterations - // and only "small" value streams would be inline. Presumably, that - // would entail buffering the whole stream. We do that for now. + if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil { + return err + } + } +} - var buf []FullValue +func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv ElementDecoder, r io.ReadCloser) (ReStream, error) { + size, err := coder.DecodeInt32(r) + if err != nil { + return nil, errors.Wrap(err, "stream size decoding failed") + } - size, err := coder.DecodeInt32(r) + switch { + case size >= 0: + // Single chunk streams are fully read in and buffered in memory. + var buf []FullValue + buf, err = readStreamToBuffer(cv, r, int64(size), buf) + if err != nil { + return nil, err + } + return &FixedReStream{Buf: buf}, nil + case size == -1: // Shouldn't this be 0? + // Multi-chunked stream. + var buf []FullValue + for { + chunk, err := coder.DecodeVarInt(r) if err != nil { - return errors.Wrap(err, "stream size decoding failed") + return nil, errors.Wrap(err, "stream chunk size decoding failed") } - - if size > -1 { - // Single chunk stream. - - // log.Printf("Fixed size=%v", size) - for i := int32(0); i < size; i++ { - value, err := cv.Decode(r) - if err != nil { - return errors.Wrap(err, "stream value decode failed") - } - buf = append(buf, *value) + // All done, escape out. + switch { + case chunk == 0: // End of stream, return buffer. + return &FixedReStream{Buf: buf}, nil + case chunk > 0: // Non-zero chunk, read that many elements from the stream, and buffer them. + buf, err = readStreamToBuffer(cv, r, chunk, buf) + if err != nil { + return nil, err } - } else { - // Multi-chunked stream. - - for { - chunk, err := coder.DecodeVarUint64(r) - if err != nil { - return errors.Wrap(err, "stream chunk size decoding failed") - } - - // log.Printf("Chunk size=%v", chunk) - - if chunk == 0 { - break - } - - for i := uint64(0); i < chunk; i++ { - value, err := cv.Decode(r) - if err != nil { - return errors.Wrap(err, "stream value decode failed") - } - buf = append(buf, *value) - } + case chunk == -1: // State backed iterable! + chunk, err := coder.DecodeVarInt(r) Review comment: That's right this is the token's length, per the protocol in ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 279096) Time Spent: 1h 50m (was: 1h 40m) > [Go SDK] State Backed Iterables > ------------------------------- > > Key: BEAM-7726 > URL: https://issues.apache.org/jira/browse/BEAM-7726 > Project: Beam > Issue Type: Improvement > Components: sdk-go > Affects Versions: Not applicable > Reporter: Robert Burke > Assignee: Robert Burke > Priority: Major > Fix For: Not applicable > > Time Spent: 1h 50m > Remaining Estimate: 0h > > The Go SDK should support the State backed iterables protocol per the proto. > [https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L644] > > Primary case is for iterables after CoGBKs. -- This message was sent by Atlassian JIRA (v7.6.14#76016)