damccorm commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r848330724


##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -301,6 +308,24 @@ func (f *SplittableDoFn) RestrictionT() reflect.Type {
        return f.CreateInitialRestrictionFn().Ret[0].T
 }
 
+// IsWatermarkEstimating returns whether the DoFn implements a custom 
watermark estimator.
+func (f *SplittableDoFn) IsWatermarkEstimating() bool {
+       // Validation already passed, so if one SDF method is present they 
should
+       // all be present.

Review Comment:
   Whoops, good catch



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -913,6 +950,64 @@ func validateSdfElementT(fn *Fn, name string, method 
*funcx.Fn, num int) error {
        return nil
 }
 
+// validateIsWatermarkEstimating returns true if watermark estimator methods 
are present on the DoFn, returns
+// false if they aren't, and returns an error if they are present but the 
function isn't an sdf and thus doesn't
+// support watermark estimation
+func validateIsWatermarkEstimating(fn *Fn, isSdf bool) (bool, error) {
+       var isWatermarkEstimating bool
+       if _, ok := fn.methods[createWatermarkEstimatorName]; ok {
+               isWatermarkEstimating = true
+       }
+       if !isSdf && isWatermarkEstimating {
+               return false, errors.Errorf("Watermark estimation method %v is 
defined on non-splittable DoFn. Watermark"+
+                       "estimation is only valid on splittable DoFns", 
createWatermarkEstimatorName)
+       }
+       return isWatermarkEstimating, nil
+}
+
+// validateWatermarkSig validates that all watermark related functions are 
valid
+func validateWatermarkSig(fn *Fn) error {
+       paramRange := map[string][]int{

Review Comment:
   Yeah, there will definitely be more added in the future (I actually already 
have those changes locally 😄 )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to