[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510828#comment-16510828
 ] 

Sihua Zhou commented on FLINK-9506:
-----------------------------------

Hi [~yow], I didn't see the email you sent yet, but I just had a look at your 
code, I think the "non-scale-able"  might be caused by your test code. From 
your code we could see that the source's parallelism is always the same as the 
other operators. And in the each sub-task of the source, you use the loop to 
mock the source records, that means the QPS of the source will increase when 
you trying to rescale up the parallelism of your job, in the end, you didn't 
scale up anything indeed. I would suggest to set the parallelism of the source 
to a fixed value(e.g. 4), and scale up the job, then let's see whether it's 
scalable. I didn't test your code on cluster yet, will test it later. My email 
is "summerle...@163.com", if you had problem to send email to 
"u...@flink.apache.org", you could send to me personally if you want. Thanks~

> Flink ReducingState.add causing more than 100% performance drop
> ---------------------------------------------------------------
>
>                 Key: FLINK-9506
>                 URL: https://issues.apache.org/jira/browse/FLINK-9506
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: 1.4.2
>            Reporter: swy
>            Priority: Major
>         Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream<String> stream = env.addSource(new GeneratorSource(loop);
> DataStream<JSONObject> convert = stream.map(new JsonTranslator())
>                                        .keyBy()
>                                        .process(new ProcessAggregation())
>                                        .map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
>     private ReducingState<Record> recStore;
>     public void processElement(Recordr, Context ctx, Collector<Record> out) {
>         recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to