snuyanzin commented on code in PR #25928:
URL: https://github.com/apache/flink/pull/25928#discussion_r1907250440
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java:
##########
@@ -0,0 +1,326 @@
+package org.apache.flink.table.planner.plan.stream.sql;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.ArgumentTrait;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+import org.apache.flink.table.types.inference.StaticArgument;
+import org.apache.flink.table.types.inference.StaticArgumentTrait;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.EnumSet;
+import java.util.Optional;
+
+import static
org.apache.flink.table.annotation.ArgumentTrait.OPTIONAL_PARTITION_BY;
+import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_ROW;
+import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_SET;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for the type inference and planning part of {@link
ProcessTableFunction}. */
+public class ProcessTableFunctionTest extends TableTestBase {
+
+ private TableTestUtil util;
+
+ @BeforeEach
+ void setup() {
+ util = streamTestUtil(TableConfig.getDefault());
+ util.tableEnv().executeSql("CREATE VIEW t1 AS SELECT 'Bob' AS name, 12
AS score");
+ util.tableEnv().executeSql("CREATE VIEW t2 AS SELECT 'Bob' AS name, 12
AS different");
+ util.tableEnv().executeSql("CREATE VIEW t3 AS SELECT 'Bob' AS name,
TRUE AS isValid");
+ }
+
+ @Test
+ void testScalarArgsNoUid() {
+ util.addTemporarySystemFunction("f", ScalarArgsFunction.class);
+ util.verifyRelPlan("SELECT * FROM f(i => 1, b => true)");
+ }
+
+ @Test
+ void testScalarArgsWithUid() {
+ util.addTemporarySystemFunction("f", ScalarArgsFunction.class);
+ // argument 'uid' is also reordered
+ util.verifyRelPlan("SELECT * FROM f(uid => 'my-uid', i => 1, b =>
true)");
+ }
+
+ @Test
+ void testUnknownScalarArg() {
+ util.addTemporarySystemFunction("f", ScalarArgsFunction.class);
+ // argument 'invalid' is ignored
+ util.verifyRelPlan("SELECT * FROM f(i => 1, b => true, invalid =>
'invalid')");
+ }
+
+ @Test
+ void testInvalidUid() {
+ util.addTemporarySystemFunction("f", ScalarArgsFunction.class);
+ assertThatThrownBy(
+ () -> util.verifyRelPlan("SELECT * FROM f(uid => '%',
i => 1, b => true)"))
+ .hasRootCauseMessage(
+ "Invalid unique identifier for process table function.
"
+ + "The 'uid' argument must be a string literal
that follows the pattern [a-zA-Z_][a-zA-Z-_0-9]*. "
+ + "But found: %");
+ }
+
+ @Test
+ void testTableAsRow() {
+ util.addTemporarySystemFunction("f", TableAsRowFunction.class);
+ assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)");
+ }
+
+ @Test
+ void testTypedTableAsRow() {
+ util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class);
+ assertReachesOptimizer("SELECT * FROM f(u => TABLE t1, i => 1)");
+ }
+
+ @Test
+ void testTypedTableAsRowIgnoringColumnNames() {
+ util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class);
+ // function expects <STRING name, INT score>
+ // but table is <STRING name, INT different>
+ assertReachesOptimizer("SELECT * FROM f(u => TABLE t2, i => 1)");
+ }
+
+ @Test
+ void testTypedTableAsRowWithInvalidInput() {
+ util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class);
+ // function expects <STRING name, INT score>
+ assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(u =>
TABLE t3, i => 1)"))
+ .hasMessageContaining(
+ "No match found for function signature "
+ + "f(<RecordType(CHAR(3) name, BOOLEAN
isValid)>, <NUMERIC>, <CHARACTER>)");
+ }
+
+ @Test
+ void testTableAsSet() {
+ util.addTemporarySystemFunction("f", TableAsSetFunction.class);
+ assertReachesOptimizer("SELECT * FROM f(r => TABLE t1 PARTITION BY
name, i => 1)");
+ }
+
+ @Test
+ void testTableAsSetWithInvalidPartitionBy() {
+ util.addTemporarySystemFunction("f", TableAsSetFunction.class);
+ assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(r =>
TABLE t1, i => 1)"))
+ .hasRootCauseMessage(
+ "Table argument 'r' requires a PARTITION BY clause for
parallel processing.");
+ }
+
+ @Test
+ void testTableAsSetOptionalPartitionBy() {
+ util.addTemporarySystemFunction("f",
TableAsSetOptionalPartitionFunction.class);
+ assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)");
+ }
+
+ @Test
+ void testTypedTableAsSet() {
+ util.addTemporarySystemFunction("f", TypedTableAsSetFunction.class);
+ assertReachesOptimizer("SELECT * FROM f(u => TABLE t1 PARTITION BY
name, i => 1)");
+ }
+
+ @Test
+ void testTypedTableAsSetWithInvalidInput() {
+ util.addTemporarySystemFunction("f", TypedTableAsSetFunction.class);
+ // function expects <STRING name, INT score>
+ assertThatThrownBy(
+ () ->
+ util.verifyRelPlan(
+ "SELECT * FROM f(u => TABLE t3
PARTITION BY name, i => 1)"))
+ .hasMessageContaining(
+ "No match found for function signature "
+ + "f(<RecordType(CHAR(3) name, BOOLEAN
isValid)>, <NUMERIC>, <CHARACTER>)");
+ }
+
+ @Test
+ void testEmptyArgs() {
+ util.addTemporarySystemFunction("f", EmptyArgFunction.class);
+ util.verifyRelPlan("SELECT * FROM f(uid => 'my-ptf')");
+ }
+
+ @Test
+ void testPojoArgs() {
+ util.addTemporarySystemFunction("f", PojoArgsFunction.class);
+ util.addTemporarySystemFunction("pojoCreator",
PojoCreatingFunction.class);
+ assertReachesOptimizer(
+ "SELECT * FROM f(input => TABLE t1, scalar =>
pojoCreator('Bob', 12), uid => 'my-ptf')");
+ }
+
+ @Test
+ void testInvalidTableFunction() {
+ util.addTemporarySystemFunction("f", NoProcessTableFunction.class);
+ assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(r =>
TABLE t1)"))
+ .hasRootCauseMessage(
+ "Only scalar arguments are supported at this location.
"
+ + "But argument 'r' declared the following
traits: [TABLE, TABLE_AS_ROW]");
+ }
+
+ @Test
+ void testReservedArg() {
+ util.addTemporarySystemFunction("f", ReservedArgFunction.class);
+ assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(uid =>
'my-ptf')"))
+ .hasRootCauseMessage(
+ "Function signature must not declare system arguments.
Reserved argument names are: [uid]");
+ }
Review Comment:
Not sure how easy it is for `success` cases
however at least for `fail` cases wouldn't it make sense to replace them
with a couple of parameterized tests?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]