This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f325b4a8d59 [FLINK-39575][table-planner] Fix argument order for
pass-through table arguments
f325b4a8d59 is described below
commit f325b4a8d5914d218b394c8a7e2f7e4f0c27a358
Author: Gustavo de Morais <[email protected]>
AuthorDate: Wed Apr 29 17:39:37 2026 +0200
[FLINK-39575][table-planner] Fix argument order for pass-through table
arguments
This closes #28063.
---
.../stream/StreamPhysicalProcessTableFunction.java | 2 +-
.../stream/ProcessTableFunctionSemanticTests.java | 2 ++
.../stream/ProcessTableFunctionTestPrograms.java | 20 ++++++++++++++++++++
.../exec/stream/ProcessTableFunctionTestUtils.java | 12 ++++++++++++
4 files changed, 35 insertions(+), 1 deletion(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
index 56a2bf1f6fc..5ccecf18e71 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
@@ -418,7 +418,7 @@ public class StreamPhysicalProcessTableFunction extends
AbstractRelNode
return 0;
}
final RexTableArgCall tableArg =
(RexTableArgCall) operand.e;
- final StaticArgument staticArg =
staticArgs.get(0);
+ final StaticArgument staticArg =
staticArgs.get(operand.i);
if
(staticArg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH)) {
return
tableArg.getType().getFieldCount();
} else {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
index aa547037a13..e8330e0075b 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
@@ -60,6 +60,8 @@ public class ProcessTableFunctionSemanticTests extends
SemanticTestBase {
ProcessTableFunctionTestPrograms.PROCESS_INTERVAL_YEAR_ARGS,
ProcessTableFunctionTestPrograms.PROCESS_EMPTY_ARGS,
ProcessTableFunctionTestPrograms.PROCESS_ROW_SEMANTIC_TABLE_PASS_THROUGH,
+ ProcessTableFunctionTestPrograms
+ .PROCESS_ROW_SEMANTIC_TABLE_PASS_THROUGH_AFTER_SCALAR,
ProcessTableFunctionTestPrograms.PROCESS_SET_SEMANTIC_TABLE_PASS_THROUGH,
ProcessTableFunctionTestPrograms.PROCESS_UPDATING_INPUT_RETRACT,
ProcessTableFunctionTestPrograms.PROCESS_UPDATING_INPUT_UPSERT,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
index 062de5c2416..5e3c54d803a 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
@@ -55,6 +55,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTablePassThroughFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ScalarArgsFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ScalarArgsTimeFunction;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ScalarBeforeRowSemanticTablePassThroughFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTableFullDeletesArgFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTableFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTableOptionalPartitionFunction;
@@ -430,6 +431,25 @@ public class ProcessTableFunctionTestPrograms {
.runSql("INSERT INTO sink SELECT * FROM f(r => TABLE t, i
=> 1)")
.build();
+ // Same as PROCESS_ROW_SEMANTIC_TABLE_PASS_THROUGH but the table argument
is at operand
+ // position 1
+ public static final TableTestProgram
PROCESS_ROW_SEMANTIC_TABLE_PASS_THROUGH_AFTER_SCALAR =
+ TableTestProgram.of(
+ "process-row-pass-through-after-scalar",
+ "pass columns through with table after scalar
argument")
+ .setupTemporarySystemFunction(
+ "f",
ScalarBeforeRowSemanticTablePassThroughFunction.class)
+ .setupSql(BASIC_VALUES)
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema(PASS_THROUGH_BASE_SINK_SCHEMA)
+ .consumedValues(
+ "+I[Bob, 12, {+I[Bob, 12], 1}]",
+ "+I[Alice, 42, {+I[Alice, 42],
1}]")
+ .build())
+ .runSql("INSERT INTO sink SELECT * FROM f(i => 1, r =>
TABLE t)")
+ .build();
+
public static final TableTestProgram
PROCESS_SET_SEMANTIC_TABLE_PASS_THROUGH =
TableTestProgram.of("process-set-pass-through", "pass columns
through enabled")
.setupTemporarySystemFunction("f",
SetSemanticTablePassThroughFunction.class)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
index 495293c9244..8d83e51770d 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
@@ -383,6 +383,18 @@ public class ProcessTableFunctionTestUtils {
}
}
+ /**
+ * Same as {@link RowSemanticTablePassThroughFunction} but with the scalar
argument before the
+ * pass-through table.
+ */
+ public static class ScalarBeforeRowSemanticTablePassThroughFunction
+ extends AppendProcessTableFunctionBase {
+ public void eval(
+ Integer i, @ArgumentHint({ROW_SEMANTIC_TABLE,
PASS_COLUMNS_THROUGH}) Row r) {
+ collectObjects(r, i);
+ }
+ }
+
/** Testing function. */
public static class SetSemanticTablePassThroughFunction extends
AppendProcessTableFunctionBase {
public void eval(