[jira] [Comment Edited] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-23 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849151#comment-17849151
 ] 

Roman Boyko edited comment on FLINK-34380 at 5/24/24 4:18 AM:
--

Hi [~xu_shuai_] , [~xuyangzhong] !

I still think that messages order *for one single key* is the main problem 
which should be addressed in this issue.

Could you please review my comment above?


was (Author: rovboyko):
Hi [~xu_shuai_] , [~xuyangzhong] !

I still think that messages order for one single key is the main problem which 
should be addressed in this issue.

Could you please review my comment above?

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-23 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849151#comment-17849151
 ] 

Roman Boyko edited comment on FLINK-34380 at 5/24/24 4:17 AM:
--

Hi [~xu_shuai_] , [~xuyangzhong] !

I still think that messages order for one single key is the main problem which 
should be addressed in this issue.

Could you please review my comment above?


was (Author: rovboyko):
Hi [~xu_shuai_] , [~xuyangzhong] !

I still think that messages order is the main problem which should be addressed 
in this issue.

Could you please review my comment above?

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-23 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849151#comment-17849151
 ] 

Roman Boyko commented on FLINK-34380:
-

Hi [~xu_shuai_] , [~xuyangzhong] !

I still think that messages order is the main problem which should be addressed 
in this issue.

Could you please review my comment above?

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join

2024-05-22 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848532#comment-17848532
 ] 

Roman Boyko commented on FLINK-34694:
-

Hi [~xu_shuai_] !

Could you please take a look on benchmark results above?

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png, image-2024-04-15-19-14-14-735.png, 
> image-2024-04-15-19-14-41-909.png, image-2024-04-15-19-15-23-010.png, 
> image-2024-04-26-16-55-19-800.png, image-2024-04-26-16-55-56-994.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-16 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846895#comment-17846895
 ] 

Roman Boyko commented on FLINK-34380:
-

Hi [~xu_shuai_] !

I can't agree with you because the order of messages for one key can't be 
violated even for distributed system. Otherwise it will lead to inconsistent 
result. You can check it by running the XuYang's example without last -D record 
for both join sides:
{code:java}
val rows = Seq(
  changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
  changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
  changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
//  changelogRow("-D", java.lang.Integer.valueOf(1), "99")
) {code}
And the result will be:
{code:java}
++-++-++
| op |           a |                              b |          a0 |             
                b0 |
++-++-++
| +U |           1 |                              1 |           1 |             
                99 |
| +U |           1 |                             99 |           1 |             
                99 |
| -U |           1 |                              1 |           1 |             
                99 |
++-++-++
 {code}
This means that sink or downstream operator will receive the -U (retract 
record) as last record. And I guess no downstream or sink operator can 
correctly process elements in such order.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-15 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846520#comment-17846520
 ] 

Roman Boyko edited comment on FLINK-34380 at 5/15/24 7:14 AM:
--

Hi [~xuyangzhong] ! Thank you for your reply.

Yes, you're right - the RowKind still not fixed in this example. But I think we 
should consider to fix the RowKind in separate issue because:

1) Incorrect RowKind in your example is the common problem of MiniBatch 
functionality. It happens every time when +I and -U records are assigned to 
first batch and then +U record is assigned to second batch. And it can't be 
fixed easily and only for Join operator - we should try to reproduce the same 
for Aggregate operator and fix it as well

2) While incorrect RowKind is not so serious problem, the incorrect order of 
output records might be really critical because it leads to incorrect result

So I sugest to fix only incorrect order in this issue and create the separate 
one for incorrect RowKind.


was (Author: rovboyko):
Hi [~xuyangzhong] ! Thank you for your reply.

Yes, you're right - the RowKind still not fixed in this example. But I think we 
should consider to fix the RowKind in separate issue because:

1) Incorrect RowKind in your example is the common problem of MiniBatch 
functionality. It happens every time when +I and -U records are assigned to 
first batch and then +U record is assigned to second batch. And it can't be 
fixed easily and only for Join operator - we should try to reproduce the same 
for Aggregate operator as well

2) While incorrect RowKind is not so serious problem, the incorrect order of 
output records might be really critical because it leads to incorrect result

So I sugest to fix only incorrect order in this issue and create the separate 
one for incorrect RowKind.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-15 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846520#comment-17846520
 ] 

Roman Boyko commented on FLINK-34380:
-

Hi [~xuyangzhong] ! Thank you for your reply.

Yes, you're right - the RowKind still not fixed in this example. But I think we 
should consider to fix the RowKind in separate issue because:

1) Incorrect RowKind in your example is the common problem of MiniBatch 
functionality. It happens every time when +I and -U records are assigned to 
first batch and then +U record is assigned to second batch. And it can't be 
fixed easily and only for Join operator - we should try to reproduce the same 
for Aggregate operator as well

2) While incorrect RowKind is not so serious problem, the incorrect order of 
output records might be really critical because it leads to incorrect result

So I sugest to fix only incorrect order in this issue and create the separate 
one for incorrect RowKind.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-15 Thread Roman Boyko (Jira)


[ https://issues.apache.org/jira/browse/FLINK-34380 ]


Roman Boyko deleted comment on FLINK-34380:
-

was (Author: rovboyko):
Hi [~xuyangzhong] ! Thank you for your reply.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-15 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846516#comment-17846516
 ] 

Roman Boyko commented on FLINK-34380:
-

Hi [~xuyangzhong] ! Thank you for your reply.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-13 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846141#comment-17846141
 ] 

Roman Boyko commented on FLINK-34380:
-

[~xu_shuai_] , [~xuyangzhong] , could you please take a look?

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join

2024-05-13 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846140#comment-17846140
 ] 

Roman Boyko commented on FLINK-34694:
-

[~xu_shuai_] , please take a look.

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png, image-2024-04-15-19-14-14-735.png, 
> image-2024-04-15-19-14-41-909.png, image-2024-04-15-19-15-23-010.png, 
> image-2024-04-26-16-55-19-800.png, image-2024-04-26-16-55-56-994.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-04-29 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842243#comment-17842243
 ] 

Roman Boyko commented on FLINK-34380:
-

[~xuyangzhong] , [~xu_shuai_] , what do you think? May I create a PR for fixing 
it?

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-29 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842232#comment-17842232
 ] 

Roman Boyko commented on FLINK-35184:
-

[~lsy] , done - https://github.com/apache/flink/pull/24749

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Assignee: Roman Boyko
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-27 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841427#comment-17841427
 ] 

Roman Boyko commented on FLINK-34694:
-

Hi [~xu_shuai_] !

Could you please take a look on benchmark results above?

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png, image-2024-04-15-19-14-14-735.png, 
> image-2024-04-15-19-14-41-909.png, image-2024-04-15-19-15-23-010.png, 
> image-2024-04-26-16-55-19-800.png, image-2024-04-26-16-55-56-994.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-26 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837205#comment-17837205
 ] 

