[
https://issues.apache.org/jira/browse/FLINK-39735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083026#comment-18083026
]
Jubin Soni commented on FLINK-39735:
------------------------------------
For the approach, I’ll expose upsert keys end-to-end for PTFs by adding
{{TableSemantics#upsertKeyColumns()}} with a default empty array for
compatibility. I’ll wire it through {{{}RuntimeTableSemantics{}}}, derive it in
the planner via {{{}FlinkRelMetadataQuery.getUpsertKeys(){}}}, and collapse per
input using {{{}UpsertKeyUtil.smallestKey(...){}}}.
I’ll pass it through {{StreamPhysicalProcessTableFunction}} and
{{StreamExecProcessTableFunction}} (with JSON back-compat), and extend the
specialization/type-inference path so it’s available at construction time
(before metadata is available, it returns {{{}new int[0]{}}}).
Finally, I’ll update codegen and {{TO_CHANGELOG}} to consume it, with fallback
to existing {{PARTITION BY}} behavior when the upsert key is absent.
Can someone assign this ticket to me?
> Expose input upsert key on TableSemantics for ProcessTableFunctions
> -------------------------------------------------------------------
>
> Key: FLINK-39735
> URL: https://issues.apache.org/jira/browse/FLINK-39735
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / API
> Reporter: Ramin Gharib
> Priority: Major
> Labels: pull-request-available
>
> h2. Problem
> {{ProcessTableFunction}}s receive a {{TableSemantics}} for each table-typed
> argument. Semantics surface today exposes:
> * dataType()
> * {{partitionByColumns()}} – only populated when caller wrote {{PARTITION
> BY}} in SQL.
> * {{orderByColumns()}}
> * timeColumn()
> * {{changelogMode()}} – planner-derived, lifecycle-aware (empty during type
> inference).
> Does *not* expose input table's upsert key. Planner already derives it via
> {{FlinkRelMetadataQuery.getUpsertKeys(input)}} (metadata handler chain in
> {{{}FlinkRelMdUpsertKeys{}}}), but result is invisible to PTF.
> Forces PTF authors to either:
> * Require caller to repeat key via {{PARTITION BY}} even when planner
> already knows it from PK constraints, or
> * Re-derive upsert keys inside function (impossible – at constructor time,
> function has only {{{}TableSemantics{}}}, not a {{RelNode}} or
> {{{}RelMetadataQuery{}}}).
> Concrete impact for {{TO_CHANGELOG}} (FLINK-39636): without access to input
> upsert key, function cannot emit partial DELETE rows that preserve identity
> columns in row semantics. Current workaround is "add {{{}PARTITION BY
> <pk>{}}}", unergonomic for users whose input table already declares a primary
> key.
> h2. Proposal
> Add {{int[] upsertKeyColumns()}} to {{{}TableSemantics{}}}, populated by
> planner via {{FlinkRelMetadataQuery.getUpsertKeys(input)}} collapsed to one
> candidate via {{{}UpsertKeyUtil.smallestKey(...){}}}. Returns empty array
> when no upsert key is derivable (pure append-only sources, post-Sort streams
> that destroyed the key, etc.) or during type inference (metadata not yet
> computed).
> Plumbing:
> # *{{TableSemantics}}* ({{{}flink-table-common{}}}): add a default
> {{upsertKeyColumns()}} method returning an empty array. Default avoids
> breaking source-compatibility for rare external implementor.
> # *{{RuntimeTableSemantics}}* ({{{}flink-table-runtime{}}}): add
> serializable {{int[] upsertKeyColumns}} field, constructor parameter,
> accessor.
> # *{{StreamExecProcessTableFunction}}* ({{{}flink-table-planner{}}}):
> persist {{List<int[]> inputUpsertKeys}} as a new {{@JsonProperty}} field (one
> entry per table input). Default to per-input empty arrays for older compiled
> plans (back-compat).
> # {*}{{StreamPhysicalProcessTableFunction.translateToExecNode}}{*}: derive
> upsert keys for each input via {{FlinkRelMetadataQuery.getUpsertKeys(input)}}
> + {{{}UpsertKeyUtil.smallestKey(...).orElse(...){}}}. Pass list to new
> ExecNode field.
> # *{{OperatorBindingCallContext}}* /
> {*}{{OperatorBindingTableSemantics}}{*}: extend to accept {{inputUpsertKeys}}
> so value is visible at specialization time (function constructor sees
> operator-binding context, not runtime context).
> # *{{BridgingSqlFunction.toCallContext}}* /
> {*}{{ProcessTableRunnerGenerator.generate}}{*}: thread {{inputUpsertKeys}}
> through codegen path.
> # *{{TableSemanticsMock}}* and {*}{{TestHarnessTableSemantics}}{*}: accept
> optional setter for unit testing.
> # *{{TO_CHANGELOG}} runtime* ({{{}ToChangelogFunction{}}}): consume
> {{tableSemantics.upsertKeyColumns()}} to decide which columns to preserve on
> DELETE when {{{}produces_full_deletes=false{}}}. Solves row-semantics case
> from FLINK-39636 without requiring {{{}PARTITION BY{}}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)