I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink 
SQL job to see what happened. However, once I did that, my results became 
nondeterministic. This happens whether I set the 
table.exec.resource.default-parallelism config option or I set the default 
local parallelism to something higher than 1. I would end up with less records 
in the end, and each time I ran the output record count would come out 
differently.

I managed to distill an example, as pasted below (with attribute names changed 
to protect company proprietary info), that causes the issue. I feel like I 
managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the 
distilled version wasn’t giving me wrong results with that. Maybe it has to do 
with joining to a table that was formed using a GROUP BY? Can somebody tell if 
I’m doing something that is known not to work, or if I have run across a bug?

Regards,
Dylan Forciea


object Job {
  def main(args: Array[String]): Unit = {
    StreamExecutionEnvironment.setDefaultLocalParallelism(1)

    val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

    val configuration = streamTableEnv.getConfig().getConfiguration()
    configuration.setInteger("table.exec.resource.default-parallelism", 16)

    streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

    streamTableEnv.executeSql(
      """
      CREATE TABLE table1 (
        id1 STRING PRIMARY KEY NOT ENFORCED,
        attr STRING
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://…',
        'table-name' = 'table1’,
        'username' = 'username',
        'password' = 'password',
        'scan.fetch-size' = '500',
        'scan.auto-commit' = 'false'
      )""")

    streamTableEnv.executeSql(
      """
      CREATE TABLE table2 (
        attr STRING,
        id2 STRING
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://…',
        'table-name' = 'table2',
        'username' = 'username',
        'password' = 'password',
        'scan.fetch-size' = '500',
        'scan.auto-commit' = 'false'
      )""")

    streamTableEnv.executeSql(
      """
      CREATE TABLE table3 (
        attr STRING PRIMARY KEY NOT ENFORCED,
        attr_mapped STRING
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://…',
        'table-name' = ‘table3',
        'username' = ‘username',
        'password' = 'password',
        'scan.fetch-size' = '500',
        'scan.auto-commit' = 'false'
      )""")

    streamTableEnv.executeSql("""
      CREATE TABLE sink (
        id STRING PRIMARY KEY NOT ENFORCED,
        attr STRING,
        attr_mapped STRING
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://…,
        'table-name' = 'sink',
        'username' = 'username',
        'password' = 'password',
        'scan.fetch-size' = '500',
        'scan.auto-commit' = 'false'
      )""")

    val view =
      streamTableEnv.sqlQuery("""
      SELECT
        COALESCE(t1.id1, t2.id2) AS id,
        COALESCE(t2.attr, t1.attr) AS operator,
        COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
      FROM table1 t1
      FULL JOIN (
        SELECT
          id2,
          FIRST_VALUE(attr) AS attr
        FROM table2
        GROUP BY id2
      ) t2
       ON (t1.id1 = t2.id2)
      LEFT JOIN table3 t3
        ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
    streamTableEnv.createTemporaryView("view", view)

    val statementSet = streamTableEnv.createStatementSet()
    statementSet.addInsertSql("""
      INSERT INTO sink SELECT * FROM view
    """)

    statementSet.execute().await()
  }
}


Reply via email to