Roman Boyko edited comment on FLINK-34694 at 4/26/24 10:04 AM:
---

Hi [~xu_shuai_] !

I prepared and executed all nexmark which uses streaming join (q4, q9 and q20). 
Because all of them use INNER JOIN (but this optimization works only for outer 
join) I created the copy with FULL OUTER JOIN for every one.

BEFORE optimization:

!image-2024-04-26-16-55-19-800.png!

AFTER optimization:

!image-2024-04-26-16-55-56-994.png!

As you can see here - for all INNER JOIN queries the result remains almost the 
same (small difference most probably cause the measurement error). But for all 
FULL OUTER JOIN benchmarks the performance is increased. Especially for 
q20_outer where it was more than 3 times better. The reason of such huge 
difference can be found on flame graph:

BEFORE optimization:

!image-2024-04-15-19-15-23-010.png!

 

AFTER optimization:

!image-2024-04-15-19-14-41-909.png!

 

Because of prevalence of state.update operation in before-optimization case the 
rocksdb CompactionJob is invoked more often spending the most CPU time.

Totally the performance boost is 6.75 / 5.15 = 1.31 (30%).


was (Author: rovboyko):
Hi [~xu_shuai_] !

I prepared and executed all nexmark which uses streaming join (q4, q9 and q20). 
Because all of them use INNER JOIN (but this optimization works only for outer 
join) I created the copy with FULL OUTER JOIN for every one.

BEFORE optimization:

!image-2024-04-26-16-55-19-800.png!

AFTER optimization:

!image-2024-04-26-16-55-56-994.png!

As you can see here - for all INNER JOIN queries the result remains almost the 
same (small difference most probably cause the measurement error). But for all 
FULL OUTER JOIN benchmarks the performance is increased. Especially for 
q20_outer where it was more than 3 times better. The reason of such huge 
difference can be found on flame graph:

BEFORE optimization:

!image-2024-04-15-19-15-23-010.png!

 

AFTER optimization:

!image-2024-04-15-19-14-41-909.png!

 

Because of prevalence of state.update operation in before-optimization case the 
rocksdb CompactionJob is invoked more often spending the most CPU time.

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png, image-2024-04-15-19-14-14-735.png, 
> image-2024-04-15-19-14-41-909.png, image-2024-04-15-19-15-23-010.png, 
> image-2024-04-26-16-55-19-800.png, image-2024-04-26-16-55-56-994.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-26 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837205#comment-17837205
 ] 

Roman Boyko edited comment on FLINK-34694 at 4/26/24 10:05 AM:
---

Hi [~xu_shuai_] !

I prepared and executed all nexmark which uses streaming join (q4, q9 and q20). 
Because all of them use INNER JOIN (but this optimization works only for outer 
join) I created the copy with FULL OUTER JOIN for every one.

BEFORE optimization:

!image-2024-04-26-16-55-19-800.png!

AFTER optimization:

!image-2024-04-26-16-55-56-994.png!

As you can see here - for all INNER JOIN queries the result remains almost the 
same (small difference most probably cause the measurement error). But for all 
FULL OUTER JOIN benchmarks the performance is increased. Especially for 
q20_outer where it was more than 3 times better. The reason of such huge 
difference can be found on flame graph:

BEFORE optimization:

!image-2024-04-15-19-15-23-010.png!

 

AFTER optimization:

!image-2024-04-15-19-14-41-909.png!

 

Because of prevalence of state.update operation in before-optimization case the 
rocksdb CompactionJob is invoked more often spending the most CPU time.

Totally the performance boost is 6.75 / 5.15 = 1.31 (31%).


was (Author: rovboyko):
Hi [~xu_shuai_] !

I prepared and executed all nexmark which uses streaming join (q4, q9 and q20). 
Because all of them use INNER JOIN (but this optimization works only for outer 
join) I created the copy with FULL OUTER JOIN for every one.

BEFORE optimization:

!image-2024-04-26-16-55-19-800.png!

AFTER optimization:

!image-2024-04-26-16-55-56-994.png!

As you can see here - for all INNER JOIN queries the result remains almost the 
same (small difference most probably cause the measurement error). But for all 
FULL OUTER JOIN benchmarks the performance is increased. Especially for 
q20_outer where it was more than 3 times better. The reason of such huge 
difference can be found on flame graph:

BEFORE optimization:

!image-2024-04-15-19-15-23-010.png!

 

AFTER optimization:

!image-2024-04-15-19-14-41-909.png!

 

Because of prevalence of state.update operation in before-optimization case the 
rocksdb CompactionJob is invoked more often spending the most CPU time.

Totally the performance boost is 6.75 / 5.15 = 1.31 (30%).

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png, image-2024-04-15-19-14-14-735.png, 
> image-2024-04-15-19-14-41-909.png, image-2024-04-15-19-15-23-010.png, 
> image-2024-04-26-16-55-19-800.png, image-2024-04-26-16-55-56-994.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-26 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837205#comment-17837205
 ] 

Roman Boyko edited comment on FLINK-34694 at 4/26/24 10:01 AM:
---

Hi [~xu_shuai_] !

I prepared and executed all nexmark which uses streaming join (q4, q9 and q20). 
Because all of them use INNER JOIN (but this optimization works only for outer 
join) I created the copy with FULL OUTER JOIN for every one.

BEFORE optimization:

!image-2024-04-26-16-55-19-800.png!

AFTER optimization:

!image-2024-04-26-16-55-56-994.png!

As you can see here - for all INNER JOIN queries the result remains almost the 
same (small difference most probably cause the measurement error). But for all 
FULL OUTER JOIN benchmarks the performance is increased. Especially for 
q20_outer where it was more than 3 times better. The reason of such huge 
difference can be found on flame graph:

BEFORE optimization:

!image-2024-04-15-19-15-23-010.png!

 

AFTER optimization:

!image-2024-04-15-19-14-41-909.png!

 

Because of prevalence of state.update operation in before-optimization case the 
rocksdb CompactionJob is invoked more often spending the most CPU time.


was (Author: rovboyko):
Hi [~xu_shuai_] !

I prepared and executed all nexmark which uses streaming join (q4, q7, q9 and 
q20). Because all of them use INNER JOIN (but this optimization works only for 
outer join) I created the copy with FULL OUTER JOIN for every one.

BEFORE optimization:

!image-2024-04-15-15-45-51-027.png!

AFTER optimization:

!image-2024-04-15-15-46-17-671.png!

As you can see here - for all queries except q20_outer the result remains 
almost the same (small difference most probably cause the measurement error). 
But for q20_outer the performance is more than 2 times better (I repeated the 
test several times). The reason of such huge difference can be found on flame 
graph:

BEFORE optimization:

