[ 
https://issues.apache.org/jira/browse/FLINK-39411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Natea Eshetu Beshada closed FLINK-39411.
----------------------------------------
    Resolution: Invalid

> PTF code generation uses wrong type for table arguments — METADATA VIRTUAL 
> columns inaccessible
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39411
>                 URL: https://issues.apache.org/jira/browse/FLINK-39411
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.0.0, 2.1.0, 2.2.0
>            Reporter: Natea Eshetu Beshada
>            Priority: Major
>
> When a table with METADATA VIRTUAL columns (e.g., Kafka headers MAP<STRING, 
> STRING> METADATA VIRTUAL) is passed as a PTF table argument, accessing the 
> metadata column by name fails at runtime:
> Unknown field name 'headers' for mapping to a position.
>  
> Root Cause
> ProcessTableRunnerGenerator.generateEvalOperands() (line 350) uses 
> call.getType() — the PTF's return type — instead of tableArgCall.getType() — 
> the table argument's type — when generating code for RexTableArgCall operands.
> {{  // Bug:}}
> {{  val tableType = FlinkTypeFactory.toLogicalType(call.getType).copy(true)}}
> {{  // Fix:}}
> {{  val tableType = 
> FlinkTypeFactory.toLogicalType(tableArgCall.getType).copy(true)}}
> The tableType is used for the GeneratedExpression result type, which flows 
> into RowRowConverter field mapping. Using the PTF output type instead of the 
> table input type causes field name/count mismatches at runtime.
>  
> The upstream pipeline is correct:
>   - CatalogSchemaTable.getRowType() correctly includes metadata columns 
> (required by Calcite contract)
>   - DynamicSourceUtils.convertSourceToRel() correctly adds 
> pushMetadataProjection() for the table scan input
>   - FlinkConvertletTable.convertTableArgs() correctly stores the full table 
> type (including metadata) in RexTableArgCall
>   - PTF extraction stores ROW<> placeholder for TABLE arguments; actual 
> schema is resolved at planning time
>  
>   Steps to Reproduce
> {{  -- Table with METADATA VIRTUAL column (e.g., Kafka-backed table with 
> headers)}}
> {{  DESCRIBE EXTENDED my_table;}}
> {{  -- shows: headers | MAP<STRING, STRING> | NULL | METADATA VIRTUAL}}
> {{  -- PTF that accesses 'headers' from input row}}
> {{  SELECT * FROM my_ptf(}}
> {{    input => TABLE my_table PARTITION BY key}}
> {{  );}}
> {{  -- Error: Unknown field name 'headers' for mapping to a position.}}
> {{  -- Compare with regular query (works fine):}}
> {{  SELECT headers FROM my_table;}}
> {{  -- Succeeds — DynamicSourceUtils adds metadata projection}}
>  
> Workaround
> Materialize metadata into a physical column via subquery:
> {{  SELECT * FROM my_ptf(}}
> {{    input => TABLE (SELECT *, headers FROM my_table) PARTITION BY key}}
> {{  )}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to