[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509818#comment-16509818
 ] 

Sihua Zhou edited comment on FLINK-9506 at 6/12/18 4:34 PM:


Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any 
state in the empty ProcessAggregation? Or could you somehow provide some code 
of the empty ProcessAggregation? In fact, it's a bit hard for me to believe the 
fluctuation is caused by the keyBy. AFAIK, it just controls which channel the 
record to go(when transfer between operators) and the content of the key stored 
in the RocksDB, without using any state the keyBy() should be cheap.

I think the picture related to he keyNoHash vs KeyHash is what I expected. With 
hash() the key's length is only 4 bytes and the distribution is uniform, 
without hash your key's length is 50 and also the distribution maybe not 
uniform. But with the hash() approach you could only get a approximate result, 
if that is enough for you then I think it's good to go now, is it not enough 
for you?



was (Author: sihuazhou):
Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any 
state in the empty ProcessAggregation? Or could you somehow provide some code 
of the empty ProcessAggregation?

I think the picture related to he keyNoHash vs KeyHash is what I expected. With 
hash() the key's length is only 4 bytes and the distribution is uniform, 
without hash your key's length is 50 and also the distribution maybe not 
uniform. But with the hash() approach you could only get a approximate result, 
if that is enough for you then I think it's good to go now, is it not enough 
for you?


> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509818#comment-16509818
 ] 

Sihua Zhou edited comment on FLINK-9506 at 6/12/18 4:26 PM:


Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any 
state in the empty ProcessAggregation? Or could you somehow provide some code 
of the empty ProcessAggregation?

I think the picture related to he keyNoHash vs KeyHash is what I expected. With 
hash() the key's length is only 4 bytes and the distribution is uniform, 
without hash your key's length is 50 and also the distribution maybe not 
uniform. But with the hash() approach you could only get a approximate result, 
if that is enough for you then I think it's good to go now, is it not enough 
for you?



was (Author: sihuazhou):
Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any 
state in the empty ProcessAggregation? Or could you somehow provide some code 
of the empty ProcessAggregation?

I think the picture related to he keyNoHash vs KeyHash is what I expected. 
Without hash() the key's length is only 4 bytes and the distribution is 
uniform, without hash your key's length is 50 and also the distribution maybe 
not uniform. But with the hash() approach you could only get a approximate 
result, if that is enough for you then I think it's good to go now, is it not 
enough for you?


> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509808#comment-16509808
 ] 

swy edited comment on FLINK-9506 at 6/12/18 4:10 PM:
-

Hi [~sihuazhou] I hope to close the ticket too but the problem still persists 
even though reduction state is no more in used, with ListState as replacement 
as suggested by you. However, further investigation show the problem is caused 
by "KeyBy" instead. Please refer to KeyBy.png, 

