[ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=159766&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-159766
 ]

ASF GitHub Bot logged work on BEAM-5265:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Oct/18 20:46
            Start Date: 28/Oct/18 20:46
    Worklog Time Spent: 10m 
      Work Description: kennknowles commented on a change in pull request 
#6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time 
domain
URL: https://github.com/apache/beam/pull/6305#discussion_r228764446
 
 

 ##########
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 ##########
 @@ -198,44 +218,185 @@ public void 
testFinishBundleExceptionsWrappedAsUserCodeException() {
   }
 
   /**
-   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}.
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains.
    */
   @Test
-  public void testOnTimerCalled() {
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithGlobalWindow() {
+
+    // TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+    Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    Instant currentEventTime = new Instant(42);
+
+    TestStream<KV<String, String>> testStream =
+        TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+            .advanceWatermarkTo(currentEventTime)
+            .addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new 
Instant(99)))
+            .advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1))
+            .advanceWatermarkToInfinity();
+
     WindowFn<?, GlobalWindow> windowFn = new GlobalWindows();
-    DoFnWithTimers<GlobalWindow> fn = new 
DoFnWithTimers(windowFn.windowCoder());
-    DoFnRunner<String, String> runner =
-        new SimpleDoFnRunner<>(
-            null,
-            fn,
-            NullSideInputReader.empty(),
-            null,
-            null,
-            Collections.emptyList(),
-            mockStepContext,
-            null,
-            Collections.emptyMap(),
-            WindowingStrategy.of(windowFn));
+    DoFnWithTimers<GlobalWindow> fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
 
-    Instant currentTime = new Instant(42);
-    Duration offset = Duration.millis(37);
+    PCollection<TimerData> output =
+        pipeline
+            .apply(testStream)
+            .apply(Window.into(new GlobalWindows()))
+            .apply(ParDo.of(fn))
+            
.setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder()));
 
-    // Mocking is not easily compatible with annotation analysis, so we 
manually record
-    // the method call.
-    runner.onTimer(
-        DoFnWithTimers.TIMER_ID,
-        GlobalWindow.INSTANCE,
-        currentTime.plus(offset),
-        TimeDomain.EVENT_TIME);
-
-    assertThat(
-        fn.onTimerInvocations,
-        contains(
+    PAssert.that(output)
+        .containsInAnyOrder(
             TimerData.of(
-                DoFnWithTimers.TIMER_ID,
+                DoFnWithTimers.PROCESSING_TIMER_ID,
                 StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
-                currentTime.plus(offset),
-                TimeDomain.EVENT_TIME)));
+                
currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1),
+                TimeDomain.PROCESSING_TIME),
+            TimerData.of(
+                DoFnWithTimers.EVENT_TIMER_ID,
+                StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
+                currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET),
+                TimeDomain.EVENT_TIME));
+
+    pipeline.run();
+  }
+
+  /**
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains. With {@link IntervalWindow}, we check 
behavior of emitted events
+   * when time is inside and outside of window boundaries.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithIntervalWindow() {
+
+    // TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+    Instant baseTime = new Instant(0);
+
+    Duration windowDuration = Duration.standardHours(1);
+    Duration windowLateness = Duration.standardMinutes(1);
+    IntervalWindow window = new IntervalWindow(baseTime, windowDuration);
+    FixedWindows windowFn = FixedWindows.of(windowDuration);
+    DoFnWithTimers<IntervalWindow> fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
+
+    TimestampedValue<KV<String, String>> event =
+        TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start());
+
+    TestStream<KV<String, String>> testStream =
+        TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+            // watermark in window, processing time far behind
+            .advanceWatermarkTo(window.start())
+            .addElements(event)
+            .advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1))
+            
.advanceWatermarkTo(window.start().plus(DoFnWithTimers.TIMER_OFFSET).plus(1))
+
+            // watermark and processing time within window
+            .advanceProcessingTime(
+                
Duration.millis(Math.abs(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis())))
+            .addElements(event)
+            .advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1))
+            .advanceWatermarkTo(
+                window.start().plus(2 * 
DoFnWithTimers.TIMER_OFFSET.getMillis()).plus(2))
+
+            // watermark in window, processing time ahead of window.end() but 
within lateness
+            .advanceProcessingTime(windowDuration)
+            .addElements(event)
+            .advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1))
+            .advanceWatermarkTo(
+                window.start().plus(3 * 
DoFnWithTimers.TIMER_OFFSET.getMillis()).plus(3))
+
+            // watermark in window, processing time  is out of window's 
allowed lateness
 
 Review comment:
   If this doesn't cause a crash, then we have a bug!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 159766)
    Time Spent: 3h  (was: 2h 50m)

> Can not test Timer with processing time domain
> ----------------------------------------------
>
>                 Key: BEAM-5265
>                 URL: https://issues.apache.org/jira/browse/BEAM-5265
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core, runner-direct
>            Reporter: Jozef Vilcek
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to