twalthr commented on code in PR #26924:
URL: https://github.com/apache/flink/pull/26924#discussion_r2341065892


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java:
##########
@@ -274,34 +290,34 @@ public void testIllegalConfig() {
                                         "SELECT *\n"
                                                 + "FROM TABLE(ML_PREDICT(TABLE 
MyTable, MODEL MyModel, DESCRIPTOR(a, b), MAP['async', true]))"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "SQL validation failed. Config param can only be a MAP 
of string literals but node's type is (CHAR(5), BOOLEAN) MAP at position line 
2, column 71.");
+                .hasRootCauseMessage(
+                        "Invalid argument type at position 3. Data type 
MAP<STRING, STRING> expected but MAP<CHAR(5) NOT NULL, BOOLEAN NOT NULL> NOT 
NULL passed.");
 
         assertThatThrownBy(
                         () ->
                                 util.verifyRelPlan(
                                         "SELECT *\n"
                                                 + "FROM TABLE(ML_PREDICT(TABLE 
MyTable, MODEL MyModel, DESCRIPTOR(a, b), MAP['async', 'yes']))"))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining("SQL validation failed. Failed to parse 
the config.");
+                .hasCauseInstanceOf(ValidationException.class)
+                .hasStackTraceContaining("Failed to parse the config.");
 
         assertThatThrownBy(
                         () ->
                                 util.verifyRelPlan(
                                         "SELECT *\n"
                                                 + "FROM TABLE(ML_PREDICT(TABLE 
MyTable, MODEL MyModel, DESCRIPTOR(a, b), MAP['async', 'true', 
'max-concurrent-operations', '-1']))"))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "SQL validation failed. Invalid runtime config option 
'max-concurrent-operations'. Its value should be positive integer but was -1.");
+                .hasCauseInstanceOf(ValidationException.class)
+                .hasStackTraceContaining(
+                        "Invalid runtime config option 
'max-concurrent-operations'. Its value should be positive integer but was -1.");
 
         assertThatThrownBy(
                         () ->
                                 util.verifyRelPlan(
                                         "SELECT *\n"
                                                 + "FROM TABLE(ML_PREDICT(TABLE 
MyTable, MODEL MyModel, DESCRIPTOR(a, b), MAP['async', 'true', 'capacity', 
CAST(-1 AS STRING)]))"))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "SQL validation failed. Unsupported expression -1 is 
in runtime config at position line 2, column 109. Currently, runtime config 
should be be a MAP of string literals.");
+                .hasCauseInstanceOf(ValidationException.class)
+                .hasStackTraceContaining(
+                        "Config param of ML_PREDICT function should be a MAP 
of String literals.");

Review Comment:
   ```suggestion
                           "Config paramameter of ML_PREDICT function should be 
a MAP data type consisting string literals.");
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java:
##########
@@ -192,6 +194,20 @@ void testMissingUid() {
                                         + "For example: myFunction(..., uid => 
'my-id')"));
     }
 
+    @Test
+    void testNoSystemArgsAllowedForScalarPtf() {
+        util.addTemporarySystemFunction("f", NoSystemArgsScalarFunction.class);
+        assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(i => 
1);"))
+                .satisfies(anyCauseMatches("Disabling system arguments is not 
supported for PTF."));

Review Comment:
   ```suggestion
                   .satisfies(anyCauseMatches("Disabling system arguments is 
not supported for user-defined PTF."));
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java:
##########
@@ -117,6 +133,35 @@ public static InputTypeStrategy windowTimeIndicator() {
                                     
and(logical(LogicalTypeFamily.CHARACTER_STRING), LITERAL),
                                     JSON_ARGUMENT));
 
+    /** Input strategy for {@link BuiltInFunctionDefinitions#ML_PREDICT}. */
+    public static final InputTypeStrategy ML_PREDICT_INPUT_TYPE_STRATEGY =
+            new InputTypeStrategy() {

Review Comment:
   usually we put this strategy into a separate class in same package but with 
default scope. take ArrayComparableElementArgumentTypeStrategy as an example



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java:
##########
@@ -194,6 +202,44 @@ public final class SpecificTypeStrategies {
     /** Type strategy specific for {@link 
BuiltInFunctionDefinitions#OBJECT_UPDATE}. */
     public static final TypeStrategy OBJECT_UPDATE = new 
ObjectUpdateTypeStrategy();
 
+    /** Type strategy specific for {@link 
BuiltInFunctionDefinitions#ML_PREDICT}. */
+    public static final TypeStrategy ML_PREDICT_OUTPUT_TYPE_STRATEGY =
+            callContext -> {

Review Comment:
   usually we put this strategy into a separate class in same package but with 
default scope. take ObjectUpdateTypeStrategy as an example.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunctionRule.java:
##########
@@ -69,13 +71,17 @@ public boolean matches(RelOptRuleCall call) {
         }
         final RexCall rexCall = (RexCall) scan.getCall();
         final FunctionDefinition definition = 
ShortcutUtils.unwrapFunctionDefinition(rexCall);
-        return definition != null && definition.getKind() == 
FunctionKind.PROCESS_TABLE;
+        return definition != null
+                && 
!StreamPhysicalMLPredictTableFunctionRule.isMLPredictFunction(definition)
+                && definition.getKind() == FunctionKind.PROCESS_TABLE;
     }
 
     @Override
     public @Nullable RelNode convert(RelNode rel) {
         final FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) rel;
         final RexCall rexCall = (RexCall) scan.getCall();
+        validateAllowSystemArgs(rexCall);

Review Comment:
   Validating one time should be enough. So we can drop the call in 
StreamPhysicalProcessTableFunction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to