lostluck commented on code in PR #32425:
URL: https://github.com/apache/beam/pull/32425#discussion_r1764226783


##########
sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go:
##########
@@ -206,6 +206,20 @@ func (b *B) Cleanup(wk *W) {
        wk.mu.Unlock()
 }
 
+func (b *B) Finalize(ctx context.Context, wk *W) 
(*fnpb.FinalizeBundleResponse, error) {
+       resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{
+               Request: &fnpb.InstructionRequest_FinalizeBundle{
+                       FinalizeBundle: &fnpb.FinalizeBundleRequest{
+                               InstructionId: b.InstID,
+                       },
+               },
+       })
+       if resp.GetError() != "" {
+               return nil, fmt.Errorf("finalize[%v] error from SDK: %v", 
b.InstID, resp.GetError())
+       }
+       return resp.GetFinalizeBundle(), nil

Review Comment:
   This is and will likely always be an empty message, and since this is 
internal code, I'd rather simply not return the empty message at this time, 
than speculate that we might want to return the message in the future if it 
gains fields.



##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -278,6 +280,14 @@ progress:
                slog.Debug("returned empty residual application", "bundle", rb, 
slog.Int("numResiduals", l), slog.String("pcollection", s.primaryInput))
        }
        em.PersistBundle(rb, s.OutputsToCoders, b.OutputData, s.inputInfo, 
residuals)
+       if s.finalize {
+               _, err := b.Finalize(ctx, wk)
+               if err != nil {
+                       slog.Debug("SDK Error from bundle finalization", 
"bundle", rb, "error", err.Error())
+                       panic(err)

Review Comment:
   At present the only reason the tests are failing is due to this panic.
   
   I'd make the slog output here be an error instead of debug, since it could 
be useful to users, and a finalizing failure isn't expected to fail the job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to