[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846895#comment-17846895
 ] 

Roman Boyko commented on FLINK-34380:
-------------------------------------

Hi [~xu_shuai_] !

I can't agree with you because the order of messages for one key can't be 
violated even for distributed system. Otherwise it will lead to inconsistent 
result. You can check it by running the XuYang's example without last -D record 
for both join sides:
{code:java}
val rows = Seq(
      changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
      changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
      changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
//      changelogRow("-D", java.lang.Integer.valueOf(1), "99")
    ) {code}
And the result will be:
{code:java}
+----+-------------+--------------------------------+-------------+--------------------------------+
| op |           a |                              b |          a0 |             
                b0 |
+----+-------------+--------------------------------+-------------+--------------------------------+
| +U |           1 |                              1 |           1 |             
                99 |
| +U |           1 |                             99 |           1 |             
                99 |
| -U |           1 |                              1 |           1 |             
                99 |
+----+-------------+--------------------------------+-------------+--------------------------------+
 {code}
This means that sink or downstream operator will receive the -U (retract 
record) as last record. And I guess no downstream or sink operator can 
correctly process elements in such order.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-34380
>                 URL: https://issues.apache.org/jira/browse/FLINK-34380
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.19.0
>            Reporter: xuyang
>            Priority: Major
>             Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
>     env.setParallelism(1)
>     val rows = Seq(
>       changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>       changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>       changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>       changelogRow("-D", java.lang.Integer.valueOf(1), "99")
>     )
>     val dataId = TestValuesTableFactory.registerData(rows)
>     val ddl =
>       s"""
>          |CREATE TABLE t1 (
>          |  a int,
>          |  b string
>          |) WITH (
>          |  'connector' = 'values',
>          |  'data-id' = '$dataId',
>          |  'bounded' = 'false'
>          |)
>        """.stripMargin
>     tEnv.executeSql(ddl)
>     val ddl2 =
>       s"""
>          |CREATE TABLE t2 (
>          |  a int,
>          |  b string
>          |) WITH (
>          |  'connector' = 'values',
>          |  'data-id' = '$dataId',
>          |  'bounded' = 'false'
>          |)
>        """.stripMargin
>     tEnv.executeSql(ddl2)
>     tEnv.getConfig.getConfiguration
>       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>     tEnv.getConfig.getConfiguration
>       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>     tEnv.getConfig.getConfiguration
>       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
>     println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
>     tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> +----+-------------+-----------------+-------------+---------+
> | op |           a |               b |          a0 |      b0 |
> +----+-------------+-----------------+-------------+---------+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> +----+-------------+-----------------+-------------+---------+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to