[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=193698&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193698 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 02/Feb/19 18:01 Start Date: 02/Feb/19 18:01 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#issuecomment-459985504 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 193698) Time Spent: 4h (was: 3h 50m) > Can not test Timer with processing time domain > -- > > Key: BEAM-5265 > URL: https://issues.apache.org/jira/browse/BEAM-5265 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-direct >Reporter: Jozef Vilcek >Assignee: Jozef Vilcek >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While > writing tests, I noticed that it does not react to `advanceProcessingTime()` > on tests stream. Problem seems to be here: > [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260] > I can only tell that patching this place works for direct runner tests. Not > sure about broader impact on other runners since it is in `runner-core` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=196607&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196607 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 09/Feb/19 18:50 Start Date: 09/Feb/19 18:50 Worklog Time Spent: 10m Work Description: stale[bot] commented on pull request #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 196607) Time Spent: 4h 20m (was: 4h 10m) > Can not test Timer with processing time domain > -- > > Key: BEAM-5265 > URL: https://issues.apache.org/jira/browse/BEAM-5265 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-direct >Reporter: Jozef Vilcek >Assignee: Jozef Vilcek >Priority: Major > Labels: triaged > Time Spent: 4h 20m > Remaining Estimate: 0h > > I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While > writing tests, I noticed that it does not react to `advanceProcessingTime()` > on tests stream. Problem seems to be here: > [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260] > I can only tell that patching this place works for direct runner tests. Not > sure about broader impact on other runners since it is in `runner-core` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=196606&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196606 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 09/Feb/19 18:50 Start Date: 09/Feb/19 18:50 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#issuecomment-462069491 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 196606) Time Spent: 4h 10m (was: 4h) > Can not test Timer with processing time domain > -- > > Key: BEAM-5265 > URL: https://issues.apache.org/jira/browse/BEAM-5265 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-direct >Reporter: Jozef Vilcek >Assignee: Jozef Vilcek >Priority: Major > Labels: triaged > Time Spent: 4h 10m > Remaining Estimate: 0h > > I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While > writing tests, I noticed that it does not react to `advanceProcessingTime()` > on tests stream. Problem seems to be here: > [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260] > I can only tell that patching this place works for direct runner tests. Not > sure about broader impact on other runners since it is in `runner-core` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=168724&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-168724 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 22/Nov/18 09:34 Start Date: 22/Nov/18 09:34 Worklog Time Spent: 10m Work Description: JozoVilcek commented on a change in pull request #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#discussion_r235655236 ## File path: runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ## @@ -198,44 +218,185 @@ public void testFinishBundleExceptionsWrappedAsUserCodeException() { } /** - * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn}. + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. */ @Test - public void testOnTimerCalled() { + @Category(NeedsRunner.class) + public void testOnTimerCalledWithGlobalWindow() { + +// TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock +Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; +Instant currentEventTime = new Instant(42); + +TestStream> testStream = +TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) +.advanceWatermarkTo(currentEventTime) +.addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new Instant(99))) +.advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1)) +.advanceWatermarkToInfinity(); + WindowFn windowFn = new GlobalWindows(); -DoFnWithTimers fn = new DoFnWithTimers(windowFn.windowCoder()); -DoFnRunner runner = -new SimpleDoFnRunner<>( -null, -fn, -NullSideInputReader.empty(), -null, -null, -Collections.emptyList(), -mockStepContext, -null, -Collections.emptyMap(), -WindowingStrategy.of(windowFn)); +DoFnWithTimers fn = new DoFnWithTimers<>(windowFn.windowCoder()); -Instant currentTime = new Instant(42); -Duration offset = Duration.millis(37); +PCollection output = +pipeline +.apply(testStream) +.apply(Window.into(new GlobalWindows())) +.apply(ParDo.of(fn)) + .setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder())); -// Mocking is not easily compatible with annotation analysis, so we manually record -// the method call. -runner.onTimer( -DoFnWithTimers.TIMER_ID, -GlobalWindow.INSTANCE, -currentTime.plus(offset), -TimeDomain.EVENT_TIME); - -assertThat( -fn.onTimerInvocations, -contains( +PAssert.that(output) +.containsInAnyOrder( TimerData.of( -DoFnWithTimers.TIMER_ID, +DoFnWithTimers.PROCESSING_TIMER_ID, StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), -currentTime.plus(offset), -TimeDomain.EVENT_TIME))); + currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1), +TimeDomain.PROCESSING_TIME), +TimerData.of( +DoFnWithTimers.EVENT_TIMER_ID, +StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), +currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET), +TimeDomain.EVENT_TIME)); + +pipeline.run(); + } + + /** + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. With {@link IntervalWindow}, we check behavior of emitted events + * when time is inside and outside of window boundaries. + */ + @Test + @Category(NeedsRunner.class) + public void testOnTimerCalledWithIntervalWindow() { + +// TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock +Instant baseTime = new Instant(0); + +Duration windowDuration = Duration.standardHours(1); +Duration windowLateness = Duration.standardMinutes(1); +IntervalWindow window = new IntervalWindow(baseTime, windowDuration); +FixedWindows windowFn = FixedWindows.of(windowDuration); +DoFnWithTimers fn = new DoFnWithTimers<>(windowFn.windowCoder()); + +TimestampedValue> event = +TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start()); + +TestStream> testStream = +TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) +// watermark in window, processing time far behind +.advanceWatermarkTo(window.sta
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=170009&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170009 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 27/Nov/18 21:48 Start Date: 27/Nov/18 21:48 Worklog Time Spent: 10m Work Description: kennknowles commented on a change in pull request #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#discussion_r236859849 ## File path: runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ## @@ -198,44 +218,185 @@ public void testFinishBundleExceptionsWrappedAsUserCodeException() { } /** - * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn}. + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. */ @Test - public void testOnTimerCalled() { + @Category(NeedsRunner.class) + public void testOnTimerCalledWithGlobalWindow() { + +// TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock +Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; +Instant currentEventTime = new Instant(42); + +TestStream> testStream = +TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) +.advanceWatermarkTo(currentEventTime) +.addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new Instant(99))) +.advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1)) +.advanceWatermarkToInfinity(); + WindowFn windowFn = new GlobalWindows(); -DoFnWithTimers fn = new DoFnWithTimers(windowFn.windowCoder()); -DoFnRunner runner = -new SimpleDoFnRunner<>( -null, -fn, -NullSideInputReader.empty(), -null, -null, -Collections.emptyList(), -mockStepContext, -null, -Collections.emptyMap(), -WindowingStrategy.of(windowFn)); +DoFnWithTimers fn = new DoFnWithTimers<>(windowFn.windowCoder()); -Instant currentTime = new Instant(42); -Duration offset = Duration.millis(37); +PCollection output = +pipeline +.apply(testStream) +.apply(Window.into(new GlobalWindows())) +.apply(ParDo.of(fn)) + .setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder())); -// Mocking is not easily compatible with annotation analysis, so we manually record -// the method call. -runner.onTimer( -DoFnWithTimers.TIMER_ID, -GlobalWindow.INSTANCE, -currentTime.plus(offset), -TimeDomain.EVENT_TIME); - -assertThat( -fn.onTimerInvocations, -contains( +PAssert.that(output) +.containsInAnyOrder( TimerData.of( -DoFnWithTimers.TIMER_ID, +DoFnWithTimers.PROCESSING_TIMER_ID, StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), -currentTime.plus(offset), -TimeDomain.EVENT_TIME))); + currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1), +TimeDomain.PROCESSING_TIME), +TimerData.of( +DoFnWithTimers.EVENT_TIMER_ID, +StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), +currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET), +TimeDomain.EVENT_TIME)); + +pipeline.run(); + } + + /** + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. With {@link IntervalWindow}, we check behavior of emitted events + * when time is inside and outside of window boundaries. + */ + @Test + @Category(NeedsRunner.class) + public void testOnTimerCalledWithIntervalWindow() { + +// TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock +Instant baseTime = new Instant(0); + +Duration windowDuration = Duration.standardHours(1); +Duration windowLateness = Duration.standardMinutes(1); +IntervalWindow window = new IntervalWindow(baseTime, windowDuration); +FixedWindows windowFn = FixedWindows.of(windowDuration); +DoFnWithTimers fn = new DoFnWithTimers<>(windowFn.windowCoder()); + +TimestampedValue> event = +TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start()); + +TestStream> testStream = +TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) +// watermark in window, processing time far behind +.advanceWatermarkTo(window.s
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=171376&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-171376 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 02/Dec/18 15:05 Start Date: 02/Dec/18 15:05 Worklog Time Spent: 10m Work Description: JozoVilcek commented on a change in pull request #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#discussion_r238102116 ## File path: runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ## @@ -198,44 +218,185 @@ public void testFinishBundleExceptionsWrappedAsUserCodeException() { } /** - * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn}. + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. */ @Test - public void testOnTimerCalled() { + @Category(NeedsRunner.class) + public void testOnTimerCalledWithGlobalWindow() { + +// TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock +Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; +Instant currentEventTime = new Instant(42); + +TestStream> testStream = +TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) +.advanceWatermarkTo(currentEventTime) +.addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new Instant(99))) +.advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1)) +.advanceWatermarkToInfinity(); + WindowFn windowFn = new GlobalWindows(); -DoFnWithTimers fn = new DoFnWithTimers(windowFn.windowCoder()); -DoFnRunner runner = -new SimpleDoFnRunner<>( -null, -fn, -NullSideInputReader.empty(), -null, -null, -Collections.emptyList(), -mockStepContext, -null, -Collections.emptyMap(), -WindowingStrategy.of(windowFn)); +DoFnWithTimers fn = new DoFnWithTimers<>(windowFn.windowCoder()); -Instant currentTime = new Instant(42); -Duration offset = Duration.millis(37); +PCollection output = +pipeline +.apply(testStream) +.apply(Window.into(new GlobalWindows())) +.apply(ParDo.of(fn)) + .setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder())); -// Mocking is not easily compatible with annotation analysis, so we manually record -// the method call. -runner.onTimer( -DoFnWithTimers.TIMER_ID, -GlobalWindow.INSTANCE, -currentTime.plus(offset), -TimeDomain.EVENT_TIME); - -assertThat( -fn.onTimerInvocations, -contains( +PAssert.that(output) +.containsInAnyOrder( TimerData.of( -DoFnWithTimers.TIMER_ID, +DoFnWithTimers.PROCESSING_TIMER_ID, StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), -currentTime.plus(offset), -TimeDomain.EVENT_TIME))); + currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1), +TimeDomain.PROCESSING_TIME), +TimerData.of( +DoFnWithTimers.EVENT_TIMER_ID, +StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), +currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET), +TimeDomain.EVENT_TIME)); + +pipeline.run(); + } + + /** + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. With {@link IntervalWindow}, we check behavior of emitted events + * when time is inside and outside of window boundaries. + */ + @Test + @Category(NeedsRunner.class) + public void testOnTimerCalledWithIntervalWindow() { + +// TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock +Instant baseTime = new Instant(0); + +Duration windowDuration = Duration.standardHours(1); +Duration windowLateness = Duration.standardMinutes(1); +IntervalWindow window = new IntervalWindow(baseTime, windowDuration); +FixedWindows windowFn = FixedWindows.of(windowDuration); +DoFnWithTimers fn = new DoFnWithTimers<>(windowFn.windowCoder()); + +TimestampedValue> event = +TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start()); + +TestStream> testStream = +TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) +// watermark in window, processing time far behind +.advanceWatermarkTo(window.sta
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=207058&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-207058 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 04/Mar/19 09:58 Start Date: 04/Mar/19 09:58 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#discussion_r261988974 ## File path: runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ## @@ -198,44 +218,185 @@ public void testFinishBundleExceptionsWrappedAsUserCodeException() { } /** - * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn}. + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. */ @Test - public void testOnTimerCalled() { + @Category(NeedsRunner.class) + public void testOnTimerCalledWithGlobalWindow() { + +// TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock +Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; +Instant currentEventTime = new Instant(42); + +TestStream> testStream = +TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) +.advanceWatermarkTo(currentEventTime) +.addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new Instant(99))) +.advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1)) +.advanceWatermarkToInfinity(); + WindowFn windowFn = new GlobalWindows(); -DoFnWithTimers fn = new DoFnWithTimers(windowFn.windowCoder()); -DoFnRunner runner = -new SimpleDoFnRunner<>( -null, -fn, -NullSideInputReader.empty(), -null, -null, -Collections.emptyList(), -mockStepContext, -null, -Collections.emptyMap(), -WindowingStrategy.of(windowFn)); +DoFnWithTimers fn = new DoFnWithTimers<>(windowFn.windowCoder()); -Instant currentTime = new Instant(42); -Duration offset = Duration.millis(37); +PCollection output = +pipeline +.apply(testStream) +.apply(Window.into(new GlobalWindows())) +.apply(ParDo.of(fn)) + .setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder())); -// Mocking is not easily compatible with annotation analysis, so we manually record -// the method call. -runner.onTimer( -DoFnWithTimers.TIMER_ID, -GlobalWindow.INSTANCE, -currentTime.plus(offset), -TimeDomain.EVENT_TIME); - -assertThat( -fn.onTimerInvocations, -contains( +PAssert.that(output) +.containsInAnyOrder( TimerData.of( -DoFnWithTimers.TIMER_ID, +DoFnWithTimers.PROCESSING_TIMER_ID, StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), -currentTime.plus(offset), -TimeDomain.EVENT_TIME))); + currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1), +TimeDomain.PROCESSING_TIME), +TimerData.of( +DoFnWithTimers.EVENT_TIMER_ID, +StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), +currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET), +TimeDomain.EVENT_TIME)); + +pipeline.run(); + } + + /** + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. With {@link IntervalWindow}, we check behavior of emitted events + * when time is inside and outside of window boundaries. + */ + @Test + @Category(NeedsRunner.class) + public void testOnTimerCalledWithIntervalWindow() { + +// TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock +Instant baseTime = new Instant(0); + +Duration windowDuration = Duration.standardHours(1); +Duration windowLateness = Duration.standardMinutes(1); +IntervalWindow window = new IntervalWindow(baseTime, windowDuration); +FixedWindows windowFn = FixedWindows.of(windowDuration); +DoFnWithTimers fn = new DoFnWithTimers<>(windowFn.windowCoder()); + +TimestampedValue> event = +TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start()); + +TestStream> testStream = +TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) +// watermark in window, processing time far behind +.advanceWatermarkTo(window.start()) +
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=157409&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-157409 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 23/Oct/18 07:31 Start Date: 23/Oct/18 07:31 Worklog Time Spent: 10m Work Description: JozoVilcek commented on issue #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#issuecomment-432128811 @kennknowles I have updated tests, hopefully on the right path. I am sorry for the delay. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 157409) Time Spent: 1h 20m (was: 1h 10m) > Can not test Timer with processing time domain > -- > > Key: BEAM-5265 > URL: https://issues.apache.org/jira/browse/BEAM-5265 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-direct >Reporter: Jozef Vilcek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While > writing tests, I noticed that it does not react to `advanceProcessingTime()` > on tests stream. Problem seems to be here: > [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260] > I can only tell that patching this place works for direct runner tests. Not > sure about broader impact on other runners since it is in `runner-core` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158275&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158275 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 24/Oct/18 19:23 Start Date: 24/Oct/18 19:23 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#issuecomment-432795256 It looks like all of your tests pass (the redness is from other stuff). What I mean is that your change to the effective timestamp in processing time will actually cause data to be dropped sometimes or outside the window other times. So you should be able to write a test that causes that your change to fail, but would pass without your change. If this doesn't seem like a fun exercise to you, that is OK also :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 158275) Time Spent: 1.5h (was: 1h 20m) > Can not test Timer with processing time domain > -- > > Key: BEAM-5265 > URL: https://issues.apache.org/jira/browse/BEAM-5265 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-direct >Reporter: Jozef Vilcek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While > writing tests, I noticed that it does not react to `advanceProcessingTime()` > on tests stream. Problem seems to be here: > [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260] > I can only tell that patching this place works for direct runner tests. Not > sure about broader impact on other runners since it is in `runner-core` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158692&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158692 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 25/Oct/18 13:36 Start Date: 25/Oct/18 13:36 Worklog Time Spent: 10m Work Description: JozoVilcek commented on issue #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#issuecomment-433053648 I am not sure if I am up to the fun :) Can not tell because I am not clear on what should be the test like. Which records can be dropped because of this change? My understanding of internals is rather shallow :/ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 158692) Time Spent: 1h 40m (was: 1.5h) > Can not test Timer with processing time domain > -- > > Key: BEAM-5265 > URL: https://issues.apache.org/jira/browse/BEAM-5265 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-direct >Reporter: Jozef Vilcek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While > writing tests, I noticed that it does not react to `advanceProcessingTime()` > on tests stream. Problem seems to be here: > [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260] > I can only tell that patching this place works for direct runner tests. Not > sure about broader impact on other runners since it is in `runner-core` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158696&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158696 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 25/Oct/18 13:42 Start Date: 25/Oct/18 13:42 Worklog Time Spent: 10m Work Description: JozoVilcek commented on issue #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#issuecomment-433056007 Do you mean that data can be dropped now when output from `onTimer` via `ctx.output(data)` instead of `ctx.outputWithTimestamp(data, timestamp)` becuse ctx will now use a correct processing time s it's timestamp? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 158696) Time Spent: 1h 50m (was: 1h 40m) > Can not test Timer with processing time domain > -- > > Key: BEAM-5265 > URL: https://issues.apache.org/jira/browse/BEAM-5265 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-direct >Reporter: Jozef Vilcek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While > writing tests, I noticed that it does not react to `advanceProcessingTime()` > on tests stream. Problem seems to be here: > [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260] > I can only tell that patching this place works for direct runner tests. Not > sure about broader impact on other runners since it is in `runner-core` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158697&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158697 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 25/Oct/18 13:43 Start Date: 25/Oct/18 13:43 Worklog Time Spent: 10m Work Description: JozoVilcek edited a comment on issue #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#issuecomment-433056007 Do you mean that data can be dropped now when output from `onTimer` via `ctx.output(data)` instead of `ctx.outputWithTimestamp(data, timestamp)` because ctx will now use a correct processing time s it's timestamp? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 158697) Time Spent: 2h (was: 1h 50m) > Can not test Timer with processing time domain > -- > > Key: BEAM-5265 > URL: https://issues.apache.org/jira/browse/BEAM-5265 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-direct >Reporter: Jozef Vilcek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While > writing tests, I noticed that it does not react to `advanceProcessingTime()` > on tests stream. Problem seems to be here: > [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260] > I can only tell that patching this place works for direct runner tests. Not > sure about broader impact on other runners since it is in `runner-core` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158700&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158700 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 25/Oct/18 13:48 Start Date: 25/Oct/18 13:48 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#issuecomment-433058029 The processing time can be arbitrarily far ahead of the watermark, or behind it. They are independent, even though in a lot of streaming applications they will be near each other. So if the watermark is behind processing time, you'll get timestamps that are outside the window and that is forbidden. If the watermark is ahead of processing time (more rare and weird probably, since you would be processing future events) then you get dropping. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 158700) Time Spent: 2h 10m (was: 2h) > Can not test Timer with processing time domain > -- > > Key: BEAM-5265 > URL: https://issues.apache.org/jira/browse/BEAM-5265 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-direct >Reporter: Jozef Vilcek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While > writing tests, I noticed that it does not react to `advanceProcessingTime()` > on tests stream. Problem seems to be here: > [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260] > I can only tell that patching this place works for direct runner tests. Not > sure about broader impact on other runners since it is in `runner-core` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158725&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158725 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 25/Oct/18 14:42 Start Date: 25/Oct/18 14:42 Worklog Time Spent: 10m Work Description: JozoVilcek commented on issue #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#issuecomment-433078519 Yes, makes sense. So in code it essentially means do a similar test case with IntervalWindow not just global and play with processing time being before / after watermark and inside / outside window boundary + outside of allowed lateness. Correct? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 158725) Time Spent: 2h 20m (was: 2h 10m) > Can not test Timer with processing time domain > -- > > Key: BEAM-5265 > URL: https://issues.apache.org/jira/browse/BEAM-5265 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-direct >Reporter: Jozef Vilcek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While > writing tests, I noticed that it does not react to `advanceProcessingTime()` > on tests stream. Problem seems to be here: > [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260] > I can only tell that patching this place works for direct runner tests. Not > sure about broader impact on other runners since it is in `runner-core` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158726&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158726 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 25/Oct/18 14:43 Start Date: 25/Oct/18 14:43 Worklog Time Spent: 10m Work Description: JozoVilcek commented on issue #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#issuecomment-433078861 Maybe before / after watermark does not matter that much This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 158726) Time Spent: 2.5h (was: 2h 20m) > Can not test Timer with processing time domain > -- > > Key: BEAM-5265 > URL: https://issues.apache.org/jira/browse/BEAM-5265 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-direct >Reporter: Jozef Vilcek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While > writing tests, I noticed that it does not react to `advanceProcessingTime()` > on tests stream. Problem seems to be here: > [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260] > I can only tell that patching this place works for direct runner tests. Not > sure about broader impact on other runners since it is in `runner-core` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158760&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158760 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 25/Oct/18 16:12 Start Date: 25/Oct/18 16:12 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#issuecomment-433112941 Yes, that's exactly what I was thinking of This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 158760) Time Spent: 2h 40m (was: 2.5h) > Can not test Timer with processing time domain > -- > > Key: BEAM-5265 > URL: https://issues.apache.org/jira/browse/BEAM-5265 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-direct >Reporter: Jozef Vilcek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While > writing tests, I noticed that it does not react to `advanceProcessingTime()` > on tests stream. Problem seems to be here: > [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260] > I can only tell that patching this place works for direct runner tests. Not > sure about broader impact on other runners since it is in `runner-core` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=159728&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-159728 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 28/Oct/18 14:32 Start Date: 28/Oct/18 14:32 Worklog Time Spent: 10m Work Description: JozoVilcek commented on issue #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#issuecomment-433710767 I am not sure if result of tests are in line with your expectations. It seems that processing time being out of window of fine as long as watermark is within the window and it's lateness. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 159728) Time Spent: 2h 50m (was: 2h 40m) > Can not test Timer with processing time domain > -- > > Key: BEAM-5265 > URL: https://issues.apache.org/jira/browse/BEAM-5265 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-direct >Reporter: Jozef Vilcek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While > writing tests, I noticed that it does not react to `advanceProcessingTime()` > on tests stream. Problem seems to be here: > [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260] > I can only tell that patching this place works for direct runner tests. Not > sure about broader impact on other runners since it is in `runner-core` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=159766&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-159766 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 28/Oct/18 20:46 Start Date: 28/Oct/18 20:46 Worklog Time Spent: 10m Work Description: kennknowles commented on a change in pull request #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#discussion_r228764446 ## File path: runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ## @@ -198,44 +218,185 @@ public void testFinishBundleExceptionsWrappedAsUserCodeException() { } /** - * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn}. + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. */ @Test - public void testOnTimerCalled() { + @Category(NeedsRunner.class) + public void testOnTimerCalledWithGlobalWindow() { + +// TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock +Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; +Instant currentEventTime = new Instant(42); + +TestStream> testStream = +TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) +.advanceWatermarkTo(currentEventTime) +.addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new Instant(99))) +.advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1)) +.advanceWatermarkToInfinity(); + WindowFn windowFn = new GlobalWindows(); -DoFnWithTimers fn = new DoFnWithTimers(windowFn.windowCoder()); -DoFnRunner runner = -new SimpleDoFnRunner<>( -null, -fn, -NullSideInputReader.empty(), -null, -null, -Collections.emptyList(), -mockStepContext, -null, -Collections.emptyMap(), -WindowingStrategy.of(windowFn)); +DoFnWithTimers fn = new DoFnWithTimers<>(windowFn.windowCoder()); -Instant currentTime = new Instant(42); -Duration offset = Duration.millis(37); +PCollection output = +pipeline +.apply(testStream) +.apply(Window.into(new GlobalWindows())) +.apply(ParDo.of(fn)) + .setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder())); -// Mocking is not easily compatible with annotation analysis, so we manually record -// the method call. -runner.onTimer( -DoFnWithTimers.TIMER_ID, -GlobalWindow.INSTANCE, -currentTime.plus(offset), -TimeDomain.EVENT_TIME); - -assertThat( -fn.onTimerInvocations, -contains( +PAssert.that(output) +.containsInAnyOrder( TimerData.of( -DoFnWithTimers.TIMER_ID, +DoFnWithTimers.PROCESSING_TIMER_ID, StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), -currentTime.plus(offset), -TimeDomain.EVENT_TIME))); + currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1), +TimeDomain.PROCESSING_TIME), +TimerData.of( +DoFnWithTimers.EVENT_TIMER_ID, +StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), +currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET), +TimeDomain.EVENT_TIME)); + +pipeline.run(); + } + + /** + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. With {@link IntervalWindow}, we check behavior of emitted events + * when time is inside and outside of window boundaries. + */ + @Test + @Category(NeedsRunner.class) + public void testOnTimerCalledWithIntervalWindow() { + +// TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock +Instant baseTime = new Instant(0); + +Duration windowDuration = Duration.standardHours(1); +Duration windowLateness = Duration.standardMinutes(1); +IntervalWindow window = new IntervalWindow(baseTime, windowDuration); +FixedWindows windowFn = FixedWindows.of(windowDuration); +DoFnWithTimers fn = new DoFnWithTimers<>(windowFn.windowCoder()); + +TimestampedValue> event = +TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start()); + +TestStream> testStream = +TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) +// watermark in window, processing time far behind +.advanceWatermarkTo(window.s
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=159882&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-159882 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 29/Oct/18 07:29 Start Date: 29/Oct/18 07:29 Worklog Time Spent: 10m Work Description: JozoVilcek commented on a change in pull request #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#discussion_r228820353 ## File path: runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ## @@ -198,44 +218,185 @@ public void testFinishBundleExceptionsWrappedAsUserCodeException() { } /** - * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn}. + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. */ @Test - public void testOnTimerCalled() { + @Category(NeedsRunner.class) + public void testOnTimerCalledWithGlobalWindow() { + +// TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock +Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; +Instant currentEventTime = new Instant(42); + +TestStream> testStream = +TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) +.advanceWatermarkTo(currentEventTime) +.addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new Instant(99))) +.advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1)) +.advanceWatermarkToInfinity(); + WindowFn windowFn = new GlobalWindows(); -DoFnWithTimers fn = new DoFnWithTimers(windowFn.windowCoder()); -DoFnRunner runner = -new SimpleDoFnRunner<>( -null, -fn, -NullSideInputReader.empty(), -null, -null, -Collections.emptyList(), -mockStepContext, -null, -Collections.emptyMap(), -WindowingStrategy.of(windowFn)); +DoFnWithTimers fn = new DoFnWithTimers<>(windowFn.windowCoder()); -Instant currentTime = new Instant(42); -Duration offset = Duration.millis(37); +PCollection output = +pipeline +.apply(testStream) +.apply(Window.into(new GlobalWindows())) +.apply(ParDo.of(fn)) + .setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder())); -// Mocking is not easily compatible with annotation analysis, so we manually record -// the method call. -runner.onTimer( -DoFnWithTimers.TIMER_ID, -GlobalWindow.INSTANCE, -currentTime.plus(offset), -TimeDomain.EVENT_TIME); - -assertThat( -fn.onTimerInvocations, -contains( +PAssert.that(output) +.containsInAnyOrder( TimerData.of( -DoFnWithTimers.TIMER_ID, +DoFnWithTimers.PROCESSING_TIMER_ID, StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), -currentTime.plus(offset), -TimeDomain.EVENT_TIME))); + currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1), +TimeDomain.PROCESSING_TIME), +TimerData.of( +DoFnWithTimers.EVENT_TIMER_ID, +StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), +currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET), +TimeDomain.EVENT_TIME)); + +pipeline.run(); + } + + /** + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. With {@link IntervalWindow}, we check behavior of emitted events + * when time is inside and outside of window boundaries. + */ + @Test + @Category(NeedsRunner.class) + public void testOnTimerCalledWithIntervalWindow() { + +// TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock +Instant baseTime = new Instant(0); + +Duration windowDuration = Duration.standardHours(1); +Duration windowLateness = Duration.standardMinutes(1); +IntervalWindow window = new IntervalWindow(baseTime, windowDuration); +FixedWindows windowFn = FixedWindows.of(windowDuration); +DoFnWithTimers fn = new DoFnWithTimers<>(windowFn.windowCoder()); + +TimestampedValue> event = +TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start()); + +TestStream> testStream = +TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) +// watermark in window, processing time far behind +.advanceWatermarkTo(window.sta
[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain
[ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=161888&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-161888 ] ASF GitHub Bot logged work on BEAM-5265: Author: ASF GitHub Bot Created on: 02/Nov/18 07:23 Start Date: 02/Nov/18 07:23 Worklog Time Spent: 10m Work Description: JozoVilcek commented on a change in pull request #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain URL: https://github.com/apache/beam/pull/6305#discussion_r230287609 ## File path: runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ## @@ -198,44 +218,185 @@ public void testFinishBundleExceptionsWrappedAsUserCodeException() { } /** - * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn}. + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. */ @Test - public void testOnTimerCalled() { + @Category(NeedsRunner.class) + public void testOnTimerCalledWithGlobalWindow() { + +// TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock +Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; +Instant currentEventTime = new Instant(42); + +TestStream> testStream = +TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) +.advanceWatermarkTo(currentEventTime) +.addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new Instant(99))) +.advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1)) +.advanceWatermarkToInfinity(); + WindowFn windowFn = new GlobalWindows(); -DoFnWithTimers fn = new DoFnWithTimers(windowFn.windowCoder()); -DoFnRunner runner = -new SimpleDoFnRunner<>( -null, -fn, -NullSideInputReader.empty(), -null, -null, -Collections.emptyList(), -mockStepContext, -null, -Collections.emptyMap(), -WindowingStrategy.of(windowFn)); +DoFnWithTimers fn = new DoFnWithTimers<>(windowFn.windowCoder()); -Instant currentTime = new Instant(42); -Duration offset = Duration.millis(37); +PCollection output = +pipeline +.apply(testStream) +.apply(Window.into(new GlobalWindows())) +.apply(ParDo.of(fn)) + .setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder())); -// Mocking is not easily compatible with annotation analysis, so we manually record -// the method call. -runner.onTimer( -DoFnWithTimers.TIMER_ID, -GlobalWindow.INSTANCE, -currentTime.plus(offset), -TimeDomain.EVENT_TIME); - -assertThat( -fn.onTimerInvocations, -contains( +PAssert.that(output) +.containsInAnyOrder( TimerData.of( -DoFnWithTimers.TIMER_ID, +DoFnWithTimers.PROCESSING_TIMER_ID, StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), -currentTime.plus(offset), -TimeDomain.EVENT_TIME))); + currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1), +TimeDomain.PROCESSING_TIME), +TimerData.of( +DoFnWithTimers.EVENT_TIMER_ID, +StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), +currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET), +TimeDomain.EVENT_TIME)); + +pipeline.run(); + } + + /** + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn} + * on appropriate time domains. With {@link IntervalWindow}, we check behavior of emitted events + * when time is inside and outside of window boundaries. + */ + @Test + @Category(NeedsRunner.class) + public void testOnTimerCalledWithIntervalWindow() { + +// TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock +Instant baseTime = new Instant(0); + +Duration windowDuration = Duration.standardHours(1); +Duration windowLateness = Duration.standardMinutes(1); +IntervalWindow window = new IntervalWindow(baseTime, windowDuration); +FixedWindows windowFn = FixedWindows.of(windowDuration); +DoFnWithTimers fn = new DoFnWithTimers<>(windowFn.windowCoder()); + +TimestampedValue> event = +TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start()); + +TestStream> testStream = +TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) +// watermark in window, processing time far behind +.advanceWatermarkTo(window.sta