!image-2024-04-15-19-15-23-010.png!

 

AFTER optimization:

!image-2024-04-15-19-14-41-909.png!

 

Because of prevalence of state.update operation in before-optimization case the 
rocksdb CompactionJob is invoked more often spending the most CPU time.

There is no such performance boost for q4, q7 and q9 because:
 * q7 translates to Interval join
 * q4 and q9 transformed to InnerJoin by FlinkFilterJoinRule (maybe this is a 
bug, I will check later)

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png, image-2024-04-15-19-14-14-735.png, 
> image-2024-04-15-19-14-41-909.png, image-2024-04-15-19-15-23-010.png, 
> image-2024-04-26-16-55-19-800.png, image-2024-04-26-16-55-56-994.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-23 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839978#comment-17839978
 ] 

Roman Boyko commented on FLINK-35184:
-

I've found even simplier solution without changing the storage schema - 
https://github.com/apache/flink/pull/24703

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>  Labels: pull-request-available
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839928#comment-17839928
 ] 

Roman Boyko commented on FLINK-35184:
-

[~xu_shuai_] , Ok agree with you. So may I start the implementation?

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839578#comment-17839578
 ] 

Roman Boyko commented on FLINK-35184:
-

"bug which is caused by the hashcode() in GenericRowData"

And by the way - why do you think that BinaryRowData can't produce the same 
collision?

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839540#comment-17839540
 ] 

Roman Boyko commented on FLINK-35184:
-

Most probably the easiest way to fix it would be to replace the Map> to Map> inside 
InputSideHasNoUniqueKeyBundle. In such attempt we should store every record as 
map key with constant rowKind=+I, while rowkinds would be stored as values. And 
before providing the record to processing we should set the proper rowKind back 
to it.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-04-21 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839474#comment-17839474
 ] 

Roman Boyko commented on FLINK-34380:
-

Hi [~xuyangzhong] , [~xu_shuai_] !

1) The RowKind can't be fixed in current architecture, because +I and +U are 
separated in different batches in this example. And this would be are bit 
tricky to fix it.

2) But the records order is really incorrect in this example and it can be 
easily fixed - https://github.com/rovboyko/flink/tree/fix/FLINK-34380

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-21 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839382#comment-17839382
 ] 

Roman Boyko commented on FLINK-35184:
-

please assign this bug on me, I'm working on it.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-21 Thread Roman Boyko (Jira)
Roman Boyko created FLINK-35184:
---

 Summary: Hash collision inside MiniBatchStreamingJoin operator
 Key: FLINK-35184
 URL: https://issues.apache.org/jira/browse/FLINK-35184
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.19.0
Reporter: Roman Boyko


The hash collision is possible for InputSideHasNoUniqueKeyBundle. To reproduce 
it just launch the following test within StreamingMiniBatchJoinOperatorTest:

 
{code:java}
@Tag("miniBatchSize=6")
@Test
public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) throws 
Exception {

leftTypeInfo =
InternalTypeInfo.of(
RowType.of(
new LogicalType[] {new IntType(), new BigIntType()},
new String[] {"id1", "val1"}));

rightTypeInfo =
InternalTypeInfo.of(
RowType.of(
new LogicalType[] {new IntType(), new BigIntType()},
new String[] {"id2", "val2"}));

leftKeySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {0},
leftTypeInfo.toRowType().getChildren().toArray(new 
LogicalType[0]));
rightKeySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {0},
rightTypeInfo.toRowType().getChildren().toArray(new 
LogicalType[0]));

joinKeyTypeInfo = InternalTypeInfo.of(new IntType());

super.beforeEach(testInfo);

testHarness.setStateTtlProcessingTime(1);
testHarness.processElement2(insertRecord(1, 1L));
testHarness.processElement1(insertRecord(1, 4294967296L));
testHarness.processElement2(insertRecord(1, 4294967296L));
testHarness.processElement2(deleteRecord(1, 1L));

testHarness.close();

assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 4294967296L, 
1, 4294967296L));
} {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-15 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837205#comment-17837205
 ] 

Roman Boyko edited comment on FLINK-34694 at 4/15/24 12:15 PM:
---

Hi [~xu_shuai_] !

I prepared and executed all nexmark which uses streaming join (q4, q7, q9 and 
q20). Because all of them use INNER JOIN (but this optimization works only for 
outer join) I created the copy with FULL OUTER JOIN for every one.

BEFORE optimization:

!image-2024-04-15-15-45-51-027.png!

AFTER optimization:

!image-2024-04-15-15-46-17-671.png!

As you can see here - for all queries except q20_outer the result remains 
almost the same (small difference most probably cause the measurement error). 
But for q20_outer the performance is more than 2 times better (I repeated the 
test several times). The reason of such huge difference can be found on flame 
graph:

BEFORE optimization:

!image-2024-04-15-19-15-23-010.png!

 

AFTER optimization:

!image-2024-04-15-19-14-41-909.png!

 

Because of prevalence of state.update operation in before-optimization case the 
rocksdb CompactionJob is invoked more often spending the most CPU time.

There is no such performance boost for q4, q7 and q9 because:
 * q7 translates to Interval join
 * q4 and q9 transformed to InnerJoin by FlinkFilterJoinRule (maybe this is a 
bug, I will check later)


was (Author: rovboyko):
Hi [~xu_shuai_] !

I prepared and executed all nexmark which uses streaming join (q4, q7, q9 and 
q20). Because all of them use INNER JOIN (but this optimization works only for 
outer join) I created the copy with FULL OUTER JOIN for every one.

BEFORE optimization:

!image-2024-04-15-15-45-51-027.png!

AFTER optimization:

!image-2024-04-15-15-46-17-671.png!

As you can see here - for all queries except q20_outer the result remains 
almost the same (small difference most probably cause the measurement error). 
But for q20_outer the performance is more than 2 times better (I repeated the 
test several times). The reason of such huge difference can be found on flame 
graph:

BEFORE optimization:

!image-2024-04-15-15-53-44-308.png!

 

AFTER optimization:

!image-2024-04-15-15-55-27-313.png!

 

Because of prevalence of state.update operation in before-optimization case the 
rocksdb CompactionJob is invoked more often spending the most CPU time.

There is no such performance boost for q4, q7 and q9 because:
 * q7 translates to Interval join
 * q4 and q9 transformed to InnerJoin by FlinkFilterJoinRule (maybe this is a 
bug, I will check later)

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png, image-2024-04-15-19-14-14-735.png, 
> image-2024-04-15-19-14-41-909.png, image-2024-04-15-19-15-23-010.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-15 Thread Roman Boyko (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Boyko updated FLINK-34694:

Attachment: image-2024-04-15-19-14-14-735.png

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png, image-2024-04-15-19-14-14-735.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-15 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837211#comment-17837211
 ] 

