Venkata krishnan Sowrirajan created FLINK-39669:
---------------------------------------------------

             Summary: BatchPhysicalCorrelateRule /   
StreamPhysicalCorrelateRule drop right-side projection when a Calc sits between 
Correlate and TableFunctionScan
                 Key: FLINK-39669
                 URL: https://issues.apache.org/jira/browse/FLINK-39669
             Project: Flink
          Issue Type: Bug
            Reporter: Venkata krishnan Sowrirajan


BatchPhysicalCorrelateRule.convertToCorrelate and 
StreamPhysicalCorrelateRule.convertToCorrelate walk through a FlinkLogicalCalc 
between the Correlate and the underlying FlinkLogicalTableFunctionScan to 
extract the Calc's condition, but silently discard the Calc's projection list. 
The physical
BatchPhysicalCorrelate / StreamPhysicalCorrelate is constructed with the 
Correlate's output RelDataType (left fields + projected right fields), while
CorrelateCodeGenerator concatenates the left input with the full TFS output 
positionally (JoinedRowData.replace(input, udtfInput) in
CorrelateCodeGenerator.scala). When the projection narrows the right-side 
output, downstream consumers index the first N fields of (left + full TFS),
returning the wrong TFS column at runtime.

Reproduction:

For SELECT a, b, v FROM T CROSS JOIN UNNEST(c) AS f(k, v) with c of type 
MAP<STRING, INT>, once a rule pushes a non-identity projection above the TFS
(e.g. Calcite's stock ProjectCorrelateTransposeRule together with the dispatch 
fix in CALCITE-7511), the wrapper Project(k=$0, v=$1) is pruned to
Project(VALUE=$1). The codegen then reads position 0 of the TFS (= K) instead 
of position 1 (= V):


{code:java}
Expected:                  Actual:
+I[1, 11, 10]              +I[1, 11, a]
+I[1, 11, 11]              +I[1, 11, b]
+I[2, 22, 20]              +I[2, 22, c]
+I[3, 33, 30]              +I[3, 33, d]
+I[3, 33, 31]              +I[3, 33, e] 
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to