[ 
https://issues.apache.org/jira/browse/BEAM-7726?focusedWorklogId=279096&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-279096
 ]

ASF GitHub Bot logged work on BEAM-7726:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Jul/19 16:06
            Start Date: 18/Jul/19 16:06
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on pull request #9080:  [BEAM-7726] 
Implement State Backed Iterables in Go SDK
URL: https://github.com/apache/beam/pull/9080#discussion_r304999826
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
 ##########
 @@ -72,117 +79,129 @@ func (n *DataSource) Process(ctx context.Context) error {
        c := coder.SkipW(n.Coder)
        wc := MakeWindowDecoder(n.Coder.Window)
 
+       var cp ElementDecoder    // Decoder for the primary element or the key 
in CoGBKs.
+       var cvs []ElementDecoder // Decoders for each value stream in CoGBKs.
+
        switch {
        case coder.IsCoGBK(c):
-               ck := MakeElementDecoder(c.Components[0])
-               cv := MakeElementDecoder(c.Components[1])
+               cp = MakeElementDecoder(c.Components[0])
 
-               for {
-                       if n.IncrementCountAndCheckSplit(ctx) {
+               // TODO(BEAM-490): Support multiple value streams (coder 
components) with
+               // with CoGBK.
+               cvs = []ElementDecoder{MakeElementDecoder(c.Components[1])}
+       default:
+               cp = MakeElementDecoder(c)
+       }
+
+       for {
+               if n.IncrementCountAndCheckSplit(ctx) {
+                       return nil
+               }
+               ws, t, err := DecodeWindowedValueHeader(wc, r)
+               if err != nil {
+                       if err == io.EOF {
                                return nil
                        }
-                       ws, t, err := DecodeWindowedValueHeader(wc, r)
-                       if err != nil {
-                               if err == io.EOF {
-                                       return nil
-                               }
-                               return errors.Wrap(err, "source failed")
-                       }
+                       return errors.Wrap(err, "source failed")
+               }
 
-                       // Decode key
+               // Decode key or parallel element.
+               pe, err := cp.Decode(r)
+               if err != nil {
+                       return errors.Wrap(err, "source decode failed")
+               }
+               pe.Timestamp = t
+               pe.Windows = ws
 
-                       key, err := ck.Decode(r)
+               var valReStreams []ReStream
+               for _, cv := range cvs {
+                       values, err := n.makeReStream(ctx, pe, cv, r)
                        if err != nil {
-                               return errors.Wrap(err, "source decode failed")
+                               return err
                        }
-                       key.Timestamp = t
-                       key.Windows = ws
+                       valReStreams = append(valReStreams, values)
+               }
 
-                       // TODO(herohde) 4/30/2017: the State API will be 
handle re-iterations
-                       // and only "small" value streams would be inline. 
Presumably, that
-                       // would entail buffering the whole stream. We do that 
for now.
+               if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err 
!= nil {
+                       return err
+               }
+       }
+}
 
-                       var buf []FullValue
+func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv 
ElementDecoder, r io.ReadCloser) (ReStream, error) {
+       size, err := coder.DecodeInt32(r)
+       if err != nil {
+               return nil, errors.Wrap(err, "stream size decoding failed")
+       }
 
-                       size, err := coder.DecodeInt32(r)
+       switch {
+       case size >= 0:
+               // Single chunk streams are fully read in and buffered in 
memory.
+               var buf []FullValue
+               buf, err = readStreamToBuffer(cv, r, int64(size), buf)
+               if err != nil {
+                       return nil, err
+               }
+               return &FixedReStream{Buf: buf}, nil
+       case size == -1: // Shouldn't this be 0?
+               // Multi-chunked stream.
+               var buf []FullValue
+               for {
+                       chunk, err := coder.DecodeVarInt(r)
                        if err != nil {
-                               return errors.Wrap(err, "stream size decoding 
failed")
+                               return nil, errors.Wrap(err, "stream chunk size 
decoding failed")
                        }
-
-                       if size > -1 {
-                               // Single chunk stream.
-
-                               // log.Printf("Fixed size=%v", size)
-                               for i := int32(0); i < size; i++ {
-                                       value, err := cv.Decode(r)
-                                       if err != nil {
-                                               return errors.Wrap(err, "stream 
value decode failed")
-                                       }
-                                       buf = append(buf, *value)
+                       // All done, escape out.
+                       switch {
+                       case chunk == 0: // End of stream, return buffer.
+                               return &FixedReStream{Buf: buf}, nil
+                       case chunk > 0: // Non-zero chunk, read that many 
elements from the stream, and buffer them.
+                               buf, err = readStreamToBuffer(cv, r, chunk, buf)
+                               if err != nil {
+                                       return nil, err
                                }
-                       } else {
-                               // Multi-chunked stream.
-
-                               for {
-                                       chunk, err := coder.DecodeVarUint64(r)
-                                       if err != nil {
-                                               return errors.Wrap(err, "stream 
chunk size decoding failed")
-                                       }
-
-                                       // log.Printf("Chunk size=%v", chunk)
-
-                                       if chunk == 0 {
-                                               break
-                                       }
-
-                                       for i := uint64(0); i < chunk; i++ {
-                                               value, err := cv.Decode(r)
-                                               if err != nil {
-                                                       return errors.Wrap(err, 
"stream value decode failed")
-                                               }
-                                               buf = append(buf, *value)
-                                       }
+                       case chunk == -1: // State backed iterable!
+                               chunk, err := coder.DecodeVarInt(r)
 
 Review comment:
   That's right this is the token's length, per the protocol in 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 279096)
    Time Spent: 1h 50m  (was: 1h 40m)

> [Go SDK] State Backed Iterables
> -------------------------------
>
>                 Key: BEAM-7726
>                 URL: https://issues.apache.org/jira/browse/BEAM-7726
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>    Affects Versions: Not applicable
>            Reporter: Robert Burke
>            Assignee: Robert Burke
>            Priority: Major
>             Fix For: Not applicable
>
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> The Go SDK should support the State backed iterables protocol per the proto.
> [https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L644]
>  
> Primary case is for iterables after CoGBKs.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to