Venkata krishnan Sowrirajan created FLINK-39669:
---------------------------------------------------
Summary: BatchPhysicalCorrelateRule /
StreamPhysicalCorrelateRule drop right-side projection when a Calc sits between
Correlate and TableFunctionScan
Key: FLINK-39669
URL: https://issues.apache.org/jira/browse/FLINK-39669
Project: Flink
Issue Type: Bug
Reporter: Venkata krishnan Sowrirajan
BatchPhysicalCorrelateRule.convertToCorrelate and
StreamPhysicalCorrelateRule.convertToCorrelate walk through a FlinkLogicalCalc
between the Correlate and the underlying FlinkLogicalTableFunctionScan to
extract the Calc's condition, but silently discard the Calc's projection list.
The physical
BatchPhysicalCorrelate / StreamPhysicalCorrelate is constructed with the
Correlate's output RelDataType (left fields + projected right fields), while
CorrelateCodeGenerator concatenates the left input with the full TFS output
positionally (JoinedRowData.replace(input, udtfInput) in
CorrelateCodeGenerator.scala). When the projection narrows the right-side
output, downstream consumers index the first N fields of (left + full TFS),
returning the wrong TFS column at runtime.
Reproduction:
For SELECT a, b, v FROM T CROSS JOIN UNNEST(c) AS f(k, v) with c of type
MAP<STRING, INT>, once a rule pushes a non-identity projection above the TFS
(e.g. Calcite's stock ProjectCorrelateTransposeRule together with the dispatch
fix in CALCITE-7511), the wrapper Project(k=$0, v=$1) is pruned to
Project(VALUE=$1). The codegen then reads position 0 of the TFS (= K) instead
of position 1 (= V):
{code:java}
Expected: Actual:
+I[1, 11, 10] +I[1, 11, a]
+I[1, 11, 11] +I[1, 11, b]
+I[2, 22, 20] +I[2, 22, c]
+I[3, 33, 30] +I[3, 33, d]
+I[3, 33, 31] +I[3, 33, e]
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)