twalthr commented on code in PR #26924:
URL: https://github.com/apache/flink/pull/26924#discussion_r2341065892
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java:
##########
@@ -274,34 +290,34 @@ public void testIllegalConfig() {
"SELECT *\n"
+ "FROM TABLE(ML_PREDICT(TABLE
MyTable, MODEL MyModel, DESCRIPTOR(a, b), MAP['async', true]))"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "SQL validation failed. Config param can only be a MAP
of string literals but node's type is (CHAR(5), BOOLEAN) MAP at position line
2, column 71.");
+ .hasRootCauseMessage(
+ "Invalid argument type at position 3. Data type
MAP<STRING, STRING> expected but MAP<CHAR(5) NOT NULL, BOOLEAN NOT NULL> NOT
NULL passed.");
assertThatThrownBy(
() ->
util.verifyRelPlan(
"SELECT *\n"
+ "FROM TABLE(ML_PREDICT(TABLE
MyTable, MODEL MyModel, DESCRIPTOR(a, b), MAP['async', 'yes']))"))
- .isInstanceOf(ValidationException.class)
- .hasMessageContaining("SQL validation failed. Failed to parse
the config.");
+ .hasCauseInstanceOf(ValidationException.class)
+ .hasStackTraceContaining("Failed to parse the config.");
assertThatThrownBy(
() ->
util.verifyRelPlan(
"SELECT *\n"
+ "FROM TABLE(ML_PREDICT(TABLE
MyTable, MODEL MyModel, DESCRIPTOR(a, b), MAP['async', 'true',
'max-concurrent-operations', '-1']))"))
- .isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "SQL validation failed. Invalid runtime config option
'max-concurrent-operations'. Its value should be positive integer but was -1.");
+ .hasCauseInstanceOf(ValidationException.class)
+ .hasStackTraceContaining(
+ "Invalid runtime config option
'max-concurrent-operations'. Its value should be positive integer but was -1.");
assertThatThrownBy(
() ->
util.verifyRelPlan(
"SELECT *\n"
+ "FROM TABLE(ML_PREDICT(TABLE
MyTable, MODEL MyModel, DESCRIPTOR(a, b), MAP['async', 'true', 'capacity',
CAST(-1 AS STRING)]))"))
- .isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "SQL validation failed. Unsupported expression -1 is
in runtime config at position line 2, column 109. Currently, runtime config
should be be a MAP of string literals.");
+ .hasCauseInstanceOf(ValidationException.class)
+ .hasStackTraceContaining(
+ "Config param of ML_PREDICT function should be a MAP
of String literals.");
Review Comment:
```suggestion
"Config paramameter of ML_PREDICT function should be
a MAP data type consisting string literals.");
```
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java:
##########
@@ -192,6 +194,20 @@ void testMissingUid() {
+ "For example: myFunction(..., uid =>
'my-id')"));
}
+ @Test
+ void testNoSystemArgsAllowedForScalarPtf() {
+ util.addTemporarySystemFunction("f", NoSystemArgsScalarFunction.class);
+ assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(i =>
1);"))
+ .satisfies(anyCauseMatches("Disabling system arguments is not
supported for PTF."));
Review Comment:
```suggestion
.satisfies(anyCauseMatches("Disabling system arguments is
not supported for user-defined PTF."));
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java:
##########
@@ -117,6 +133,35 @@ public static InputTypeStrategy windowTimeIndicator() {
and(logical(LogicalTypeFamily.CHARACTER_STRING), LITERAL),
JSON_ARGUMENT));
+ /** Input strategy for {@link BuiltInFunctionDefinitions#ML_PREDICT}. */
+ public static final InputTypeStrategy ML_PREDICT_INPUT_TYPE_STRATEGY =
+ new InputTypeStrategy() {
Review Comment:
usually we put this strategy into a separate class in same package but with
default scope. take ArrayComparableElementArgumentTypeStrategy as an example
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java:
##########
@@ -194,6 +202,44 @@ public final class SpecificTypeStrategies {
/** Type strategy specific for {@link
BuiltInFunctionDefinitions#OBJECT_UPDATE}. */
public static final TypeStrategy OBJECT_UPDATE = new
ObjectUpdateTypeStrategy();
+ /** Type strategy specific for {@link
BuiltInFunctionDefinitions#ML_PREDICT}. */
+ public static final TypeStrategy ML_PREDICT_OUTPUT_TYPE_STRATEGY =
+ callContext -> {
Review Comment:
usually we put this strategy into a separate class in same package but with
default scope. take ObjectUpdateTypeStrategy as an example.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunctionRule.java:
##########
@@ -69,13 +71,17 @@ public boolean matches(RelOptRuleCall call) {
}
final RexCall rexCall = (RexCall) scan.getCall();
final FunctionDefinition definition =
ShortcutUtils.unwrapFunctionDefinition(rexCall);
- return definition != null && definition.getKind() ==
FunctionKind.PROCESS_TABLE;
+ return definition != null
+ &&
!StreamPhysicalMLPredictTableFunctionRule.isMLPredictFunction(definition)
+ && definition.getKind() == FunctionKind.PROCESS_TABLE;
}
@Override
public @Nullable RelNode convert(RelNode rel) {
final FlinkLogicalTableFunctionScan scan =
(FlinkLogicalTableFunctionScan) rel;
final RexCall rexCall = (RexCall) scan.getCall();
+ validateAllowSystemArgs(rexCall);
Review Comment:
Validating one time should be enough. So we can drop the call in
StreamPhysicalProcessTableFunction.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]