[ 
https://issues.apache.org/jira/browse/BEAM-11104?focusedWorklogId=755304&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755304
 ]

ASF GitHub Bot logged work on BEAM-11104:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Apr/22 16:38
            Start Date: 11/Apr/22 16:38
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847497405


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -247,9 +253,16 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws 
[]typex.Window, ts typex.EventTime,
                if r1 != nil {
                        return nil, r1.(error)
                }
+               if n.outPcIdx >= 0 {
+                       n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, 
Continuation: r0.(sdf.ProcessContinuation)}
+                       return &n.ret, nil
+               }
                n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn}
                return &n.ret, nil
        case n.outEtIdx == 0:
+               if n.outPcIdx >= 0 {
+                       panic("invoker.ret2: cannot return event time without a 
value")
+               }
                n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), 
Elm: r1, Pane: pn}
                return &n.ret, nil
        default:

Review Comment:
   I think we're missing a case where outPcIdx >= 0 and neither outEtIdx nor 
outErrIdx are >= 0 (aka the case where a single element and a 
processContinuation are emitted together)



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -224,14 +227,17 @@ func (n *invoker) Invoke(ctx context.Context, pn 
typex.PaneInfo, ws []typex.Wind
 }
 
 // ret1 handles processing of a single return value.
-// Errors or single values are the only options.
+// Errors, single values, or a ProcessContinuation are the only options.
 func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime, r0 interface{}) (*FullValue, error) {
        switch {
        case n.outErrIdx >= 0:
                if r0 != nil {
                        return nil, r0.(error)
                }
                return nil, nil
+       case n.outPcIdx >= 0:
+               n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, 
Continuation: r0.(sdf.ProcessContinuation)}

Review Comment:
   Should we guard against r0 being nil here and either panic with a nicer 
exception or treat it as a Stop (I'd probably prefer panic)? Same question below



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -343,7 +348,12 @@ func (n *ProcessSizedElementsAndRestrictions) 
ProcessElement(_ context.Context,
                defer func() {
                        <-n.SU
                }()
-               return n.PDo.processSingleWindow(mainIn)
+               continuation, processResult := n.PDo.processSingleWindow(mainIn)
+               if continuation != nil {
+                       n.source.pc = continuation
+                       n.source.selfSu = n

Review Comment:
   I think I missed the discussion on this in the design doc (I think maybe it 
got changed after I first looked, or I just missed it), but I don't love this 
mechanism of talking back to the datasource. This creates a weird backwards 
dependency and makes both classes harder to update as a result.
   
   Is there any reason we can't try to infer a 
`ProcessSizedElementsAndRestrictions` type on the out element and if its there 
check for a ProcessContinuation using some built in method to 
`ProcessSizedElementsAndRestrictions`? That would avoid the circular dependency 
here. 
https://github.com/apache/beam/blob/9bb766b02fbd371b66221f8d3ed1e1228e7a9588/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L185



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -361,7 +371,7 @@ func (n *ProcessSizedElementsAndRestrictions) 
ProcessElement(_ context.Context,
                        n.rt = rt
                        n.elm = elm
                        n.SU <- n
-                       err := n.PDo.processSingleWindow(&MainInput{Key: wElm, 
Values: mainIn.Values, RTracker: rt})
+                       _, err := n.PDo.processSingleWindow(&MainInput{Key: 
wElm, Values: mainIn.Values, RTracker: rt})

Review Comment:
   A comment explaining why we don't need to worry about ProcessContinuation 
here might be helpful



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws 
[]typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
        switch {
-       case n.outErrIdx >= 0:
-               if r2 != nil {
-                       return nil, r2.(error)
+       case n.outEtIdx >= 0:
+               if n.outErrIdx == 2 {
+                       if r2 != nil {
+                               return nil, r2.(error)
+                       }
+                       n.ret = FullValue{Windows: ws, Timestamp: 
r0.(typex.EventTime), Elm: r1, Pane: pn}
+                       return &n.ret, nil
                }
-               if n.outEtIdx < 0 {
-                       n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, 
Elm2: r1, Pane: pn}
+               if n.outPcIdx >= 0 {
+                       n.ret = FullValue{Windows: ws, Timestamp: 
r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: 
r2.(sdf.ProcessContinuation)}
                        return &n.ret, nil
                }
-               n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), 
Elm: r1, Pane: pn}
-               return &n.ret, nil
-       case n.outEtIdx == 0:
                n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), 
Elm: r1, Elm2: r2, Pane: pn}
                return &n.ret, nil
        default:
-               panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match 
permitted return values.", r0, r1, r2))
+               if n.outErrIdx == 2 {

Review Comment:
   I think this reads cleaner if you move it up to a condition in the outer 
switch:
   ```
   switch {
   case n.outEtIdx >= 0:
      // Do stuff
   case n.outErrIdx == 2:
      // Do stuff
   default:
      n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: 
pn, Continuation: r2.(sdf.ProcessContinuation)}
      return &n.ret, nil



##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -35,9 +36,10 @@ type FullValue struct {
        Elm  interface{} // Element or KV key.
        Elm2 interface{} // KV value, if not invalid
 
-       Timestamp typex.EventTime
-       Windows   []typex.Window
-       Pane      typex.PaneInfo
+       Timestamp    typex.EventTime
+       Windows      []typex.Window
+       Pane         typex.PaneInfo
+       Continuation sdf.ProcessContinuation

Review Comment:
   Genuine (and possibly dumb) question - I'm not totally clear on how our 
encoding works, but will this cause the whole ProcessContinuation object to get 
encoded as part of our responses to the runner?



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws 
[]typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
        switch {
-       case n.outErrIdx >= 0:
-               if r2 != nil {
-                       return nil, r2.(error)
+       case n.outEtIdx >= 0:
+               if n.outErrIdx == 2 {
+                       if r2 != nil {
+                               return nil, r2.(error)
+                       }
+                       n.ret = FullValue{Windows: ws, Timestamp: 
r0.(typex.EventTime), Elm: r1, Pane: pn}
+                       return &n.ret, nil
                }
-               if n.outEtIdx < 0 {
-                       n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, 
Elm2: r1, Pane: pn}
+               if n.outPcIdx >= 0 {

Review Comment:
   There's a lot of mixing `>=` conditions and `==` conditions on how we check 
these indices - we should coallesce to one approach. As a reader, the back and 
forth makes it harder to follow. When possible, I'd vote to prefer the exact 
`==` check since validation of signatures should be taken care of on the front 
end, and if we don't have the indices right we'll struggle later on anyways.



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws 
[]typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
        switch {
-       case n.outErrIdx >= 0:
-               if r2 != nil {
-                       return nil, r2.(error)
+       case n.outEtIdx >= 0:
+               if n.outErrIdx == 2 {
+                       if r2 != nil {
+                               return nil, r2.(error)
+                       }
+                       n.ret = FullValue{Windows: ws, Timestamp: 
r0.(typex.EventTime), Elm: r1, Pane: pn}
+                       return &n.ret, nil
                }
-               if n.outEtIdx < 0 {
-                       n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, 
Elm2: r1, Pane: pn}
+               if n.outPcIdx >= 0 {
+                       n.ret = FullValue{Windows: ws, Timestamp: 
r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: 
r2.(sdf.ProcessContinuation)}
                        return &n.ret, nil
                }
-               n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), 
Elm: r1, Pane: pn}
-               return &n.ret, nil
-       case n.outEtIdx == 0:
                n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), 
Elm: r1, Elm2: r2, Pane: pn}
                return &n.ret, nil
        default:
-               panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match 
permitted return values.", r0, r1, r2))
+               if n.outErrIdx == 2 {

Review Comment:
   If you don't do that, there's no reason to use a switch instead of a simple 
if/else here



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -284,7 +310,24 @@ func (n *invoker) ret4(pn typex.PaneInfo, ws 
[]typex.Window, ts typex.EventTime,
        if r3 != nil {

Review Comment:
   I don't think this check is valid anymore. Couldn't you have something with 
4 returns and no errors (1 event time, 2 kv elements, and 1 process 
continuation)





Issue Time Tracking
-------------------

    Worklog Id:     (was: 755304)
    Time Spent: 5h 50m  (was: 5h 40m)

> [Go SDK] DoFn Self Checkpointing
> --------------------------------
>
>                 Key: BEAM-11104
>                 URL: https://issues.apache.org/jira/browse/BEAM-11104
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Priority: P3
>          Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Allow SplittableDoFns to self checkpoint.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and 
> the programming guide updated with SDF content.)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to