[ 
https://issues.apache.org/jira/browse/FLINK-27849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-27849:
--------------------------------
    Description: 
There commonly exists updates(which means not only RowKind.INSERT messages) in 
a streaming pipeline, then wrong results or error may occurs when use some 
non-deterministic functions or operations.

It is a long lived issue since the first day that flink sql was available in 
streaming, but it still not totally be eliminated though some efforts have been 
taken.

We should detect all the non-deterministic operations in the changelog 
pipelines, raise an error to tell users the risk and also add an mechanism that 
can process such a issue if a user is willing to pay some cost(probably 
introduce the state).

All non-deterministic operations include builtin temporal functions(now, 
current_timestamp...), UUID, RAND... 
or user defined non-deterministic functions (override isDeterministic return 
false)
or a lookup join on a lookup source which data may change over time

or a cdc-source with meta data field (described in FLINK-28242)

 

====== Solution ======

Will introduce a physical plan checker to validate if there's any 
non-deterministic updates which may cause wrong result, and also a physical 
plan rewriter to eliminate the non determinism generated by lookup join node 
(which we think is commonly used in sql, and hard to solve by users themselves).

For implementation steps, the main changes may include 4 parts:
 # [preparing work] Adds an internal postOptimize method for physical dag 
processing
 # Introduces a `StreamPhysicalPlanChecker` to validate if there's any 
non-deterministic updates which may cause wrong result
 # Adds materialization support to eliminate the non determinism generated by 
lookup join node
 # [No.3 followup] Implements a new lookup join operator (sync mode only) with 
state to eliminate the non determinism

 

 

 

 

 

  was:
There commonly exists updates(which means not only RowKind.INSERT messages) in 
a streaming pipeline, then wrong results or error may occurs when use some 
non-deterministic functions or operations.

It is a long lived issue since the first day that flink sql was available in 
streaming, but it still not totally be eliminated though some efforts have been 
taken.

We should detect all the non-deterministic operations in the changelog 
pipelines, raise an error to tell users the risk and also add an mechanism that 
can process such a issue if a user is willing to pay some cost(probably 
introduce the state).

All non-deterministic operations include builtin temporal functions(now, 
current_timestamp...), UUID, RAND... 
or user defined non-deterministic functions (override isDeterministic return 
false)
or a lookup join on a lookup source which data may change over time

or a cdc-source with meta data field (described in FLINK-28242)

 

====== Solution ======

Will introduce a physical plan checker to validate if there's any 
non-deterministic updates which may cause wrong result, and also a physical 
plan rewriter to eliminate the non determinism generated by lookup join node 
(which we think is commonly used in sql, and hard to solve by users themselves).

For implementation steps, the main changes may include 4 parts:
 # [preparing work] Adds an internal postOptimize method for physical dag 
processing
 # Introduces a `StreamPhysicalPlanChecker` to validate if there's any 
non-deterministic updates which may cause wrong result
 # Adds materialization support to eliminate the non determinism generated by 
lookup join node
 # [No.3 followup] Implements a new lookup join operator with state to 
eliminate the non determinism

 

 

 

 

 


> Harden correctness for non-deterministic updates present in the changelog 
> pipeline
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-27849
>                 URL: https://issues.apache.org/jira/browse/FLINK-27849
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>             Fix For: 1.16.0
>
>
> There commonly exists updates(which means not only RowKind.INSERT messages) 
> in a streaming pipeline, then wrong results or error may occurs when use some 
> non-deterministic functions or operations.
> It is a long lived issue since the first day that flink sql was available in 
> streaming, but it still not totally be eliminated though some efforts have 
> been taken.
> We should detect all the non-deterministic operations in the changelog 
> pipelines, raise an error to tell users the risk and also add an mechanism 
> that can process such a issue if a user is willing to pay some cost(probably 
> introduce the state).
> All non-deterministic operations include builtin temporal functions(now, 
> current_timestamp...), UUID, RAND... 
> or user defined non-deterministic functions (override isDeterministic return 
> false)
> or a lookup join on a lookup source which data may change over time
> or a cdc-source with meta data field (described in FLINK-28242)
>  
> ====== Solution ======
> Will introduce a physical plan checker to validate if there's any 
> non-deterministic updates which may cause wrong result, and also a physical 
> plan rewriter to eliminate the non determinism generated by lookup join node 
> (which we think is commonly used in sql, and hard to solve by users 
> themselves).
> For implementation steps, the main changes may include 4 parts:
>  # [preparing work] Adds an internal postOptimize method for physical dag 
> processing
>  # Introduces a `StreamPhysicalPlanChecker` to validate if there's any 
> non-deterministic updates which may cause wrong result
>  # Adds materialization support to eliminate the non determinism generated by 
> lookup join node
>  # [No.3 followup] Implements a new lookup join operator (sync mode only) 
> with state to eliminate the non determinism
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to