Roman Boyko commented on FLINK-34694:
-

The images of profiler results for previous comment.

BEFOR optimization:

!image-2024-04-15-19-05-50-379.png!

 

AFTER optimization:

!image-2024-04-15-19-06-12-298.png!

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-15 Thread Roman Boyko (Jira)


[ https://issues.apache.org/jira/browse/FLINK-34694 ]


Roman Boyko deleted comment on FLINK-34694:
-

was (Author: rovboyko):
The images of profiler results for previous comment.

BEFOR optimization:

!image-2024-04-15-19-05-50-379.png!

 

AFTER optimization:

!image-2024-04-15-19-06-12-298.png!

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-15 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837205#comment-17837205
 ] 

Roman Boyko commented on FLINK-34694:
-

Hi [~xu_shuai_] !

I prepared and executed all nexmark which uses streaming join (q4, q7, q9 and 
q20). Because all of them use INNER JOIN (but this optimization works only for 
outer join) I created the copy with FULL OUTER JOIN for every one.

BEFORE optimization:

!image-2024-04-15-15-45-51-027.png!

AFTER optimization:

!image-2024-04-15-15-46-17-671.png!

As you can see here - for all queries except q20_outer the result remains 
almost the same (small difference most probably cause the measurement error). 
But for q20_outer the performance is more than 2 times better (I repeated the 
test several times). The reason of such huge difference can be found on flame 
graph:

BEFORE optimization:

!image-2024-04-15-15-53-44-308.png!

 

AFTER optimization:

!image-2024-04-15-15-55-27-313.png!

 

Because of prevalence of state.update operation in before-optimization case the 
rocksdb CompactionJob is invoked more often spending the most CPU time.

There is no such performance boost for q4, q7 and q9 because:
 * q7 translates to Interval join
 * q4 and q9 transformed to InnerJoin by FlinkFilterJoinRule (maybe this is a 
bug, I will check later)

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-15 Thread Roman Boyko (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Boyko updated FLINK-34694:

Attachment: image-2024-04-15-15-46-17-671.png

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-15 Thread Roman Boyko (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Boyko updated FLINK-34694:

Attachment: image-2024-04-15-15-45-51-027.png

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-10 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835650#comment-17835650
 ] 

Roman Boyko commented on FLINK-34694:
-

Hi [~xu_shuai_] ! Thank you for your reply!

Actually this optimization reduces not only state size but the state access 
too: the method `updateNumOfAssociations()` is not invoked anymore. And the 
main performance improvement is gained mostly because of reducing state access 
operations.

According to benchmarks - ok, I will measure all the queries of nexmark which 
have JOIN keyword inside and prepare the comparison table. Would it be ok?

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-09 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835599#comment-17835599
 ] 

Roman Boyko commented on FLINK-34694:
-

[~martijnvisser] , could you please help with assigning this task to me or 
getting any feedback? From my point of view 20% performance boost - is a good 
result especially when it has no drawbacks. Or I missed something?

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30088) Excessive state updates for TtlMapState and TtlListState

2024-04-01 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832804#comment-17832804
 ] 

Roman Boyko commented on FLINK-30088:
-

I've updated the PR ([https://github.com/apache/flink/pull/21406]) to optimize 
only TtlMapState and added benchmark results there. 

The total performance improvement is above 17%.

> Excessive state updates for TtlMapState and TtlListState
> 
>
> Key: FLINK-30088
> URL: https://issues.apache.org/jira/browse/FLINK-30088
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Roman Boyko
>Assignee: Roman Boyko
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2022-11-18-20-25-14-466.png, 
> image-2022-11-18-20-27-24-054.png
>
>
> After merging the FLINK-21413 every ttl check for cleanup for TtlMapState and 
> TtlListState (even without expired elements) leads to whole state update.
> This is because:
> - comparison by link inside `TtlIncrementalCleanup`:
> !image-2022-11-18-20-25-14-466.png|width=450,height=288!
> - and creating new map or list inside TtlMapState or TtlListState:
> !image-2022-11-18-20-27-24-054.png|width=477,height=365!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30088) Excessive state updates for TtlMapState and TtlListState

2024-04-01 Thread Roman Boyko (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Boyko updated FLINK-30088:

Labels: pull-request-available  (was: pull-request-available stale-assigned)

> Excessive state updates for TtlMapState and TtlListState
> 
>
> Key: FLINK-30088
> URL: https://issues.apache.org/jira/browse/FLINK-30088
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Roman Boyko
>Assignee: Roman Boyko
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2022-11-18-20-25-14-466.png, 
> image-2022-11-18-20-27-24-054.png
>
>
> After merging the FLINK-21413 every ttl check for cleanup for TtlMapState and 
> TtlListState (even without expired elements) leads to whole state update.
> This is because:
> - comparison by link inside `TtlIncrementalCleanup`:
> !image-2022-11-18-20-25-14-466.png|width=450,height=288!
> - and creating new map or list inside TtlMapState or TtlListState:
> !image-2022-11-18-20-27-24-054.png|width=477,height=365!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34694) Delete num of associations for streaming outer join

2024-03-25 Thread Roman Boyko (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Boyko updated FLINK-34694:

Affects Version/s: (was: 1.16.3)
   (was: 1.17.2)
   (was: 1.19.0)
   (was: 1.18.1)

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join

2024-03-22 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829770#comment-17829770
 ] 

Roman Boyko commented on FLINK-34694:
-

[~libenchao] What do you think about feature and performance results? Is it 
worth to do?

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30535) Introduce TTL state based benchmarks

2024-03-19 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828309#comment-17828309
 ] 

Roman Boyko commented on FLINK-30535:
-

[~Zakelly] , thank for this functionality!

One question - why it's still in OPEN status?

> Introduce TTL state based benchmarks
> 
>
> Key: FLINK-30535
> URL: https://issues.apache.org/jira/browse/FLINK-30535
> Project: Flink
>  Issue Type: New Feature
>  Components: Benchmarks
>Reporter: Yun Tang
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>
> This ticket is inspired by https://issues.apache.org/jira/browse/FLINK-30088 
> which wants to optimize the TTL state performance. I think it would be useful 
> to introduce state benchmarks based on TTL as Flink has some overhead to 
> support TTL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join

2024-03-16 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827664#comment-17827664
 ] 

Roman Boyko commented on FLINK-34694:
-

After failed CI I've realized the state is changed for outer side states and 
can't be restored from previously made checkpoints. Thanks to RestoreTest 
functionality for checking it=)

So now I have two options here: either use the old state schema for outer side 
(it would still have the same performance) or increase the version of 
ExecNodeMetadata for StreamExecJoin node.

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34694) Delete num of associations for streaming outer join

