[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-09-18 Thread Matteo Ferrario (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16170001#comment-16170001
 ] 

Matteo Ferrario commented on FLINK-7606:


Hi [~kkl0u],
thanks for your reply.
If I understand correctly, in processing time the state of NestedMapsStateTable 
object is never cleared, while in event time at each watermark it is checked 
the condition if the NFA is empty and then state is cleared.
If this consideration is correct, the memory leak (in terms of 
NestedMapsStateTable that grows up without been cleared) will not occur setting 
the stream time characteristic as event time:
{code:java}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
{code}

On the other hand, this memory leak will occur setting the stream time 
characteristic as processing time:
{code:java}
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
{code}

Is it correct?

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) Memory leak on NestedMapsStateTable

2017-09-12 Thread Matteo Ferrario (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16163046#comment-16163046
 ] 

Matteo Ferrario commented on FLINK-7606:


In order to test the job, we configured JMeter to execute a cycle of 100 
iterations every second, per 5 minutes.
For each iteration of the cycle, JMeter sends to RabbitMQ two messages that 
match the pattern.
These messages:
* have the same 'deviceCode' (field 'deviceCode' is the key used to 
partitionate the stream)
* the first has 'phase' = 'Idle'
* the second has 'phase' = 'Start'

The total number of messages send to Rabbit is 60.000 (30.000 with 'phase' = 
'Idle' and 30.000 with 'phase' = 'Start').
The expectation is to have 30.000 events generated.

We executed the same test three times, one after another:
* in the first run the number of events generated was 29.997
* in the second run the number of events generated was 29.987
* in the third run the number of events generated was 29.989

We also see the heap dumps of flink, taken after each run of the test.
In these heap dumps, we can see that the memory used by NestedMapsStateTable 
grows up continuosly:
* after the first run it was about 24 MB ([^heap-dump1.png])
* after the second run it was about 51 MB ([^heap-dump2.png])
* after the third run it was about 78 MB ([^heap-dump3.png])

Also the "within" clause didn't help: we did an heap dump after 10 minutes but 
it was like the one done after just the third run.

> Memory leak on NestedMapsStateTable
> ---
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7606) Memory leak on NestedMapsStateTable

2017-09-12 Thread Matteo Ferrario (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matteo Ferrario updated FLINK-7606:
---
Attachment: heap-dump1.png
heap-dump2.png
heap-dump3.png

> Memory leak on NestedMapsStateTable
> ---
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) Memory leak on NestedMapsStateTable

2017-09-12 Thread Matteo Ferrario (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16163002#comment-16163002
 ] 

Matteo Ferrario commented on FLINK-7606:


Hi Kostas,
thanks for your reply.
I give you some details about the job we've implemented.
The job processes a stream of messages coming from RabbitMQ.
Each message is deserialize to the custom 'Payload' POJO object, using the 
'PayloadSchema' class:

{code:java}
public class Payload implements Serializable {

private static final long serialVersionUID = -7700917163136255068L;

private String deviceCode;
private Long stateTimestamp;
private String phase;

public Payload() {

}

public String getDeviceCode() {
return deviceCode;
}

public void setDeviceCode(String deviceCode) {
this.deviceCode = deviceCode;
}

public Long getStateTimestamp() {
return stateTimestamp;
}

public void setStateTimestamp(Long stateTimestamp) {
this.stateTimestamp = stateTimestamp;
}

public String getPhase() {
return phase;
}

public void setPhase(String phase) {
this.phase = phase;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Payload payload = (Payload) o;
return Objects.equals(deviceCode, payload.deviceCode) && 
Objects.equals(stateTimestamp, payload.stateTimestamp)
&& Objects.equals(phase, payload.phase);
}

@Override
public int hashCode() {
return Objects.hash(deviceCode, stateTimestamp, phase);
}
}
{code}

