[ 
https://issues.apache.org/jira/browse/FLINK-30533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17676585#comment-17676585
 ] 

Piotr Nowojski edited comment on FLINK-30533 at 1/13/23 11:12 AM:
------------------------------------------------------------------

It looks like a nice improvement:
http://codespeed.dak8s.net:8000/timeline/?ben=mapSink.F27_UNBOUNDED&env=2
I have some questions about the way it was implemented. Apart of FLINK-30623, 
can someone tell me why this was only implemented for sources? There was quite 
a lot of effort put in the system, to make sure that sources are behaving and 
handled the same way as network inputs. 

The other question I have is have you considered abstracting away `Supplier 
mailboxHasMail` into something more general. Instead of passing 
`Supplier<Boolean>` via the constructor, make `DataOutput<OUT> output` to 
return/have a method like `bool DataOutput#canEmitBatchOfRecords()` that serves 
the same purpose?


was (Author: pnowojski):
It looks like a nice improvement:
http://codespeed.dak8s.net:8000/timeline/?ben=mapSink.F27_UNBOUNDED&env=2
I have some questions about the way it was implemented. Apart of FLINK-30633, 
can someone tell me why this was only implemented for sources? There was quite 
a lot of effort put in the system, to make sure that sources are behaving and 
handled the same way as network inputs. 

The other question I have is have you considered abstracting away `Supplier 
mailboxHasMail` into something more general. Instead of passing 
`Supplier<Boolean>` via the constructor, make `DataOutput<OUT> output` to 
return/have a method like `bool DataOutput#canEmitBatchOfRecords()` that serves 
the same purpose?

> SourceOperator#emitNext() should push records to DataOutput in a while loop
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-30533
>                 URL: https://issues.apache.org/jira/browse/FLINK-30533
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: Dong Lin
>            Assignee: Dong Lin
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, each invocation of SourceOperator#emitNext() push at most one 
> record to the given DataOutput. This unnecessarily increases the average Java 
> call stack depth needed to produce a record.
> Take the following program as an example. For each element produced by this 
> program, Flink runtime needs to include in the call stack these 3 function 
> calls:
>  * StreamTask#processInput()
>  * StreamOneInputProcessor#processInput()
>  * StreamTaskSourceInput#emitNext()
> {code:java}
> env.fromSequence(1, 1000000000L).map(x -> x).addSink(new DiscardingSink<>());
> {code}
>  
> This ticket proposes to update SourceOperator#emitNext() to push records to 
> DataOutput in a while loop. It improves Flink performance by removing an 
> average of 3 function from the call stack needed to produce a record.
> Here are the benchmark results obtained by running the above program with 
> parallelism=1 and object re-use enabled. The results are averaged across 5 
> runs for each setup.
>  * Prior to the proposed change, the average execution time is 46.1 sec with 
> std=5.1 sec.
>  * After the proposed change, the average execution time is 33.3 sec with 
> std=0.9 sec.
>  * The proposed change increases throughput by 38.4%.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to