2024-03-15 Thread Roman Boyko (Jira)
Roman Boyko created FLINK-34694:
---

 Summary: Delete num of associations for streaming outer join
 Key: FLINK-34694
 URL: https://issues.apache.org/jira/browse/FLINK-34694
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.18.1, 1.19.0, 1.17.2, 1.16.3
Reporter: Roman Boyko
 Attachments: image-2024-03-15-19-51-29-282.png, 
image-2024-03-15-19-52-24-391.png

Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
OuterJoinRecordStateView is used to store additional field - the number of 
associations for every record. This leads to store additional Tuple2 and 
Integer data for every record in outer state.

This functionality is used only for sending:
 * -D[nullPaddingRecord] in case of first Accumulate record
 * +I[nullPaddingRecord] in case of last Revoke record

The overhead of storing additional data and updating the counter for 
associations can be avoided by checking the input state for these events.

 

The proposed solution can be found here - 
[https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]

 

According to the nexmark q20 test (changed to OUTER JOIN) it could increase the 
performance up to 20%:
 * Before:

!image-2024-03-15-19-52-24-391.png!
 * After:

!image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30088) Excessive state updates for TtlMapState and TtlListState

2022-11-18 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635875#comment-17635875
 ] 

Roman Boyko commented on FLINK-30088:
-

The fix is ready - 
[https://github.com/rovboyko/flink/commit/cf8b7c1de5e3dfbe83106a4de89423e17e36e50e]

Waiting for task to be approved and assigned to me. Thanks.

> Excessive state updates for TtlMapState and TtlListState
> 
>
> Key: FLINK-30088
> URL: https://issues.apache.org/jira/browse/FLINK-30088
> Project: Flink
>  Issue Type: Bug
>Reporter: Roman Boyko
>Priority: Minor
> Attachments: image-2022-11-18-20-25-14-466.png, 
> image-2022-11-18-20-27-24-054.png
>
>
> After merging the FLINK-21413 every ttl check for cleanup for TtlMapState and 
> TtlListState (even without expired elements) leads to whole state update.
> This is because:
> - comparison by link inside `TtlIncrementalCleanup`:
> !image-2022-11-18-20-25-14-466.png|width=450,height=288!
> - and creating new map or list inside TtlMapState or TtlListState:
> !image-2022-11-18-20-27-24-054.png|width=477,height=365!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30088) Excessive state updates for TtlMapState and TtlListState

2022-11-18 Thread Roman Boyko (Jira)
Roman Boyko created FLINK-30088:
---

 Summary: Excessive state updates for TtlMapState and TtlListState
 Key: FLINK-30088
 URL: https://issues.apache.org/jira/browse/FLINK-30088
 Project: Flink
  Issue Type: Bug
Reporter: Roman Boyko
 Attachments: image-2022-11-18-20-25-14-466.png, 
image-2022-11-18-20-27-24-054.png

After merging the FLINK-21413 every ttl check for cleanup for TtlMapState and 
TtlListState (even without expired elements) leads to whole state update.

This is because:

- comparison by link inside `TtlIncrementalCleanup`:

!image-2022-11-18-20-25-14-466.png|width=450,height=288!

- and creating new map or list inside TtlMapState or TtlListState:

!image-2022-11-18-20-27-24-054.png|width=477,height=365!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27438) SQL validation failed when constructing a map array

2022-07-08 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17564339#comment-17564339
 ] 

Roman Boyko commented on FLINK-27438:
-

Hi [~twalthr] !

Could you please check the changed fix - 
[https://github.com/apache/flink/pull/19648] ?

> SQL validation failed when constructing a map array
> ---
>
> Key: FLINK-27438
> URL: https://issues.apache.org/jira/browse/FLINK-27438
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.4, 1.15.0
>Reporter: Wei Zhong
>Assignee: Roman Boyko
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> Exception: 
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. Unsupported type when convertTypeToSpec: MAP
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:185)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:695)
>     at 
> org.apache.flink.table.planner.plan.stream.sql.LegacyTableFactoryTest.(LegacyTableFactoryTest.java:35)
>     at 
> org.apache.flink.table.planner.plan.stream.sql.LegacyTableFactoryTest.main(LegacyTableFactoryTest.java:49)
> Caused by: java.lang.UnsupportedOperationException: Unsupported type when 
> convertTypeToSpec: MAP
>     at 
> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1059)
>     at 
> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1081)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.castTo(SqlValidatorUtils.java:82)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.adjustTypeForMultisetConstructor(SqlValidatorUtils.java:74)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.adjustTypeForArrayConstructor(SqlValidatorUtils.java:39)
>     at 
> org.apache.flink.table.planner.functions.sql.SqlArrayConstructor.inferReturnType(SqlArrayConstructor.java:44)
>     at 
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:449)
>     at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5716)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5703)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:421)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347)
>     at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>     at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:180)
>     ... 6 more {code}
> How to reproduce:
> {code:java}
> tableEnv.executeSql("select array[map['A', 'AA'], map['B', 'BB'], map['C', 
> CAST(NULL AS STRING)]] from (VALUES ('a'))").print(); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27418) Flink SQL TopN result is wrong

2022-05-20 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17540342#comment-17540342
 ] 

Roman Boyko commented on FLINK-27418:
-

