autophagy commented on code in PR #28326:
URL: https://github.com/apache/flink/pull/28326#discussion_r3382050240
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row
partitionKey) {
stateManager.clearStateForKey(stateName, partitionKey);
}
+ //
-------------------------------------------------------------------------
+ // Watermark & Timer API
+ //
-------------------------------------------------------------------------
+
+ /**
+ * Sets the watermark for all tables to the given {@link LocalDateTime}
and fires eligible
+ * timers.
+ */
+ public void setWatermark(LocalDateTime watermark) throws Exception {
+ checkNotNull(watermark, "watermark must not be null");
+ setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+ }
+
+ /** Sets the watermark for all tables to the given {@link Instant} and
fires eligible timers. */
+ public void setWatermark(Instant watermark) throws Exception {
+ checkNotNull(watermark, "watermark must not be null");
+ setWatermarkMillis(watermark.toEpochMilli());
+ }
+
+ /**
+ * Sets the watermark for a specific table to the given {@link
LocalDateTime} and fires eligible
+ * timers.
+ */
+ public void setWatermarkForTable(String tableArgument, LocalDateTime
watermark)
+ throws Exception {
+ checkNotNull(watermark, "watermark must not be null");
+ setWatermarkForTableMillis(tableArgument,
DateTimeUtils.toTimestampMillis(watermark));
+ }
+
+ /**
+ * Sets the watermark for a specific table to the given {@link Instant}
and fires eligible
+ * timers.
+ */
+ public void setWatermarkForTable(String tableArgument, Instant watermark)
throws Exception {
+ checkNotNull(watermark, "watermark must not be null");
+ setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+ }
+
+ /** Returns all timers (both pending and fired), sorted by timestamp then
name. */
+ public List<Timer> getTimers() {
+ return Stream.concat(
+ timerManager.getPendingTimers().stream(),
+ timerManager.getFiredTimers().stream())
+ .sorted()
+ .collect(Collectors.toList());
+ }
+
+ /** Returns all pending (not yet fired) timers, sorted by timestamp then
name. */
+ public List<Timer> getPendingTimers() {
+ return timerManager.getPendingTimers();
+ }
+
+ /** Returns all pending timers with the given name. */
+ public List<Timer> getPendingTimers(String timerName) {
+ return timerManager.getPendingTimers().stream()
+ .filter(t -> timerName.equals(t.getName()))
+ .collect(Collectors.toList());
+ }
+
+ /** Returns all timers that have fired, in the order they fired. */
+ public List<Timer> getFiredTimers() {
+ return timerManager.getFiredTimers();
+ }
+
+ /** Returns all fired timers with the given name. */
+ public List<Timer> getFiredTimers(String timerName) {
+ return timerManager.getFiredTimers().stream()
+ .filter(t -> timerName.equals(t.getName()))
+ .collect(Collectors.toList());
+ }
+
+ /** Clears the fired timer history. */
+ public void clearFiredTimers() {
+ timerManager.clearFiredTimers();
+ }
+
+ private void setWatermarkMillis(long millis) throws Exception {
+ checkState(isOpen, "Harness is not open");
+ for (TableArgumentInfo tableArg :
ArgumentInfo.filterTableArguments(arguments)) {
+ timerManager.setTableWatermark(tableArg.name, millis);
+ }
+ timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+ }
+
+ private void setWatermarkForTableMillis(String tableArgument, long millis)
throws Exception {
+ checkState(isOpen, "Harness is not open");
+ checkNotNull(tableArgument, "tableArgument must not be null");
+ checkArgument(
+ argumentsByName.get(tableArgument) instanceof
TableArgumentInfo,
+ "Unknown or non-table argument: %s",
+ tableArgument);
+ timerManager.setTableWatermark(tableArgument, millis);
+ timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+ }
+
+ private void fireTimer(Timer timer) throws Exception {
+ if (onTimer == null) {
+ throw new IllegalStateException(
+ "Timer fired but no onTimer() method is defined in "
+ + function.getClass().getSimpleName());
+ }
+
+ currentInvocation = InvocationContext.forTimer(timer);
+
+ try {
+ Map<String, Object> stateMap =
stateManager.loadStateForKey(timer.partitionKey);
+
+ List<StateArgumentInfo> stateArgs =
ArgumentInfo.filterStateArguments(arguments);
+ Object[] methodArgs = new Object[stateArgs.size()];
+ for (int i = 0; i < stateArgs.size(); i++) {
+ methodArgs[i] = stateMap.get(stateArgs.get(i).name);
+ }
+
+ onTimer.invoke(function, new TestOnTimerContext(stateMap),
methodArgs);
+ stateManager.updateStateForKey(timer.partitionKey, stateMap);
+ } catch (InvocationTargetException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof Exception) {
+ throw (Exception) cause;
+ }
+ throw new RuntimeException("onTimer() invocation failed", e);
+ } finally {
+ currentInvocation = null;
+ }
+ }
+
+ //
-------------------------------------------------------------------------
+ // Context implementations
+ //
-------------------------------------------------------------------------
+
+ private class TestContext implements ProcessTableFunction.Context {
+ final Map<String, Object> stateMap;
+
+ TestContext(Map<String, Object> stateMap) {
+ this.stateMap = stateMap;
+ }
+
+ @Override
+ public <TimeType> ProcessTableFunction.TimeContext<TimeType>
timeContext(
+ Class<TimeType> conversionClass) {
+ return new TestTimeContext<>(conversionClass);
+ }
+
+ @Override
+ public TableSemantics tableSemanticsFor(String argName) {
+ ArgumentInfo argInfo = argumentsByName.get(argName);
+ if (argInfo == null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Argument '%s' not found. Available arguments:
%s",
+ argName, argumentsByName.keySet()));
+ }
+ if (!(argInfo instanceof TableArgumentInfo)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Argument '%s' is not a table argument (type:
%s)",
+ argName, argInfo.getClass().getSimpleName()));
+ }
+ TableArgumentInfo tableArg = (TableArgumentInfo) argInfo;
+ int[] partitionIndices = getPartitionColumnIndices(tableArg);
+ int timeColumn =
+ onTimeColumnName != null
+ ?
getFieldNames(tableArg.dataType).indexOf(onTimeColumnName)
+ : -1;
+ return new TestHarnessTableSemantics(tableArg.dataType,
partitionIndices, timeColumn);
+ }
+
+ @Override
+ public void clearState(String stateName) {
+ stateMap.remove(stateName);
+ }
+
+ @Override
+ public void clearAllState() {
+ stateMap.clear();
+ }
+
+ @Override
+ public void clearAllTimers() {
+ timerManager.clearAll(currentInvocation.partitionKey);
+ }
+
+ @Override
+ public void clearAll() {
+ stateMap.clear();
+ timerManager.clearAll(currentInvocation.partitionKey);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+ }
+
+ private class TestTimeContext<TimeType> implements
ProcessTableFunction.TimeContext<TimeType> {
+ private final Class<TimeType> conversionClass;
+
+ TestTimeContext(Class<TimeType> conversionClass) {
+ this.conversionClass = conversionClass;
+ }
+
+ @Override
+ public TimeType time() {
+ InvocationContext ctx = currentInvocation;
+ if (ctx.isTimerInvocation()) {
+ return fromMillis(ctx.firingTimer.timestamp);
+ }
+ if (ctx.isEvalInvocation() && onTimeColumnName != null) {
+ ArgumentInfo argInfo =
argumentsByName.get(ctx.tableArgumentName);
+ if (argInfo instanceof TableArgumentInfo) {
+ TableArgumentInfo tableArg = (TableArgumentInfo) argInfo;
+ if
(!getFieldNames(tableArg.dataType).contains(onTimeColumnName)) {
+ return null;
+ }
+ }
+ Object timeValue = ctx.row.getField(onTimeColumnName);
+ if (timeValue == null) {
+ return null;
+ }
+ return fromMillis(toMillis(timeValue));
+ }
+ return null;
+ }
+
+ @Override
+ public TimeType tableWatermark() {
+ InvocationContext ctx = currentInvocation;
+ if (!ctx.isEvalInvocation()) {
+ return null;
+ }
+ Long wm = timerManager.getWatermarkForTable(ctx.tableArgumentName);
+ return wm != null ? fromMillis(wm) : null;
+ }
+
+ @Override
+ public TimeType currentWatermark() {
+ Long wm = timerManager.getGlobalWatermark();
+ return wm != null ? fromMillis(wm) : null;
+ }
+
+ @Override
+ public void registerOnTime(String name, TimeType time) {
+ checkTimersEnabled();
+ checkNotNull(name, "Timer name must not be null");
+ checkNotNull(time, "Timer timestamp must not be null");
+ timerManager.register(currentInvocation.partitionKey,
toMillis(time), name);
+ }
+
+ @Override
+ public void registerOnTime(TimeType time) {
+ checkTimersEnabled();
+ checkNotNull(time, "Timer timestamp must not be null");
+ timerManager.register(currentInvocation.partitionKey,
toMillis(time), null);
+ }
+
+ @Override
+ public void clearTimer(String name) {
+ checkNotNull(name, "Timer name must not be null");
+ timerManager.clearByName(currentInvocation.partitionKey, name);
+ }
+
+ @Override
+ public void clearTimer(TimeType time) {
+ checkNotNull(time, "Timer timestamp must not be null");
+ timerManager.clearByTimestamp(currentInvocation.partitionKey,
toMillis(time));
+ }
+
+ @Override
+ public void clearAllTimers() {
+ timerManager.clearAll(currentInvocation.partitionKey);
+ }
+
+ private void checkTimersEnabled() {
+ boolean enabled =
+ ArgumentInfo.filterTableArguments(arguments).stream()
+ .anyMatch(
+ t ->
+ t.isSetSemantic
+ && t.prependStrategy
+ !=
OutputPrependStrategy.ALL_COLUMNS);
+ if (!enabled) {
+ throw new TableRuntimeException(
+ "Timers are not supported in the current PTF
declaration. "
+ + "Note that only PTFs that take set semantic
tables support timers. "
+ + "Also timers are not available for advanced
traits such as "
+ + "supporting pass-through columns or
updates.");
+ }
+ }
+
+ private TimeType fromMillis(long millis) {
+ return convertFromMillis(millis, conversionClass);
+ }
+
+ private long toMillis(Object time) {
+ if (time instanceof Long) {
+ return (Long) time;
+ } else if (time instanceof Instant) {
+ return ((Instant) time).toEpochMilli();
+ } else if (time instanceof LocalDateTime) {
+ return DateTimeUtils.toTimestampMillis((LocalDateTime) time);
+ } else if (time instanceof java.sql.Timestamp) {
+ return ((java.sql.Timestamp) time).getTime();
+ }
+ throw new IllegalArgumentException(
+ "Unsupported time type: " +
time.getClass().getSimpleName());
+ }
+ }
+
+ private class TestOnTimerContext extends TestContext
+ implements ProcessTableFunction.OnTimerContext {
+ TestOnTimerContext(Map<String, Object> stateMap) {
+ super(stateMap);
+ }
+
+ @Override
+ public String currentTimer() {
+ if (currentInvocation.isTimerInvocation()) {
+ return currentInvocation.firingTimer.getName();
+ }
+ return null;
+ }
+ }
+
+ private static int[] getPartitionColumnIndices(TableArgumentInfo arg) {
+ if (arg.partitionColumnNames == null ||
arg.partitionColumnNames.length == 0) {
+ return new int[0];
+ }
+ List<String> fieldNames = getFieldNames(arg.dataType);
+ int[] indices = new int[arg.partitionColumnNames.length];
+ for (int i = 0; i < arg.partitionColumnNames.length; i++) {
+ String colName = arg.partitionColumnNames[i];
+ int index = fieldNames.indexOf(colName);
+ if (index < 0) {
+ throw new IllegalStateException(
+ "Partition column '"
+ + colName
+ + "' not found in table argument. "
+ + "Available fields: "
+ + fieldNames);
+ }
+ indices[i] = index;
+ }
+ return indices;
+ }
+
+ @Nullable
+ private Class<?> resolveRowtimeConversionClass(List<TableArgumentInfo>
tableArguments) {
Review Comment:
Looking at live, it looks like the planner rejects this. It goes over all
the columns types for the `on_time` attributes, gets their `LogicalTypeRoot`
and rejects it if they are different
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]