snuyanzin commented on code in PR #25928:
URL: https://github.com/apache/flink/pull/25928#discussion_r1907250440


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java:
##########
@@ -0,0 +1,326 @@
+package org.apache.flink.table.planner.plan.stream.sql;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.ArgumentTrait;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+import org.apache.flink.table.types.inference.StaticArgument;
+import org.apache.flink.table.types.inference.StaticArgumentTrait;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.EnumSet;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.annotation.ArgumentTrait.OPTIONAL_PARTITION_BY;
+import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_ROW;
+import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_SET;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for the type inference and planning part of {@link 
ProcessTableFunction}. */
+public class ProcessTableFunctionTest extends TableTestBase {
+
+    private TableTestUtil util;
+
+    @BeforeEach
+    void setup() {
+        util = streamTestUtil(TableConfig.getDefault());
+        util.tableEnv().executeSql("CREATE VIEW t1 AS SELECT 'Bob' AS name, 12 
AS score");
+        util.tableEnv().executeSql("CREATE VIEW t2 AS SELECT 'Bob' AS name, 12 
AS different");
+        util.tableEnv().executeSql("CREATE VIEW t3 AS SELECT 'Bob' AS name, 
TRUE AS isValid");
+    }
+
+    @Test
+    void testScalarArgsNoUid() {
+        util.addTemporarySystemFunction("f", ScalarArgsFunction.class);
+        util.verifyRelPlan("SELECT * FROM f(i => 1, b => true)");
+    }
+
+    @Test
+    void testScalarArgsWithUid() {
+        util.addTemporarySystemFunction("f", ScalarArgsFunction.class);
+        // argument 'uid' is also reordered
+        util.verifyRelPlan("SELECT * FROM f(uid => 'my-uid', i => 1, b => 
true)");
+    }
+
+    @Test
+    void testUnknownScalarArg() {
+        util.addTemporarySystemFunction("f", ScalarArgsFunction.class);
+        // argument 'invalid' is ignored
+        util.verifyRelPlan("SELECT * FROM f(i => 1, b => true, invalid => 
'invalid')");
+    }
+
+    @Test
+    void testInvalidUid() {
+        util.addTemporarySystemFunction("f", ScalarArgsFunction.class);
+        assertThatThrownBy(
+                        () -> util.verifyRelPlan("SELECT * FROM f(uid => '%', 
i => 1, b => true)"))
+                .hasRootCauseMessage(
+                        "Invalid unique identifier for process table function. 
"
+                                + "The 'uid' argument must be a string literal 
that follows the pattern [a-zA-Z_][a-zA-Z-_0-9]*. "
+                                + "But found: %");
+    }
+
+    @Test
+    void testTableAsRow() {
+        util.addTemporarySystemFunction("f", TableAsRowFunction.class);
+        assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)");
+    }
+
+    @Test
+    void testTypedTableAsRow() {
+        util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class);
+        assertReachesOptimizer("SELECT * FROM f(u => TABLE t1, i => 1)");
+    }
+
+    @Test
+    void testTypedTableAsRowIgnoringColumnNames() {
+        util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class);
+        // function expects <STRING name, INT score>
+        // but table is <STRING name, INT different>
+        assertReachesOptimizer("SELECT * FROM f(u => TABLE t2, i => 1)");
+    }
+
+    @Test
+    void testTypedTableAsRowWithInvalidInput() {
+        util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class);
+        // function expects <STRING name, INT score>
+        assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(u => 
TABLE t3, i => 1)"))
+                .hasMessageContaining(
+                        "No match found for function signature "
+                                + "f(<RecordType(CHAR(3) name, BOOLEAN 
isValid)>, <NUMERIC>, <CHARACTER>)");
+    }
+
+    @Test
+    void testTableAsSet() {
+        util.addTemporarySystemFunction("f", TableAsSetFunction.class);
+        assertReachesOptimizer("SELECT * FROM f(r => TABLE t1 PARTITION BY 
name, i => 1)");
+    }
+
+    @Test
+    void testTableAsSetWithInvalidPartitionBy() {
+        util.addTemporarySystemFunction("f", TableAsSetFunction.class);
+        assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(r => 
TABLE t1, i => 1)"))
+                .hasRootCauseMessage(
+                        "Table argument 'r' requires a PARTITION BY clause for 
parallel processing.");
+    }
+
+    @Test
+    void testTableAsSetOptionalPartitionBy() {
+        util.addTemporarySystemFunction("f", 
TableAsSetOptionalPartitionFunction.class);
+        assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)");
+    }
+
+    @Test
+    void testTypedTableAsSet() {
+        util.addTemporarySystemFunction("f", TypedTableAsSetFunction.class);
+        assertReachesOptimizer("SELECT * FROM f(u => TABLE t1 PARTITION BY 
name, i => 1)");
+    }
+
+    @Test
+    void testTypedTableAsSetWithInvalidInput() {
+        util.addTemporarySystemFunction("f", TypedTableAsSetFunction.class);
+        // function expects <STRING name, INT score>
+        assertThatThrownBy(
+                        () ->
+                                util.verifyRelPlan(
+                                        "SELECT * FROM f(u => TABLE t3 
PARTITION BY name, i => 1)"))
+                .hasMessageContaining(
+                        "No match found for function signature "
+                                + "f(<RecordType(CHAR(3) name, BOOLEAN 
isValid)>, <NUMERIC>, <CHARACTER>)");
+    }
+
+    @Test
+    void testEmptyArgs() {
+        util.addTemporarySystemFunction("f", EmptyArgFunction.class);
+        util.verifyRelPlan("SELECT * FROM f(uid => 'my-ptf')");
+    }
+
+    @Test
+    void testPojoArgs() {
+        util.addTemporarySystemFunction("f", PojoArgsFunction.class);
+        util.addTemporarySystemFunction("pojoCreator", 
PojoCreatingFunction.class);
+        assertReachesOptimizer(
+                "SELECT * FROM f(input => TABLE t1, scalar => 
pojoCreator('Bob', 12), uid => 'my-ptf')");
+    }
+
+    @Test
+    void testInvalidTableFunction() {
+        util.addTemporarySystemFunction("f", NoProcessTableFunction.class);
+        assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(r => 
TABLE t1)"))
+                .hasRootCauseMessage(
+                        "Only scalar arguments are supported at this location. 
"
+                                + "But argument 'r' declared the following 
traits: [TABLE, TABLE_AS_ROW]");
+    }
+
+    @Test
+    void testReservedArg() {
+        util.addTemporarySystemFunction("f", ReservedArgFunction.class);
+        assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(uid => 
'my-ptf')"))
+                .hasRootCauseMessage(
+                        "Function signature must not declare system arguments. 
Reserved argument names are: [uid]");
+    }

Review Comment:
   Not sure how easy it is for `success` cases
   however at least for `fail` cases wouldn't it make sense to replace them 
with a couple of parameterized tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to