[ 
https://issues.apache.org/jira/browse/FLINK-39669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39669:
-----------------------------------
    Labels: pull-request-available  (was: )

> BatchPhysicalCorrelateRule /   StreamPhysicalCorrelateRule drop right-side 
> projection when a Calc sits between Correlate and TableFunctionScan
> ----------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39669
>                 URL: https://issues.apache.org/jira/browse/FLINK-39669
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Venkata krishnan Sowrirajan
>            Priority: Major
>              Labels: pull-request-available
>
> BatchPhysicalCorrelateRule.convertToCorrelate and 
> StreamPhysicalCorrelateRule.convertToCorrelate walk through a 
> FlinkLogicalCalc between the Correlate and the underlying 
> FlinkLogicalTableFunctionScan to extract the Calc's condition, but silently 
> discard the Calc's projection list. The physical
> BatchPhysicalCorrelate / StreamPhysicalCorrelate is constructed with the 
> Correlate's output RelDataType (left fields + projected right fields), while
> CorrelateCodeGenerator concatenates the left input with the full TFS output 
> positionally (JoinedRowData.replace(input, udtfInput) in
> CorrelateCodeGenerator.scala). When the projection narrows the right-side 
> output, downstream consumers index the first N fields of (left + full TFS),
> returning the wrong TFS column at runtime.
> Reproduction:
> For SELECT a, b, v FROM T CROSS JOIN UNNEST(c) AS f(k, v) with c of type 
> MAP<STRING, INT>, once a rule pushes a non-identity projection above the TFS
> (e.g. Calcite's stock ProjectCorrelateTransposeRule together with the 
> dispatch fix in CALCITE-7511), the wrapper Project(k=$0, v=$1) is pruned to
> Project(VALUE=$1). The codegen then reads position 0 of the TFS (= K) instead 
> of position 1 (= V):
> {code:java}
> Expected:                  Actual:
> +I[1, 11, 10]              +I[1, 11, a]
> +I[1, 11, 11]              +I[1, 11, b]
> +I[2, 22, 20]              +I[2, 22, c]
> +I[3, 33, 30]              +I[3, 33, d]
> +I[3, 33, 31]              +I[3, 33, e]       
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to