[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2019-02-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=193698&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193698
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 02/Feb/19 18:01
Start Date: 02/Feb/19 18:01
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #6305: [BEAM-5265] Use 
currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#issuecomment-459985504
 
 
   This pull request has been marked as stale due to 60 days of inactivity. It 
will be closed in 1 week if no further activity occurs. If you think that’s 
incorrect or this pull request requires a review, please simply write any 
comment. If closed, you can revive the PR at any time and @mention a reviewer 
or discuss it on the d...@beam.apache.org list. Thank you for your 
contributions.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 193698)
Time Spent: 4h  (was: 3h 50m)

> Can not test Timer with processing time domain
> --
>
> Key: BEAM-5265
> URL: https://issues.apache.org/jira/browse/BEAM-5265
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-direct
>Reporter: Jozef Vilcek
>Assignee: Jozef Vilcek
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2019-02-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=196607&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196607
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 09/Feb/19 18:50
Start Date: 09/Feb/19 18:50
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on pull request #6305: [BEAM-5265] 
Use currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 196607)
Time Spent: 4h 20m  (was: 4h 10m)

> Can not test Timer with processing time domain
> --
>
> Key: BEAM-5265
> URL: https://issues.apache.org/jira/browse/BEAM-5265
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-direct
>Reporter: Jozef Vilcek
>Assignee: Jozef Vilcek
>Priority: Major
>  Labels: triaged
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2019-02-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=196606&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196606
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 09/Feb/19 18:50
Start Date: 09/Feb/19 18:50
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #6305: [BEAM-5265] Use 
currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#issuecomment-462069491
 
 
   This pull request has been closed due to lack of activity. If you think that 
is incorrect, or the pull request requires review, you can revive the PR at any 
time.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 196606)
Time Spent: 4h 10m  (was: 4h)

> Can not test Timer with processing time domain
> --
>
> Key: BEAM-5265
> URL: https://issues.apache.org/jira/browse/BEAM-5265
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-direct
>Reporter: Jozef Vilcek
>Assignee: Jozef Vilcek
>Priority: Major
>  Labels: triaged
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-11-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=168724&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-168724
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 22/Nov/18 09:34
Start Date: 22/Nov/18 09:34
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on a change in pull request #6305: 
[BEAM-5265] Use currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#discussion_r235655236
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 ##
 @@ -198,44 +218,185 @@ public void 
testFinishBundleExceptionsWrappedAsUserCodeException() {
   }
 
   /**
-   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}.
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains.
*/
   @Test
-  public void testOnTimerCalled() {
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithGlobalWindow() {
+
+// TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+Instant currentEventTime = new Instant(42);
+
+TestStream> testStream =
+TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+.advanceWatermarkTo(currentEventTime)
+.addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new 
Instant(99)))
+.advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1))
+.advanceWatermarkToInfinity();
+
 WindowFn windowFn = new GlobalWindows();
-DoFnWithTimers fn = new 
DoFnWithTimers(windowFn.windowCoder());
-DoFnRunner runner =
-new SimpleDoFnRunner<>(
-null,
-fn,
-NullSideInputReader.empty(),
-null,
-null,
-Collections.emptyList(),
-mockStepContext,
-null,
-Collections.emptyMap(),
-WindowingStrategy.of(windowFn));
+DoFnWithTimers fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
 
-Instant currentTime = new Instant(42);
-Duration offset = Duration.millis(37);
+PCollection output =
+pipeline
+.apply(testStream)
+.apply(Window.into(new GlobalWindows()))
+.apply(ParDo.of(fn))
+
.setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder()));
 