1. the first run without KeyBy
DataStream AggregatedRecordWithAuditStream = sourceStringStream
.map(new 
JsonToRecordTranslator().name("JsonRecTranslator") 

2. the second run with KeyBy and with ProcessAggregation logic(the logic using 
ListState to store all record and will be sum up when timer triggered)
DataStream AggregatedRecordWithAuditStream = sourceStringStream
.map(new 
JsonToRecordTranslator().name("JsonRecTranslator") 
.keyBy(new KeySelector() {
@Override
public Integer getKey(Record r) throws 
Exception {
return r.getUNIQUE_KEY().hashCode() * 
31; 
}
}) 
.process(new ProcessAggregation());

3. the third run is with KeyBy and empty ProcessAggregation logic.

The result show ProcessAggregation is not the root caused of fluctuation, no 
difference between empty logic or logic with ListState. Seems the fluctuation 
is caused by "KeyBy". Any idea why? Thank you.


was (Author: yow):
Hi [~sihuazhou] I hope to close the ticket too but the problem still persists 
even though reduction state is no more in used, with ListState as replacement 
as suggested by you. However, further investigation show the problem caused by 
"KeyBy" instead. Please refer to KeyBy.png, 

1. the first run without KeyBy
DataStream AggregatedRecordWithAuditStream = sourceStringStream
.map(new 
JsonToRecordTranslator().name("JsonRecTranslator") 

2. the second run with KeyBy and with ProcessAggregation logic(the logic using 
ListState to store all record and will be sum up when timer triggered)
DataStream AggregatedRecordWithAuditStream = sourceStringStream
.map(new 
JsonToRecordTranslator().name("JsonRecTranslator") 
.keyBy(new KeySelector() {
@Override
public Integer getKey(Record r) throws 
Exception {
return r.getUNIQUE_KEY().hashCode() * 
31; 
}
}) 
.process(new ProcessAggregation(aggrDuration, 
markerFactory.getMarker(), markerFactory.getMarker()))
.name("AggregationDuration: " + aggrDuration +"ms");

3. the third run is with KeyBy and empty ProcessAggregation logic.

The result show ProcessAggregation not the root caused of fluctuation, no 
difference between empty logic or logic with ListState in ProcessAggregation. 
Seems the fluctuation is causing by "KeyBy". Any idea why? Thank you.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Con

[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-07 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503505#comment-16503505
 ] 

Sihua Zhou edited comment on FLINK-9506 at 6/7/18 9:25 AM:
---

[~yow] Maybe there is one more optimization that could have a try, I see you 
are using the ReduceState in your code just to accumulate the 
`record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For 
the ReduceState it works as follows:

- get the "old result" from RocksDB.
- reduce the "old result" with the input, and put the "new result" back to 
RocksDB.

that means for input record in processElement(), it needs to do a `get` and a 
`put` to RocksDB. And the `get` cost much more then `put`. I would suggest to 
use the ListState instead. With using ListState, what you need to do are:

- Performing {{ListState.add(record)}} in {{processElement()}}, since the 
`ListState.add()` is cheap as it only put the record into Rocks.
- Performing reducing in {{OnTimer()}}, the reducing might look as follow:
{code:java}
List< JSONObject> records = listState.get();
for (JSonObject jsonObj : records) {
// do accumulation
}
out.collect(result);
{code}

In this way, for every key every second, you only need to do one read operation 
of RocksDB.





was (Author: sihuazhou):
[~yow] Maybe there is one more optimization that could have a try, I see you 
are using the ReduceState in your code just to accumulate the 
`record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For 
the ReduceState it works as follows:

- get the "old result" from RocksDB.
- reduce the "old result" with the input, and put the "new result" back to 
RocksDB.

that means for input record in processElement(), it needs to do a `get` and a 
`put` to RocksDB. And the `get` cost much more then `put`. I would suggest to 
use the ListState instead. With using ListState, what you need to do are:

- Performing {{ListState.add(record)}} in {{processElement()}}, since the 
`ListState.add()` is cheap as it not put the record into Rocks.
- Performing reducing in {{OnTimer()}}, the reducing might look as follow:
{code:java}
List< JSONObject> records = listState.get();
for (JSonObject jsonObj : records) {
// do accumulation
}
out.collect(result);
{code}

In this way, for every key very seconds, you only need to do one read operation 
of RocksDB.




> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499867#comment-16499867
 ] 

Sihua Zhou edited comment on FLINK-9506 at 6/4/18 8:05 AM:
---

Thanks for trying it out. Then I think the performance drop and the fluctuation 
might be caused by the state lookup, and since you are using the 
KeyedStateBackend base on Heap, I think the fluctuation might caused by the 
capacity rescaling of the "Hash Map", but  I think the impaction should not be 
that obvious... Maybe [~srichter] could give some more useful and professional 
information...


was (Author: sihuazhou):
Thanks for trying it out. Then I think the performance drop and the fluctuation 
might be caused by the state lookup, and since you are using the 
KeyedStateBackend base on Heap, I think the fluctuation might caused by the 
capacity rescale of the "Hash Map", but  I think the impaction should not be 
that obvious... Maybe [~srichter] could give some more useful and professional 
information...

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499859#comment-16499859
 ] 

swy edited comment on FLINK-9506 at 6/4/18 8:03 AM:


[~sihuazhou] Your tricks quite promising as the performance has been improved 
very much, and in a more stable pattern. Please refer to attach 
"KeyNoHash_VS_KeyHash.png", left hand side is fluctuation pattern "before the 
change" while the right hand side is "after the change".

.keyBy(new KeySelector() {
@Override
public Integer getKey(Record r) throws 
Exception {
return r.getUNIQUE_KEY().*hashCode() % 
128*; 
}
})  

However, the change also affected process timer as the record cannot be 
flushed, or partially flushed even the schedule reached. I guess it might due 
to wrong key reducing. Any advice? Thanks.


was (Author: yow):
[~sihuazhou] Your tricks quite promising as the performance has been improved 
very much, and in a more stable pattern. Please refer to attach 
"KeyNoHash_VS_KeyHash.png", left hand side is fluctuation pattern "before the 
change" while the right hand side is "after the change".

.keyBy(new KeySelector() {
@Override
public Integer getKey(Record r) throws 
Exception {
return r.getUNIQUE_KEY().*hashCode() % 
128*; 
}
})  

However, the change also affected process timer as the record cannot be 
flushed, or partially flushed even the schedule reached. Any advice? Thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499672#comment-16499672
 ] 

Sihua Zhou edited comment on FLINK-9506 at 6/4/18 2:22 AM:
---

Hi [~yow] could you please just replace the getKey() as follow and give a try?
{code:java}
new KeySelector() {
@Override
public Integer getKey(Record r) throws Exception { 
return r.getUNIQUE_KEY().hash() % 128; 
}
}
{code}
if this is work then I think the performance drop may cause by the state lookup.


was (Author: sihuazhou):
Hi [~yow] could you please just replace the getKey() as follow and give a try?
{code}
new KeySelector() {
@Override
public Integer getKey(Record r) throws Exception { 
return r.getUNIQUE_KEY().hash() / 128; 
}
}
{code}

if this is work then I think the performance drop may cause by the state lookup.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499445#comment-16499445
 ] 

swy edited comment on FLINK-9506 at 6/3/18 3:04 PM:


what we want to know is: Is this something expected(which is drop >100%) or 
something wrong in our code? Even we understand that the 'state' will impact 
performance, but not expect so much and also in a fluctuation pattern.


was (Author: yow):
what we want to know is: Is this something expected(which is drop >100%) or 
something wrong in our code? Even we understand that the 'state' will impact 
performance, but not so much and also in a fluctuation pattern.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499445#comment-16499445
 ] 

swy edited comment on FLINK-9506 at 6/3/18 3:03 PM:


what we want to know is: Is this something expected(which is drop >100%) or 
something wrong in our code? Even we understand that the 'state' will impact 
performance, but not so much and also in a fluctuation pattern.


was (Author: yow):
what we want to know is: Is this something expected or something wrong in our 
code?

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)