yuchengxin commented on code in PR #28310:
URL: https://github.com/apache/flink/pull/28310#discussion_r3506111602
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java:
##########
@@ -351,6 +353,94 @@ public static void validateClassForRuntime(
}
}
+ /**
+ * Validates whether an {@link AsyncTableFunction} subclass declares a
usable {@code timeout}
+ * fallback method that mirrors the {@code eval(CompletableFuture, ...)}
signature.
+ *
+ * <p>The detection uses a three-step contract: collect every method
literally named {@code
+ * timeout} via {@link ExtractionUtils#collectMethods}, filter to those
that are both {@code
+ * public} and non-{@code static}, and finally verify the surviving
candidates against the
+ * expected {@code (CompletableFuture<Collection<T>>, argumentClasses)}
signature.
+ *
+ * <ul>
+ * <li>No applicable candidate (none declared, or all
private/static/mis-spelled) — returns
+ * {@code false}; the framework falls back to {@link
+ * java.util.concurrent.TimeoutException} via the default {@code
AsyncFunction#timeout}.
+ * <li>One or more applicable candidates with a matching signature —
returns {@code true};
+ * codegen will emit a {@code fetcher.timeout(...)} dispatch.
+ * <li>One or more applicable candidates but signature mismatch — throws
{@link
+ * ValidationException} eagerly with the FQN, the expected
signature, and every actual
+ * candidate signature so users can locate the offending method
quickly.
+ * </ul>
+ */
+ public static boolean validateAsyncTableFunctionTimeoutClass(
+ Class<? extends UserDefinedFunction> functionClass,
+ Class<?>[] argumentClasses,
+ String functionName) {
+ final List<Method> candidates =
+ ExtractionUtils.collectMethods(functionClass,
ASYNC_TABLE_TIMEOUT);
+ final List<Method> applicable =
+ candidates.stream()
+ .filter(
+ method ->
+
Modifier.isPublic(method.getModifiers())
+ &&
!Modifier.isStatic(method.getModifiers()))
+ .collect(Collectors.toList());
+ if (applicable.isEmpty()) {
+ return false;
+ }
+ // Mirror the eval convention: prepend the implicit CompletableFuture
parameter so the
+ // full expected signature is `timeout(CompletableFuture,
argumentClasses...)`.
+ final Class<?>[] expectedSignature = new
Class<?>[argumentClasses.length + 1];
+ expectedSignature[0] = CompletableFuture.class;
+ System.arraycopy(argumentClasses, 0, expectedSignature, 1,
argumentClasses.length);
+ try {
+ validateClassForRuntime(
+ functionClass,
+ ASYNC_TABLE_TIMEOUT,
+ expectedSignature,
+ void.class,
+ functionName);
+ } catch (ValidationException originalException) {
+ throw new ValidationException(
+ buildTimeoutSignatureMismatchMessage(
+ functionClass, expectedSignature, applicable),
+ originalException);
+ }
+ return true;
+ }
Review Comment:
The validation no longer stops at the raw `CompletableFuture` type. It now
checks that the timeout handler uses `CompletableFuture<Collection<T>>` and
verifies that `T` is compatible with the corresponding `eval(...)` contract for
the current call site.
I also added negative tests for:
- non-void timeout return type,
- raw `CompletableFuture`,
- mismatched `Collection<T>` element type.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]