[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509818#comment-16509818 ] Sihua Zhou edited comment on FLINK-9506 at 6/12/18 4:34 PM: Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any state in the empty ProcessAggregation? Or could you somehow provide some code of the empty ProcessAggregation? In fact, it's a bit hard for me to believe the fluctuation is caused by the keyBy. AFAIK, it just controls which channel the record to go(when transfer between operators) and the content of the key stored in the RocksDB, without using any state the keyBy() should be cheap. I think the picture related to he keyNoHash vs KeyHash is what I expected. With hash() the key's length is only 4 bytes and the distribution is uniform, without hash your key's length is 50 and also the distribution maybe not uniform. But with the hash() approach you could only get a approximate result, if that is enough for you then I think it's good to go now, is it not enough for you? was (Author: sihuazhou): Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any state in the empty ProcessAggregation? Or could you somehow provide some code of the empty ProcessAggregation? I think the picture related to he keyNoHash vs KeyHash is what I expected. With hash() the key's length is only 4 bytes and the distribution is uniform, without hash your key's length is 50 and also the distribution maybe not uniform. But with the hash() approach you could only get a approximate result, if that is enough for you then I think it's good to go now, is it not enough for you? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509818#comment-16509818 ] Sihua Zhou edited comment on FLINK-9506 at 6/12/18 4:26 PM: Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any state in the empty ProcessAggregation? Or could you somehow provide some code of the empty ProcessAggregation? I think the picture related to he keyNoHash vs KeyHash is what I expected. With hash() the key's length is only 4 bytes and the distribution is uniform, without hash your key's length is 50 and also the distribution maybe not uniform. But with the hash() approach you could only get a approximate result, if that is enough for you then I think it's good to go now, is it not enough for you? was (Author: sihuazhou): Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any state in the empty ProcessAggregation? Or could you somehow provide some code of the empty ProcessAggregation? I think the picture related to he keyNoHash vs KeyHash is what I expected. Without hash() the key's length is only 4 bytes and the distribution is uniform, without hash your key's length is 50 and also the distribution maybe not uniform. But with the hash() approach you could only get a approximate result, if that is enough for you then I think it's good to go now, is it not enough for you? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509808#comment-16509808 ] swy edited comment on FLINK-9506 at 6/12/18 4:10 PM: - Hi [~sihuazhou] I hope to close the ticket too but the problem still persists even though reduction state is no more in used, with ListState as replacement as suggested by you. However, further investigation show the problem is caused by "KeyBy" instead. Please refer to KeyBy.png, 1. the first run without KeyBy DataStream AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") 2. the second run with KeyBy and with ProcessAggregation logic(the logic using ListState to store all record and will be sum up when timer triggered) DataStream AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") .keyBy(new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().hashCode() * 31; } }) .process(new ProcessAggregation()); 3. the third run is with KeyBy and empty ProcessAggregation logic. The result show ProcessAggregation is not the root caused of fluctuation, no difference between empty logic or logic with ListState. Seems the fluctuation is caused by "KeyBy". Any idea why? Thank you. was (Author: yow): Hi [~sihuazhou] I hope to close the ticket too but the problem still persists even though reduction state is no more in used, with ListState as replacement as suggested by you. However, further investigation show the problem caused by "KeyBy" instead. Please refer to KeyBy.png, 1. the first run without KeyBy DataStream AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") 2. the second run with KeyBy and with ProcessAggregation logic(the logic using ListState to store all record and will be sum up when timer triggered) DataStream AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") .keyBy(new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().hashCode() * 31; } }) .process(new ProcessAggregation(aggrDuration, markerFactory.getMarker(), markerFactory.getMarker())) .name("AggregationDuration: " + aggrDuration +"ms"); 3. the third run is with KeyBy and empty ProcessAggregation logic. The result show ProcessAggregation not the root caused of fluctuation, no difference between empty logic or logic with ListState in ProcessAggregation. Seems the fluctuation is causing by "KeyBy". Any idea why? Thank you. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Con
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503505#comment-16503505 ] Sihua Zhou edited comment on FLINK-9506 at 6/7/18 9:25 AM: --- [~yow] Maybe there is one more optimization that could have a try, I see you are using the ReduceState in your code just to accumulate the `record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For the ReduceState it works as follows: - get the "old result" from RocksDB. - reduce the "old result" with the input, and put the "new result" back to RocksDB. that means for input record in processElement(), it needs to do a `get` and a `put` to RocksDB. And the `get` cost much more then `put`. I would suggest to use the ListState instead. With using ListState, what you need to do are: - Performing {{ListState.add(record)}} in {{processElement()}}, since the `ListState.add()` is cheap as it only put the record into Rocks. - Performing reducing in {{OnTimer()}}, the reducing might look as follow: {code:java} List< JSONObject> records = listState.get(); for (JSonObject jsonObj : records) { // do accumulation } out.collect(result); {code} In this way, for every key every second, you only need to do one read operation of RocksDB. was (Author: sihuazhou): [~yow] Maybe there is one more optimization that could have a try, I see you are using the ReduceState in your code just to accumulate the `record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For the ReduceState it works as follows: - get the "old result" from RocksDB. - reduce the "old result" with the input, and put the "new result" back to RocksDB. that means for input record in processElement(), it needs to do a `get` and a `put` to RocksDB. And the `get` cost much more then `put`. I would suggest to use the ListState instead. With using ListState, what you need to do are: - Performing {{ListState.add(record)}} in {{processElement()}}, since the `ListState.add()` is cheap as it not put the record into Rocks. - Performing reducing in {{OnTimer()}}, the reducing might look as follow: {code:java} List< JSONObject> records = listState.get(); for (JSonObject jsonObj : records) { // do accumulation } out.collect(result); {code} In this way, for every key very seconds, you only need to do one read operation of RocksDB. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499867#comment-16499867 ] Sihua Zhou edited comment on FLINK-9506 at 6/4/18 8:05 AM: --- Thanks for trying it out. Then I think the performance drop and the fluctuation might be caused by the state lookup, and since you are using the KeyedStateBackend base on Heap, I think the fluctuation might caused by the capacity rescaling of the "Hash Map", but I think the impaction should not be that obvious... Maybe [~srichter] could give some more useful and professional information... was (Author: sihuazhou): Thanks for trying it out. Then I think the performance drop and the fluctuation might be caused by the state lookup, and since you are using the KeyedStateBackend base on Heap, I think the fluctuation might caused by the capacity rescale of the "Hash Map", but I think the impaction should not be that obvious... Maybe [~srichter] could give some more useful and professional information... > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499859#comment-16499859 ] swy edited comment on FLINK-9506 at 6/4/18 8:03 AM: [~sihuazhou] Your tricks quite promising as the performance has been improved very much, and in a more stable pattern. Please refer to attach "KeyNoHash_VS_KeyHash.png", left hand side is fluctuation pattern "before the change" while the right hand side is "after the change". .keyBy(new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().*hashCode() % 128*; } }) However, the change also affected process timer as the record cannot be flushed, or partially flushed even the schedule reached. I guess it might due to wrong key reducing. Any advice? Thanks. was (Author: yow): [~sihuazhou] Your tricks quite promising as the performance has been improved very much, and in a more stable pattern. Please refer to attach "KeyNoHash_VS_KeyHash.png", left hand side is fluctuation pattern "before the change" while the right hand side is "after the change". .keyBy(new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().*hashCode() % 128*; } }) However, the change also affected process timer as the record cannot be flushed, or partially flushed even the schedule reached. Any advice? Thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499672#comment-16499672 ] Sihua Zhou edited comment on FLINK-9506 at 6/4/18 2:22 AM: --- Hi [~yow] could you please just replace the getKey() as follow and give a try? {code:java} new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().hash() % 128; } } {code} if this is work then I think the performance drop may cause by the state lookup. was (Author: sihuazhou): Hi [~yow] could you please just replace the getKey() as follow and give a try? {code} new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().hash() / 128; } } {code} if this is work then I think the performance drop may cause by the state lookup. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499445#comment-16499445 ] swy edited comment on FLINK-9506 at 6/3/18 3:04 PM: what we want to know is: Is this something expected(which is drop >100%) or something wrong in our code? Even we understand that the 'state' will impact performance, but not expect so much and also in a fluctuation pattern. was (Author: yow): what we want to know is: Is this something expected(which is drop >100%) or something wrong in our code? Even we understand that the 'state' will impact performance, but not so much and also in a fluctuation pattern. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499445#comment-16499445 ] swy edited comment on FLINK-9506 at 6/3/18 3:03 PM: what we want to know is: Is this something expected(which is drop >100%) or something wrong in our code? Even we understand that the 'state' will impact performance, but not so much and also in a fluctuation pattern. was (Author: yow): what we want to know is: Is this something expected or something wrong in our code? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)