[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839474#comment-17839474
 ] 

Roman Boyko commented on FLINK-34380:
-------------------------------------

Hi [~xuyangzhong] , [~xu_shuai_] !

1) The RowKind can't be fixed in current architecture, because +I and +U are 
separated in different batches in this example. And this would be are bit 
tricky to fix it.

2) But the records order is really incorrect in this example and it can be 
easily fixed - https://github.com/rovboyko/flink/tree/fix/FLINK-34380

> Strange RowKind and records about intermediate output when using minibatch 
> join
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-34380
>                 URL: https://issues.apache.org/jira/browse/FLINK-34380
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.19.0
>            Reporter: xuyang
>            Priority: Major
>             Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
>     env.setParallelism(1)
>     val rows = Seq(
>       changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>       changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>       changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>       changelogRow("-D", java.lang.Integer.valueOf(1), "99")
>     )
>     val dataId = TestValuesTableFactory.registerData(rows)
>     val ddl =
>       s"""
>          |CREATE TABLE t1 (
>          |  a int,
>          |  b string
>          |) WITH (
>          |  'connector' = 'values',
>          |  'data-id' = '$dataId',
>          |  'bounded' = 'false'
>          |)
>        """.stripMargin
>     tEnv.executeSql(ddl)
>     val ddl2 =
>       s"""
>          |CREATE TABLE t2 (
>          |  a int,
>          |  b string
>          |) WITH (
>          |  'connector' = 'values',
>          |  'data-id' = '$dataId',
>          |  'bounded' = 'false'
>          |)
>        """.stripMargin
>     tEnv.executeSql(ddl2)
>     tEnv.getConfig.getConfiguration
>       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>     tEnv.getConfig.getConfiguration
>       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>     tEnv.getConfig.getConfiguration
>       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
>     println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
>     tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> +----+-------------+-----------------+-------------+---------+
> | op |           a |               b |          a0 |      b0 |
> +----+-------------+-----------------+-------------+---------+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> +----+-------------+-----------------+-------------+---------+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to