[ 
https://issues.apache.org/jira/browse/FLINK-39088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18060281#comment-18060281
 ] 

Timo Walther commented on FLINK-39088:
--------------------------------------

Additional commit fixed in master: de569e4d9305ed5aeb569d53a7f36ae054a12e87

> Preserve upsert keys for explicit (injective) CAST calls to avoid unnecessary 
> SinkUpsertMaterializer
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39088
>                 URL: https://issues.apache.org/jira/browse/FLINK-39088
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 2.2.0
>            Reporter: Gustavo de Morais
>            Assignee: Gustavo de Morais
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.3.0
>
>
> Currently, when a CAST operation is applied to a primary key or upsert key 
> column in a streaming query, Flink's optimizer loses track of the key, in 
> some cases, even when the cast is provably injective (one-to-one mapping). 
> This causes the planner to insert a SinkUpsertMaterializer operator 
> unnecessarily, adding state overhead and reducing performance. For example:
> CREATE TABLE source (
>   id INT PRIMARY KEY NOT ENFORCED,
>   name STRING
> );
>  
> CREATE TABLE sink (
>   id STRING PRIMARY KEY NOT ENFORCED,
>   name STRING  
> );
>  
> INSERT INTO sink SELECT CAST(id AS STRING), name FROM source;
>  
> In this case, CAST(INT AS STRING) is injective - every distinct integer maps 
> to a distinct string representation. The upsert key should be preserved 
> through this cast. However, the current implementation treats any cast as 
> potentially key-destroying, requiring a SinkUpsertMaterializer to 
> re-establish the key before writing to the sink.Solution:Extend the 
> key-tracking logic in FlinkRelMdUniqueKeys to recognize injective casts as 
> key-preserving operations. An explicit cast is considered injective when 
> every distinct input value maps to a distinct output value.The following 
> explicit casts to VARCHAR/CHAR will be recognized as injective:
>  - Integer types: TINYINT, SMALLINT, INTEGER, BIGINT (biggest win)
> The following can also be added but are up to discussion since they're more 
> rarely used as unique keys
>  - Floating point types: FLOAT, DOUBLE
>  - BOOLEAN
>  - DATE
>  - Timestamp types: TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_TZ
>  
> *Also, we currently are used isImplicitCasts to check if the upsert key 
> should be preserved.* This is not correct. There are multiple cases of 
> implicit casts that we support that are not necessarily injective. E.g. 
> char(6) to char(3). We'll fix this by properly implementing all the supported 
> injective casts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to