[ 
https://issues.apache.org/jira/browse/FLINK-39735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ramin Gharib updated FLINK-39735:
---------------------------------
    Description: 
h2. Problem

{{ProcessTableFunction}}s receive a {{TableSemantics}} for each table-typed 
argument. Semantics surface today exposes:
 * dataType()
 * {{partitionByColumns()}} – only populated when caller wrote {{PARTITION BY}} 
in SQL.
 * {{orderByColumns()}}
 * timeColumn()
 * {{changelogMode()}} – planner-derived, lifecycle-aware (empty during type 
inference).

Does *not* expose input table's upsert key. Planner already derives it via 
{{FlinkRelMetadataQuery.getUpsertKeys(input)}} (metadata handler chain in 
{{{}FlinkRelMdUpsertKeys{}}}), but result is invisible to PTF.

Forces PTF authors to either:
 * Require caller to repeat key via {{PARTITION BY}} even when planner already 
knows it from PK constraints, or
 * Re-derive upsert keys inside function (impossible – at constructor time, 
function has only {{{}TableSemantics{}}}, not a {{RelNode}} or 
{{{}RelMetadataQuery{}}}).

Concrete impact for {{TO_CHANGELOG}} (FLINK-39636): without access to input 
upsert key, function cannot emit partial DELETE rows that preserve identity 
columns in row semantics. Current workaround is "add {{{}PARTITION BY 
<pk>{}}}", unergonomic for users whose input table already declares a primary 
key.
h2. Proposal

Add {{int[] upsertKeyColumns()}} to {{{}TableSemantics{}}}, populated by 
planner via {{FlinkRelMetadataQuery.getUpsertKeys(input)}} collapsed to one 
candidate via {{{}UpsertKeyUtil.smallestKey(...){}}}. Returns empty array when 
no upsert key is derivable (pure append-only sources, post-Sort streams that 
destroyed the key, etc.) or during type inference (metadata not yet computed).

Plumbing:
 # *{{TableSemantics}}* ({{{}flink-table-common{}}}): add a default 
{{upsertKeyColumns()}} method returning an empty array. Default avoids breaking 
source-compatibility for rare external implementor.
 # *{{RuntimeTableSemantics}}* ({{{}flink-table-runtime{}}}): add serializable 
{{int[] upsertKeyColumns}} field, constructor parameter, accessor.
 # *{{StreamExecProcessTableFunction}}* ({{{}flink-table-planner{}}}): persist 
{{List<int[]> inputUpsertKeys}} as a new {{@JsonProperty}} field (one entry per 
table input). Default to per-input empty arrays for older compiled plans 
(back-compat).
 # {*}{{StreamPhysicalProcessTableFunction.translateToExecNode}}{*}: derive 
upsert keys for each input via {{FlinkRelMetadataQuery.getUpsertKeys(input)}} + 
{{{}UpsertKeyUtil.smallestKey(...).orElse(...){}}}. Pass list to new ExecNode 
field.
 # *{{OperatorBindingCallContext}}* / {*}{{OperatorBindingTableSemantics}}{*}: 
extend to accept {{inputUpsertKeys}} so value is visible at specialization time 
(function constructor sees operator-binding context, not runtime context).
 # *{{BridgingSqlFunction.toCallContext}}* / 
{*}{{ProcessTableRunnerGenerator.generate}}{*}: thread {{inputUpsertKeys}} 
through codegen path.
 # *{{TableSemanticsMock}}* and {*}{{TestHarnessTableSemantics}}{*}: accept 
optional setter for unit testing.
 # *{{TO_CHANGELOG}} runtime* ({{{}ToChangelogFunction{}}}): consume 
{{tableSemantics.upsertKeyColumns()}} to decide which columns to preserve on 
DELETE when {{{}produces_full_deletes=false{}}}. Solves row-semantics case from 
FLINK-39636 without requiring {{{}PARTITION BY{}}}.

  was:
