[ 
https://issues.apache.org/jira/browse/FLINK-30601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17677626#comment-17677626
 ] 

Lijie Wang commented on FLINK-30601:
------------------------------------

Done via master 4e3cd986cd4304c699d5c7d368ffd6e0ead5a096

> Omit "setKeyContextElement" call for non-keyed stream/operators to improve 
> performance
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-30601
>                 URL: https://issues.apache.org/jira/browse/FLINK-30601
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>            Reporter: Lijie Wang
>            Assignee: Lijie Wang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0
>
>
> Currently, flink will set the correct key context(by call 
> [setKeyContextElement|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java#:~:text=input.setKeyContextElement(castRecord)%3B])
>  before processing each record, which is typically used to extract key from 
> record and pass that key to the state backends.
> However, the "setKeyContextElement" is obviously not need for non-keyed 
> stream/operator, in which case we can omit the "setKeyContextElement" calls 
> to improve performance. Note that "setKeyContextElement" is an interface 
> method, it requires looking up the interface table when calling, which will 
> further increase the method call overhead.
>  
> We run the following program as benchmark with parallelism=1 and object 
> re-use enabled. The benchmark results are averaged across 5 runs for each 
> setup. Before and after applying the proposed change, the average execution 
> time changed from 88.39 s to 78.76 s, which increases throughput by 10.8%.
> {code:java}
> env.fromSequence(1, 1000000000L)
>         .map(x -> x)
>         .map(x -> x)
>         .map(x -> x)
>         .map(x -> x)
>         .map(x -> x).addSink(new DiscardingSink<>());{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to