lukecwik commented on a change in pull request #15135:
URL: https://github.com/apache/beam/pull/15135#discussion_r665720963
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4851,6 +4892,348 @@ public void onTimer2(
return PDone.in(input.getPipeline());
}
}
+
+ // @Test
+ // @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ // public void testSetAndClearProcessingTimeTimer() {
+ //
+ // final String timerId = "processing-timer";
+ //
+ // DoFn<KV<String, Integer>, Integer> fn =
+ // new DoFn<KV<String, Integer>, Integer>() {
+ //
+ // @TimerId(timerId)
+ // private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+ //
+ // @ProcessElement
+ // public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer>
+ // r) {
+ // timer.offset(Duration.standardSeconds(1)).setRelative();
+ // timer.clear();
+ // r.output(3);
+ // }
+ //
+ // @OnTimer(timerId)
+ // public void onTimer(TimeDomain timeDomain,
OutputReceiver<Integer> r) {
+ // r.output(42);
+ // }
+ // };
+ //
+ // TestStream<KV<String, Integer>> stream =
+ // TestStream.create(KvCoder.of(StringUtf8Coder.of(),
VarIntCoder.of()))
+ // .addElements(KV.of("hello", 37))
+ // .advanceProcessingTime(
+ // Duration.millis(
+ // DateTimeUtils.currentTimeMillis() / 1000 *
1000) // round to seconds
+ // .plus(Duration.standardMinutes(2)))
+ // .advanceWatermarkToInfinity();
+ //
+ // PCollection<Integer> output =
pipeline.apply(stream).apply(ParDo.of(fn));
+ // PAssert.that(output).containsInAnyOrder(3);
+ // pipeline.run();
+ // }
+ //
+ // @Test
+ // @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ // public void testSetAndClearEventTimeTimer() {
+ // final String timerId = "event-timer";
+ //
+ // DoFn<KV<String, Integer>, Integer> fn =
+ // new DoFn<KV<String, Integer>, Integer>() {
+ //
+ // @TimerId(timerId)
+ // private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+ //
+ // @ProcessElement
+ // public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer>
+ // r) {
+ // timer.offset(Duration.standardSeconds(1)).setRelative();
+ // timer.clear();
+ // r.output(3);
+ // }
+ //
+ // @OnTimer(timerId)
+ // public void onTimer(OutputReceiver<Integer> r) {
+ // r.output(42);
+ // }
+ // };
+ //
+ // TestStream<KV<String, Integer>> stream =
+ // TestStream.create(KvCoder.of(StringUtf8Coder.of(),
VarIntCoder.of()))
+ // .advanceWatermarkTo(new Instant(0))
+ // .addElements(KV.of("hello", 37))
+ // .advanceWatermarkTo(new
Instant(0).plus(Duration.standardSeconds(1)))
+ // .advanceWatermarkToInfinity();
+ //
+ // PCollection<Integer> output =
pipeline.apply(stream).apply(ParDo.of(fn));
+ // PAssert.that(output).containsInAnyOrder(3);
+ // pipeline.run();
+ // }
+ //
+ // @Test
+ // @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ // public void testClearUnsetProcessingTimeTimer() {
+ // final String timerId = "processing-timer";
+ //
+ // DoFn<KV<String, Integer>, Integer> fn =
+ // new DoFn<KV<String, Integer>, Integer>() {
+ //
+ // @TimerId(timerId)
+ // private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+ //
+ // @ProcessElement
+ // public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer>
+ // r) {
+ // timer.clear();
+ // r.output(3);
+ // }
+ //
+ // @OnTimer(timerId)
+ // public void onTimer(TimeDomain timeDomain,
OutputReceiver<Integer> r) {
+ // r.output(42);
+ // }
+ // };
+ //
+ // TestStream<KV<String, Integer>> stream =
+ // TestStream.create(KvCoder.of(StringUtf8Coder.of(),
VarIntCoder.of()))
+ // .addElements(KV.of("hello", 37))
+ // .advanceProcessingTime(
+ // Duration.millis(
+ // DateTimeUtils.currentTimeMillis() / 1000 *
1000) // round to seconds
+ // .plus(Duration.standardMinutes(4)))
+ // .advanceWatermarkToInfinity();
+ //
+ // PCollection<Integer> output =
pipeline.apply(stream).apply(ParDo.of(fn));
+ // PAssert.that(output).containsInAnyOrder(3);
+ // pipeline.run();
+ // }
+ //
+ // @Test
+ // @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ // public void testClearUnsetEventTimeTimer() {
+ // final String timerId = "event-timer";
+ //
+ // DoFn<KV<String, Integer>, Integer> fn =
+ // new DoFn<KV<String, Integer>, Integer>() {
+ //
+ // @TimerId(timerId)
+ // private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+ //
+ // @ProcessElement
+ // public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer>
+ // r) {
+ // timer.clear();
+ // r.output(3);
+ // }
+ //
+ // @OnTimer(timerId)
+ // public void onTimer(OutputReceiver<Integer> r) {
+ // r.output(42);
+ // }
+ // };
+ //
+ // TestStream<KV<String, Integer>> stream =
+ // TestStream.create(KvCoder.of(StringUtf8Coder.of(),
VarIntCoder.of()))
+ // .advanceWatermarkTo(new Instant(0))
+ // .addElements(KV.of("hello", 37))
+ // .advanceWatermarkTo(new
Instant(0).plus(Duration.standardSeconds(1)))
+ // .advanceWatermarkToInfinity();
+ //
+ // PCollection<Integer> output =
pipeline.apply(stream).apply(ParDo.of(fn));
+ // PAssert.that(output).containsInAnyOrder(3);
+ // pipeline.run();
+ // }
+ //
+ // @Test
+ // @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ // public void testClearProcessingTimeTimer() {
+ // final String timerId = "processing-timer";
+ // final String clearTimerId = "clear-timer";
+ //
+ // DoFn<KV<String, Integer>, Integer> fn =
+ // new DoFn<KV<String, Integer>, Integer>() {
+ //
+ // @TimerId(timerId)
+ // private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+ //
+ // @TimerId(clearTimerId)
+ // private final TimerSpec clearTimerSpec =
+ // TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+ //
+ // @ProcessElement
+ // public void processElement(
+ // @TimerId(timerId) Timer timer,
+ // @TimerId(clearTimerId) Timer clearTimer,
+ // OutputReceiver<Integer> r) {
+ // timer.offset(Duration.standardSeconds(1)).setRelative();
+ //
clearTimer.offset(Duration.standardSeconds(2)).setRelative();
+ //
+ // r.output(3);
+ // }
+ //
+ // @OnTimer(timerId)
+ // public void onTimer(
+ // OutputReceiver<Integer> r, @TimerId(clearTimerId) Timer
clearTimer) {
+ // System.err.println("onTimer");
+ // r.output(42);
+ // clearTimer.clear();
+ // }
+ //
+ // // This should never fire since we clear the timer in the
earlier timer.
+ // @OnTimer(clearTimerId)
+ // public void clearTimer(OutputReceiver<Integer> r) {
+ // System.err.println("clearTimer");
+ // r.output(43);
+ // }
+ // };
+ //
+ // TestStream<KV<String, Integer>> stream =
+ // TestStream.create(KvCoder.of(StringUtf8Coder.of(),
VarIntCoder.of()))
+ // .addElements(KV.of("hello", 37))
+ // .advanceProcessingTime(
+ // Duration.millis(
+ // DateTimeUtils.currentTimeMillis() / 1000 *
1000) // round to seconds
+ // .plus(Duration.standardMinutes(2)))
+ // .advanceProcessingTime(
+ // Duration.millis(
+ // DateTimeUtils.currentTimeMillis() / 1000 *
1000) // round to seconds
+ // .plus(Duration.standardMinutes(4)))
+ // .advanceWatermarkToInfinity();
+ //
+ // PCollection<Integer> output =
pipeline.apply(stream).apply(ParDo.of(fn));
+ // PAssert.that(output).containsInAnyOrder(3, 42);
+ // pipeline.run();
+ // }
+
+ // @Test
+ // @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ // public void testClearEventTimeTimer() {
+ // final String timerId = "event-timer";
+ // final String clearTimerId = "clear-timer";
+ //
+ // DoFn<KV<String, Integer>, Integer> fn =
+ // new DoFn<KV<String, Integer>, Integer>() {
+ //
+ // @TimerId(timerId)
+ // private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+ //
+ // @TimerId(clearTimerId)
+ // private final TimerSpec clearSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+ //
+ // @ProcessElement
+ // public void processElement(
+ // @TimerId(timerId) Timer timer,
+ // @TimerId(clearTimerId) Timer clearTimer,
+ // OutputReceiver<Integer> r) {
+ // timer.offset(Duration.standardSeconds(1)).setRelative();
+ //
clearTimer.offset(Duration.standardSeconds(2)).setRelative();
+ //
+ // r.output(3);
+ // }
+ //
+ // @OnTimer(timerId)
+ // public void onTimer(
+ // OutputReceiver<Integer> r, @TimerId(clearTimerId) Timer
clearTimer) {
+ // r.output(42);
+ // clearTimer.clear();
+ // }
+ //
+ // // This should never fire since we clear the timer in the
earlier timer.
+ // @OnTimer(clearTimerId)
+ // public void clearTimer(OutputReceiver<Integer> r) {
+ // r.output(43);
+ // }
+ // };
+ //
+ // TestStream<KV<String, Integer>> stream =
+ // TestStream.create(KvCoder.of(StringUtf8Coder.of(),
VarIntCoder.of()))
+ // .advanceWatermarkTo(new Instant(0))
+ // .addElements(KV.of("hello", 37))
+ // .advanceWatermarkTo(new
Instant(0).plus(Duration.standardSeconds(1)))
+ // .advanceWatermarkToInfinity();
+ //
+ // PCollection<Integer> output =
pipeline.apply(stream).apply(ParDo.of(fn));
+ // PAssert.that(output).containsInAnyOrder(3, 42);
+ // pipeline.run();
+ // }
+ //
+ // @Test
+ // @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ // public void testSetProcessingTimerAfterClear() {
+ // final String timerId = "processing-timer";
+ //
+ // DoFn<KV<String, Integer>, Integer> fn =
+ // new DoFn<KV<String, Integer>, Integer>() {
+ //
+ // @TimerId(timerId)
+ // private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+ //
+ // @ProcessElement
+ // public void processElement(
+ // @Element KV<String, Integer> e,
+ // @TimerId(timerId) Timer timer,
+ // OutputReceiver<Integer> r) {
+ // timer.clear();
+ // timer.offset(Duration.standardSeconds(1)).setRelative();
+ // r.output(3);
+ // }
+ //
+ // @OnTimer(timerId)
+ // public void onTimer(TimeDomain timeDomain,
OutputReceiver<Integer> r) {
+ // r.output(42);
+ // }
+ // };
+ //
+ // TestStream<KV<String, Integer>> stream =
+ // TestStream.create(KvCoder.of(StringUtf8Coder.of(),
VarIntCoder.of()))
+ // .addElements(KV.of("hello", 37), KV.of("hello", 38))
+ // .advanceProcessingTime(
+ // Duration.millis(
+ // DateTimeUtils.currentTimeMillis() / 1000 *
1000) // round to seconds
+ // .plus(Duration.standardMinutes(2)))
+ // .advanceWatermarkToInfinity();
+ //
+ // PCollection<Integer> output =
pipeline.apply(stream).apply(ParDo.of(fn));
+ // PAssert.that(output).containsInAnyOrder(3, 3, 42);
+ // pipeline.run();
+ // }
+ //
+ // @Test
+ // @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ // public void testSetEventTimerAfterClear() {
+ // final String timerId = "event-timer";
+ //
+ // DoFn<KV<String, Integer>, Integer> fn =
+ // new DoFn<KV<String, Integer>, Integer>() {
+ //
+ // @TimerId(timerId)
+ // private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+ //
+ // @ProcessElement
+ // public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer>
Review comment:
Did you mean to leave these large sections commented out?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]