[ https://issues.apache.org/jira/browse/FLINK-30533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17676585#comment-17676585 ]
Piotr Nowojski edited comment on FLINK-30533 at 1/13/23 11:12 AM: ------------------------------------------------------------------ It looks like a nice improvement: http://codespeed.dak8s.net:8000/timeline/?ben=mapSink.F27_UNBOUNDED&env=2 I have some questions about the way it was implemented. Apart of FLINK-30623, can someone tell me why this was only implemented for sources? There was quite a lot of effort put in the system, to make sure that sources are behaving and handled the same way as network inputs. The other question I have is have you considered abstracting away `Supplier mailboxHasMail` into something more general. Instead of passing `Supplier<Boolean>` via the constructor, make `DataOutput<OUT> output` to return/have a method like `bool DataOutput#canEmitBatchOfRecords()` that serves the same purpose? was (Author: pnowojski): It looks like a nice improvement: http://codespeed.dak8s.net:8000/timeline/?ben=mapSink.F27_UNBOUNDED&env=2 I have some questions about the way it was implemented. Apart of FLINK-30633, can someone tell me why this was only implemented for sources? There was quite a lot of effort put in the system, to make sure that sources are behaving and handled the same way as network inputs. The other question I have is have you considered abstracting away `Supplier mailboxHasMail` into something more general. Instead of passing `Supplier<Boolean>` via the constructor, make `DataOutput<OUT> output` to return/have a method like `bool DataOutput#canEmitBatchOfRecords()` that serves the same purpose? > SourceOperator#emitNext() should push records to DataOutput in a while loop > --------------------------------------------------------------------------- > > Key: FLINK-30533 > URL: https://issues.apache.org/jira/browse/FLINK-30533 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task > Reporter: Dong Lin > Assignee: Dong Lin > Priority: Major > Labels: pull-request-available > > Currently, each invocation of SourceOperator#emitNext() push at most one > record to the given DataOutput. This unnecessarily increases the average Java > call stack depth needed to produce a record. > Take the following program as an example. For each element produced by this > program, Flink runtime needs to include in the call stack these 3 function > calls: > * StreamTask#processInput() > * StreamOneInputProcessor#processInput() > * StreamTaskSourceInput#emitNext() > {code:java} > env.fromSequence(1, 1000000000L).map(x -> x).addSink(new DiscardingSink<>()); > {code} > > This ticket proposes to update SourceOperator#emitNext() to push records to > DataOutput in a while loop. It improves Flink performance by removing an > average of 3 function from the call stack needed to produce a record. > Here are the benchmark results obtained by running the above program with > parallelism=1 and object re-use enabled. The results are averaged across 5 > runs for each setup. > * Prior to the proposed change, the average execution time is 46.1 sec with > std=5.1 sec. > * After the proposed change, the average execution time is 33.3 sec with > std=0.9 sec. > * The proposed change increases throughput by 38.4%. > -- This message was sent by Atlassian Jira (v8.20.10#820010)