-// Mocking is not easily compatible with annotation analysis, so we 
manually record
-// the method call.
-runner.onTimer(
-DoFnWithTimers.TIMER_ID,
-GlobalWindow.INSTANCE,
-currentTime.plus(offset),
-TimeDomain.EVENT_TIME);
-
-assertThat(
-fn.onTimerInvocations,
-contains(
+PAssert.that(output)
+.containsInAnyOrder(
 TimerData.of(
-DoFnWithTimers.TIMER_ID,
+DoFnWithTimers.PROCESSING_TIMER_ID,
 StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
-currentTime.plus(offset),
-TimeDomain.EVENT_TIME)));
+
currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1),
+TimeDomain.PROCESSING_TIME),
+TimerData.of(
+DoFnWithTimers.EVENT_TIMER_ID,
+StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
+currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET),
+TimeDomain.EVENT_TIME));
+
+pipeline.run();
+  }
+
+  /**
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains. With {@link IntervalWindow}, we check 
behavior of emitted events
+   * when time is inside and outside of window boundaries.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithIntervalWindow() {
+
+// TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+Instant baseTime = new Instant(0);
+
+Duration windowDuration = Duration.standardHours(1);
+Duration windowLateness = Duration.standardMinutes(1);
+IntervalWindow window = new IntervalWindow(baseTime, windowDuration);
+FixedWindows windowFn = FixedWindows.of(windowDuration);
+DoFnWithTimers fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
+
+TimestampedValue> event =
+TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start());
+
+TestStream> testStream =
+TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+// watermark in window, processing time far behind
+.advanceWatermarkTo(window.sta

[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-11-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=170009&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170009
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 27/Nov/18 21:48
Start Date: 27/Nov/18 21:48
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on a change in pull request 
#6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time 
domain
URL: https://github.com/apache/beam/pull/6305#discussion_r236859849
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 ##
 @@ -198,44 +218,185 @@ public void 
testFinishBundleExceptionsWrappedAsUserCodeException() {
   }
 
   /**
-   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}.
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains.
*/
   @Test
-  public void testOnTimerCalled() {
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithGlobalWindow() {
+
+// TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+Instant currentEventTime = new Instant(42);
+
+TestStream> testStream =
+TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+.advanceWatermarkTo(currentEventTime)
+.addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new 
Instant(99)))
+.advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1))
+.advanceWatermarkToInfinity();
+
 WindowFn windowFn = new GlobalWindows();
-DoFnWithTimers fn = new 
DoFnWithTimers(windowFn.windowCoder());
-DoFnRunner runner =
-new SimpleDoFnRunner<>(
-null,
-fn,
-NullSideInputReader.empty(),
-null,
-null,
-Collections.emptyList(),
-mockStepContext,
-null,
-Collections.emptyMap(),
-WindowingStrategy.of(windowFn));
+DoFnWithTimers fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
 
-Instant currentTime = new Instant(42);
-Duration offset = Duration.millis(37);
+PCollection output =
+pipeline
+.apply(testStream)
+.apply(Window.into(new GlobalWindows()))
+.apply(ParDo.of(fn))
+
.setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder()));
 
-// Mocking is not easily compatible with annotation analysis, so we 
manually record
-// the method call.
-runner.onTimer(
-DoFnWithTimers.TIMER_ID,
-GlobalWindow.INSTANCE,
-currentTime.plus(offset),
-TimeDomain.EVENT_TIME);
-
-assertThat(
-fn.onTimerInvocations,
-contains(
+PAssert.that(output)
+.containsInAnyOrder(
 TimerData.of(
-DoFnWithTimers.TIMER_ID,
+DoFnWithTimers.PROCESSING_TIMER_ID,
 StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
-currentTime.plus(offset),
-TimeDomain.EVENT_TIME)));
+
currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1),
+TimeDomain.PROCESSING_TIME),
+TimerData.of(
+DoFnWithTimers.EVENT_TIMER_ID,
+StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
+currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET),
+TimeDomain.EVENT_TIME));
+
+pipeline.run();
+  }
+
+  /**
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains. With {@link IntervalWindow}, we check 
behavior of emitted events
+   * when time is inside and outside of window boundaries.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithIntervalWindow() {
+
+// TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+Instant baseTime = new Instant(0);
+
+Duration windowDuration = Duration.standardHours(1);
+Duration windowLateness = Duration.standardMinutes(1);
+IntervalWindow window = new IntervalWindow(baseTime, windowDuration);
+FixedWindows windowFn = FixedWindows.of(windowDuration);
+DoFnWithTimers fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
+
+TimestampedValue> event =
+TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start());
+
+TestStream> testStream =
+TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+// watermark in window, processing time far behind
+.advanceWatermarkTo(window.s

[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-12-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=171376&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-171376
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 02/Dec/18 15:05
Start Date: 02/Dec/18 15:05
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on a change in pull request #6305: 
[BEAM-5265] Use currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#discussion_r238102116
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 ##
 @@ -198,44 +218,185 @@ public void 
testFinishBundleExceptionsWrappedAsUserCodeException() {
   }
 
   /**
-   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}.
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains.
*/
   @Test
