I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink SQL job to see what happened. However, once I did that, my results became nondeterministic. This happens whether I set the table.exec.resource.default-parallelism config option or I set the default local parallelism to something higher than 1. I would end up with less records in the end, and each time I ran the output record count would come out differently.
I managed to distill an example, as pasted below (with attribute names changed to protect company proprietary info), that causes the issue. I feel like I managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the distilled version wasn’t giving me wrong results with that. Maybe it has to do with joining to a table that was formed using a GROUP BY? Can somebody tell if I’m doing something that is known not to work, or if I have run across a bug? Regards, Dylan Forciea object Job { def main(args: Array[String]): Unit = { StreamExecutionEnvironment.setDefaultLocalParallelism(1) val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings) val configuration = streamTableEnv.getConfig().getConfiguration() configuration.setInteger("table.exec.resource.default-parallelism", 16) streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); streamTableEnv.executeSql( """ CREATE TABLE table1 ( id1 STRING PRIMARY KEY NOT ENFORCED, attr STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…', 'table-name' = 'table1’, 'username' = 'username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") streamTableEnv.executeSql( """ CREATE TABLE table2 ( attr STRING, id2 STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…', 'table-name' = 'table2', 'username' = 'username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") streamTableEnv.executeSql( """ CREATE TABLE table3 ( attr STRING PRIMARY KEY NOT ENFORCED, attr_mapped STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…', 'table-name' = ‘table3', 'username' = ‘username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") streamTableEnv.executeSql(""" CREATE TABLE sink ( id STRING PRIMARY KEY NOT ENFORCED, attr STRING, attr_mapped STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…, 'table-name' = 'sink', 'username' = 'username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") val view = streamTableEnv.sqlQuery(""" SELECT COALESCE(t1.id1, t2.id2) AS id, COALESCE(t2.attr, t1.attr) AS operator, COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped FROM table1 t1 FULL JOIN ( SELECT id2, FIRST_VALUE(attr) AS attr FROM table2 GROUP BY id2 ) t2 ON (t1.id1 = t2.id2) LEFT JOIN table3 t3 ON (COALESCE(t2.attr, t1.attr) = t3.attr)""") streamTableEnv.createTemporaryView("view", view) val statementSet = streamTableEnv.createStatementSet() statementSet.addInsertSql(""" INSERT INTO sink SELECT * FROM view """) statementSet.execute().await() } }