[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510647#comment-16510647
 ] 

Sihua Zhou commented on FLINK-9506:
-----------------------------------

Hi [~yow] in the input_stop_when_timer_run.png, does the yellow line mean QPS 
of input, and the green line mean QPS of output? If this picture is captured 
when the onTimer is uncomment out, then it didn't surprise me, but if the 
picture is captured when the content of onTimer is commented out, then it 
surprised me a bit.

And you mentioned that, when the content of onTimer is commented out, the 
Fluctuation still exists. Does the commented out means that there is nothing in 
the onTimer()? If yes, I think it surprised me and for an additional could you 
also comment out the `recordStore.add()` in processElement(). If both the 
content of onTimer() and the `recordStore.add()` are commented out and the 
Fluctuation still there, I think the problem is related to the timer, because 
of the GC.

And I'm curious about the QPS of source for you job? and the degree of the 
parallelism of your job?

Thanks~



> Flink ReducingState.add causing more than 100% performance drop
> ---------------------------------------------------------------
>
>                 Key: FLINK-9506
>                 URL: https://issues.apache.org/jira/browse/FLINK-9506
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: 1.4.2
>            Reporter: swy
>            Priority: Major
>         Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream<String> stream = env.addSource(new GeneratorSource(loop);
> DataStream<JSONObject> convert = stream.map(new JsonTranslator())
>                                        .keyBy()
>                                        .process(new ProcessAggregation())
>                                        .map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
>     private ReducingState<Record> recStore;
>     public void processElement(Recordr, Context ctx, Collector<Record> out) {
>         recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to