-  public void testOnTimerCalled() {
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithGlobalWindow() {
+
+// TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+Instant currentEventTime = new Instant(42);
+
+TestStream> testStream =
+TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+.advanceWatermarkTo(currentEventTime)
+.addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new 
Instant(99)))
+.advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1))
+.advanceWatermarkToInfinity();
+
 WindowFn windowFn = new GlobalWindows();
-DoFnWithTimers fn = new 
DoFnWithTimers(windowFn.windowCoder());
-DoFnRunner runner =
-new SimpleDoFnRunner<>(
-null,
-fn,
-NullSideInputReader.empty(),
-null,
-null,
-Collections.emptyList(),
-mockStepContext,
-null,
-Collections.emptyMap(),
-WindowingStrategy.of(windowFn));
+DoFnWithTimers fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
 
-Instant currentTime = new Instant(42);
-Duration offset = Duration.millis(37);
+PCollection output =
+pipeline
+.apply(testStream)
+.apply(Window.into(new GlobalWindows()))
+.apply(ParDo.of(fn))
+
.setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder()));
 
-// Mocking is not easily compatible with annotation analysis, so we 
manually record
-// the method call.
-runner.onTimer(
-DoFnWithTimers.TIMER_ID,
-GlobalWindow.INSTANCE,
-currentTime.plus(offset),
-TimeDomain.EVENT_TIME);
-
-assertThat(
-fn.onTimerInvocations,
-contains(
+PAssert.that(output)
+.containsInAnyOrder(
 TimerData.of(
-DoFnWithTimers.TIMER_ID,
+DoFnWithTimers.PROCESSING_TIMER_ID,
 StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
-currentTime.plus(offset),
-TimeDomain.EVENT_TIME)));
+
currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1),
+TimeDomain.PROCESSING_TIME),
+TimerData.of(
+DoFnWithTimers.EVENT_TIMER_ID,
+StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
+currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET),
+TimeDomain.EVENT_TIME));
+
+pipeline.run();
+  }
+
+  /**
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains. With {@link IntervalWindow}, we check 
behavior of emitted events
+   * when time is inside and outside of window boundaries.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithIntervalWindow() {
+
+// TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+Instant baseTime = new Instant(0);
+
+Duration windowDuration = Duration.standardHours(1);
+Duration windowLateness = Duration.standardMinutes(1);
+IntervalWindow window = new IntervalWindow(baseTime, windowDuration);
+FixedWindows windowFn = FixedWindows.of(windowDuration);
+DoFnWithTimers fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
+
+TimestampedValue> event =
+TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start());
+
+TestStream> testStream =
+TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+// watermark in window, processing time far behind
+.advanceWatermarkTo(window.sta

[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2019-03-04 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=207058&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-207058
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 04/Mar/19 09:58
Start Date: 04/Mar/19 09:58
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #6305: [BEAM-5265] 
Use currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#discussion_r261988974
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 ##
 @@ -198,44 +218,185 @@ public void 
testFinishBundleExceptionsWrappedAsUserCodeException() {
   }
 
   /**
-   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}.
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains.
*/
   @Test
-  public void testOnTimerCalled() {
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithGlobalWindow() {
+
+// TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+Instant currentEventTime = new Instant(42);
+
+TestStream> testStream =
+TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+.advanceWatermarkTo(currentEventTime)
+.addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new 
Instant(99)))
+.advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1))
+.advanceWatermarkToInfinity();
+
 WindowFn windowFn = new GlobalWindows();
-DoFnWithTimers fn = new 
DoFnWithTimers(windowFn.windowCoder());
-DoFnRunner runner =
-new SimpleDoFnRunner<>(
-null,
-fn,
-NullSideInputReader.empty(),
-null,
-null,
-Collections.emptyList(),
-mockStepContext,
-null,
-Collections.emptyMap(),
-WindowingStrategy.of(windowFn));
+DoFnWithTimers fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
 
-Instant currentTime = new Instant(42);
-Duration offset = Duration.millis(37);
+PCollection output =
+pipeline
+.apply(testStream)
+.apply(Window.into(new GlobalWindows()))
+.apply(ParDo.of(fn))
+
.setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder()));
 
