This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f79e63e7505 [FLINK-38771][table] Allow scalar args for multi-table PTFs
f79e63e7505 is described below

commit f79e63e750565f12ec6d7636e355d71120fde15b
Author: Timo Walther <[email protected]>
AuthorDate: Sun Dec 7 14:11:02 2025 +0100

    [FLINK-38771][table] Allow scalar args for multi-table PTFs
    
    This closes #27320.
---
 .../table/types/inference/SystemTypeInference.java |  8 ++++--
 .../stream/ProcessTableFunctionSemanticTests.java  |  1 +
 .../stream/ProcessTableFunctionTestPrograms.java   | 31 ++++++++++++++++++++++
 .../exec/stream/ProcessTableFunctionTestUtils.java | 14 ++++++++++
 4 files changed, 52 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
index fae1083f5ba..27dd4b9b981 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
@@ -182,10 +182,14 @@ public class SystemTypeInference {
     }
 
     private static void checkMultipleTableArgs(List<StaticArgument> 
staticArgs) {
-        if (staticArgs.stream().filter(arg -> 
arg.is(StaticArgumentTrait.TABLE)).count() <= 1) {
+        final List<StaticArgument> tableArgs =
+                staticArgs.stream()
+                        .filter(arg -> arg.is(StaticArgumentTrait.TABLE))
+                        .collect(Collectors.toList());
+        if (tableArgs.size() <= 1) {
             return;
         }
-        if (staticArgs.stream().anyMatch(arg -> 
!arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE))) {
+        if (tableArgs.stream().anyMatch(arg -> 
!arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE))) {
             throw new ValidationException(
                     "All table arguments must use set semantics if multiple 
table arguments are declared.");
         }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
index 27f0b42c317..cace4ad26c5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
@@ -98,6 +98,7 @@ public class ProcessTableFunctionSemanticTests extends 
SemanticTestBase {
                 ProcessTableFunctionTestPrograms.PROCESS_LIST_STATE,
                 ProcessTableFunctionTestPrograms.PROCESS_MAP_STATE,
                 ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT,
+                
ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT_WITH_SCALAR_ARGS,
                 
ProcessTableFunctionTestPrograms.PROCESS_STATEFUL_MULTI_INPUT_WITH_TIMEOUT,
                 ProcessTableFunctionTestPrograms.PROCESS_UPDATING_MULTI_INPUT);
     }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
index 830d4fd5c29..e65dd8319f1 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ListStateFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MapStateFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiInputFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiInputWithScalarArgsFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiStateFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.NamedTimersFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.NonNullMapStateFunction;
@@ -1417,6 +1418,36 @@ public class ProcessTableFunctionTestPrograms {
                             "INSERT INTO sink SELECT * FROM f(in1 => TABLE t 
PARTITION BY name, in2 => TABLE city PARTITION BY name)")
                     .build();
 
+    public static final TableTestProgram PROCESS_MULTI_INPUT_WITH_SCALAR_ARGS =
+            TableTestProgram.of(
+                            "process-multi-input-scalar-args",
+                            "takes multiple tables and some scalar arguments")
+                    .setupTemporarySystemFunction("f", 
MultiInputWithScalarArgsFunction.class)
+                    .setupSql(MULTI_VALUES)
+                    .setupSql(CITY_VALUES)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(MULTI_BASE_SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[Bob, Bob, {null, +I[Bob, 
London], {A=1, B=2}, 12, +I[true, Hello]}]",
+                                            "+I[Bob, Bob, {+I[Bob, 12], null, 
{A=1, B=2}, 12, +I[true, Hello]}]",
+                                            "+I[Alice, Alice, {null, +I[Alice, 
Berlin], {A=1, B=2}, 12, +I[true, Hello]}]",
+                                            "+I[Alice, Alice, {+I[Alice, 42], 
null, {A=1, B=2}, 12, +I[true, Hello]}]",
+                                            "+I[Charly, Charly, {null, 
+I[Charly, Paris], {A=1, B=2}, 12, +I[true, Hello]}]",
+                                            "+I[Bob, Bob, {+I[Bob, 99], null, 
{A=1, B=2}, 12, +I[true, Hello]}]",
+                                            "+I[Bob, Bob, {+I[Bob, 100], null, 
{A=1, B=2}, 12, +I[true, Hello]}]",
+                                            "+I[Alice, Alice, {+I[Alice, 400], 
null, {A=1, B=2}, 12, +I[true, Hello]}]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM f("
+                                    + "m => MAP['A', '1', 'B', '2'],"
+                                    + "in1 => TABLE t PARTITION BY name,"
+                                    + "i => 12,"
+                                    + "in2 => TABLE city PARTITION BY name,"
+                                    + "r => ROW(TRUE, 'Hello')"
+                                    + ")")
+                    .build();
+
     public static final TableTestProgram PROCESS_MULTI_INPUT_RESTORE =
             TableTestProgram.of("process-multi-input-restore", "takes multiple 
tables")
                     .setupTemporarySystemFunction("f", 
MultiInputFunction.class)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
index 9c011a7b6aa..cab52688050 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
@@ -942,6 +942,20 @@ public class ProcessTableFunctionTestUtils {
         }
     }
 
+    /** Testing function. */
+    public static class MultiInputWithScalarArgsFunction extends 
AppendProcessTableFunctionBase {
+        public void eval(
+                Context ctx,
+                Map<String, String> m,
+                @ArgumentHint(SET_SEMANTIC_TABLE) Row in1,
+                Integer i,
+                @ArgumentHint({SET_SEMANTIC_TABLE, OPTIONAL_PARTITION_BY}) Row 
in2,
+                @DataTypeHint("ROW<b BOOLEAN, s STRING>") Row r)
+                throws Exception {
+            collectObjects(in1, in2, m, i, r);
+        }
+    }
+
     /** Testing function. */
     public static class TimedJoinFunction extends 
AppendProcessTableFunctionBase {
         public void eval(

Reply via email to