[
https://issues.apache.org/jira/browse/FLINK-39735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ramin Gharib updated FLINK-39735:
---------------------------------
Description:
h2. Problem
{{ProcessTableFunction}}s receive a {{TableSemantics}} for each table-typed
argument. Semantics surface today exposes:
* dataType()
* {{partitionByColumns()}} – only populated when caller wrote {{PARTITION BY}}
in SQL.
* {{orderByColumns()}}
* timeColumn()
* {{changelogMode()}} – planner-derived, lifecycle-aware (empty during type
inference).
Does *not* expose input table's upsert key. Planner already derives it via
{{FlinkRelMetadataQuery.getUpsertKeys(input)}} (metadata handler chain in
{{{}FlinkRelMdUpsertKeys{}}}), but result is invisible to PTF.
Forces PTF authors to either:
* Require caller to repeat key via {{PARTITION BY}} even when planner already
knows it from PK constraints, or
* Re-derive upsert keys inside function (impossible – at constructor time,
function has only {{{}TableSemantics{}}}, not a {{RelNode}} or
{{{}RelMetadataQuery{}}}).
Concrete impact for {{TO_CHANGELOG}} (FLINK-39636): without access to input
upsert key, function cannot emit partial DELETE rows that preserve identity
columns in row semantics. Current workaround is "add {{{}PARTITION BY
<pk>{}}}", unergonomic for users whose input table already declares a primary
key.
h2. Proposal
Add {{int[] upsertKeyColumns()}} to {{{}TableSemantics{}}}, populated by
planner via {{FlinkRelMetadataQuery.getUpsertKeys(input)}} collapsed to one
candidate via {{{}UpsertKeyUtil.smallestKey(...){}}}. Returns empty array when
no upsert key is derivable (pure append-only sources, post-Sort streams that
destroyed the key, etc.) or during type inference (metadata not yet computed).
Plumbing:
# *{{TableSemantics}}* ({{{}flink-table-common{}}}): add a default
{{upsertKeyColumns()}} method returning an empty array. Default avoids breaking
source-compatibility for rare external implementor.
# *{{RuntimeTableSemantics}}* ({{{}flink-table-runtime{}}}): add serializable
{{int[] upsertKeyColumns}} field, constructor parameter, accessor.
# *{{StreamExecProcessTableFunction}}* ({{{}flink-table-planner{}}}): persist
{{List<int[]> inputUpsertKeys}} as a new {{@JsonProperty}} field (one entry per
table input). Default to per-input empty arrays for older compiled plans
(back-compat).
# {*}{{StreamPhysicalProcessTableFunction.translateToExecNode}}{*}: derive
upsert keys for each input via {{FlinkRelMetadataQuery.getUpsertKeys(input)}} +
{{{}UpsertKeyUtil.smallestKey(...).orElse(...){}}}. Pass list to new ExecNode
field.
# *{{OperatorBindingCallContext}}* / {*}{{OperatorBindingTableSemantics}}{*}:
extend to accept {{inputUpsertKeys}} so value is visible at specialization time
(function constructor sees operator-binding context, not runtime context).
# *{{BridgingSqlFunction.toCallContext}}* /
{*}{{ProcessTableRunnerGenerator.generate}}{*}: thread {{inputUpsertKeys}}
through codegen path.
# *{{TableSemanticsMock}}* and {*}{{TestHarnessTableSemantics}}{*}: accept
optional setter for unit testing.
# *{{TO_CHANGELOG}} runtime* ({{{}ToChangelogFunction{}}}): consume
{{tableSemantics.upsertKeyColumns()}} to decide which columns to preserve on
DELETE when {{{}produces_full_deletes=false{}}}. Solves row-semantics case from
FLINK-39636 without requiring {{{}PARTITION BY{}}}.
was:
h2. Problem
{\{ProcessTableFunction}}s receive a \{{TableSemantics}} for each table-typed
argument. Semantics surface today exposes:
* \{{dataType()}}
* \{{partitionByColumns()}} -- only populated when caller wrote \{{PARTITION
BY}} in SQL.
* \{{orderByColumns()}} / \{{orderByDirections()}}
* \{{timeColumn()}}
* \{{changelogMode()}} -- planner-derived, lifecycle-aware (empty during type
inference).
Does *not* expose input table's upsert key. Planner already derives it via
\{{FlinkRelMetadataQuery.getUpsertKeys(input)}} (metadata handler chain in
\{{FlinkRelMdUpsertKeys}}), but result is invisible to PTF.
Forces PTF authors to either:
* Require caller to repeat key via \{{PARTITION BY}} even when planner already
knows it from PK constraints, or
* Re-derive upsert keys inside function (impossible -- at constructor time,
function has only \{{TableSemantics}}, not a \{{RelNode}} or
\{{RelMetadataQuery}}).
Concrete impact for \{{TO_CHANGELOG}} (FLINK-39636): without access to input
upsert key, function cannot emit partial DELETE rows that preserve identity
columns in row semantics. Current workaround is "add \{{PARTITION BY <pk>}}",
unergonomic for users whose input table already declares a primary key.
h2. Proposal
Add \{{int[] upsertKeyColumns()}} to \{{TableSemantics}}, populated by planner
via \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} collapsed to one candidate
via \{{UpsertKeyUtil.smallestKey(...)}}. Returns empty array when no upsert key
is derivable (pure append-only sources, post-Sort streams that destroyed the
key, etc.) or during type inference (metadata not yet computed).
Plumbing:
# *\{{TableSemantics}}* (\{{flink-table-common}}): add a default
\{{upsertKeyColumns()}} method returning an empty array. Default avoids
breaking source-compatibility for rare external implementor.
# *\{{RuntimeTableSemantics}}* (\{{flink-table-runtime}}): add serializable
\{{int[] upsertKeyColumns}} field, constructor parameter, accessor.
# *\{{StreamExecProcessTableFunction}}* (\{{flink-table-planner}}): persist
\{{List<int[]> inputUpsertKeys}} as a new \{{@JsonProperty}} field (one entry
per table input). Default to per-input empty arrays for older compiled plans
(back-compat).
# *\{{StreamPhysicalProcessTableFunction.translateToExecNode}}*: derive upsert
keys for each input via \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} +
\{{UpsertKeyUtil.smallestKey(...).orElse(...)}}. Pass list to new ExecNode
field.
# *\{{OperatorBindingCallContext}}* / *\{{OperatorBindingTableSemantics}}*:
extend to accept \{{inputUpsertKeys}} so value is visible at specialization
time (function constructor sees operator-binding context, not runtime context).
# *\{{BridgingSqlFunction.toCallContext}}* /
*\{{ProcessTableRunnerGenerator.generate}}*: thread \{{inputUpsertKeys}}
through codegen path.
# *\{{TableSemanticsMock}}* and *\{{TestHarnessTableSemantics}}*: accept
optional setter for unit testing.
# *\{{TO_CHANGELOG}} runtime* (\{{ToChangelogFunction}}): consume
\{{tableSemantics.upsertKeyColumns()}} to decide which columns to preserve on
DELETE when \{{produces_full_deletes=false}}. Solves row-semantics case from
FLINK-39636 without requiring \{{PARTITION BY}}.
> Expose input upsert key on TableSemantics for ProcessTableFunctions
> -------------------------------------------------------------------
>
> Key: FLINK-39735
> URL: https://issues.apache.org/jira/browse/FLINK-39735
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / API
> Reporter: Ramin Gharib
> Priority: Major
>
> h2. Problem
> {{ProcessTableFunction}}s receive a {{TableSemantics}} for each table-typed
> argument. Semantics surface today exposes:
> * dataType()
> * {{partitionByColumns()}} – only populated when caller wrote {{PARTITION
> BY}} in SQL.
> * {{orderByColumns()}}
> * timeColumn()
> * {{changelogMode()}} – planner-derived, lifecycle-aware (empty during type
> inference).
> Does *not* expose input table's upsert key. Planner already derives it via
> {{FlinkRelMetadataQuery.getUpsertKeys(input)}} (metadata handler chain in
> {{{}FlinkRelMdUpsertKeys{}}}), but result is invisible to PTF.
> Forces PTF authors to either:
> * Require caller to repeat key via {{PARTITION BY}} even when planner
> already knows it from PK constraints, or
> * Re-derive upsert keys inside function (impossible – at constructor time,
> function has only {{{}TableSemantics{}}}, not a {{RelNode}} or
> {{{}RelMetadataQuery{}}}).
> Concrete impact for {{TO_CHANGELOG}} (FLINK-39636): without access to input
> upsert key, function cannot emit partial DELETE rows that preserve identity
> columns in row semantics. Current workaround is "add {{{}PARTITION BY
> <pk>{}}}", unergonomic for users whose input table already declares a primary
> key.
> h2. Proposal
> Add {{int[] upsertKeyColumns()}} to {{{}TableSemantics{}}}, populated by
> planner via {{FlinkRelMetadataQuery.getUpsertKeys(input)}} collapsed to one
> candidate via {{{}UpsertKeyUtil.smallestKey(...){}}}. Returns empty array
> when no upsert key is derivable (pure append-only sources, post-Sort streams
> that destroyed the key, etc.) or during type inference (metadata not yet
> computed).
> Plumbing:
> # *{{TableSemantics}}* ({{{}flink-table-common{}}}): add a default
> {{upsertKeyColumns()}} method returning an empty array. Default avoids
> breaking source-compatibility for rare external implementor.
> # *{{RuntimeTableSemantics}}* ({{{}flink-table-runtime{}}}): add
> serializable {{int[] upsertKeyColumns}} field, constructor parameter,
> accessor.
> # *{{StreamExecProcessTableFunction}}* ({{{}flink-table-planner{}}}):
> persist {{List<int[]> inputUpsertKeys}} as a new {{@JsonProperty}} field (one
> entry per table input). Default to per-input empty arrays for older compiled
> plans (back-compat).
> # {*}{{StreamPhysicalProcessTableFunction.translateToExecNode}}{*}: derive
> upsert keys for each input via {{FlinkRelMetadataQuery.getUpsertKeys(input)}}
> + {{{}UpsertKeyUtil.smallestKey(...).orElse(...){}}}. Pass list to new
> ExecNode field.
> # *{{OperatorBindingCallContext}}* /
> {*}{{OperatorBindingTableSemantics}}{*}: extend to accept {{inputUpsertKeys}}
> so value is visible at specialization time (function constructor sees
> operator-binding context, not runtime context).
> # *{{BridgingSqlFunction.toCallContext}}* /
> {*}{{ProcessTableRunnerGenerator.generate}}{*}: thread {{inputUpsertKeys}}
> through codegen path.
> # *{{TableSemanticsMock}}* and {*}{{TestHarnessTableSemantics}}{*}: accept
> optional setter for unit testing.
> # *{{TO_CHANGELOG}} runtime* ({{{}ToChangelogFunction{}}}): consume
> {{tableSemantics.upsertKeyColumns()}} to decide which columns to preserve on
> DELETE when {{{}produces_full_deletes=false{}}}. Solves row-semantics case
> from FLINK-39636 without requiring {{{}PARTITION BY{}}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)