youngoli commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r848100363


##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -913,6 +950,64 @@ func validateSdfElementT(fn *Fn, name string, method 
*funcx.Fn, num int) error {
        return nil
 }
 
+// validateIsWatermarkEstimating returns true if watermark estimator methods 
are present on the DoFn, returns
+// false if they aren't, and returns an error if they are present but the 
function isn't an sdf and thus doesn't
+// support watermark estimation
+func validateIsWatermarkEstimating(fn *Fn, isSdf bool) (bool, error) {
+       var isWatermarkEstimating bool
+       if _, ok := fn.methods[createWatermarkEstimatorName]; ok {
+               isWatermarkEstimating = true
+       }
+       if !isSdf && isWatermarkEstimating {
+               return false, errors.Errorf("Watermark estimation method %v is 
defined on non-splittable DoFn. Watermark"+
+                       "estimation is only valid on splittable DoFns", 
createWatermarkEstimatorName)
+       }
+       return isWatermarkEstimating, nil
+}
+
+// validateWatermarkSig validates that all watermark related functions are 
valid
+func validateWatermarkSig(fn *Fn) error {
+       paramRange := map[string][]int{

Review Comment:
   Nit: While I see that you're future-proofing this validation, I think 
simplifying it would make it much more legible (that is, instead of having a 
map and everything, since we only have one method with 0 params we can just 
store that in a variable like `paramsNum := 0`.
   
   I'd make an exception if you're already sure that additional methods or 
params will be added, like if those are already in the design doc. If that's 
the case then future-proofing is fine and you can disregard this nit.



##########
sdks/go/pkg/beam/core/graph/fn_test.go:
##########
@@ -227,6 +228,59 @@ func TestNewDoFnSdf(t *testing.T) {
        })
 }
 
+func TestNewDoFnWatermarkEstimating(t *testing.T) {
+       t.Run("valid", func(t *testing.T) {
+               tests := []struct {
+                       dfn  interface{}
+                       main mainInputs
+               }{
+                       {dfn: &GoodWatermarkEstimating{}, main: MainSingle},
+               }
+
+               for _, test := range tests {
+                       t.Run(reflect.TypeOf(test.dfn).String(), func(t 
*testing.T) {
+                               // Valid DoFns should pass validation with and 
without KV info.
+                               if _, err := NewDoFn(test.dfn); err != nil {
+                                       t.Fatalf("NewDoFn with Watermark 
Estimation failed: %v", err)
+                               }
+                               if _, err := NewDoFn(test.dfn, 
NumMainInputs(test.main)); err != nil {
+                                       t.Fatalf("NewDoFn(NumMainInputs(%v)) 
with Watermark Estimation failed: %v", test.main, err)
+                               }
+                       })
+               }
+       })
+       t.Run("invalid", func(t *testing.T) {
+               tests := []struct {
+                       dfn interface{}
+               }{
+                       {dfn: &BadWatermarkEstimatingNonSdf{}},
+                       {dfn: 
&BadWatermarkEstimatingCreateWatermarkEstimatorReturnType{}},
+               }
+               for _, test := range tests {
+                       t.Run(reflect.TypeOf(test.dfn).String(), func(t 
*testing.T) {
+                               if cfn, err := NewDoFn(test.dfn); err != nil {
+                                       t.Logf("NewDoFn with SDF failed as 
expected:\n%v", err)
+                               } else {
+                                       t.Errorf("NewDoFn(%v) = %v, want 
failure", cfn.Name(), cfn)
+                               }
+                               // If validation fails with unknown main 
inputs, then it should
+                               // always fail for any known number of main 
inputs, so test them
+                               // all. Error messages won't necessarily match.
+                               if cfn, err := NewDoFn(test.dfn, 
NumMainInputs(MainSingle)); err != nil {
+                                       
t.Logf("NewDoFn(NumMainInputs(MainSingle)) with SDF failed as expected:\n%v", 
err)
+                               } else {
+                                       t.Errorf("NewDoFn(%v, 
NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
+                               }
+                               if cfn, err := NewDoFn(test.dfn, 
NumMainInputs(MainKv)); err != nil {

Review Comment:
   Nit: This KV section can be removed since the watermark estimator validation 
code isn't affected in any way by whether the main input is a KV or not.



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -301,6 +308,24 @@ func (f *SplittableDoFn) RestrictionT() reflect.Type {
        return f.CreateInitialRestrictionFn().Ret[0].T
 }
 
+// IsWatermarkEstimating returns whether the DoFn implements a custom 
watermark estimator.
+func (f *SplittableDoFn) IsWatermarkEstimating() bool {
+       // Validation already passed, so if one SDF method is present they 
should
+       // all be present.

Review Comment:
   Nit: This looks like a copy-pasted comment.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -327,6 +338,10 @@ func (n *ProcessSizedElementsAndRestrictions) 
ProcessElement(_ context.Context,
                }
        }
 
+       if n.cweInv != nil {
+               n.PDo.we = n.cweInv.Invoke()

Review Comment:
   Creating a watermark estimator doesn't need to be done for each element, 
does it? If not, this should go in StartBundle.



##########
sdks/go/pkg/beam/core/runtime/exec/plan.go:
##########
@@ -24,6 +24,7 @@ import (
        "time"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "google.golang.org/protobuf/types/known/timestamppb"

Review Comment:
   Is timestamppb the type we already represent watermarks as internally? From 
a cursory look it feels weird to represent it with a protobuf type, but I'm 
also not sure what we currently use so I just want to check.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -664,6 +686,21 @@ func (n *ProcessSizedElementsAndRestrictions) GetInputId() 
string {
        return indexToInputId(0)
 }
 
+// GetOutputWatermark gets the current output watermark of the splittable unit
+// if one is defined, or returns nil otherwise.
+func (n *ProcessSizedElementsAndRestrictions) GetOutputWatermark() 
map[string]*timestamppb.Timestamp {
+       if n.PDo.we != nil {
+               ow := timestamppb.New(n.PDo.we.CurrentWatermark())
+               owMap := make(map[string]*timestamppb.Timestamp)
+               for _, out := range n.outputs {
+                       owMap[out] = ow

Review Comment:
   Maybe it's because I'm not well-versed in how watermarks work, but why store 
output watermarks in a map like this if we set the same value for every single 
output? Why not just store the single timestamp? Is it because of some upcoming 
work that's going to build on this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to