[ https://issues.apache.org/jira/browse/FLINK-27849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lincoln lee updated FLINK-27849: -------------------------------- Description: There commonly exists updates(which means not only RowKind.INSERT messages) in a streaming pipeline, then wrong results or error may occurs when use some non-deterministic functions or operations. It is a long lived issue since the first day that flink sql was available in streaming, but it still not totally be eliminated though some efforts have been taken. We should detect all the non-deterministic operations in the changelog pipelines, raise an error to tell users the risk and also add an mechanism that can process such a issue if a user is willing to pay some cost(probably introduce the state). All non-deterministic operations include builtin temporal functions(now, current_timestamp...), UUID, RAND... or user defined non-deterministic functions (override isDeterministic return false) or a lookup join on a lookup source which data may change over time or a cdc-source with meta data field (described in FLINK-28242) ====== Solution ====== Will introduce a physical plan checker to validate if there's any non-deterministic updates which may cause wrong result, and also a physical plan rewriter to eliminate the non determinism generated by lookup join node (which we think is commonly used in sql, and hard to solve by users themselves). For implementation steps, the main changes may include 4 parts: # [preparing work] Adds an internal postOptimize method for physical dag processing # Introduces a `StreamPhysicalPlanChecker` to validate if there's any non-deterministic updates which may cause wrong result # Adds materialization support to eliminate the non determinism generated by lookup join node # [No.3 followup] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism was: There commonly exists updates(which means not only RowKind.INSERT messages) in a streaming pipeline, then wrong results or error may occurs when use some non-deterministic functions or operations. It is a long lived issue since the first day that flink sql was available in streaming, but it still not totally be eliminated though some efforts have been taken. We should detect all the non-deterministic operations in the changelog pipelines, raise an error to tell users the risk and also add an mechanism that can process such a issue if a user is willing to pay some cost(probably introduce the state). All non-deterministic operations include builtin temporal functions(now, current_timestamp...), UUID, RAND... or user defined non-deterministic functions (override isDeterministic return false) or a lookup join on a lookup source which data may change over time or a cdc-source with meta data field (described in FLINK-28242) ====== Solution ====== Will introduce a physical plan checker to validate if there's any non-deterministic updates which may cause wrong result, and also a physical plan rewriter to eliminate the non determinism generated by lookup join node (which we think is commonly used in sql, and hard to solve by users themselves). For implementation steps, the main changes may include 4 parts: # [preparing work] Adds an internal postOptimize method for physical dag processing # Introduces a `StreamPhysicalPlanChecker` to validate if there's any non-deterministic updates which may cause wrong result # Adds materialization support to eliminate the non determinism generated by lookup join node # [No.3 followup] Implements a new lookup join operator with state to eliminate the non determinism > Harden correctness for non-deterministic updates present in the changelog > pipeline > ---------------------------------------------------------------------------------- > > Key: FLINK-27849 > URL: https://issues.apache.org/jira/browse/FLINK-27849 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Reporter: lincoln lee > Assignee: lincoln lee > Priority: Major > Fix For: 1.16.0 > > > There commonly exists updates(which means not only RowKind.INSERT messages) > in a streaming pipeline, then wrong results or error may occurs when use some > non-deterministic functions or operations. > It is a long lived issue since the first day that flink sql was available in > streaming, but it still not totally be eliminated though some efforts have > been taken. > We should detect all the non-deterministic operations in the changelog > pipelines, raise an error to tell users the risk and also add an mechanism > that can process such a issue if a user is willing to pay some cost(probably > introduce the state). > All non-deterministic operations include builtin temporal functions(now, > current_timestamp...), UUID, RAND... > or user defined non-deterministic functions (override isDeterministic return > false) > or a lookup join on a lookup source which data may change over time > or a cdc-source with meta data field (described in FLINK-28242) > > ====== Solution ====== > Will introduce a physical plan checker to validate if there's any > non-deterministic updates which may cause wrong result, and also a physical > plan rewriter to eliminate the non determinism generated by lookup join node > (which we think is commonly used in sql, and hard to solve by users > themselves). > For implementation steps, the main changes may include 4 parts: > # [preparing work] Adds an internal postOptimize method for physical dag > processing > # Introduces a `StreamPhysicalPlanChecker` to validate if there's any > non-deterministic updates which may cause wrong result > # Adds materialization support to eliminate the non determinism generated by > lookup join node > # [No.3 followup] Implements a new lookup join operator (sync mode only) > with state to eliminate the non determinism > > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)