-// Mocking is not easily compatible with annotation analysis, so we 
manually record
-// the method call.
-runner.onTimer(
-DoFnWithTimers.TIMER_ID,
-GlobalWindow.INSTANCE,
-currentTime.plus(offset),
-TimeDomain.EVENT_TIME);
-
-assertThat(
-fn.onTimerInvocations,
-contains(
+PAssert.that(output)
+.containsInAnyOrder(
 TimerData.of(
-DoFnWithTimers.TIMER_ID,
+DoFnWithTimers.PROCESSING_TIMER_ID,
 StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
-currentTime.plus(offset),
-TimeDomain.EVENT_TIME)));
+
currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1),
+TimeDomain.PROCESSING_TIME),
+TimerData.of(
+DoFnWithTimers.EVENT_TIMER_ID,
+StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
+currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET),
+TimeDomain.EVENT_TIME));
+
+pipeline.run();
+  }
+
+  /**
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains. With {@link IntervalWindow}, we check 
behavior of emitted events
+   * when time is inside and outside of window boundaries.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithIntervalWindow() {
+
+// TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+Instant baseTime = new Instant(0);
+
+Duration windowDuration = Duration.standardHours(1);
+Duration windowLateness = Duration.standardMinutes(1);
+IntervalWindow window = new IntervalWindow(baseTime, windowDuration);
+FixedWindows windowFn = FixedWindows.of(windowDuration);
+DoFnWithTimers fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
+
+TimestampedValue> event =
+TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start());
+
+TestStream> testStream =
+TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+// watermark in window, processing time far behind
+.advanceWatermarkTo(window.start())
+ 

[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-10-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=157409&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-157409
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 23/Oct/18 07:31
Start Date: 23/Oct/18 07:31
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on issue #6305: [BEAM-5265] Use 
currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#issuecomment-432128811
 
 
   @kennknowles I have updated tests, hopefully on the right path. I am sorry 
for the delay. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 157409)
Time Spent: 1h 20m  (was: 1h 10m)

> Can not test Timer with processing time domain
> --
>
> Key: BEAM-5265
> URL: https://issues.apache.org/jira/browse/BEAM-5265
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-direct
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-10-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158275&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158275
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 24/Oct/18 19:23
Start Date: 24/Oct/18 19:23
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #6305: [BEAM-5265] Use 
currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#issuecomment-432795256
 
 
   It looks like all of your tests pass (the redness is from other stuff). What 
I mean is that your change to the effective timestamp in processing time will 
actually cause data to be dropped sometimes or outside the window other times. 
So you should be able to write a test that causes that your change to fail, but 
would pass without your change. If this doesn't seem like a fun exercise to 
you, that is OK also :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 158275)
Time Spent: 1.5h  (was: 1h 20m)

> Can not test Timer with processing time domain
> --
>
> Key: BEAM-5265
> URL: https://issues.apache.org/jira/browse/BEAM-5265
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-direct
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-10-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158692&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158692
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 25/Oct/18 13:36
Start Date: 25/Oct/18 13:36
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on issue #6305: [BEAM-5265] Use 
currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#issuecomment-433053648
 
 
   I am not sure if I am up to the fun :) Can not tell because I am not clear 
on what should be the test like. Which records can be dropped because of this 
change? My understanding of internals is rather shallow :/


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 158692)
Time Spent: 1h 40m  (was: 1.5h)

> Can not test Timer with processing time domain
> --
>
> Key: BEAM-5265
> URL: https://issues.apache.org/jira/browse/BEAM-5265
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-direct
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-10-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158696&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158696
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 25/Oct/18 13:42
Start Date: 25/Oct/18 13:42
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on issue #6305: [BEAM-5265] Use 
currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#issuecomment-433056007
 
 
   Do you mean that data can be dropped now when output from `onTimer` via 
`ctx.output(data)` instead of `ctx.outputWithTimestamp(data, timestamp)` becuse 
ctx will now use a correct processing time s it's timestamp?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 158696)
Time Spent: 1h 50m  (was: 1h 40m)

> Can not test Timer with processing time domain
> --
>
> Key: BEAM-5265
> URL: https://issues.apache.org/jira/browse/BEAM-5265
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-direct
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-10-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158697&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158697
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 25/Oct/18 13:43
Start Date: 25/Oct/18 13:43
Worklog Time Spent: 10m 
  Work Description: JozoVilcek edited a comment on issue #6305: [BEAM-5265] 
Use currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#issuecomment-433056007
 
 
   Do you mean that data can be dropped now when output from `onTimer` via 
`ctx.output(data)` instead of `ctx.outputWithTimestamp(data, timestamp)` 
because ctx will now use a correct processing time s it's timestamp?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 158697)
Time Spent: 2h  (was: 1h 50m)

> Can not test Timer with processing time domain
> --
>
> Key: BEAM-5265
> URL: https://issues.apache.org/jira/browse/BEAM-5265
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-direct
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-10-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158700&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158700
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 25/Oct/18 13:48
Start Date: 25/Oct/18 13:48
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #6305: [BEAM-5265] Use 
currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#issuecomment-433058029
 
 
   The processing time can be arbitrarily far ahead of the watermark, or behind 
it. They are independent, even though in a lot of streaming applications they 
will be near each other.
   
   So if the watermark is behind processing time, you'll get timestamps that 
are outside the window and that is forbidden. If the watermark is ahead of 
processing time (more rare and weird probably, since you would be processing 
future events) then you get dropping.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 158700)
Time Spent: 2h 10m  (was: 2h)

> Can not test Timer with processing time domain
> --
>
> Key: BEAM-5265
> URL: https://issues.apache.org/jira/browse/BEAM-5265
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-direct
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-10-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158725&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158725
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 25/Oct/18 14:42
Start Date: 25/Oct/18 14:42
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on issue #6305: [BEAM-5265] Use 
currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#issuecomment-433078519
 
 
   Yes, makes sense.
   So in code it essentially means do a similar test case with IntervalWindow 
not just global and play with processing time being before / after watermark 
and inside / outside window boundary + outside of allowed lateness. Correct?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 158725)
Time Spent: 2h 20m  (was: 2h 10m)

> Can not test Timer with processing time domain
> --
>
> Key: BEAM-5265
> URL: https://issues.apache.org/jira/browse/BEAM-5265
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-direct
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-10-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158726&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158726
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 25/Oct/18 14:43
Start Date: 25/Oct/18 14:43
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on issue #6305: [BEAM-5265] Use 
currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#issuecomment-433078861
 
 
   Maybe before / after watermark does not matter that much


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 158726)
Time Spent: 2.5h  (was: 2h 20m)

> Can not test Timer with processing time domain
> --
>
> Key: BEAM-5265
> URL: https://issues.apache.org/jira/browse/BEAM-5265
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-direct
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-10-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=158760&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158760
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 25/Oct/18 16:12
Start Date: 25/Oct/18 16:12
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #6305: [BEAM-5265] Use 
currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#issuecomment-433112941
 
 
   Yes, that's exactly what I was thinking of


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 158760)
Time Spent: 2h 40m  (was: 2.5h)

> Can not test Timer with processing time domain
> --
>
> Key: BEAM-5265
> URL: https://issues.apache.org/jira/browse/BEAM-5265
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-direct
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-10-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=159728&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-159728
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 28/Oct/18 14:32
Start Date: 28/Oct/18 14:32
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on issue #6305: [BEAM-5265] Use 
currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#issuecomment-433710767
 
 
   I am not sure if result of tests are in line with your expectations. It 
seems that processing time being out of window of fine as long as watermark is 
within the window and it's lateness.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 159728)
Time Spent: 2h 50m  (was: 2h 40m)

> Can not test Timer with processing time domain
> --
>
> Key: BEAM-5265
> URL: https://issues.apache.org/jira/browse/BEAM-5265
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-direct
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-10-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=159766&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-159766
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 28/Oct/18 20:46
Start Date: 28/Oct/18 20:46
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on a change in pull request 
#6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time 
domain
URL: https://github.com/apache/beam/pull/6305#discussion_r228764446
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 ##
 @@ -198,44 +218,185 @@ public void 
testFinishBundleExceptionsWrappedAsUserCodeException() {
   }
 
   /**
-   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}.
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains.
*/
   @Test
