This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch release-2.20.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit cd67dbf1c0209824a06f764f3a1d4c591441c5c8
Author: reuvenlax <re...@google.com>
AuthorDate: Tue Apr 7 09:50:14 2020 -0700

    Merge pull request #11226: [BEAM-9557] Fix timer window boundary checking
---
 .../apache/beam/runners/core/SimpleDoFnRunner.java |  4 +-
 .../org/apache/beam/sdk/transforms/ParDoTest.java  | 91 +++++++++++++++++++++-
 2 files changed, 92 insertions(+), 3 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index fa5c695..9cc1b8d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -1190,13 +1190,13 @@ public class SimpleDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT, Out
       if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
         checkArgument(
             !outputTimestamp.isAfter(target),
-            "Attempted to set an event time timer with an output timestamp of 
%s that is"
+            "Attempted to set an event-time timer with an output timestamp of 
%s that is"
                 + " after the timer firing timestamp %s",
             outputTimestamp,
             target);
         checkArgument(
             !target.isAfter(windowExpiry),
-            "Attempted to set an event time timer with a firing timestamp of 
%s that is"
+            "Attempted to set an event-time timer with a firing timestamp of 
%s that is"
                 + " after the expiration of window %s",
             target,
             windowExpiry);
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index f3a3b03..7a2978b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3465,10 +3465,99 @@ public class ParDoTest implements Serializable {
                 ProcessContext context, BoundedWindow window, 
@TimerId(timerId) Timer timer) {
               try {
                 timer.set(window.maxTimestamp().plus(1L));
+                fail("Should have failed due to out-of-bounds timer.");
+              } catch (RuntimeException e) {
+                String message = e.getMessage();
+                List<String> expectedSubstrings = Arrays.asList("event-time 
timer", "expiration");
+                expectedSubstrings.forEach(
+                    str ->
+                        Preconditions.checkState(
+                            message.contains(str),
+                            "Pipeline didn't fail with the expected strings: 
%s",
+                            expectedSubstrings));
+              }
+            }
+
+            @OnTimer(timerId)
+            public void onTimer() {}
+          };
+
+      pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
+      pipeline.run();
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
+    public void testOutOfBoundsEventTimeTimerHold() throws Exception {
+      final String timerId = "foo";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @ProcessElement
+            public void processElement(
+                ProcessContext context, BoundedWindow window, 
@TimerId(timerId) Timer timer) {
+              try {
+                timer
+                    .withOutputTimestamp(window.maxTimestamp().plus(1L))
+                    .set(window.maxTimestamp());
+                fail("Should have failed due to out-of-bounds timer.");
+              } catch (RuntimeException e) {
+                String message = e.getMessage();
+                List<String> expectedSubstrings =
+                    Arrays.asList("event-time timer", "output timestamp");
+                expectedSubstrings.forEach(
+                    str ->
+                        Preconditions.checkState(
+                            message.contains(str),
+                            "Pipeline didn't fail with the expected strings: 
%s",
+                            expectedSubstrings));
+              }
+            }
+
+            @OnTimer(timerId)
+            public void onTimer() {}
+          };
+
+      pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
+      pipeline.run();
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
+    public void testOutOfBoundsProcessingTimeTimerHold() throws Exception {
+      final String timerId = "foo";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+            @ProcessElement
+            public void processElement(
+                ProcessContext context, BoundedWindow window, 
@TimerId(timerId) Timer timer) {
+              try {
+                timer
+                    .withOutputTimestamp(window.maxTimestamp().plus(1L))
+                    .offset(Duration.standardSeconds(1))
+                    .setRelative();
                 fail("Should have failed due to processing time with absolute 
timer.");
               } catch (RuntimeException e) {
                 String message = e.getMessage();
-                List<String> expectedSubstrings = Arrays.asList("event time 
timer", "expiration");
+                List<String> expectedSubstrings =
+                    Arrays.asList("processing-time timer", "output timestamp");
                 expectedSubstrings.forEach(
                     str ->
                         Preconditions.checkState(

Reply via email to