The PR is ready - [https://github.com/apache/flink/pull/19778]

[~zhangbinzaifendou] , [~jark] , could you please have a look?

> Flink SQL TopN result is wrong
> --
>
> Key: FLINK-27418
> URL: https://issues.apache.org/jira/browse/FLINK-27418
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.14.3
> Environment: Flink 1.12.2 and Flink 1.14.3 test results are sometimes 
> wrong
>Reporter: zhangbin
>Assignee: Roman Boyko
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL TopN is executed multiple times with different results, sometimes 
> with correct results and sometimes with incorrect results.
> Example:
> {code:java}
> @Test
> public void flinkSqlJoinRetract() {
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> streamEnv.setParallelism(1);
> StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(streamEnv, settings);
> tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(1));
> RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo();
> RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo();
> SourceFunction waybillSourceFunction = 
> buildWaybillStreamSource(waybillTableTypeInfo);
> SourceFunction itemSourceFunction = 
> buildItemStreamSource(itemTableTypeInfo);
> String waybillTable = "waybill";
> String itemTable = "item";
> DataStreamSource waybillStream = streamEnv.addSource(
> waybillSourceFunction,
> waybillTable,
> waybillTableTypeInfo);
> DataStreamSource itemStream = streamEnv.addSource(
> itemSourceFunction,
> itemTable,
> itemTableTypeInfo);
> Expression[] waybillFields = ExpressionParser
> .parseExpressionList(String.join(",", 
> waybillTableTypeInfo.getFieldNames())
> + ",proctime.proctime").toArray(new Expression[0]);
> Expression[] itemFields = ExpressionParser
> .parseExpressionList(
> String.join(",", itemTableTypeInfo.getFieldNames()) + 
> ",proctime.proctime")
> .toArray(new Expression[0]);
> tableEnv.createTemporaryView(waybillTable, waybillStream, 
> waybillFields);
> tableEnv.createTemporaryView(itemTable, itemStream, itemFields);
> String sql =
> "select \n"
> + "city_id, \n"
> + "count(*) as cnt\n"
> + "from (\n"
> + "select id,city_id\n"
> + "from (\n"
> + "select \n"
> + "id,\n"
> + "city_id,\n"
> + "row_number() over(partition by id order by 
> utime desc ) as rno \n"
> + "from (\n"
> + "select \n"
> + "waybill.id as id,\n"
> + "coalesce(item.city_id, waybill.city_id) as 
> city_id,\n"
> + "waybill.utime as utime \n"
> + "from waybill left join item \n"
> + "on waybill.id = item.id \n"
> + ") \n"
> + ")\n"
> + "where rno =1\n"
> + ")\n"
> + "group by city_id";
> StatementSet statementSet = tableEnv.createStatementSet();
> Table table = tableEnv.sqlQuery(sql);
> DataStream> rowDataStream = 
> tableEnv.toRetractStream(table, Row.class);
> rowDataStream.printToErr();
> try {
> streamEnv.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> private static RowTypeInfo buildWaybillTableTypeInfo() {
> TypeInformation[] types = new TypeInformation[]{Types.INT(), 
> Types.STRING(), Types.LONG(), Types.LONG()};
> String[] fields = new String[]{"id", "city_id", "rider_id", "utime"};
> return new RowTypeInfo(types, fields);
> }
> private static RowTypeInfo buildItemTableTypeInfo() {
> TypeInformation[] types = new TypeInformation[]{Types.INT(), 
> Types.STRING(), Types.LONG()};
> String[] fields = new 

[jira] [Comment Edited] (FLINK-27418) Flink SQL TopN result is wrong

2022-05-20 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539968#comment-17539968
 ] 

Roman Boyko edited comment on FLINK-27418 at 5/20/22 7:40 AM:
--

The bug exists indeed. The simplest way to reproduce it is add this to 
RetractableTopNFunctionTest:
{code:java}
@Test
public void testRetractRowNumber() throws Exception {
AbstractTopNFunction func =
new RetractableTopNFunction(
ttlConfig,
InternalTypeInfo.ofFields(
VarCharType.STRING_TYPE,
new BigIntType(),
new IntType(),
new IntType()),
comparableRecordComparator,
sortKeySelector,
RankType.ROW_NUMBER,
new ConstantRankRange(1, 1),
generatedEqualiser,
true,
false);

OneInputStreamOperatorTestHarness testHarness = 
createTestHarness(func);
testHarness.open();
testHarness.processElement(insertRecord("a", 1L, 10, 0));
testHarness.processElement(insertRecord("a", 1L, 9, 0));
testHarness.processElement(deleteRecord("a", 1L, 10, 0));
testHarness.processElement(deleteRecord("a", 1L, 9, 0));
testHarness.processElement(insertRecord("a", 1L, 10, 1));
testHarness.processElement(insertRecord("a", 1L, 9, 1));
testHarness.close();

List expectedOutput = new ArrayList<>();
expectedOutput.add(insertRecord("a", 1L, 10, 0));
expectedOutput.add(deleteRecord("a", 1L, 10, 0));
expectedOutput.add(insertRecord("a", 1L, 9, 0));
expectedOutput.add(deleteRecord("a", 1L, 9, 0));
expectedOutput.add(insertRecord("a", 1L, 10, 1));
expectedOutput.add(deleteRecord("a", 1L, 10, 1));
expectedOutput.add(insertRecord("a", 1L, 9, 1));

assertorWithRowNumber.assertOutputEquals(
"output wrong.", expectedOutput, testHarness.getOutput());
} {code}
I'll take it.

[~zhangbinzaifendou] , [~martijnvisser] could you please assign it to me?


was (Author: rovboyko):
The bug exists indeed. The simplest way to reproduce it is add this to 
RetractableTopNFunctionTest:
{code:java}
@Test
public void testRetractRowNumber() throws Exception {
AbstractTopNFunction func =
new RetractableTopNFunction(
ttlConfig,
InternalTypeInfo.ofFields(
VarCharType.STRING_TYPE,
new BigIntType(),
new IntType(),
new IntType()),
comparableRecordComparator,
sortKeySelector,
RankType.ROW_NUMBER,
new ConstantRankRange(1, 1),
generatedEqualiser,
true,
false);

OneInputStreamOperatorTestHarness testHarness = 
createTestHarness(func);
testHarness.open();
testHarness.processElement(insertRecord("a", 1L, 10, 0));
testHarness.processElement(insertRecord("a", 1L, 9, 0));
testHarness.processElement(deleteRecord("a", 1L, 10, 0));
testHarness.processElement(deleteRecord("a", 1L, 9, 0));
testHarness.processElement(insertRecord("a", 1L, 10, 1));
testHarness.processElement(insertRecord("a", 1L, 9, 1));
testHarness.close();

List expectedOutput = new ArrayList<>();
expectedOutput.add(insertRecord("a", 1L, 10, 0));
expectedOutput.add(deleteRecord("a", 1L, 10, 0));
expectedOutput.add(insertRecord("a", 1L, 9, 0));
expectedOutput.add(deleteRecord("a", 1L, 9, 0));
expectedOutput.add(insertRecord("a", 1L, 10, 1));
expectedOutput.add(deleteRecord("a", 1L, 10, 1));
expectedOutput.add(insertRecord("a", 1L, 9, 1));

assertorWithRowNumber.assertOutputEquals(
"output wrong.", expectedOutput, testHarness.getOutput());
} {code}
I'll take it.

> Flink SQL TopN result is wrong
> --
>
> Key: FLINK-27418
> URL: https://issues.apache.org/jira/browse/FLINK-27418
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.14.3
> Environment: Flink 1.12.2 and Flink 1.14.3 test results are sometimes 
> wrong
>Reporter: zhangbin
>Priority: Major
>
> Flink SQL TopN is executed multiple times with different results, sometimes 
> with correct results and sometimes with incorrect results.
> Example:
> {code:java}
> @Test
> public void flinkSqlJoinRetract() {
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamExecutionEnvironment streamEnv = 
> 

[jira] [Commented] (FLINK-27418) Flink SQL TopN result is wrong

2022-05-20 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539968#comment-17539968
 ] 

Roman Boyko commented on FLINK-27418:
-

The bug exists indeed. The simplest way to reproduce it is add this to 
RetractableTopNFunctionTest:
{code:java}
@Test
public void testRetractRowNumber() throws Exception {
AbstractTopNFunction func =
new RetractableTopNFunction(
ttlConfig,
InternalTypeInfo.ofFields(
VarCharType.STRING_TYPE,
new BigIntType(),
new IntType(),
new IntType()),
comparableRecordComparator,
sortKeySelector,
RankType.ROW_NUMBER,
new ConstantRankRange(1, 1),
generatedEqualiser,
true,
false);

OneInputStreamOperatorTestHarness testHarness = 
createTestHarness(func);
testHarness.open();
testHarness.processElement(insertRecord("a", 1L, 10, 0));
testHarness.processElement(insertRecord("a", 1L, 9, 0));
testHarness.processElement(deleteRecord("a", 1L, 10, 0));
testHarness.processElement(deleteRecord("a", 1L, 9, 0));
testHarness.processElement(insertRecord("a", 1L, 10, 1));
testHarness.processElement(insertRecord("a", 1L, 9, 1));
testHarness.close();

List expectedOutput = new ArrayList<>();
expectedOutput.add(insertRecord("a", 1L, 10, 0));
expectedOutput.add(deleteRecord("a", 1L, 10, 0));
expectedOutput.add(insertRecord("a", 1L, 9, 0));
expectedOutput.add(deleteRecord("a", 1L, 9, 0));
expectedOutput.add(insertRecord("a", 1L, 10, 1));
expectedOutput.add(deleteRecord("a", 1L, 10, 1));
expectedOutput.add(insertRecord("a", 1L, 9, 1));

assertorWithRowNumber.assertOutputEquals(
"output wrong.", expectedOutput, testHarness.getOutput());
} {code}
I'll take it.

> Flink SQL TopN result is wrong
> --
>
> Key: FLINK-27418
> URL: https://issues.apache.org/jira/browse/FLINK-27418
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.14.3
> Environment: Flink 1.12.2 and Flink 1.14.3 test results are sometimes 
> wrong
>Reporter: zhangbin
>Priority: Major
>
> Flink SQL TopN is executed multiple times with different results, sometimes 
> with correct results and sometimes with incorrect results.
> Example:
> {code:java}
> @Test
> public void flinkSqlJoinRetract() {
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> streamEnv.setParallelism(1);
> StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(streamEnv, settings);
> tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(1));
> RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo();
> RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo();
> SourceFunction waybillSourceFunction = 
> buildWaybillStreamSource(waybillTableTypeInfo);
> SourceFunction itemSourceFunction = 
> buildItemStreamSource(itemTableTypeInfo);
> String waybillTable = "waybill";
> String itemTable = "item";
> DataStreamSource waybillStream = streamEnv.addSource(
> waybillSourceFunction,
> waybillTable,
> waybillTableTypeInfo);
> DataStreamSource itemStream = streamEnv.addSource(
> itemSourceFunction,
> itemTable,
> itemTableTypeInfo);
> Expression[] waybillFields = ExpressionParser
> .parseExpressionList(String.join(",", 
> waybillTableTypeInfo.getFieldNames())
> + ",proctime.proctime").toArray(new Expression[0]);
> Expression[] itemFields = ExpressionParser
> .parseExpressionList(
> String.join(",", itemTableTypeInfo.getFieldNames()) + 
> ",proctime.proctime")
> .toArray(new Expression[0]);
> tableEnv.createTemporaryView(waybillTable, waybillStream, 
> waybillFields);
> tableEnv.createTemporaryView(itemTable, itemStream, itemFields);
> String sql =
> "select \n"
> + "city_id, \n"
> + "count(*) as cnt\n"
> + "from (\n"
> + "select id,city_id\n"
> + "from (\n"
> + "select \n"
> + "id,\n"
> + "  

[jira] [Commented] (FLINK-27438) SQL validation failed when constructing a map array

2022-05-12 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536414#comment-17536414
 ] 

