This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f79e63e7505 [FLINK-38771][table] Allow scalar args for multi-table PTFs
f79e63e7505 is described below
commit f79e63e750565f12ec6d7636e355d71120fde15b
Author: Timo Walther <[email protected]>
AuthorDate: Sun Dec 7 14:11:02 2025 +0100
[FLINK-38771][table] Allow scalar args for multi-table PTFs
This closes #27320.
---
.../table/types/inference/SystemTypeInference.java | 8 ++++--
.../stream/ProcessTableFunctionSemanticTests.java | 1 +
.../stream/ProcessTableFunctionTestPrograms.java | 31 ++++++++++++++++++++++
.../exec/stream/ProcessTableFunctionTestUtils.java | 14 ++++++++++
4 files changed, 52 insertions(+), 2 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
index fae1083f5ba..27dd4b9b981 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
@@ -182,10 +182,14 @@ public class SystemTypeInference {
}
private static void checkMultipleTableArgs(List<StaticArgument>
staticArgs) {
- if (staticArgs.stream().filter(arg ->
arg.is(StaticArgumentTrait.TABLE)).count() <= 1) {
+ final List<StaticArgument> tableArgs =
+ staticArgs.stream()
+ .filter(arg -> arg.is(StaticArgumentTrait.TABLE))
+ .collect(Collectors.toList());
+ if (tableArgs.size() <= 1) {
return;
}
- if (staticArgs.stream().anyMatch(arg ->
!arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE))) {
+ if (tableArgs.stream().anyMatch(arg ->
!arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE))) {
throw new ValidationException(
"All table arguments must use set semantics if multiple
table arguments are declared.");
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
index 27f0b42c317..cace4ad26c5 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
@@ -98,6 +98,7 @@ public class ProcessTableFunctionSemanticTests extends
SemanticTestBase {
ProcessTableFunctionTestPrograms.PROCESS_LIST_STATE,
ProcessTableFunctionTestPrograms.PROCESS_MAP_STATE,
ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT,
+
ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT_WITH_SCALAR_ARGS,
ProcessTableFunctionTestPrograms.PROCESS_STATEFUL_MULTI_INPUT_WITH_TIMEOUT,
ProcessTableFunctionTestPrograms.PROCESS_UPDATING_MULTI_INPUT);
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
index 830d4fd5c29..e65dd8319f1 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
@@ -37,6 +37,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ListStateFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MapStateFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiInputFunction;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiInputWithScalarArgsFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiStateFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.NamedTimersFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.NonNullMapStateFunction;
@@ -1417,6 +1418,36 @@ public class ProcessTableFunctionTestPrograms {
"INSERT INTO sink SELECT * FROM f(in1 => TABLE t
PARTITION BY name, in2 => TABLE city PARTITION BY name)")
.build();
+ public static final TableTestProgram PROCESS_MULTI_INPUT_WITH_SCALAR_ARGS =
+ TableTestProgram.of(
+ "process-multi-input-scalar-args",
+ "takes multiple tables and some scalar arguments")
+ .setupTemporarySystemFunction("f",
MultiInputWithScalarArgsFunction.class)
+ .setupSql(MULTI_VALUES)
+ .setupSql(CITY_VALUES)
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema(MULTI_BASE_SINK_SCHEMA)
+ .consumedValues(
+ "+I[Bob, Bob, {null, +I[Bob,
London], {A=1, B=2}, 12, +I[true, Hello]}]",
+ "+I[Bob, Bob, {+I[Bob, 12], null,
{A=1, B=2}, 12, +I[true, Hello]}]",
+ "+I[Alice, Alice, {null, +I[Alice,
Berlin], {A=1, B=2}, 12, +I[true, Hello]}]",
+ "+I[Alice, Alice, {+I[Alice, 42],
null, {A=1, B=2}, 12, +I[true, Hello]}]",
+ "+I[Charly, Charly, {null,
+I[Charly, Paris], {A=1, B=2}, 12, +I[true, Hello]}]",
+ "+I[Bob, Bob, {+I[Bob, 99], null,
{A=1, B=2}, 12, +I[true, Hello]}]",
+ "+I[Bob, Bob, {+I[Bob, 100], null,
{A=1, B=2}, 12, +I[true, Hello]}]",
+ "+I[Alice, Alice, {+I[Alice, 400],
null, {A=1, B=2}, 12, +I[true, Hello]}]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM f("
+ + "m => MAP['A', '1', 'B', '2'],"
+ + "in1 => TABLE t PARTITION BY name,"
+ + "i => 12,"
+ + "in2 => TABLE city PARTITION BY name,"
+ + "r => ROW(TRUE, 'Hello')"
+ + ")")
+ .build();
+
public static final TableTestProgram PROCESS_MULTI_INPUT_RESTORE =
TableTestProgram.of("process-multi-input-restore", "takes multiple
tables")
.setupTemporarySystemFunction("f",
MultiInputFunction.class)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
index 9c011a7b6aa..cab52688050 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
@@ -942,6 +942,20 @@ public class ProcessTableFunctionTestUtils {
}
}
+ /** Testing function. */
+ public static class MultiInputWithScalarArgsFunction extends
AppendProcessTableFunctionBase {
+ public void eval(
+ Context ctx,
+ Map<String, String> m,
+ @ArgumentHint(SET_SEMANTIC_TABLE) Row in1,
+ Integer i,
+ @ArgumentHint({SET_SEMANTIC_TABLE, OPTIONAL_PARTITION_BY}) Row
in2,
+ @DataTypeHint("ROW<b BOOLEAN, s STRING>") Row r)
+ throws Exception {
+ collectObjects(in1, in2, m, i, r);
+ }
+ }
+
/** Testing function. */
public static class TimedJoinFunction extends
AppendProcessTableFunctionBase {
public void eval(