damondouglas commented on code in PR #30492: URL: https://github.com/apache/beam/pull/30492#discussion_r1618025133
########## sdks/go/pkg/beam/runners/prism/internal/stage.go: ########## @@ -55,15 +55,16 @@ type link struct { // account, but all serialization boundaries remain since the pcollections // would continue to get serialized. type stage struct { - ID string - transforms []string - primaryInput string // PCollection used as the parallel input. - outputs []link // PCollections that must escape this stage. - sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers - internalCols []string // PCollections that escape. Used for precise coder sending. - envID string - stateful bool - hasTimers []string + ID string + transforms []string + primaryInput string // PCollection used as the parallel input. + outputs []link // PCollections that must escape this stage. + sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers + internalCols []string // PCollections that escape. Used for precise coder sending. + envID string + stateful bool + hasTimers []string + processingTimeTimers map[string]bool Review Comment: Is https://github.com/lostluck/beam/blob/beam30083ProcessingTimme/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L843 the reason why we only have a map of processing time timers and not event time timers? ########## sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go: ########## @@ -221,3 +221,46 @@ func TestTestStream(t *testing.T) { } } } + +// TestProcessingTime is the suite for validating behaviors around ProcessingTime. +// Separate from the TestStream, Timers, and Triggers tests due to the unique nature +// of the time domain. +func TestProcessingTime(t *testing.T) { + initRunner(t) + + tests := []struct { + pipeline func(s beam.Scope) + }{ + {pipeline: primitives.TimersProcessingTimeTestStream_Infinity}, + {pipeline: primitives.TimersProcessingTime_Bounded}, + {pipeline: primitives.TimersProcessingTime_Unbounded}, + } + + configs := []struct { + name string + OneElementPerKey, OneKeyPerBundle bool + }{ + {"Greedy", false, false}, + {"AllElementsPerKey", false, true}, + {"OneElementPerKey", true, false}, + // {"OneElementPerBundle", true, true}, // Reveals flaky behavior Review Comment: Not PR blocking and not sure if this might be problematic for the AllElementsPerKey and OneElementPerKey cases in the future. I commented out this flakey OneElementPerBundle case and inserted log statements after https://github.com/lostluck/beam/blob/beam30083ProcessingTimme/sdks/go/test/integration/primitives/timers.go#L197 and after https://github.com/lostluck/beam/blob/beam30083ProcessingTimme/sdks/go/test/integration/primitives/timers.go#L210. I observed that the key associated with the panic here: https://github.com/lostluck/beam/blob/beam30083ProcessingTimme/sdks/go/test/integration/primitives/timers.go#L213 never appeared in the aforementioned logged steps. I haven't yet figured out why this is but wanted to relay my findings. ########## sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go: ########## @@ -305,8 +312,13 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) for { em.refreshCond.L.Lock() + // Check if processing time has advanced before the wait loop. + emNow := em.ProcessingTimeNow() + ptRefreshed := em.processTimeEvents.AdvanceTo(emNow) + em.watermarkRefreshes.merge(ptRefreshed) + // If there are no watermark refreshes available, we wait until there are. - for len(em.watermarkRefreshes) == 0 { + for len(em.watermarkRefreshes)+len(ptRefreshed) == 0 { // TODO Add processing time event condition instead of piggybacking on watermarks? Review Comment: Non PR blocking but curious what "processing time event condition" means and especially with respect to the "instead of" in the TODO statement. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org