Roman Boyko commented on FLINK-27438:
-

Hi everyone!

I've changed the solution according to Timo's request and completely dropped 
out the code brought with https://issues.apache.org/jira/browse/FLINK-19796

[~twalthr] , [~martijnvisser] , could you please have a look? - 
[https://github.com/apache/flink/pull/19648]

> SQL validation failed when constructing a map array
> ---
>
> Key: FLINK-27438
> URL: https://issues.apache.org/jira/browse/FLINK-27438
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0, 1.14.4
>Reporter: Wei Zhong
>Assignee: Roman Boyko
>Priority: Major
>  Labels: pull-request-available
>
> Exception: 
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. Unsupported type when convertTypeToSpec: MAP
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:185)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:695)
>     at 
> org.apache.flink.table.planner.plan.stream.sql.LegacyTableFactoryTest.(LegacyTableFactoryTest.java:35)
>     at 
> org.apache.flink.table.planner.plan.stream.sql.LegacyTableFactoryTest.main(LegacyTableFactoryTest.java:49)
> Caused by: java.lang.UnsupportedOperationException: Unsupported type when 
> convertTypeToSpec: MAP
>     at 
> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1059)
>     at 
> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1081)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.castTo(SqlValidatorUtils.java:82)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.adjustTypeForMultisetConstructor(SqlValidatorUtils.java:74)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.adjustTypeForArrayConstructor(SqlValidatorUtils.java:39)
>     at 
> org.apache.flink.table.planner.functions.sql.SqlArrayConstructor.inferReturnType(SqlArrayConstructor.java:44)
>     at 
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:449)
>     at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5716)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5703)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:421)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347)
>     at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>     at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:180)
>     ... 6 more {code}
> How to reproduce:
> {code:java}
> tableEnv.executeSql("select array[map['A', 'AA'], map['B', 'BB'], map['C', 
> CAST(NULL AS STRING)]] from (VALUES ('a'))").print(); {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27438) SQL validation failed when constructing a map array

2022-05-07 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17533252#comment-17533252
 ] 

Roman Boyko commented on FLINK-27438:
-

