[
https://issues.apache.org/jira/browse/FLINK-39411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Natea Eshetu Beshada closed FLINK-39411.
----------------------------------------
Resolution: Invalid
> PTF code generation uses wrong type for table arguments — METADATA VIRTUAL
> columns inaccessible
> -----------------------------------------------------------------------------------------------
>
> Key: FLINK-39411
> URL: https://issues.apache.org/jira/browse/FLINK-39411
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 2.0.0, 2.1.0, 2.2.0
> Reporter: Natea Eshetu Beshada
> Priority: Major
>
> When a table with METADATA VIRTUAL columns (e.g., Kafka headers MAP<STRING,
> STRING> METADATA VIRTUAL) is passed as a PTF table argument, accessing the
> metadata column by name fails at runtime:
> Unknown field name 'headers' for mapping to a position.
>
> Root Cause
> ProcessTableRunnerGenerator.generateEvalOperands() (line 350) uses
> call.getType() — the PTF's return type — instead of tableArgCall.getType() —
> the table argument's type — when generating code for RexTableArgCall operands.
> {{ // Bug:}}
> {{ val tableType = FlinkTypeFactory.toLogicalType(call.getType).copy(true)}}
> {{ // Fix:}}
> {{ val tableType =
> FlinkTypeFactory.toLogicalType(tableArgCall.getType).copy(true)}}
> The tableType is used for the GeneratedExpression result type, which flows
> into RowRowConverter field mapping. Using the PTF output type instead of the
> table input type causes field name/count mismatches at runtime.
>
> The upstream pipeline is correct:
> - CatalogSchemaTable.getRowType() correctly includes metadata columns
> (required by Calcite contract)
> - DynamicSourceUtils.convertSourceToRel() correctly adds
> pushMetadataProjection() for the table scan input
> - FlinkConvertletTable.convertTableArgs() correctly stores the full table
> type (including metadata) in RexTableArgCall
> - PTF extraction stores ROW<> placeholder for TABLE arguments; actual
> schema is resolved at planning time
>
> Steps to Reproduce
> {{ -- Table with METADATA VIRTUAL column (e.g., Kafka-backed table with
> headers)}}
> {{ DESCRIBE EXTENDED my_table;}}
> {{ -- shows: headers | MAP<STRING, STRING> | NULL | METADATA VIRTUAL}}
> {{ -- PTF that accesses 'headers' from input row}}
> {{ SELECT * FROM my_ptf(}}
> {{ input => TABLE my_table PARTITION BY key}}
> {{ );}}
> {{ -- Error: Unknown field name 'headers' for mapping to a position.}}
> {{ -- Compare with regular query (works fine):}}
> {{ SELECT headers FROM my_table;}}
> {{ -- Succeeds — DynamicSourceUtils adds metadata projection}}
>
> Workaround
> Materialize metadata into a physical column via subquery:
> {{ SELECT * FROM my_ptf(}}
> {{ input => TABLE (SELECT *, headers FROM my_table) PARTITION BY key}}
> {{ )}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)