[ https://issues.apache.org/jira/browse/FLINK-27953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17554894#comment-17554894 ]
zoucao edited comment on FLINK-27953 at 6/16/22 3:57 AM: --------------------------------------------------------- Hi [~godfreyhe], could you have time to take a look? was (Author: zoucao): Hi [~godfrey], could you have time to take a look? > using the original order to add the primary key in > PushProjectIntoTableSourceScanRule > ------------------------------------------------------------------------------------- > > Key: FLINK-27953 > URL: https://issues.apache.org/jira/browse/FLINK-27953 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Affects Versions: 1.14.4 > Reporter: zoucao > Priority: Major > > In PushProjectIntoTableSourceScanRule, if the source produces a changelog > stream, the primary key will be added to the end of projected fields, see the > following SQL: > {code:java} > StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault()); > TableEnvironment tEnv = util.getTableEnv(); > String srcTableDdl = > "CREATE TABLE fs (\n" > + " a bigint,\n" > + " b int,\n" > + " c varchar,\n" > + " d int,\n" > + " e int,\n " > + " primary key (a,b) not enforced \n" > + ") with (\n" > + " 'connector' = 'values',\n" > + " 'disable-lookup'='true',\n" > + " 'changelog-mode' = 'I,UB,UA,D')"; > tEnv.executeSql(srcTableDdl); > tEnv.getConfig().set("table.exec.source.cdc-events-duplicate", > "true"); > {code} > {code:java} > System.out.println(tEnv.explainSql("select a, c from fs where c > 0 and b = > 0")); > projected list: > [[0],[1],[2]] > == Optimized Execution Plan == > Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 0)]) > +- ChangelogNormalize(key=[a, b]) > +- Exchange(distribution=[hash[a, b]]) > +- Calc(select=[a, b, c], where=[(b = 0)]) > +- DropUpdateBefore > +- TableSourceScan(table=[[default_catalog, default_database, fs, > filter=[], project=[a, b, c], metadata=[]]], fields=[a, b, c]) > {code} > {code:java} > System.out.println(tEnv.explainSql("select a, c from fs where c > 0")); > projected list: > [[0],[2],[1]] > == Optimized Execution Plan == > Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 0)]) > +- ChangelogNormalize(key=[a, b]) > +- Exchange(distribution=[hash[a, b]]) > +- DropUpdateBefore > +- TableSourceScan(table=[[default_catalog, default_database, fs, > filter=[], project=[a, c, b], metadata=[]]], fields=[a, c, b]) > {code} > Field b is not involved in > {code:sql} > select a, c from fs where c > 0{code} > , but it is a primary key, so we add it to the end of projected list, If > 'table.exec.source.cdc-events-duplicate' is enabled. The condition about > field b will change output type, that says the duplicate node will get the > different input type, and the state serializer will also be changed, leading > to state incompatibility. > I think we can use the original order from the source table to add the > primary key to projected list. -- This message was sent by Atlassian Jira (v8.20.7#820007)