damondouglas commented on code in PR #30492:
URL: https://github.com/apache/beam/pull/30492#discussion_r1618025133


##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -55,15 +55,16 @@ type link struct {
 // account, but all serialization boundaries remain since the pcollections
 // would continue to get serialized.
 type stage struct {
-       ID           string
-       transforms   []string
-       primaryInput string          // PCollection used as the parallel input.
-       outputs      []link          // PCollections that must escape this 
stage.
-       sideInputs   []engine.LinkID // Non-parallel input PCollections and 
their consumers
-       internalCols []string        // PCollections that escape. Used for 
precise coder sending.
-       envID        string
-       stateful     bool
-       hasTimers    []string
+       ID                   string
+       transforms           []string
+       primaryInput         string          // PCollection used as the 
parallel input.
+       outputs              []link          // PCollections that must escape 
this stage.
+       sideInputs           []engine.LinkID // Non-parallel input PCollections 
and their consumers
+       internalCols         []string        // PCollections that escape. Used 
for precise coder sending.
+       envID                string
+       stateful             bool
+       hasTimers            []string
+       processingTimeTimers map[string]bool

Review Comment:
   Is 
https://github.com/lostluck/beam/blob/beam30083ProcessingTimme/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L843
 the reason why we only have a map of processing time timers and not event time 
timers?



##########
sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go:
##########
@@ -221,3 +221,46 @@ func TestTestStream(t *testing.T) {
                }
        }
 }
+
+// TestProcessingTime is the suite for validating behaviors around 
ProcessingTime.
+// Separate from the TestStream, Timers, and Triggers tests due to the unique 
nature
+// of the time domain.
+func TestProcessingTime(t *testing.T) {
+       initRunner(t)
+
+       tests := []struct {
+               pipeline func(s beam.Scope)
+       }{
+               {pipeline: primitives.TimersProcessingTimeTestStream_Infinity},
+               {pipeline: primitives.TimersProcessingTime_Bounded},
+               {pipeline: primitives.TimersProcessingTime_Unbounded},
+       }
+
+       configs := []struct {
+               name                              string
+               OneElementPerKey, OneKeyPerBundle bool
+       }{
+               {"Greedy", false, false},
+               {"AllElementsPerKey", false, true},
+               {"OneElementPerKey", true, false},
+               // {"OneElementPerBundle", true, true}, // Reveals flaky 
behavior

Review Comment:
   Not PR blocking and not sure if this might be problematic for the 
AllElementsPerKey and OneElementPerKey cases in the future. I commented out 
this flakey OneElementPerBundle case and inserted log statements after 
https://github.com/lostluck/beam/blob/beam30083ProcessingTimme/sdks/go/test/integration/primitives/timers.go#L197
 and after 
https://github.com/lostluck/beam/blob/beam30083ProcessingTimme/sdks/go/test/integration/primitives/timers.go#L210.
 I observed that the key associated with the panic here: 
https://github.com/lostluck/beam/blob/beam30083ProcessingTimme/sdks/go/test/integration/primitives/timers.go#L213
 never appeared in the aforementioned logged steps. I haven't yet figured out 
why this is but wanted to relay my findings.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -305,8 +312,13 @@ func (em *ElementManager) Bundles(ctx context.Context, 
nextBundID func() string)
 
                for {
                        em.refreshCond.L.Lock()
+                       // Check if processing time has advanced before the 
wait loop.
+                       emNow := em.ProcessingTimeNow()
+                       ptRefreshed := em.processTimeEvents.AdvanceTo(emNow)
+                       em.watermarkRefreshes.merge(ptRefreshed)
+
                        // If there are no watermark refreshes available, we 
wait until there are.
-                       for len(em.watermarkRefreshes) == 0 {
+                       for len(em.watermarkRefreshes)+len(ptRefreshed) == 0 { 
// TODO Add processing time event condition instead of piggybacking on 
watermarks?

Review Comment:
   Non PR blocking but curious what "processing time event condition" means and 
especially with respect to the "instead of" in the TODO statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to