-  public void testOnTimerCalled() {
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithGlobalWindow() {
+
+// TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+Instant currentEventTime = new Instant(42);
+
+TestStream> testStream =
+TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+.advanceWatermarkTo(currentEventTime)
+.addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new 
Instant(99)))
+.advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1))
+.advanceWatermarkToInfinity();
+
 WindowFn windowFn = new GlobalWindows();
-DoFnWithTimers fn = new 
DoFnWithTimers(windowFn.windowCoder());
-DoFnRunner runner =
-new SimpleDoFnRunner<>(
-null,
-fn,
-NullSideInputReader.empty(),
-null,
-null,
-Collections.emptyList(),
-mockStepContext,
-null,
-Collections.emptyMap(),
-WindowingStrategy.of(windowFn));
+DoFnWithTimers fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
 
-Instant currentTime = new Instant(42);
-Duration offset = Duration.millis(37);
+PCollection output =
+pipeline
+.apply(testStream)
+.apply(Window.into(new GlobalWindows()))
+.apply(ParDo.of(fn))
+
.setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder()));
 
-// Mocking is not easily compatible with annotation analysis, so we 
manually record
-// the method call.
-runner.onTimer(
-DoFnWithTimers.TIMER_ID,
-GlobalWindow.INSTANCE,
-currentTime.plus(offset),
-TimeDomain.EVENT_TIME);
-
-assertThat(
-fn.onTimerInvocations,
-contains(
+PAssert.that(output)
+.containsInAnyOrder(
 TimerData.of(
-DoFnWithTimers.TIMER_ID,
+DoFnWithTimers.PROCESSING_TIMER_ID,
 StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
-currentTime.plus(offset),
-TimeDomain.EVENT_TIME)));
+
currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1),
+TimeDomain.PROCESSING_TIME),
+TimerData.of(
+DoFnWithTimers.EVENT_TIMER_ID,
+StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
+currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET),
+TimeDomain.EVENT_TIME));
+
+pipeline.run();
+  }
+
+  /**
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains. With {@link IntervalWindow}, we check 
behavior of emitted events
+   * when time is inside and outside of window boundaries.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithIntervalWindow() {
+
+// TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+Instant baseTime = new Instant(0);
+
+Duration windowDuration = Duration.standardHours(1);
+Duration windowLateness = Duration.standardMinutes(1);
+IntervalWindow window = new IntervalWindow(baseTime, windowDuration);
+FixedWindows windowFn = FixedWindows.of(windowDuration);
+DoFnWithTimers fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
+
+TimestampedValue> event =
+TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start());
+
+TestStream> testStream =
+TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+// watermark in window, processing time far behind
+.advanceWatermarkTo(window.s

[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-10-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=159882&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-159882
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 29/Oct/18 07:29
Start Date: 29/Oct/18 07:29
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on a change in pull request #6305: 
[BEAM-5265] Use currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#discussion_r228820353
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 ##
 @@ -198,44 +218,185 @@ public void 
testFinishBundleExceptionsWrappedAsUserCodeException() {
   }
 
   /**
-   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}.
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains.
*/
   @Test
