reuvenlax commented on a change in pull request #15135:
URL: https://github.com/apache/beam/pull/15135#discussion_r665781931



##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4851,6 +4892,348 @@ public void onTimer2(
         return PDone.in(input.getPipeline());
       }
     }
+
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    //    public void testSetAndClearProcessingTimeTimer() {
+    //
+    //      final String timerId = "processing-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer>
+    // r) {
+    //              timer.offset(Duration.standardSeconds(1)).setRelative();
+    //              timer.clear();
+    //              r.output(3);
+    //            }
+    //
+    //            @OnTimer(timerId)
+    //            public void onTimer(TimeDomain timeDomain, 
OutputReceiver<Integer> r) {
+    //              r.output(42);
+    //            }
+    //          };
+    //
+    //      TestStream<KV<String, Integer>> stream =
+    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()))
+    //              .addElements(KV.of("hello", 37))
+    //              .advanceProcessingTime(
+    //                  Duration.millis(
+    //                          DateTimeUtils.currentTimeMillis() / 1000 * 
1000) // round to seconds
+    //                      .plus(Duration.standardMinutes(2)))
+    //              .advanceWatermarkToInfinity();
+    //
+    //      PCollection<Integer> output = 
pipeline.apply(stream).apply(ParDo.of(fn));
+    //      PAssert.that(output).containsInAnyOrder(3);
+    //      pipeline.run();
+    //    }
+    //
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    //    public void testSetAndClearEventTimeTimer() {
+    //      final String timerId = "event-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer>
+    // r) {
+    //              timer.offset(Duration.standardSeconds(1)).setRelative();
+    //              timer.clear();
+    //              r.output(3);
+    //            }
+    //
+    //            @OnTimer(timerId)
+    //            public void onTimer(OutputReceiver<Integer> r) {
+    //              r.output(42);
+    //            }
+    //          };
+    //
+    //      TestStream<KV<String, Integer>> stream =
+    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()))
+    //              .advanceWatermarkTo(new Instant(0))
+    //              .addElements(KV.of("hello", 37))
+    //              .advanceWatermarkTo(new 
Instant(0).plus(Duration.standardSeconds(1)))
+    //              .advanceWatermarkToInfinity();
+    //
+    //      PCollection<Integer> output = 
pipeline.apply(stream).apply(ParDo.of(fn));
+    //      PAssert.that(output).containsInAnyOrder(3);
+    //      pipeline.run();
+    //    }
+    //
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    //    public void testClearUnsetProcessingTimeTimer() {
+    //      final String timerId = "processing-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer>
+    // r) {
+    //              timer.clear();
+    //              r.output(3);
+    //            }
+    //
+    //            @OnTimer(timerId)
+    //            public void onTimer(TimeDomain timeDomain, 
OutputReceiver<Integer> r) {
+    //              r.output(42);
+    //            }
+    //          };
+    //
+    //      TestStream<KV<String, Integer>> stream =
+    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()))
+    //              .addElements(KV.of("hello", 37))
+    //              .advanceProcessingTime(
+    //                  Duration.millis(
+    //                          DateTimeUtils.currentTimeMillis() / 1000 * 
1000) // round to seconds
+    //                      .plus(Duration.standardMinutes(4)))
+    //              .advanceWatermarkToInfinity();
+    //
+    //      PCollection<Integer> output = 
pipeline.apply(stream).apply(ParDo.of(fn));
+    //      PAssert.that(output).containsInAnyOrder(3);
+    //      pipeline.run();
+    //    }
+    //
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    //    public void testClearUnsetEventTimeTimer() {
+    //      final String timerId = "event-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer>
+    // r) {
+    //              timer.clear();
+    //              r.output(3);
+    //            }
+    //
+    //            @OnTimer(timerId)
+    //            public void onTimer(OutputReceiver<Integer> r) {
+    //              r.output(42);
+    //            }
+    //          };
+    //
+    //      TestStream<KV<String, Integer>> stream =
+    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()))
+    //              .advanceWatermarkTo(new Instant(0))
+    //              .addElements(KV.of("hello", 37))
+    //              .advanceWatermarkTo(new 
Instant(0).plus(Duration.standardSeconds(1)))
+    //              .advanceWatermarkToInfinity();
+    //
+    //      PCollection<Integer> output = 
pipeline.apply(stream).apply(ParDo.of(fn));
+    //      PAssert.that(output).containsInAnyOrder(3);
+    //      pipeline.run();
+    //    }
+    //
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    //    public void testClearProcessingTimeTimer() {
+    //      final String timerId = "processing-timer";
+    //      final String clearTimerId = "clear-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+    //
+    //            @TimerId(clearTimerId)
+    //            private final TimerSpec clearTimerSpec =
+    // TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(
+    //                @TimerId(timerId) Timer timer,
+    //                @TimerId(clearTimerId) Timer clearTimer,
+    //                OutputReceiver<Integer> r) {
+    //              timer.offset(Duration.standardSeconds(1)).setRelative();
+    //              
clearTimer.offset(Duration.standardSeconds(2)).setRelative();
+    //
+    //              r.output(3);
+    //            }
+    //
+    //            @OnTimer(timerId)
+    //            public void onTimer(
+    //                OutputReceiver<Integer> r, @TimerId(clearTimerId) Timer 
clearTimer) {
+    //              System.err.println("onTimer");
+    //              r.output(42);
+    //              clearTimer.clear();
+    //            }
+    //
+    //            // This should never fire since we clear the timer in the 
earlier timer.
+    //            @OnTimer(clearTimerId)
+    //            public void clearTimer(OutputReceiver<Integer> r) {
+    //              System.err.println("clearTimer");
+    //              r.output(43);
+    //            }
+    //          };
+    //
+    //      TestStream<KV<String, Integer>> stream =
+    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()))
+    //              .addElements(KV.of("hello", 37))
+    //              .advanceProcessingTime(
+    //                  Duration.millis(
+    //                          DateTimeUtils.currentTimeMillis() / 1000 * 
1000) // round to seconds
+    //                      .plus(Duration.standardMinutes(2)))
+    //              .advanceProcessingTime(
+    //                  Duration.millis(
+    //                          DateTimeUtils.currentTimeMillis() / 1000 * 
1000) // round to seconds
+    //                      .plus(Duration.standardMinutes(4)))
+    //              .advanceWatermarkToInfinity();
+    //
+    //      PCollection<Integer> output = 
pipeline.apply(stream).apply(ParDo.of(fn));
+    //      PAssert.that(output).containsInAnyOrder(3, 42);
+    //      pipeline.run();
+    //    }
+
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    //    public void testClearEventTimeTimer() {
+    //      final String timerId = "event-timer";
+    //      final String clearTimerId = "clear-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+    //
+    //            @TimerId(clearTimerId)
+    //            private final TimerSpec clearSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(
+    //                @TimerId(timerId) Timer timer,
+    //                @TimerId(clearTimerId) Timer clearTimer,
+    //                OutputReceiver<Integer> r) {
+    //              timer.offset(Duration.standardSeconds(1)).setRelative();
+    //              
clearTimer.offset(Duration.standardSeconds(2)).setRelative();
+    //
+    //              r.output(3);
+    //            }
+    //
+    //            @OnTimer(timerId)
+    //            public void onTimer(
+    //                OutputReceiver<Integer> r, @TimerId(clearTimerId) Timer 
clearTimer) {
+    //              r.output(42);
+    //              clearTimer.clear();
+    //            }
+    //
+    //            // This should never fire since we clear the timer in the 
earlier timer.
+    //            @OnTimer(clearTimerId)
+    //            public void clearTimer(OutputReceiver<Integer> r) {
+    //              r.output(43);
+    //            }
+    //          };
+    //
+    //      TestStream<KV<String, Integer>> stream =
+    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()))
+    //              .advanceWatermarkTo(new Instant(0))
+    //              .addElements(KV.of("hello", 37))
+    //              .advanceWatermarkTo(new 
Instant(0).plus(Duration.standardSeconds(1)))
+    //              .advanceWatermarkToInfinity();
+    //
+    //      PCollection<Integer> output = 
pipeline.apply(stream).apply(ParDo.of(fn));
+    //      PAssert.that(output).containsInAnyOrder(3, 42);
+    //      pipeline.run();
+    //    }
+    //
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    //    public void testSetProcessingTimerAfterClear() {
+    //      final String timerId = "processing-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(
+    //                @Element KV<String, Integer> e,
+    //                @TimerId(timerId) Timer timer,
+    //                OutputReceiver<Integer> r) {
+    //              timer.clear();
+    //              timer.offset(Duration.standardSeconds(1)).setRelative();
+    //              r.output(3);
+    //            }
+    //
+    //            @OnTimer(timerId)
+    //            public void onTimer(TimeDomain timeDomain, 
OutputReceiver<Integer> r) {
+    //              r.output(42);
+    //            }
+    //          };
+    //
+    //      TestStream<KV<String, Integer>> stream =
+    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()))
+    //              .addElements(KV.of("hello", 37), KV.of("hello", 38))
+    //              .advanceProcessingTime(
+    //                  Duration.millis(
+    //                          DateTimeUtils.currentTimeMillis() / 1000 * 
1000) // round to seconds
+    //                      .plus(Duration.standardMinutes(2)))
+    //              .advanceWatermarkToInfinity();
+    //
+    //      PCollection<Integer> output = 
pipeline.apply(stream).apply(ParDo.of(fn));
+    //      PAssert.that(output).containsInAnyOrder(3, 3, 42);
+    //      pipeline.run();
+    //    }
+    //
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    //    public void testSetEventTimerAfterClear() {
+    //      final String timerId = "event-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer>

Review comment:
       No I didn't. thanks for catching!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to