{code:java}
public class PayloadSchema implements DeserializationSchema, 
SerializationSchema {
private ObjectMapper mapper = new ObjectMapper();

private static final long serialVersionUID = 1L;

@Override
public byte[] serialize(Payload element) {
byte[] out = new byte[0];
try {
out = mapper.writeValueAsBytes(element);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return out;
}

@Override
public Payload deserialize(byte[] message) {
Payload highLevelDeviceStatus = null;
try {
highLevelDeviceStatus = mapper.readValue(message, Payload.class);
} catch (IOException e) {
e.printStackTrace();
}
return highLevelDeviceStatus;
}

@Override
public boolean isEndOfStream(Payload nextElement) {
return false;
}

@Override
public TypeInformation getProducedType() {
return TypeExtractor.getForClass(Payload.class);
}
}
{code}

'Payload' elements are ordered according to the field 'stateTimestamp':

{code:java}
DataStream dataStreamSource =
env.addSource(new RMQSource<>(connectionConfig,
"input",
new PayloadSchema()))
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
private static final long serialVersionUID = -1L;
@Override
public long extractTimestamp(Payload element) {
if (element.getStateTimestamp()==null) {
throw new RuntimeException("HighLevelDeviceStatus 
Timestamp is null during time ordering for device [" +  element.getDeviceCode() 
+ "]");
}
Date timestamp = 
getTimeFromJsonTimestamp(element.getStateTimestamp());

logger.debug("DeviceCode [" + element.getDeviceCode() + 
"] Time [" + df.format(timestamp) + "] Watermark [" + timestamp.getTime() + 
"]");
return timestamp.getTime();
}
})
.uid("ciao")
.name("DEVICE_HL_STATUS");
{code}

The job tries to identify a specific pattern in the stream, generate an event 
and send it to RabbitMQ:

{code:java}
Pattern pattern = Pattern
.begin("start")
.subtype(Payload.class)
.where(new SimpleCondition() {
@Override
public boolean filter(Payload value) throws Exception {
return !value.getPhase().equals("Start");
}
})
.next("end")
.subtype(Payload.class)
.where(new SimpleCondition() {
@Override
public boolean filter(Payload value) throws Exception {
return value.getPhase().equals("Start");
}
})
.within(Time.minutes(5));

PatternFlatSelectFunction patternFlatSelectFunction = (statusMap, 
collector) -> collector.collect(new Synth().synthesize(statusMap));

PatternStream patternStreamStartOfCycle = CEP.pattern(
dataStreamSource.keyBy((KeySelector) 

[jira] [Updated] (FLINK-7606) Memory leak on NestedMapsStateTable

2017-09-08 Thread Matteo Ferrario (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matteo Ferrario updated FLINK-7606:
---
Description: 
The NestedMapsStateTable grows up continuously without free the heap memory.
We created a simple job that processes a stream of messages and uses CEP to 
generate an outcome message when a specific pattern is identified.
The messages coming from the stream are grouped by a key defined in a specific 
field of the message.
We've also added the "within" clause (set as 5 minutes), indicating that two 
incoming messages match the pattern only if they come in a certain time window.
What we've seen is that for every key present in the message, an NFA object is 
instantiated in the NestedMapsStateTable and it is never deallocated.
Also the "within" clause didn't help: we've seen that if we send messages that 
don't match the pattern, the memory grows up (I suppose that the state of NFA 
is updated) but it is not cleaned also after the 5 minutes of time window 
defined in "within" clause.
If you need, I can provide more details about the job we've implemented and 
also the screenshots about the memory leak.


  was:
The NestedMapsStateTable grows up continuously without free the heap memory.
We created a simple job that processes a stream of messages and uses CEP to 
generate an outcome message when a specific pattern is identified.
The messages coming from the stream are grouped by a key defined in a specific 
field of the message.
We've also added the "within" clause (set as 5 minutes), indicating that two 
incoming messages match the pattern only if they come in a certain time window.
What we've seen is that for every key present in the message, an NFA object is 
instantiated in the NestedMapsStateTable and it is never deallocated.
Also the "within" clause didn't help: we've seen that if we send messages that 
doesn't match the pattern, the memory grows up (I suppose that the state of NFA 
is updated) but it is not cleaned also after the 5 minutes of time window 
defined in "within" clause.
If you need, I can provide more details about the job we've implemented and 
also the screenshots about the memory leak.



> Memory leak on NestedMapsStateTable
> ---
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7606) Memory leak on NestedMapsStateTable

2017-09-08 Thread Matteo Ferrario (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matteo Ferrario updated FLINK-7606:
---
Description: 
The NestedMapsStateTable grows up continuously without free the heap memory.
We created a simple job that processes a stream of messages and uses CEP to 
generate an outcome message when a specific pattern is identified.
The messages coming from the stream are grouped by a key defined in a specific 
field of the message.
We've also added the "within" clause (set as 5 minutes), indicating that two 
incoming messages match the pattern only if they come in a certain time window.
What we've seen is that for every key present in the message, an NFA object is 
instantiated in the NestedMapsStateTable and it is never deallocated.
Also the "within" clause didn't help: we've seen that if we send messages that 
doesn't match the pattern, the memory grows up (I suppose that the state of NFA 
is updated) but it is not cleaned also after the 5 minutes of time window 
defined in "within" clause.
If you need, I can provide more details about the job we've implemented and 
also the screenshots about the memory leak.


  was:
