[ 
https://issues.apache.org/jira/browse/FLINK-30709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17681247#comment-17681247
 ] 

Piotr Nowojski commented on FLINK-30709:
----------------------------------------

Merged to master as f6c7c30118e^ and f6c7c30118e

[~lindong] , when closing tickets could you remember next time to mark the 
commit range that they were merged as?

Note, after first benchmark run the performance improvement is not visible 
unfortunately.

> NetworkInput#emitNext() should push records to DataOutput in a while loop
> -------------------------------------------------------------------------
>
>                 Key: FLINK-30709
>                 URL: https://issues.apache.org/jira/browse/FLINK-30709
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: Rui Fan
>            Assignee: Rui Fan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0
>
>
> It's similar to FLINK-30533, FLINK-30533 focus on source operator, this JIRA 
> focus on Network input.
>  
> Currently, each invocation of AbstractStreamTaskNetworkInput#emitNext() push 
> at most one record to the given DataOutput. This unnecessarily increases the 
> average Java call stack depth needed to produce a record.
> Take the following program as an example. For each element produced by this 
> program, Flink runtime needs to include in the call stack these 3 function 
> calls:
>  * StreamTask#processInput()
>  * StreamOneInputProcessor#processInput()
>  * AbstractStreamTaskNetworkInput#emitNext()
> This ticket proposes to update AbstractStreamTaskNetworkInput#emitNext() to 
> push records to DataOutput in a while loop. It improves Flink performance by 
> removing an average of 3 function from the call stack needed to produce a 
> record.
> Here are the benchmark results obtained by running the 
> [InputBenchmark#mapSink|https://github.com/apache/flink-benchmarks/blob/0bafe0e85700c889894324aadb70302381f98e03/src/main/java/org/apache/flink/benchmark/InputBenchmark.java#L55]
>  with env.disableOperatorChaining(). And I run it 4 times on My Mac.
>  
> {code:java}
> Before the proposed change, the avg is 12429.0605 ops/ms, here is detailed 
> results:
> Benchmark                (sourceType)   Mode  Cnt      Score     Error   Units
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12339.771 ± 414.649  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12687.872 ± 320.084  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12256.445 ± 512.219  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12432.154 ± 405.083  ops/ms
> After the proposed change, the avg is 13836.845 ops/ms, here is detailed 
> results:
> Benchmark                (sourceType)   Mode  Cnt      Score     Error   Units
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  13092.451 ± 490.886  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  13881.138 ± 370.249  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  13960.280 ± 389.505  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  14413.511 ± 727.844  
> ops/ms{code}
>  
> The proposed change increases throughput by 11.3%.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to