master - 6b1cdf3b7c3bf2d8beb399f95d0a27733d6a6c39 
[https://github.com/apache/flink/pull/19648] 

1.15 - e07757ef4069178f68379b8cb9e06ec25337ed70 
[https://github.com/apache/flink/pull/19669]

1.14 - 3e35a5c4d4a6cd1508e1b9336605783be5e0ee9a 
[https://github.com/apache/flink/pull/19670] 

[~martijnvisser] , [~twalthr] , [~matriv] , could you please help with merging 
these PRs?

> SQL validation failed when constructing a map array
> ---
>
> Key: FLINK-27438
> URL: https://issues.apache.org/jira/browse/FLINK-27438
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0, 1.14.4
>Reporter: Wei Zhong
>Assignee: Roman Boyko
>Priority: Major
>  Labels: pull-request-available
>
> Exception: 
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. Unsupported type when convertTypeToSpec: MAP
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:185)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:695)
>     at 
> org.apache.flink.table.planner.plan.stream.sql.LegacyTableFactoryTest.(LegacyTableFactoryTest.java:35)
>     at 
> org.apache.flink.table.planner.plan.stream.sql.LegacyTableFactoryTest.main(LegacyTableFactoryTest.java:49)
> Caused by: java.lang.UnsupportedOperationException: Unsupported type when 
> convertTypeToSpec: MAP
>     at 
> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1059)
>     at 
> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1081)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.castTo(SqlValidatorUtils.java:82)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.adjustTypeForMultisetConstructor(SqlValidatorUtils.java:74)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.adjustTypeForArrayConstructor(SqlValidatorUtils.java:39)
>     at 
> org.apache.flink.table.planner.functions.sql.SqlArrayConstructor.inferReturnType(SqlArrayConstructor.java:44)
>     at 
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:449)
>     at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5716)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5703)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:421)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347)
>     at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>     at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:180)
>     ... 6 more {code}
> How to reproduce:
> {code:java}
> tableEnv.executeSql("select array[map['A', 'AA'], map['B', 'BB'], map['C', 
> CAST(NULL AS STRING)]] from (VALUES ('a'))").print(); {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27438) SQL validation failed when constructing a map array

2022-05-06 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532758#comment-17532758
 ] 

Roman Boyko commented on FLINK-27438:
-

[~martijnvisser] , could you please review the PR - 
[https://github.com/apache/flink/pull/19648]

> SQL validation failed when constructing a map array
> ---
>
> Key: FLINK-27438
> URL: https://issues.apache.org/jira/browse/FLINK-27438
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0, 1.14.4
>Reporter: Wei Zhong
>Assignee: Roman Boyko
>Priority: Major
>  Labels: pull-request-available
>
> Exception: 
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. Unsupported type when convertTypeToSpec: MAP
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:185)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:695)
>     at 
> org.apache.flink.table.planner.plan.stream.sql.LegacyTableFactoryTest.(LegacyTableFactoryTest.java:35)
>     at 
> org.apache.flink.table.planner.plan.stream.sql.LegacyTableFactoryTest.main(LegacyTableFactoryTest.java:49)
> Caused by: java.lang.UnsupportedOperationException: Unsupported type when 
> convertTypeToSpec: MAP
>     at 
> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1059)
>     at 
> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1081)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.castTo(SqlValidatorUtils.java:82)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.adjustTypeForMultisetConstructor(SqlValidatorUtils.java:74)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.adjustTypeForArrayConstructor(SqlValidatorUtils.java:39)
>     at 
> org.apache.flink.table.planner.functions.sql.SqlArrayConstructor.inferReturnType(SqlArrayConstructor.java:44)
>     at 
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:449)
>     at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5716)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5703)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:421)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347)
>     at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>     at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:180)
>     ... 6 more {code}
> How to reproduce:
> {code:java}
> tableEnv.executeSql("select array[map['A', 'AA'], map['B', 'BB'], map['C', 
> CAST(NULL AS STRING)]] from (VALUES ('a'))").print(); {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27438) SQL validation failed when constructing a map array

2022-05-04 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532054#comment-17532054
 ] 

Roman Boyko commented on FLINK-27438:
-

I've created a PR to master. [~martijnvisser] [~zhongwei] could you please take 
a look.

After codereview I'll create the backports to 1.14 and 1.15.

> SQL validation failed when constructing a map array
> ---
>
> Key: FLINK-27438
> URL: https://issues.apache.org/jira/browse/FLINK-27438
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0, 1.14.4
>Reporter: Wei Zhong
>Assignee: Roman Boyko
>Priority: Major
>  Labels: pull-request-available
>
> Exception: 
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. Unsupported type when convertTypeToSpec: MAP
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:185)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:695)
>     at 
> org.apache.flink.table.planner.plan.stream.sql.LegacyTableFactoryTest.(LegacyTableFactoryTest.java:35)
>     at 
> org.apache.flink.table.planner.plan.stream.sql.LegacyTableFactoryTest.main(LegacyTableFactoryTest.java:49)
> Caused by: java.lang.UnsupportedOperationException: Unsupported type when 
> convertTypeToSpec: MAP
>     at 
> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1059)
>     at 
> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1081)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.castTo(SqlValidatorUtils.java:82)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.adjustTypeForMultisetConstructor(SqlValidatorUtils.java:74)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.adjustTypeForArrayConstructor(SqlValidatorUtils.java:39)
>     at 
> org.apache.flink.table.planner.functions.sql.SqlArrayConstructor.inferReturnType(SqlArrayConstructor.java:44)
>     at 
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:449)
>     at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5716)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5703)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:421)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347)
>     at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>     at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:180)
>     ... 6 more {code}
> How to reproduce:
> {code:java}
> tableEnv.executeSql("select array[map['A', 'AA'], map['B', 'BB'], map['C', 
> CAST(NULL AS STRING)]] from (VALUES ('a'))").print(); {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27438) SQL validation failed when constructing a map array

2022-05-04 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17531615#comment-17531615
 ] 

Roman Boyko commented on FLINK-27438:
-

This bug appears at sql array construct time. This is caused by 
https://issues.apache.org/jira/browse/FLINK-19796 
(0f51995bc156435eb9e97d2ac15e0225117877e2). All branches starting from 1.14.0 
are affected. 

[~zhongwei] Could you please assign it to me? I'll create a PR.

> SQL validation failed when constructing a map array
> ---
>
> Key: FLINK-27438
> URL: https://issues.apache.org/jira/browse/FLINK-27438
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0, 1.14.4
>Reporter: Wei Zhong
>Priority: Major
>
> Exception: 
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. Unsupported type when convertTypeToSpec: MAP
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:185)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:695)
>     at 
> org.apache.flink.table.planner.plan.stream.sql.LegacyTableFactoryTest.(LegacyTableFactoryTest.java:35)
>     at 
> org.apache.flink.table.planner.plan.stream.sql.LegacyTableFactoryTest.main(LegacyTableFactoryTest.java:49)
> Caused by: java.lang.UnsupportedOperationException: Unsupported type when 
> convertTypeToSpec: MAP
>     at 
> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1059)
>     at 
> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1081)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.castTo(SqlValidatorUtils.java:82)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.adjustTypeForMultisetConstructor(SqlValidatorUtils.java:74)
>     at 
> org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.adjustTypeForArrayConstructor(SqlValidatorUtils.java:39)
>     at 
> org.apache.flink.table.planner.functions.sql.SqlArrayConstructor.inferReturnType(SqlArrayConstructor.java:44)
>     at 
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:449)
>     at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5716)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5703)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:421)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347)
>     at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>     at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:180)
>     ... 6 more {code}
> How to reproduce:
> {code:java}
> tableEnv.executeSql("select array[map['A', 'AA'], map['B', 'BB'], map['C', 
> CAST(NULL AS STRING)]] from (VALUES ('a'))").print(); {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)