[
https://issues.apache.org/jira/browse/BEAM-11104?focusedWorklogId=755304&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755304
]
ASF GitHub Bot logged work on BEAM-11104:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Apr/22 16:38
Start Date: 11/Apr/22 16:38
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847497405
##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -247,9 +253,16 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws
[]typex.Window, ts typex.EventTime,
if r1 != nil {
return nil, r1.(error)
}
+ if n.outPcIdx >= 0 {
+ n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn,
Continuation: r0.(sdf.ProcessContinuation)}
+ return &n.ret, nil
+ }
n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn}
return &n.ret, nil
case n.outEtIdx == 0:
+ if n.outPcIdx >= 0 {
+ panic("invoker.ret2: cannot return event time without a
value")
+ }
n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime),
Elm: r1, Pane: pn}
return &n.ret, nil
default:
Review Comment:
I think we're missing a case where outPcIdx >= 0 and neither outEtIdx nor
outErrIdx are >= 0 (aka the case where a single element and a
processContinuation are emitted together)
##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -224,14 +227,17 @@ func (n *invoker) Invoke(ctx context.Context, pn
typex.PaneInfo, ws []typex.Wind
}
// ret1 handles processing of a single return value.
-// Errors or single values are the only options.
+// Errors, single values, or a ProcessContinuation are the only options.
func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime, r0 interface{}) (*FullValue, error) {
switch {
case n.outErrIdx >= 0:
if r0 != nil {
return nil, r0.(error)
}
return nil, nil
+ case n.outPcIdx >= 0:
+ n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn,
Continuation: r0.(sdf.ProcessContinuation)}
Review Comment:
Should we guard against r0 being nil here and either panic with a nicer
exception or treat it as a Stop (I'd probably prefer panic)? Same question below
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -343,7 +348,12 @@ func (n *ProcessSizedElementsAndRestrictions)
ProcessElement(_ context.Context,
defer func() {
<-n.SU
}()
- return n.PDo.processSingleWindow(mainIn)
+ continuation, processResult := n.PDo.processSingleWindow(mainIn)
+ if continuation != nil {
+ n.source.pc = continuation
+ n.source.selfSu = n
Review Comment:
I think I missed the discussion on this in the design doc (I think maybe it
got changed after I first looked, or I just missed it), but I don't love this
mechanism of talking back to the datasource. This creates a weird backwards
dependency and makes both classes harder to update as a result.
Is there any reason we can't try to infer a
`ProcessSizedElementsAndRestrictions` type on the out element and if its there
check for a ProcessContinuation using some built in method to
`ProcessSizedElementsAndRestrictions`? That would avoid the circular dependency
here.
https://github.com/apache/beam/blob/9bb766b02fbd371b66221f8d3ed1e1228e7a9588/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L185
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -361,7 +371,7 @@ func (n *ProcessSizedElementsAndRestrictions)
ProcessElement(_ context.Context,
n.rt = rt
n.elm = elm
n.SU <- n
- err := n.PDo.processSingleWindow(&MainInput{Key: wElm,
Values: mainIn.Values, RTracker: rt})
+ _, err := n.PDo.processSingleWindow(&MainInput{Key:
wElm, Values: mainIn.Values, RTracker: rt})
Review Comment:
A comment explaining why we don't need to worry about ProcessContinuation
here might be helpful
##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws
[]typex.Window, ts typex.EventTime,
// ret3 handles processing of a trio of return values.
func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
switch {
- case n.outErrIdx >= 0:
- if r2 != nil {
- return nil, r2.(error)
+ case n.outEtIdx >= 0:
+ if n.outErrIdx == 2 {
+ if r2 != nil {
+ return nil, r2.(error)
+ }
+ n.ret = FullValue{Windows: ws, Timestamp:
r0.(typex.EventTime), Elm: r1, Pane: pn}
+ return &n.ret, nil
}
- if n.outEtIdx < 0 {
- n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0,
Elm2: r1, Pane: pn}
+ if n.outPcIdx >= 0 {
+ n.ret = FullValue{Windows: ws, Timestamp:
r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation:
r2.(sdf.ProcessContinuation)}
return &n.ret, nil
}
- n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime),
Elm: r1, Pane: pn}
- return &n.ret, nil
- case n.outEtIdx == 0:
n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime),
Elm: r1, Elm2: r2, Pane: pn}
return &n.ret, nil
default:
- panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match
permitted return values.", r0, r1, r2))
+ if n.outErrIdx == 2 {
Review Comment:
I think this reads cleaner if you move it up to a condition in the outer
switch:
```
switch {
case n.outEtIdx >= 0:
// Do stuff
case n.outErrIdx == 2:
// Do stuff
default:
n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane:
pn, Continuation: r2.(sdf.ProcessContinuation)}
return &n.ret, nil
##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -35,9 +36,10 @@ type FullValue struct {
Elm interface{} // Element or KV key.
Elm2 interface{} // KV value, if not invalid
- Timestamp typex.EventTime
- Windows []typex.Window
- Pane typex.PaneInfo
+ Timestamp typex.EventTime
+ Windows []typex.Window
+ Pane typex.PaneInfo
+ Continuation sdf.ProcessContinuation
Review Comment:
Genuine (and possibly dumb) question - I'm not totally clear on how our
encoding works, but will this cause the whole ProcessContinuation object to get
encoded as part of our responses to the runner?
##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws
[]typex.Window, ts typex.EventTime,
// ret3 handles processing of a trio of return values.
func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
switch {
- case n.outErrIdx >= 0:
- if r2 != nil {
- return nil, r2.(error)
+ case n.outEtIdx >= 0:
+ if n.outErrIdx == 2 {
+ if r2 != nil {
+ return nil, r2.(error)
+ }
+ n.ret = FullValue{Windows: ws, Timestamp:
r0.(typex.EventTime), Elm: r1, Pane: pn}
+ return &n.ret, nil
}
- if n.outEtIdx < 0 {
- n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0,
Elm2: r1, Pane: pn}
+ if n.outPcIdx >= 0 {
Review Comment:
There's a lot of mixing `>=` conditions and `==` conditions on how we check
these indices - we should coallesce to one approach. As a reader, the back and
forth makes it harder to follow. When possible, I'd vote to prefer the exact
`==` check since validation of signatures should be taken care of on the front
end, and if we don't have the indices right we'll struggle later on anyways.
##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws
[]typex.Window, ts typex.EventTime,
// ret3 handles processing of a trio of return values.
func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
switch {
- case n.outErrIdx >= 0:
- if r2 != nil {
- return nil, r2.(error)
+ case n.outEtIdx >= 0:
+ if n.outErrIdx == 2 {
+ if r2 != nil {
+ return nil, r2.(error)
+ }
+ n.ret = FullValue{Windows: ws, Timestamp:
r0.(typex.EventTime), Elm: r1, Pane: pn}
+ return &n.ret, nil
}
- if n.outEtIdx < 0 {
- n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0,
Elm2: r1, Pane: pn}
+ if n.outPcIdx >= 0 {
+ n.ret = FullValue{Windows: ws, Timestamp:
r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation:
r2.(sdf.ProcessContinuation)}
return &n.ret, nil
}
- n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime),
Elm: r1, Pane: pn}
- return &n.ret, nil
- case n.outEtIdx == 0:
n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime),
Elm: r1, Elm2: r2, Pane: pn}
return &n.ret, nil
default:
- panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match
permitted return values.", r0, r1, r2))
+ if n.outErrIdx == 2 {
Review Comment:
If you don't do that, there's no reason to use a switch instead of a simple
if/else here
##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -284,7 +310,24 @@ func (n *invoker) ret4(pn typex.PaneInfo, ws
[]typex.Window, ts typex.EventTime,
if r3 != nil {
Review Comment:
I don't think this check is valid anymore. Couldn't you have something with
4 returns and no errors (1 event time, 2 kv elements, and 1 process
continuation)
Issue Time Tracking
-------------------
Worklog Id: (was: 755304)
Time Spent: 5h 50m (was: 5h 40m)
> [Go SDK] DoFn Self Checkpointing
> --------------------------------
>
> Key: BEAM-11104
> URL: https://issues.apache.org/jira/browse/BEAM-11104
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Priority: P3
> Time Spent: 5h 50m
> Remaining Estimate: 0h
>
> Allow SplittableDoFns to self checkpoint.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and
> the programming guide updated with SDF content.)
--
This message was sent by Atlassian Jira
(v8.20.1#820001)