damccorm commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r848331113
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -327,6 +338,10 @@ func (n *ProcessSizedElementsAndRestrictions)
ProcessElement(_ context.Context,
}
}
+ if n.cweInv != nil {
+ n.PDo.we = n.cweInv.Invoke()
Review Comment:
It actually does - watermark estimation is done at the element/restriction
level. This will become more important when we're taking in watermark state
which is created while giving the user element/restriction access
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -664,6 +686,21 @@ func (n *ProcessSizedElementsAndRestrictions) GetInputId()
string {
return indexToInputId(0)
}
+// GetOutputWatermark gets the current output watermark of the splittable unit
+// if one is defined, or returns nil otherwise.
+func (n *ProcessSizedElementsAndRestrictions) GetOutputWatermark()
map[string]*timestamppb.Timestamp {
+ if n.PDo.we != nil {
+ ow := timestamppb.New(n.PDo.we.CurrentWatermark())
+ owMap := make(map[string]*timestamppb.Timestamp)
+ for _, out := range n.outputs {
+ owMap[out] = ow
Review Comment:
I can't speak to why this decision was originally made (though I imagine it
makes things significantly easier for the runner since input watermark is an
important concept and := `min(all incoming output watermarks)`). It is worth
noting that we're not actually setting _every_ output to this value, we're just
setting the outputs of this transform to this value. At the runner level, I
believe this is unioned with other output watermarks that may exist and this
map is used to make triggering/windowing decisions
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]