[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509818#comment-16509818 ]
Sihua Zhou edited comment on FLINK-9506 at 6/12/18 4:34 PM: ------------------------------------------------------------ Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any state in the empty ProcessAggregation? Or could you somehow provide some code of the empty ProcessAggregation? In fact, it's a bit hard for me to believe the fluctuation is caused by the keyBy. AFAIK, it just controls which channel the record to go(when transfer between operators) and the content of the key stored in the RocksDB, without using any state the keyBy() should be cheap. I think the picture related to he keyNoHash vs KeyHash is what I expected. With hash() the key's length is only 4 bytes and the distribution is uniform, without hash your key's length is 50 and also the distribution maybe not uniform. But with the hash() approach you could only get a approximate result, if that is enough for you then I think it's good to go now, is it not enough for you? was (Author: sihuazhou): Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any state in the empty ProcessAggregation? Or could you somehow provide some code of the empty ProcessAggregation? I think the picture related to he keyNoHash vs KeyHash is what I expected. With hash() the key's length is only 4 bytes and the distribution is uniform, without hash your key's length is 50 and also the distribution maybe not uniform. But with the hash() approach you could only get a approximate result, if that is enough for you then I think it's good to go now, is it not enough for you? > Flink ReducingState.add causing more than 100% performance drop > --------------------------------------------------------------- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement > Affects Versions: 1.4.2 > Reporter: swy > Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream<String> stream = env.addSource(new GeneratorSource(loop); > DataStream<JSONObject> convert = stream.map(new JsonTranslator()) > .keyBy() > .process(new ProcessAggregation()) > .map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState<Record> recStore; > public void processElement(Recordr, Context ctx, Collector<Record> out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)