This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 986ee964858 [Prism] Support BundleFinalization DoFn parameter (#32425)
986ee964858 is described below

commit 986ee9648582470c2ca96016698096b3edd51e42
Author: Damon <damondoug...@users.noreply.github.com>
AuthorDate: Sat Sep 21 10:41:47 2024 -0600

    [Prism] Support BundleFinalization DoFn parameter (#32425)
    
    * Support BundleFinalization DoFn parameter
    
    * Replace beam.Register with register.DoFn2x0
    
    * Add TestParDoBundleFinalizer.* to filters
    
    * Register test funcs
    
    * Add filter to portable runner tests
    
    * Temporarily skip test
    
    * Simply tests; refactor per PR comments
    
    * Skip tests for not lookback mode
    
    * Clean up tests; add to coverage
    
    * Fix import ordering
    
    ---------
    
    Co-authored-by: Robert Burke <lostl...@users.noreply.github.com>
---
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  |  1 +
 sdks/go/pkg/beam/core/typex/special.go             | 12 +++-
 sdks/go/pkg/beam/forward.go                        |  1 +
 .../pkg/beam/runners/prism/internal/handlepardo.go |  6 +-
 .../beam/runners/prism/internal/jobservices/job.go |  1 +
 .../pkg/beam/runners/prism/internal/preprocess.go  |  1 +
 sdks/go/pkg/beam/runners/prism/internal/stage.go   | 12 +++-
 .../runners/prism/internal/unimplemented_test.go   |  1 +
 .../beam/runners/prism/internal/worker/bundle.go   | 11 +++
 sdks/go/test/integration/integration.go            | 15 ++++
 sdks/go/test/integration/primitives/pardo.go       | 80 ++++++++++++++++++++++
 sdks/go/test/integration/primitives/pardo_test.go  | 43 ++++++++++++
 12 files changed, 177 insertions(+), 7 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 65280ef6b93..1e30d425850 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -494,6 +494,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) 
([]string, error) {
                        m.requirements[URNRequiresSplittableDoFn] = true
                }
                if _, ok := 
edge.Edge.DoFn.ProcessElementFn().BundleFinalization(); ok {
+                       payload.RequestsFinalization = true
                        m.requirements[URNRequiresBundleFinalization] = true
                }
                if _, ok := edge.Edge.DoFn.ProcessElementFn().StateProvider(); 
ok {
diff --git a/sdks/go/pkg/beam/core/typex/special.go 
b/sdks/go/pkg/beam/core/typex/special.go
index edc1249fe76..af36ba92d28 100644
--- a/sdks/go/pkg/beam/core/typex/special.go
+++ b/sdks/go/pkg/beam/core/typex/special.go
@@ -69,8 +69,18 @@ type Window interface {
        Equals(o Window) bool
 }
 
-// BundleFinalization allows registering callbacks to be performed after the 
runner durably persists bundle results.
+// BundleFinalization allows registering callbacks for the runner to invoke 
after the bundle completes and the runner
+// commits the output. Parameter is accessible during DoFn StartBundle, 
ProcessElement, FinishBundle.
+// However, if your DoFn implementation requires BundleFinalization in 
StartBundle or FinishBundle, it is needed in the
+// ProcessElement signature, even if not invoked,
+// Common use cases for BundleFinalization would be to perform work after 
elements in a bundle have been processed.
+// See beam.ParDo for documentation on these DoFn lifecycle methods.
 type BundleFinalization interface {
+
+       // RegisterCallback registers the runner to invoke func() after the 
runner persists the bundle of processed elements.
+       // The time.Duration configures the callback expiration, after which 
the runner will not invoke func().
+       // Returning error communicates to the runner that bundle finalization 
failed and the runner may choose to attempt
+       // finalization again.
        RegisterCallback(time.Duration, func() error)
 }
 
diff --git a/sdks/go/pkg/beam/forward.go b/sdks/go/pkg/beam/forward.go
index 210c39ab4e4..b2f610b703e 100644
--- a/sdks/go/pkg/beam/forward.go
+++ b/sdks/go/pkg/beam/forward.go
@@ -204,6 +204,7 @@ type Window = typex.Window
 
 // BundleFinalization represents the parameter used to register callbacks to
 // be run once the runner has durably persisted output for a bundle.
+// See typex.BundleFinalization for more details.
 type BundleFinalization = typex.BundleFinalization
 
 // These are the reflect.Type instances of the universal types, which are used
diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go 
b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
index 2d3425af33c..13e9b6f1b79 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
@@ -78,11 +78,7 @@ func (h *pardo) PrepareTransform(tid string, t 
*pipepb.PTransform, comps *pipepb
        }
 
        // Lets check for and remove anything that makes things less simple.
-       if pdo.OnWindowExpirationTimerFamilySpec == "" &&
-               !pdo.RequestsFinalization &&
-               !pdo.RequiresStableInput &&
-               !pdo.RequiresTimeSortedInput &&
-               pdo.RestrictionCoderId == "" {
+       if pdo.RestrictionCoderId == "" {
                // Which inputs are Side inputs don't change the graph further,
                // so they're not included here. Any nearly any ParDo can have 
them.
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
index 6cde48ded9a..1407feafe32 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -44,6 +44,7 @@ import (
 var supportedRequirements = map[string]struct{}{
        urns.RequirementSplittableDoFn:     {},
        urns.RequirementStatefulProcessing: {},
+       urns.RequirementBundleFinalization: {},
 }
 
 // TODO, move back to main package, and key off of executor handlers?
diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go 
b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
index ed7f168e36e..7de32f85b7e 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
@@ -445,6 +445,7 @@ func finalizeStage(stg *stage, comps *pipepb.Components, 
pipelineFacts *fusionFa
                                if err := 
(proto.UnmarshalOptions{}).Unmarshal(t.GetSpec().GetPayload(), pardo); err != 
nil {
                                        return fmt.Errorf("unable to decode 
ParDoPayload for %v", link.Transform)
                                }
+                               stg.finalize = pardo.RequestsFinalization
                                if 
len(pardo.GetTimerFamilySpecs())+len(pardo.GetStateSpecs())+len(pardo.GetOnWindowExpirationTimerFamilySpec())
 > 0 {
                                        stg.stateful = true
                                }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go 
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index 24cfc750933..f33754b2ca0 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -20,6 +20,7 @@ import (
        "context"
        "fmt"
        "io"
+       "runtime/debug"
        "sync/atomic"
        "time"
 
@@ -63,6 +64,7 @@ type stage struct {
        sideInputs   []engine.LinkID // Non-parallel input PCollections and 
their consumers
        internalCols []string        // PCollections that escape. Used for 
precise coder sending.
        envID        string
+       finalize     bool
        stateful     bool
        // hasTimers indicates the transform+timerfamily pairs that need to be 
waited on for
        // the stage to be considered complete.
@@ -106,7 +108,7 @@ func (s *stage) Execute(ctx context.Context, j 
*jobservices.Job, wk *worker.W, c
        defer func() {
                // Convert execution panics to errors to fail the bundle.
                if e := recover(); e != nil {
-                       err = fmt.Errorf("panic in stage.Execute bundle 
processing goroutine: %v, stage: %+v", e, s)
+                       err = fmt.Errorf("panic in stage.Execute bundle 
processing goroutine: %v, stage: %+v,stackTrace:\n%s", e, s, debug.Stack())
                }
        }()
        slog.Debug("Execute: starting bundle", "bundle", rb)
@@ -322,6 +324,14 @@ progress:
                slog.Debug("returned empty residual application", "bundle", rb, 
slog.Int("numResiduals", l), slog.String("pcollection", s.primaryInput))
        }
        em.PersistBundle(rb, s.OutputsToCoders, b.OutputData, s.inputInfo, 
residuals)
+       if s.finalize {
+               _, err := b.Finalize(ctx, wk)
+               if err != nil {
+                       slog.Error("SDK Error from bundle finalization", 
"bundle", rb, "error", err.Error())
+                       panic(err)
+               }
+               slog.Info("finalized bundle", "bundle", rb)
+       }
        b.OutputData = engine.TentativeData{} // Clear the data.
        return nil
 }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
index 6afb04521af..f8917c72ccd 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
@@ -83,6 +83,7 @@ func TestImplemented(t *testing.T) {
                {pipeline: primitives.Checkpoints},
                {pipeline: primitives.CoGBK},
                {pipeline: primitives.ReshuffleKV},
+               {pipeline: primitives.ParDoProcessElementBundleFinalizer},
 
                // The following have been "allowed" to unblock further 
development
                // But it's not clear these tests truly validate the expected 
behavior
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
index 50e427ca36f..3ccafdb81e9 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
@@ -206,6 +206,17 @@ func (b *B) Cleanup(wk *W) {
        wk.mu.Unlock()
 }
 
+func (b *B) Finalize(ctx context.Context, wk *W) 
(*fnpb.FinalizeBundleResponse, error) {
+       resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{
+               Request: &fnpb.InstructionRequest_FinalizeBundle{
+                       FinalizeBundle: &fnpb.FinalizeBundleRequest{
+                               InstructionId: b.InstID,
+                       },
+               },
+       })
+       return resp.GetFinalizeBundle(), nil
+}
+
 // Progress sends a progress request for the given bundle to the passed in 
worker, blocking on the response.
 func (b *B) Progress(ctx context.Context, wk *W) 
(*fnpb.ProcessBundleProgressResponse, error) {
        resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{
diff --git a/sdks/go/test/integration/integration.go 
b/sdks/go/test/integration/integration.go
index aec69036eeb..de782daa2d5 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -104,6 +104,9 @@ var directFilters = []string{
        "TestSetState",
        "TestSetStateClear",
        "TestTimers.*", // no timer support for the go direct runner.
+
+       // no support for BundleFinalizer
+       "TestParDoBundleFinalizer.*",
 }
 
 var portableFilters = []string{
@@ -134,6 +137,9 @@ var portableFilters = []string{
 
        // The portable runner does not uniquify timers. (data elements 
re-fired)
        "TestTimers.*",
+
+       // no support for BundleFinalizer
+       "TestParDoBundleFinalizer.*",
 }
 
 var prismFilters = []string{
@@ -190,6 +196,9 @@ var flinkFilters = []string{
 
        "TestTimers_EventTime_Unbounded", // (failure when comparing on side 
inputs (NPE on window lookup))
        "TestTimers_ProcessingTime.*",    // Flink doesn't support processing 
time timers.
+
+       // no support for BundleFinalizer
+       "TestParDoBundleFinalizer.*",
 }
 
 var samzaFilters = []string{
@@ -231,6 +240,9 @@ var samzaFilters = []string{
 
        // Samza does not support state.
        "TestTimers.*",
+
+       // no support for BundleFinalizer
+       "TestParDoBundleFinalizer.*",
 }
 
 var sparkFilters = []string{
@@ -265,6 +277,9 @@ var sparkFilters = []string{
 
        "TestTimers_EventTime_Unbounded",     // Side inputs in executable 
stage not supported.
        "TestTimers_ProcessingTime_Infinity", // Spark doesn't support test 
stream.
+
+       // no support for BundleFinalizer
+       "TestParDoBundleFinalizer.*",
 }
 
 var dataflowFilters = []string{
diff --git a/sdks/go/test/integration/primitives/pardo.go 
b/sdks/go/test/integration/primitives/pardo.go
index 2c2383ea90b..dc59d8f67b8 100644
--- a/sdks/go/test/integration/primitives/pardo.go
+++ b/sdks/go/test/integration/primitives/pardo.go
@@ -18,6 +18,8 @@ package primitives
 import (
        "flag"
        "fmt"
+       "sync/atomic"
+       "time"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
@@ -32,6 +34,9 @@ func init() {
        register.Function3x2(asymJoinFn)
        register.Function5x0(splitByName)
        register.Function2x0(emitPipelineOptions)
+       register.DoFn2x0[beam.BundleFinalization, 
[]byte]((*processElemBundleFinalizer)(nil))
+       register.DoFn2x0[beam.BundleFinalization, 
[]byte]((*finalizerInFinishBundle)(nil))
+       register.DoFn2x0[beam.BundleFinalization, 
[]byte]((*finalizerInAll)(nil))
 
        register.Iter1[int]()
        register.Iter2[int, int]()
@@ -192,3 +197,78 @@ func emitPipelineOptions(_ []byte, emit func(string)) {
        emit(fmt.Sprintf("%s: %s", "B", beam.PipelineOptions.Get("B")))
        emit(fmt.Sprintf("%s: %s", "C", beam.PipelineOptions.Get("C")))
 }
+
+var CountInvokeBundleFinalizer atomic.Int32
+
+const (
+       BundleFinalizerStart   = 1
+       BundleFinalizerProcess = 2
+       BundleFinalizerFinish  = 4
+)
+
+// ParDoProcessElementBundleFinalizer creates a beam.Pipeline with a 
beam.ParDo0 that processes a DoFn with a
+// beam.BundleFinalization in its ProcessElement method.
+func ParDoProcessElementBundleFinalizer(s beam.Scope) {
+       imp := beam.Impulse(s)
+       beam.ParDo0(s, &processElemBundleFinalizer{}, imp)
+}
+
+type processElemBundleFinalizer struct {
+}
+
+func (fn *processElemBundleFinalizer) ProcessElement(bf 
beam.BundleFinalization, _ []byte) {
+       bf.RegisterCallback(time.Second, func() error {
+               CountInvokeBundleFinalizer.Add(BundleFinalizerProcess)
+               return nil
+       })
+}
+
+// ParDoFinishBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that 
processes a DoFn containing a noop
+// beam.BundleFinalization in its ProcessElement method and a 
beam.BundleFinalization in its FinishBundle method.
+func ParDoFinishBundleFinalizer(s beam.Scope) {
+       imp := beam.Impulse(s)
+       beam.ParDo0(s, &finalizerInFinishBundle{}, imp)
+}
+
+type finalizerInFinishBundle struct{}
+
+// ProcessElement requires beam.BundleFinalization in its method signature in 
order for FinishBundle's
+// beam.BundleFinalization to be invoked.
+func (fn *finalizerInFinishBundle) ProcessElement(_ beam.BundleFinalization, _ 
[]byte) {}
+
+func (fn *finalizerInFinishBundle) FinishBundle(bf beam.BundleFinalization) {
+       bf.RegisterCallback(time.Second, func() error {
+               CountInvokeBundleFinalizer.Add(BundleFinalizerFinish)
+               return nil
+       })
+}
+
+// ParDoFinalizerInAll creates a beam.Pipeline with a beam.ParDo0 that 
processes a DoFn containing a beam.BundleFinalization
+// in all three lifecycle methods StartBundle, ProcessElement, FinishBundle.
+func ParDoFinalizerInAll(s beam.Scope) {
+       imp := beam.Impulse(s)
+       beam.ParDo0(s, &finalizerInAll{}, imp)
+}
+
+type finalizerInAll struct{}
+
+func (fn *finalizerInAll) StartBundle(bf beam.BundleFinalization) {
+       bf.RegisterCallback(time.Second, func() error {
+               CountInvokeBundleFinalizer.Add(BundleFinalizerStart)
+               return nil
+       })
+}
+
+func (fn *finalizerInAll) ProcessElement(bf beam.BundleFinalization, _ []byte) 
{
+       bf.RegisterCallback(time.Second, func() error {
+               CountInvokeBundleFinalizer.Add(BundleFinalizerProcess)
+               return nil
+       })
+}
+
+func (fn *finalizerInAll) FinishBundle(bf beam.BundleFinalization) {
+       bf.RegisterCallback(time.Second, func() error {
+               CountInvokeBundleFinalizer.Add(BundleFinalizerFinish)
+               return nil
+       })
+}
diff --git a/sdks/go/test/integration/primitives/pardo_test.go 
b/sdks/go/test/integration/primitives/pardo_test.go
index d2ad57b350b..aa6cb3de200 100644
--- a/sdks/go/test/integration/primitives/pardo_test.go
+++ b/sdks/go/test/integration/primitives/pardo_test.go
@@ -18,6 +18,8 @@ package primitives
 import (
        "testing"
 
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
        "github.com/apache/beam/sdks/v2/go/test/integration"
 )
@@ -46,3 +48,44 @@ func TestParDoPipelineOptions(t *testing.T) {
        integration.CheckFilters(t)
        ptest.RunAndValidate(t, ParDoPipelineOptions())
 }
+
+func TestParDoBundleFinalizer(t *testing.T) {
+       integration.CheckFilters(t)
+       if !jobopts.IsLoopback() {
+               t.Skip("Only Loopback mode is supported")
+       }
+       for _, tt := range []struct {
+               name       string
+               pipelineFn func(s beam.Scope)
+               want       int32
+       }{
+               {
+                       name:       "InProcessElement",
+                       pipelineFn: ParDoProcessElementBundleFinalizer,
+                       want:       BundleFinalizerProcess,
+               },
+               {
+                       name:       "InFinishBundle",
+                       pipelineFn: ParDoFinishBundleFinalizer,
+                       want:       BundleFinalizerFinish,
+               },
+               {
+                       name:       "InStartProcessFinishBundle",
+                       pipelineFn: ParDoFinalizerInAll,
+                       want:       BundleFinalizerStart + 
BundleFinalizerProcess + BundleFinalizerFinish,
+               },
+       } {
+               t.Run(tt.name, func(t *testing.T) {
+                       CountInvokeBundleFinalizer.Store(0)
+                       p, s := beam.NewPipelineWithRoot()
+                       tt.pipelineFn(s)
+                       _, err := ptest.RunWithMetrics(p)
+                       if err != nil {
+                               t.Fatalf("Failed to execute job: %v", err)
+                       }
+                       if got := CountInvokeBundleFinalizer.Load(); got != 
tt.want {
+                               t.Errorf("BundleFinalization RegisterCallback 
not invoked as expected via proxy counts, got: %v, want: %v", got, tt.want)
+                       }
+               })
+       }
+}

Reply via email to