-  public void testOnTimerCalled() {
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithGlobalWindow() {
+
+// TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+Instant currentEventTime = new Instant(42);
+
+TestStream> testStream =
+TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+.advanceWatermarkTo(currentEventTime)
+.addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new 
Instant(99)))
+.advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1))
+.advanceWatermarkToInfinity();
+
 WindowFn windowFn = new GlobalWindows();
-DoFnWithTimers fn = new 
DoFnWithTimers(windowFn.windowCoder());
-DoFnRunner runner =
-new SimpleDoFnRunner<>(
-null,
-fn,
-NullSideInputReader.empty(),
-null,
-null,
-Collections.emptyList(),
-mockStepContext,
-null,
-Collections.emptyMap(),
-WindowingStrategy.of(windowFn));
+DoFnWithTimers fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
 
-Instant currentTime = new Instant(42);
-Duration offset = Duration.millis(37);
+PCollection output =
+pipeline
+.apply(testStream)
+.apply(Window.into(new GlobalWindows()))
+.apply(ParDo.of(fn))
+
.setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder()));
 
-// Mocking is not easily compatible with annotation analysis, so we 
manually record
-// the method call.
-runner.onTimer(
-DoFnWithTimers.TIMER_ID,
-GlobalWindow.INSTANCE,
-currentTime.plus(offset),
-TimeDomain.EVENT_TIME);
-
-assertThat(
-fn.onTimerInvocations,
-contains(
+PAssert.that(output)
+.containsInAnyOrder(
 TimerData.of(
-DoFnWithTimers.TIMER_ID,
+DoFnWithTimers.PROCESSING_TIMER_ID,
 StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
-currentTime.plus(offset),
-TimeDomain.EVENT_TIME)));
+
currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1),
+TimeDomain.PROCESSING_TIME),
+TimerData.of(
+DoFnWithTimers.EVENT_TIMER_ID,
+StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
+currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET),
+TimeDomain.EVENT_TIME));
+
+pipeline.run();
+  }
+
+  /**
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains. With {@link IntervalWindow}, we check 
behavior of emitted events
+   * when time is inside and outside of window boundaries.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithIntervalWindow() {
+
+// TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+Instant baseTime = new Instant(0);
+
+Duration windowDuration = Duration.standardHours(1);
+Duration windowLateness = Duration.standardMinutes(1);
+IntervalWindow window = new IntervalWindow(baseTime, windowDuration);
+FixedWindows windowFn = FixedWindows.of(windowDuration);
+DoFnWithTimers fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
+
+TimestampedValue> event =
+TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start());
+
+TestStream> testStream =
+TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+// watermark in window, processing time far behind
+.advanceWatermarkTo(window.sta

[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

2018-11-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=161888&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-161888
 ]

ASF GitHub Bot logged work on BEAM-5265:


Author: ASF GitHub Bot
Created on: 02/Nov/18 07:23
Start Date: 02/Nov/18 07:23
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on a change in pull request #6305: 
[BEAM-5265] Use currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#discussion_r230287609
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 ##
 @@ -198,44 +218,185 @@ public void 
testFinishBundleExceptionsWrappedAsUserCodeException() {
   }
 
   /**
-   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}.
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains.
*/
   @Test
