[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=170009&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170009 ]
ASF GitHub Bot logged work on BEAM-5265: ---------------------------------------- Author: ASF GitHub Bot Created on: 27/Nov/18 21:48 Start Date: 27/Nov/18 21:48 Worklog Time Spent: 10m Work Description: kennknowles commented on a change in pull request #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#discussion_r236859849 ########## File path: runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ########## @@ -198,44 +218,185 @@ public void testFinishBundleExceptionsWrappedAsUserCodeException() { } /** - * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn}. + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. */ @Test - public void testOnTimerCalled() { + @Category(NeedsRunner.class) + public void testOnTimerCalledWithGlobalWindow() { + + // TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock + Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + Instant currentEventTime = new Instant(42); + + TestStream<KV<String, String>> testStream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) + .advanceWatermarkTo(currentEventTime) + .addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new Instant(99))) + .advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1)) + .advanceWatermarkToInfinity(); + WindowFn<?, GlobalWindow> windowFn = new GlobalWindows(); - DoFnWithTimers<GlobalWindow> fn = new DoFnWithTimers(windowFn.windowCoder()); - DoFnRunner<String, String> runner = - new SimpleDoFnRunner<>( - null, - fn, - NullSideInputReader.empty(), - null, - null, - Collections.emptyList(), - mockStepContext, - null, - Collections.emptyMap(), - WindowingStrategy.of(windowFn)); + DoFnWithTimers<GlobalWindow> fn = new DoFnWithTimers<>(windowFn.windowCoder()); - Instant currentTime = new Instant(42); - Duration offset = Duration.millis(37); + PCollection<TimerData> output = + pipeline + .apply(testStream) + .apply(Window.into(new GlobalWindows())) + .apply(ParDo.of(fn)) + .setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder())); - // Mocking is not easily compatible with annotation analysis, so we manually record - // the method call. - runner.onTimer( - DoFnWithTimers.TIMER_ID, - GlobalWindow.INSTANCE, - currentTime.plus(offset), - TimeDomain.EVENT_TIME); - - assertThat( - fn.onTimerInvocations, - contains( + PAssert.that(output) + .containsInAnyOrder( TimerData.of( - DoFnWithTimers.TIMER_ID, + DoFnWithTimers.PROCESSING_TIMER_ID, StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), - currentTime.plus(offset), - TimeDomain.EVENT_TIME))); + currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1), + TimeDomain.PROCESSING_TIME), + TimerData.of( + DoFnWithTimers.EVENT_TIMER_ID, + StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), + currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET), + TimeDomain.EVENT_TIME)); + + pipeline.run(); + } + + /** + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. With {@link IntervalWindow}, we check behavior of emitted events + * when time is inside and outside of window boundaries. + */ + @Test + @Category(NeedsRunner.class) + public void testOnTimerCalledWithIntervalWindow() { + + // TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock + Instant baseTime = new Instant(0); + + Duration windowDuration = Duration.standardHours(1); + Duration windowLateness = Duration.standardMinutes(1); + IntervalWindow window = new IntervalWindow(baseTime, windowDuration); + FixedWindows windowFn = FixedWindows.of(windowDuration); + DoFnWithTimers<IntervalWindow> fn = new DoFnWithTimers<>(windowFn.windowCoder()); + + TimestampedValue<KV<String, String>> event = + TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start()); + + TestStream<KV<String, String>> testStream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) + // watermark in window, processing time far behind + .advanceWatermarkTo(window.start()) + .addElements(event) + .advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1)) + .advanceWatermarkTo(window.start().plus(DoFnWithTimers.TIMER_OFFSET).plus(1)) + + // watermark and processing time within window + .advanceProcessingTime( + Duration.millis(Math.abs(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()))) + .addElements(event) + .advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1)) + .advanceWatermarkTo( + window.start().plus(2 * DoFnWithTimers.TIMER_OFFSET.getMillis()).plus(2)) + + // watermark in window, processing time ahead of window.end() but within lateness + .advanceProcessingTime(windowDuration) + .addElements(event) + .advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1)) + .advanceWatermarkTo( + window.start().plus(3 * DoFnWithTimers.TIMER_OFFSET.getMillis()).plus(3)) + + // watermark in window, processing time is out of window's allowed lateness Review comment: I think a more isolated test case might be easier. You want processing time to be way beyond the allowed lateness. Then when you emit elements I think `outputWithTimestamp` should actually crash saying that the timestamp was out of bounds. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 170009) Time Spent: 3h 40m (was: 3.5h) > Can not test Timer with processing time domain > ---------------------------------------------- > > Key: BEAM-5265 > URL: https://issues.apache.org/jira/browse/BEAM-5265 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-direct > Reporter: Jozef Vilcek > Assignee: Jozef Vilcek > Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While > writing tests, I noticed that it does not react to `advanceProcessingTime()` > on tests stream. Problem seems to be here: > [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260] > I can only tell that patching this place works for direct runner tests. Not > sure about broader impact on other runners since it is in `runner-core` -- This message was sent by Atlassian JIRA (v7.6.3#76005)