h2. Problem

{\{ProcessTableFunction}}s receive a \{{TableSemantics}} for each table-typed 
argument. Semantics surface today exposes:

* \{{dataType()}}
* \{{partitionByColumns()}} -- only populated when caller wrote \{{PARTITION 
BY}} in SQL.
* \{{orderByColumns()}} / \{{orderByDirections()}}
* \{{timeColumn()}}
* \{{changelogMode()}} -- planner-derived, lifecycle-aware (empty during type 
inference).

Does *not* expose input table's upsert key. Planner already derives it via 
\{{FlinkRelMetadataQuery.getUpsertKeys(input)}} (metadata handler chain in 
\{{FlinkRelMdUpsertKeys}}), but result is invisible to PTF.

Forces PTF authors to either:

* Require caller to repeat key via \{{PARTITION BY}} even when planner already 
knows it from PK constraints, or
* Re-derive upsert keys inside function (impossible -- at constructor time, 
function has only \{{TableSemantics}}, not a \{{RelNode}} or 
\{{RelMetadataQuery}}).

Concrete impact for \{{TO_CHANGELOG}} (FLINK-39636): without access to input 
upsert key, function cannot emit partial DELETE rows that preserve identity 
columns in row semantics. Current workaround is "add \{{PARTITION BY <pk>}}", 
unergonomic for users whose input table already declares a primary key.

h2. Proposal

Add \{{int[] upsertKeyColumns()}} to \{{TableSemantics}}, populated by planner 
via \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} collapsed to one candidate 
via \{{UpsertKeyUtil.smallestKey(...)}}. Returns empty array when no upsert key 
is derivable (pure append-only sources, post-Sort streams that destroyed the 
key, etc.) or during type inference (metadata not yet computed).

Plumbing:

# *\{{TableSemantics}}* (\{{flink-table-common}}): add a default 
\{{upsertKeyColumns()}} method returning an empty array. Default avoids 
breaking source-compatibility for rare external implementor.
# *\{{RuntimeTableSemantics}}* (\{{flink-table-runtime}}): add serializable 
\{{int[] upsertKeyColumns}} field, constructor parameter, accessor.
# *\{{StreamExecProcessTableFunction}}* (\{{flink-table-planner}}): persist 
\{{List<int[]> inputUpsertKeys}} as a new \{{@JsonProperty}} field (one entry 
per table input). Default to per-input empty arrays for older compiled plans 
(back-compat).
# *\{{StreamPhysicalProcessTableFunction.translateToExecNode}}*: derive upsert 
keys for each input via \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} + 
\{{UpsertKeyUtil.smallestKey(...).orElse(...)}}. Pass list to new ExecNode 
field.
# *\{{OperatorBindingCallContext}}* / *\{{OperatorBindingTableSemantics}}*: 
extend to accept \{{inputUpsertKeys}} so value is visible at specialization 
time (function constructor sees operator-binding context, not runtime context).
# *\{{BridgingSqlFunction.toCallContext}}* / 
*\{{ProcessTableRunnerGenerator.generate}}*: thread \{{inputUpsertKeys}} 
through codegen path.
# *\{{TableSemanticsMock}}* and *\{{TestHarnessTableSemantics}}*: accept 
optional setter for unit testing.
# *\{{TO_CHANGELOG}} runtime* (\{{ToChangelogFunction}}): consume 
\{{tableSemantics.upsertKeyColumns()}} to decide which columns to preserve on 
DELETE when \{{produces_full_deletes=false}}. Solves row-semantics case from 
FLINK-39636 without requiring \{{PARTITION BY}}.