-  public void testOnTimerCalled() {
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithGlobalWindow() {
+
+// TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+Instant currentEventTime = new Instant(42);
+
+TestStream> testStream =
+TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+.advanceWatermarkTo(currentEventTime)
+.addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new 
Instant(99)))
+.advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1))
+.advanceWatermarkToInfinity();
+
 WindowFn windowFn = new GlobalWindows();
-DoFnWithTimers fn = new 
DoFnWithTimers(windowFn.windowCoder());
-DoFnRunner runner =
-new SimpleDoFnRunner<>(
-null,
-fn,
-NullSideInputReader.empty(),
-null,
-null,
-Collections.emptyList(),
-mockStepContext,
-null,
-Collections.emptyMap(),
-WindowingStrategy.of(windowFn));
+DoFnWithTimers fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
 
-Instant currentTime = new Instant(42);
-Duration offset = Duration.millis(37);
+PCollection output =
+pipeline
+.apply(testStream)
+.apply(Window.into(new GlobalWindows()))
+.apply(ParDo.of(fn))
+
.setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder()));
 
-// Mocking is not easily compatible with annotation analysis, so we 
manually record
-// the method call.
-runner.onTimer(
-DoFnWithTimers.TIMER_ID,
-GlobalWindow.INSTANCE,
-currentTime.plus(offset),
-TimeDomain.EVENT_TIME);
-
-assertThat(
-fn.onTimerInvocations,
-contains(
+PAssert.that(output)
+.containsInAnyOrder(
 TimerData.of(
-DoFnWithTimers.TIMER_ID,
+DoFnWithTimers.PROCESSING_TIMER_ID,
 StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
-currentTime.plus(offset),
-TimeDomain.EVENT_TIME)));
+
currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1),
+TimeDomain.PROCESSING_TIME),
+TimerData.of(
+DoFnWithTimers.EVENT_TIMER_ID,
+StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
+currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET),
+TimeDomain.EVENT_TIME));
+
+pipeline.run();
+  }
+
+  /**
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying {@link DoFn}
+   * on appropriate time domains. With {@link IntervalWindow}, we check 
behavior of emitted events
+   * when time is inside and outside of window boundaries.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithIntervalWindow() {
+
+// TIMESTAMP_MIN_VALUE is initial value for processing time used done by 
TestClock
+Instant baseTime = new Instant(0);
+
+Duration windowDuration = Duration.standardHours(1);
+Duration windowLateness = Duration.standardMinutes(1);
+IntervalWindow window = new IntervalWindow(baseTime, windowDuration);
+FixedWindows windowFn = FixedWindows.of(windowDuration);
+DoFnWithTimers fn = new 
DoFnWithTimers<>(windowFn.windowCoder());
+
+TimestampedValue> event =
+TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start());
+
+TestStream> testStream =
+TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+// watermark in window, processing time far behind
+.advanceWatermarkTo(window.sta