[ https://issues.apache.org/jira/browse/FLINK-30709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-30709: ----------------------------------- Labels: pull-request-available (was: ) > NetworkInput#emitNext() should push records to DataOutput in a while loop > ------------------------------------------------------------------------- > > Key: FLINK-30709 > URL: https://issues.apache.org/jira/browse/FLINK-30709 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task > Reporter: Rui Fan > Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > It's similar to FLINK-30533, FLINK-30533 focus on source operator, this JIRA > focus on Network input. > > Currently, each invocation of AbstractStreamTaskNetworkInput#emitNext() push > at most one record to the given DataOutput. This unnecessarily increases the > average Java call stack depth needed to produce a record. > Take the following program as an example. For each element produced by this > program, Flink runtime needs to include in the call stack these 3 function > calls: > * StreamTask#processInput() > * StreamOneInputProcessor#processInput() > * AbstractStreamTaskNetworkInput#emitNext() > This ticket proposes to update AbstractStreamTaskNetworkInput#emitNext() to > push records to DataOutput in a while loop. It improves Flink performance by > removing an average of 3 function from the call stack needed to produce a > record. > Here are the benchmark results obtained by running the > [InputBenchmark#mapSink|https://github.com/apache/flink-benchmarks/blob/0bafe0e85700c889894324aadb70302381f98e03/src/main/java/org/apache/flink/benchmark/InputBenchmark.java#L55] > with env.disableOperatorChaining(). And I run it 4 times on My Mac. > > {code:java} > Before the proposed change, the avg is 12429.0605 ops/ms, here is detailed > results: > Benchmark (sourceType) Mode Cnt Score Error Units > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 12339.771 ± 414.649 ops/ms > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 12687.872 ± 320.084 ops/ms > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 12256.445 ± 512.219 ops/ms > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 12432.154 ± 405.083 ops/ms > After the proposed change, the avg is 13836.845 ops/ms, here is detailed > results: > Benchmark (sourceType) Mode Cnt Score Error Units > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 13092.451 ± 490.886 ops/ms > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 13881.138 ± 370.249 ops/ms > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 13960.280 ± 389.505 ops/ms > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 14413.511 ± 727.844 > ops/ms{code} > > The proposed change increases throughput by 11.3%. -- This message was sent by Atlassian Jira (v8.20.10#820010)