xuyang created FLINK-34380: ------------------------------ Summary: Strange RowKind and records about intermediate output when using minibatch join Key: FLINK-34380 URL: https://issues.apache.org/jira/browse/FLINK-34380 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.19.0 Reporter: xuyang Fix For: 1.19.0
{code:java} // Add it in CalcItCase @Test def test(): Unit = { env.setParallelism(1) val rows = Seq( changelogRow("+I", java.lang.Integer.valueOf(1), "1"), changelogRow("-U", java.lang.Integer.valueOf(1), "1"), changelogRow("+U", java.lang.Integer.valueOf(1), "99"), changelogRow("-D", java.lang.Integer.valueOf(1), "99") ) val dataId = TestValuesTableFactory.registerData(rows) val ddl = s""" |CREATE TABLE t1 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl) val ddl2 = s""" |CREATE TABLE t2 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl2) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() } {code} Output: {code:java} +----+-------------+-----------------+-------------+---------+ | op | a | b | a0 | b0 | +----+-------------+-----------------+-------------+---------+ | +U | 1 | 1 | 1 | 99 | | +U | 1 | 99 | 1 | 99 | | -U | 1 | 1 | 1 | 99 | | -D | 1 | 99 | 1 | 99 | +----+-------------+-----------------+-------------+---------+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)