The NestedMapsStateTable grows up continuously without free the heap memory.
We created a simple job that processes a stream of messages and uses CEP to 
generate an outcome message when a specific pattern is identified.
The messages coming from the stream are grouped by a key defined in a specific 
field of the message.
We've also added the "within" clause (set as 5 minutes), indicating that two 
incoming messages match the pattern only if they come in a certain time window.
What we've seen is that for every key present in the message, an NFA object is 
instantiated in the NestedMapsStateTable and it is never deallocated.
Also the "within" clause didn't help: we've seen that if we send messages that 
doesn't match the pattern, the memory grows up (I suppose that the state of NFA 
is updated) but it is not cleaned also after the 5 minutes of window time 
defined in "within" clause.
If you need, I can provide more details about the job we've implemented and 
also the screenshots about the memory leak.



> Memory leak on NestedMapsStateTable
> ---
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that doesn't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7606) Memory leak on NestedMapsStateTable

2017-09-08 Thread Matteo Ferrario (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matteo Ferrario updated FLINK-7606:
---
Description: 
The NestedMapsStateTable grows up continuously without free the heap memory.
We created a simple job that processes a stream of messages and uses CEP to 
generate an outcome message when a specific pattern is identified.
The messages coming from the stream are grouped by a key defined in a specific 
field of the message.
We've also added the "within" clause (set as 5 minutes), indicating that two 
incoming messages match the pattern only if they come in a certain time window.
What we've seen is that for every key present in the message, an NFA object is 
instantiated in the NestedMapsStateTable and it is never deallocated.
Also the "within" clause didn't help: we've seen that if we send messages that 
doesn't match the pattern, the memory grows up (I suppose that the state of NFA 
is updated) but it is not cleaned also after the 5 minutes of window time 
defined in "within" clause.
If you need, I can provide more details about the job we've implemented and 
also the screenshots about the memory leak.


  was:
The NestedMapsStateTable grows up continuously without free the heap memory.
We created a simple job that processes a stream of messages and uses CEP to 
generate an outcome message when a specific pattern is identified.
The messages coming from the stream are grouped by a key defined in a specific 
field of the message.
We've also added the "within" clause (set as 5 minutes), indicating that two 
incoming messages match the pattern only if they come in a certain time window.
What we've seen is that for every key present in the message, an NFA object is 
instantiated in the NestedMapsStateTable and it is never deallocated.
Also the "within" clause didn't help: we've seen that if we send messages that 
doesn't match the pattern, the memory grows up (I suppose that the state of NFA 
is updated) but it is not cleaned also after the 5 minutes of window time 
define in "within" clause.
If you need, I can provide more details about the job we've implemented and 
also the screenshots about the memory leak.



> Memory leak on NestedMapsStateTable
> ---
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that doesn't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of window 
> time defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7606) Memory leak on NestedMapsStateTable

2017-09-08 Thread Matteo Ferrario (JIRA)
Matteo Ferrario created FLINK-7606:
--

 Summary: Memory leak on NestedMapsStateTable
 Key: FLINK-7606
 URL: https://issues.apache.org/jira/browse/FLINK-7606
 Project: Flink
  Issue Type: Bug
  Components: CEP
Affects Versions: 1.3.1
Reporter: Matteo Ferrario


The NestedMapsStateTable grows up continuously without free the heap memory.
We created a simple job that processes a stream of messages and uses CEP to 
generate an outcome message when a specific pattern is identified.
The messages coming from the stream are grouped by a key defined in a specific 
field of the message.
We've also added the "within" clause (set as 5 minutes), indicating that two 
incoming messages match the pattern only if they come in a certain time window.
What we've seen is that for every key present in the message, an NFA object is 
instantiated in the NestedMapsStateTable and it is never deallocated.
Also the "within" clause didn't help: we've seen that if we send messages that 
doesn't match the pattern, the memory grows up (I suppose that the state of NFA 
is updated) but it is not cleaned also after the 5 minutes of window time 
define in "within" clause.
If you need, I can provide more details about the job we've implemented and 
also the screenshots about the memory leak.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)