This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f325b4a8d59 [FLINK-39575][table-planner] Fix argument order for 
pass-through table arguments
f325b4a8d59 is described below

commit f325b4a8d5914d218b394c8a7e2f7e4f0c27a358
Author: Gustavo de Morais <[email protected]>
AuthorDate: Wed Apr 29 17:39:37 2026 +0200

    [FLINK-39575][table-planner] Fix argument order for pass-through table 
arguments
    
    This closes #28063.
---
 .../stream/StreamPhysicalProcessTableFunction.java   |  2 +-
 .../stream/ProcessTableFunctionSemanticTests.java    |  2 ++
 .../stream/ProcessTableFunctionTestPrograms.java     | 20 ++++++++++++++++++++
 .../exec/stream/ProcessTableFunctionTestUtils.java   | 12 ++++++++++++
 4 files changed, 35 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
index 56a2bf1f6fc..5ccecf18e71 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
@@ -418,7 +418,7 @@ public class StreamPhysicalProcessTableFunction extends 
AbstractRelNode
                                         return 0;
                                     }
                                     final RexTableArgCall tableArg = 
(RexTableArgCall) operand.e;
-                                    final StaticArgument staticArg = 
staticArgs.get(0);
+                                    final StaticArgument staticArg = 
staticArgs.get(operand.i);
                                     if 
(staticArg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH)) {
                                         return 
tableArg.getType().getFieldCount();
                                     } else {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
index aa547037a13..e8330e0075b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
@@ -60,6 +60,8 @@ public class ProcessTableFunctionSemanticTests extends 
SemanticTestBase {
                 ProcessTableFunctionTestPrograms.PROCESS_INTERVAL_YEAR_ARGS,
                 ProcessTableFunctionTestPrograms.PROCESS_EMPTY_ARGS,
                 
ProcessTableFunctionTestPrograms.PROCESS_ROW_SEMANTIC_TABLE_PASS_THROUGH,
+                ProcessTableFunctionTestPrograms
+                        .PROCESS_ROW_SEMANTIC_TABLE_PASS_THROUGH_AFTER_SCALAR,
                 
ProcessTableFunctionTestPrograms.PROCESS_SET_SEMANTIC_TABLE_PASS_THROUGH,
                 
ProcessTableFunctionTestPrograms.PROCESS_UPDATING_INPUT_RETRACT,
                 ProcessTableFunctionTestPrograms.PROCESS_UPDATING_INPUT_UPSERT,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
index 062de5c2416..5e3c54d803a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
@@ -55,6 +55,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTablePassThroughFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ScalarArgsFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ScalarArgsTimeFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ScalarBeforeRowSemanticTablePassThroughFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTableFullDeletesArgFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTableFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTableOptionalPartitionFunction;
@@ -430,6 +431,25 @@ public class ProcessTableFunctionTestPrograms {
                     .runSql("INSERT INTO sink SELECT * FROM f(r => TABLE t, i 
=> 1)")
                     .build();
 
+    // Same as PROCESS_ROW_SEMANTIC_TABLE_PASS_THROUGH but the table argument 
is at operand
+    // position 1
+    public static final TableTestProgram 
PROCESS_ROW_SEMANTIC_TABLE_PASS_THROUGH_AFTER_SCALAR =
+            TableTestProgram.of(
+                            "process-row-pass-through-after-scalar",
+                            "pass columns through with table after scalar 
argument")
+                    .setupTemporarySystemFunction(
+                            "f", 
ScalarBeforeRowSemanticTablePassThroughFunction.class)
+                    .setupSql(BASIC_VALUES)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(PASS_THROUGH_BASE_SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[Bob, 12, {+I[Bob, 12], 1}]",
+                                            "+I[Alice, 42, {+I[Alice, 42], 
1}]")
+                                    .build())
+                    .runSql("INSERT INTO sink SELECT * FROM f(i => 1, r => 
TABLE t)")
+                    .build();
+
     public static final TableTestProgram 
PROCESS_SET_SEMANTIC_TABLE_PASS_THROUGH =
             TableTestProgram.of("process-set-pass-through", "pass columns 
through enabled")
                     .setupTemporarySystemFunction("f", 
SetSemanticTablePassThroughFunction.class)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
index 495293c9244..8d83e51770d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
@@ -383,6 +383,18 @@ public class ProcessTableFunctionTestUtils {
         }
     }
 
+    /**
+     * Same as {@link RowSemanticTablePassThroughFunction} but with the scalar 
argument before the
+     * pass-through table.
+     */
+    public static class ScalarBeforeRowSemanticTablePassThroughFunction
+            extends AppendProcessTableFunctionBase {
+        public void eval(
+                Integer i, @ArgumentHint({ROW_SEMANTIC_TABLE, 
PASS_COLUMNS_THROUGH}) Row r) {
+            collectObjects(r, i);
+        }
+    }
+
     /** Testing function. */
     public static class SetSemanticTablePassThroughFunction extends 
AppendProcessTableFunctionBase {
         public void eval(

Reply via email to