lostluck commented on code in PR #32029:
URL: https://github.com/apache/beam/pull/32029#discussion_r1697730564


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -211,6 +211,16 @@ func (em *ElementManager) AddStage(ID string, inputIDs, 
outputIDs []string, side
        for _, input := range inputIDs {
                em.consumers[input] = append(em.consumers[input], ss.ID)
        }
+
+       // In very rare cases, we can have a stage without any inputs, such as 
a flatten.
+       // In that case, there's nothing that will start the watermark refresh 
cycle,
+       // so we must do it here.
+       if len(inputIDs) == 0 {

Review Comment:
   Effectively yes, but basically it's due to flatten being special: all Other 
Stages must have a single parallel input pCollection. Flattens may have any 
number, which apparently includes 0.
   
   In this case, by construction, the flatten has no inputs to begin with.
   
   Impulses and TestStream are also special: Impulses are required to kick off 
SDK side processing. TestStream is just weird.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to