[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837205#comment-17837205
 ] 

Roman Boyko edited comment on FLINK-34694 at 4/26/24 10:04 AM:
---------------------------------------------------------------

Hi [~xu_shuai_] !

I prepared and executed all nexmark which uses streaming join (q4, q9 and q20). 
Because all of them use INNER JOIN (but this optimization works only for outer 
join) I created the copy with FULL OUTER JOIN for every one.

BEFORE optimization:

!image-2024-04-26-16-55-19-800.png!

AFTER optimization:

!image-2024-04-26-16-55-56-994.png!

As you can see here - for all INNER JOIN queries the result remains almost the 
same (small difference most probably cause the measurement error). But for all 
FULL OUTER JOIN benchmarks the performance is increased. Especially for 
q20_outer where it was more than 3 times better. The reason of such huge 
difference can be found on flame graph:

BEFORE optimization:

!image-2024-04-15-19-15-23-010.png!

 

AFTER optimization:

!image-2024-04-15-19-14-41-909.png!

 

Because of prevalence of state.update operation in before-optimization case the 
rocksdb CompactionJob is invoked more often spending the most CPU time.

Totally the performance boost is 6.75 / 5.15 = 1.31 (30%).


was (Author: rovboyko):
Hi [~xu_shuai_] !

I prepared and executed all nexmark which uses streaming join (q4, q9 and q20). 
Because all of them use INNER JOIN (but this optimization works only for outer 
join) I created the copy with FULL OUTER JOIN for every one.

BEFORE optimization:

!image-2024-04-26-16-55-19-800.png!

AFTER optimization:

!image-2024-04-26-16-55-56-994.png!

As you can see here - for all INNER JOIN queries the result remains almost the 
same (small difference most probably cause the measurement error). But for all 
FULL OUTER JOIN benchmarks the performance is increased. Especially for 
q20_outer where it was more than 3 times better. The reason of such huge 
difference can be found on flame graph:

BEFORE optimization:

!image-2024-04-15-19-15-23-010.png!

 

AFTER optimization:

!image-2024-04-15-19-14-41-909.png!

 

Because of prevalence of state.update operation in before-optimization case the 
rocksdb CompactionJob is invoked more often spending the most CPU time.

> Delete num of associations for streaming outer join
> ---------------------------------------------------
>
>                 Key: FLINK-34694
>                 URL: https://issues.apache.org/jira/browse/FLINK-34694
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>            Reporter: Roman Boyko
>            Priority: Major
>         Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png, image-2024-04-15-19-14-14-735.png, 
> image-2024-04-15-19-14-41-909.png, image-2024-04-15-19-15-23-010.png, 
> image-2024-04-26-16-55-19-800.png, image-2024-04-26-16-55-56-994.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to