[ 
https://issues.apache.org/jira/browse/BEAM-11104?focusedWorklogId=762604&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762604
 ]

ASF GitHub Bot logged work on BEAM-11104:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Apr/22 22:39
            Start Date: 26/Apr/22 22:39
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on code in PR #17386:
URL: https://github.com/apache/beam/pull/17386#discussion_r859200270


##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -320,6 +321,73 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
        return ProgressReportSnapshot{ID: n.SID.PtransformID, Name: n.Name, 
Count: c, pcol: pcol}
 }
 
+func (n *DataSource) getProcessContinuation() sdf.ProcessContinuation {
+       if u, ok := n.Out.(*ProcessSizedElementsAndRestrictions); ok {

Review Comment:
   Consider adding a comment here that current support requires the SDF to be 
immediately after the datasource node.  This is true for most runners (it's 
easiest to reason about), but it's not a requirement of the model. See also how 
we could theoretically support multiple Datasources in a single bundled, but in 
practice, that never happens....



##########
sdks/go/pkg/beam/core/runtime/harness/harness.go:
##########
@@ -401,21 +402,47 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                                c.plans[bdID] = append(c.plans[bdID], plan)
                        }
                }
+
+               // Check if the underlying DoFn self-checkpointed.
+               sr, delay, checkpointed, checkErr := plan.Checkpoint()
+
+               var rRoots []*fnpb.DelayedBundleApplication
+               if checkpointed {
+                       rRoots = make([]*fnpb.DelayedBundleApplication, 
len(sr.RS))
+                       for i, r := range sr.RS {
+                               rRoots[i] = &fnpb.DelayedBundleApplication{
+                                       Application: &fnpb.BundleApplication{
+                                               TransformId:      sr.TId,
+                                               InputId:          sr.InId,
+                                               Element:          r,
+                                               OutputWatermarks: sr.OW,
+                                       },
+                                       RequestedTimeDelay: 
durationpb.New(delay),
+                               }
+                       }
+               }
+
                delete(c.active, instID)
                if removed, ok := c.inactive.Insert(instID); ok {
                        delete(c.failed, removed) // Also GC old failed bundles.
                }
                delete(c.metStore, instID)
+
                c.mu.Unlock()
 
                if err != nil {
                        return fail(ctx, instID, "process bundle failed for 
instruction %v using plan %v : %v", instID, bdID, err)
                }
 
+               if checkErr != nil {
+                       return fail(ctx, instID, "process bundle failed for 
instruction %v using plan %v : %v", instID, bdID, checkErr)

Review Comment:
   Consider making it explicit that this error happened while checkpointing.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 762604)
    Time Spent: 14h 40m  (was: 14.5h)

> [Go SDK] DoFn Self Checkpointing
> --------------------------------
>
>                 Key: BEAM-11104
>                 URL: https://issues.apache.org/jira/browse/BEAM-11104
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Jack McCluskey
>            Priority: P3
>          Time Spent: 14h 40m
>  Remaining Estimate: 0h
>
> Allow SplittableDoFns to self checkpoint.
> Design doc: 
> https://docs.google.com/document/d/1_JbzjY9JR07ZK5v7PcZevUfzHPsqwzfV7W6AouNpMPk/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to