> Expose input upsert key on TableSemantics for ProcessTableFunctions
> -------------------------------------------------------------------
>
>                 Key: FLINK-39735
>                 URL: https://issues.apache.org/jira/browse/FLINK-39735
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / API
>            Reporter: Ramin Gharib
>            Priority: Major
>
> h2. Problem
> {{ProcessTableFunction}}s receive a {{TableSemantics}} for each table-typed 
> argument. Semantics surface today exposes:
>  * dataType()
>  * {{partitionByColumns()}} – only populated when caller wrote {{PARTITION 
> BY}} in SQL.
>  * {{orderByColumns()}}
>  * timeColumn()
>  * {{changelogMode()}} – planner-derived, lifecycle-aware (empty during type 
> inference).
> Does *not* expose input table's upsert key. Planner already derives it via 
> {{FlinkRelMetadataQuery.getUpsertKeys(input)}} (metadata handler chain in 
> {{{}FlinkRelMdUpsertKeys{}}}), but result is invisible to PTF.
> Forces PTF authors to either:
>  * Require caller to repeat key via {{PARTITION BY}} even when planner 
> already knows it from PK constraints, or
>  * Re-derive upsert keys inside function (impossible – at constructor time, 
> function has only {{{}TableSemantics{}}}, not a {{RelNode}} or 
> {{{}RelMetadataQuery{}}}).
> Concrete impact for {{TO_CHANGELOG}} (FLINK-39636): without access to input 
> upsert key, function cannot emit partial DELETE rows that preserve identity 
> columns in row semantics. Current workaround is "add {{{}PARTITION BY 
> <pk>{}}}", unergonomic for users whose input table already declares a primary 
> key.
> h2. Proposal
> Add {{int[] upsertKeyColumns()}} to {{{}TableSemantics{}}}, populated by 
> planner via {{FlinkRelMetadataQuery.getUpsertKeys(input)}} collapsed to one 
> candidate via {{{}UpsertKeyUtil.smallestKey(...){}}}. Returns empty array 
> when no upsert key is derivable (pure append-only sources, post-Sort streams 
> that destroyed the key, etc.) or during type inference (metadata not yet 
> computed).
> Plumbing:
>  # *{{TableSemantics}}* ({{{}flink-table-common{}}}): add a default 
> {{upsertKeyColumns()}} method returning an empty array. Default avoids 
> breaking source-compatibility for rare external implementor.
>  # *{{RuntimeTableSemantics}}* ({{{}flink-table-runtime{}}}): add 
> serializable {{int[] upsertKeyColumns}} field, constructor parameter, 
> accessor.
>  # *{{StreamExecProcessTableFunction}}* ({{{}flink-table-planner{}}}): 
> persist {{List<int[]> inputUpsertKeys}} as a new {{@JsonProperty}} field (one 
> entry per table input). Default to per-input empty arrays for older compiled 
> plans (back-compat).
>  # {*}{{StreamPhysicalProcessTableFunction.translateToExecNode}}{*}: derive 
> upsert keys for each input via {{FlinkRelMetadataQuery.getUpsertKeys(input)}} 
> + {{{}UpsertKeyUtil.smallestKey(...).orElse(...){}}}. Pass list to new 
> ExecNode field.
>  # *{{OperatorBindingCallContext}}* / 
> {*}{{OperatorBindingTableSemantics}}{*}: extend to accept {{inputUpsertKeys}} 
> so value is visible at specialization time (function constructor sees 
> operator-binding context, not runtime context).
>  # *{{BridgingSqlFunction.toCallContext}}* / 
> {*}{{ProcessTableRunnerGenerator.generate}}{*}: thread {{inputUpsertKeys}} 
> through codegen path.
>  # *{{TableSemanticsMock}}* and {*}{{TestHarnessTableSemantics}}{*}: accept 
> optional setter for unit testing.
>  # *{{TO_CHANGELOG}} runtime* ({{{}ToChangelogFunction{}}}): consume 
> {{tableSemantics.upsertKeyColumns()}} to decide which columns to preserve on 
> DELETE when {{{}produces_full_deletes=false{}}}. Solves row-semantics case 
> from FLINK-39636 without requiring {{{}PARTITION BY{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to