[
https://issues.apache.org/jira/browse/FLINK-30533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dong Lin updated FLINK-30533:
-----------------------------
Description:
Currently, each invocation of IteratorSourceReaderBase#pollNext() push at most
one record to the given ReaderOutput. This unnecessarily increases the average
Java call stack depth needed to produce an element.
Take the following program as an example. For each element produced by this
program, Flink runtime needs to include in the call stack these 4 function
calls:
* StreamTask#processInput()
* StreamOneInputProcessor#processInput()
* StreamTaskSourceInput#emitNext()
* SourceOperator#emitNext()
{code:java}
DataStream<Long> stream = env.fromSequence(1, 1000000000L)
.map(x -> x)
.addSink(new DiscardingSink<>());
{code}
In comparison, SourceReaderBase#pollNext() is already using a while loop so
that each invocation of this method could push as many records to the given
ReaderOutput as possible.
This ticket proposes to update IteratorSourceReaderBase#pollNext() to push
records to ReaderOutput in a while loop, for the following two reason:
* It improves performance for programs that IteratorSourceReaderBase (e.g.
env.fromSequence) by removing an average of 4 function from the call stack
needed to produce a record.
* It makes the behavior of IteratorSourceReaderBase and SourceReaderBase
consistent with each other.
Here are the benchmark results by running the above program with parallelism=1
and 5 runs per setup.
* Prior to the proposed change, the average execution time is 31.3 sec with
std=2.3 sec.
* After the proposed change, the average execution time is 46.1 sec with
std=5.1 sec.
* The proposed change increases throughput by 47.3%.
was:
Currently, each invocation of IteratorSourceReaderBase#pollNext() push at most
one record to the given ReaderOutput. This unnecessarily increases the average
Java call stack depth needed to produce an element.
Take the following program as an example. For each element produced by this
program, Flink runtime needs to include in the call stack these 4 function
calls:
* StreamTask#processInput()
* StreamOneInputProcessor#processInput()
* StreamTaskSourceInput#emitNext()
* SourceOperator#emitNext()
{code:java}
DataStream<Long> stream = env.fromSequence(1, 1000000000L)
.map(x -> x)
.addSink(new DiscardingSink<>());
{code}
In comparison, SourceReaderBase#pollNext() is already using a while loop so
that each invocation of this method could push as many records to the given
ReaderOutput as possible.
This ticket proposes to update IteratorSourceReaderBase#pollNext() to push
records to ReaderOutput in a while loop, for the following two reason:
* It improves performance for programs that IteratorSourceReaderBase (e.g.
env.fromSequence) by removing an average of 4 function from the call stack
needed to produce a record.
* It makes the behavior of IteratorSourceReaderBase and SourceReaderBase
consistent with each other.
> IteratorSourceReaderBase#pollNext() should push records to ReaderOutput in a
> while loop
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-30533
> URL: https://issues.apache.org/jira/browse/FLINK-30533
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Task
> Reporter: Dong Lin
> Assignee: Dong Lin
> Priority: Major
>
> Currently, each invocation of IteratorSourceReaderBase#pollNext() push at
> most one record to the given ReaderOutput. This unnecessarily increases the
> average Java call stack depth needed to produce an element.
>
> Take the following program as an example. For each element produced by this
> program, Flink runtime needs to include in the call stack these 4 function
> calls:
> * StreamTask#processInput()
> * StreamOneInputProcessor#processInput()
> * StreamTaskSourceInput#emitNext()
> * SourceOperator#emitNext()
> {code:java}
> DataStream<Long> stream = env.fromSequence(1, 1000000000L)
> .map(x -> x)
> .addSink(new DiscardingSink<>());
> {code}
>
> In comparison, SourceReaderBase#pollNext() is already using a while loop so
> that each invocation of this method could push as many records to the given
> ReaderOutput as possible.
>
> This ticket proposes to update IteratorSourceReaderBase#pollNext() to push
> records to ReaderOutput in a while loop, for the following two reason:
> * It improves performance for programs that IteratorSourceReaderBase (e.g.
> env.fromSequence) by removing an average of 4 function from the call stack
> needed to produce a record.
> * It makes the behavior of IteratorSourceReaderBase and SourceReaderBase
> consistent with each other.
>
> Here are the benchmark results by running the above program with
> parallelism=1 and 5 runs per setup.
> * Prior to the proposed change, the average execution time is 31.3 sec with
> std=2.3 sec.
> * After the proposed change, the average execution time is 46.1 sec with
> std=5.1 sec.
> * The proposed change increases throughput by 47.3%.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)