autophagy commented on code in PR #28326:
URL: https://github.com/apache/flink/pull/28326#discussion_r3475684066
##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java:
##########
@@ -2240,6 +2240,19 @@ void testWatermarkAdvancesWithoutTimers() throws
Exception {
}
}
+ @Test
+ void testWatermarkAdvancesWithoutOnTimeColumn() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PassthroughPTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<value
INT>"))
+ .build()) {
+
+ harness.processElement(Row.of(42));
+ harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 10));
Review Comment:
Yep, this is something that the PTF framework supports! There's even a test
PTF for it in the live tests:
```
public static class OptionalOnTimeFunction extends
AppendProcessTableFunctionBase {
public void eval(Context ctx, @ArgumentHint(SET_SEMANTIC_TABLE) Row
r) {
final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
collectEvalEvent(timeCtx, r);
Long wm = timeCtx.currentWatermark();
// Register at wm+1 to always target the immediate next
watermark: the timer fires
// exactly once per watermark advance, and each new row
re-registers the timer for the
// following watermark step, demonstrating repeated timer
re-registration.
long timer = wm == null || wm < 0 ? 1 : wm + 1;
collectCreateTimer(timeCtx, "t", timer);
}
public void onTimer(OnTimerContext ctx) {
final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
collectOnTimerEvent(ctx);
}
}
```
The harness should already handle this correctly but I dont think we have an
explicit test case for it - i'll add one!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]