[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499859#comment-16499859 ]
swy edited comment on FLINK-9506 at 6/4/18 8:03 AM: ---------------------------------------------------- [~sihuazhou] Your tricks quite promising as the performance has been improved very much, and in a more stable pattern. Please refer to attach "KeyNoHash_VS_KeyHash.png", left hand side is fluctuation pattern "before the change" while the right hand side is "after the change". .keyBy(new KeySelector<Record, Integer>() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().*hashCode() % 128*; } }) However, the change also affected process timer as the record cannot be flushed, or partially flushed even the schedule reached. I guess it might due to wrong key reducing. Any advice? Thanks. was (Author: yow): [~sihuazhou] Your tricks quite promising as the performance has been improved very much, and in a more stable pattern. Please refer to attach "KeyNoHash_VS_KeyHash.png", left hand side is fluctuation pattern "before the change" while the right hand side is "after the change". .keyBy(new KeySelector<Record, Integer>() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().*hashCode() % 128*; } }) However, the change also affected process timer as the record cannot be flushed, or partially flushed even the schedule reached. Any advice? Thanks. > Flink ReducingState.add causing more than 100% performance drop > --------------------------------------------------------------- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement > Affects Versions: 1.4.2 > Reporter: swy > Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream<String> stream = env.addSource(new GeneratorSource(loop); > DataStream<JSONObject> convert = stream.map(new JsonTranslator()) > .keyBy() > .process(new ProcessAggregation()) > .map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState<Record> recStore; > public void processElement(Recordr, Context ctx, Collector<Record> out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)