[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509818#comment-16509818
 ] 

Sihua Zhou edited comment on FLINK-9506 at 6/12/18 4:34 PM:
------------------------------------------------------------

Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any 
state in the empty ProcessAggregation? Or could you somehow provide some code 
of the empty ProcessAggregation? In fact, it's a bit hard for me to believe the 
fluctuation is caused by the keyBy. AFAIK, it just controls which channel the 
record to go(when transfer between operators) and the content of the key stored 
in the RocksDB, without using any state the keyBy() should be cheap.

I think the picture related to he keyNoHash vs KeyHash is what I expected. With 
hash() the key's length is only 4 bytes and the distribution is uniform, 
without hash your key's length is 50 and also the distribution maybe not 
uniform. But with the hash() approach you could only get a approximate result, 
if that is enough for you then I think it's good to go now, is it not enough 
for you?



was (Author: sihuazhou):
Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any 
state in the empty ProcessAggregation? Or could you somehow provide some code 
of the empty ProcessAggregation?

I think the picture related to he keyNoHash vs KeyHash is what I expected. With 
hash() the key's length is only 4 bytes and the distribution is uniform, 
without hash your key's length is 50 and also the distribution maybe not 
uniform. But with the hash() approach you could only get a approximate result, 
if that is enough for you then I think it's good to go now, is it not enough 
for you?


> Flink ReducingState.add causing more than 100% performance drop
> ---------------------------------------------------------------
>
>                 Key: FLINK-9506
>                 URL: https://issues.apache.org/jira/browse/FLINK-9506
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: 1.4.2
>            Reporter: swy
>            Priority: Major
>         Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream<String> stream = env.addSource(new GeneratorSource(loop);
> DataStream<JSONObject> convert = stream.map(new JsonTranslator())
>                                        .keyBy()
>                                        .process(new ProcessAggregation())
>                                        .map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
>     private ReducingState<Record> recStore;
>     public void processElement(Recordr, Context ctx, Collector<Record> out) {
>         recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to