[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201731#comment-16201731 ] Kostas Kloudas commented on FLINK-7606: --- Hi [~i...@paolorendano.it] ! I will close this issue, as I think that our discussion clarified things. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: Schermata 2017-09-27 alle 00.35.53.png, heap-dump1.png, > heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197094#comment-16197094 ] Kostas Kloudas commented on FLINK-7606: --- I do not think that this would solve the problem. In event time, an element that has a timestamp X does not mean that also the event time has advanced passed that time. The watermark is the one that signals that time has advanced. So if the watermark does not advance, we cannot be sure that we can close a window. Only if you know your data, _e.g._ that timestamps are in order, then you can assume that. But in this case you could probably write your own timestamp extractor, that after a timeout, it sends a watermark with the "correct" timestamp. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: Schermata 2017-09-27 alle 00.35.53.png, heap-dump1.png, > heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193026#comment-16193026 ] Paolo Rendano commented on FLINK-7606: -- IMO the timeout should be triggered after the expiration of the time window (in the example 10 seconds) without any new elements (i.e. if the last received element is out of that time window). With this kind of strategy you would get at most a latency of the time window interval. Would it solve? > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, > Schermata 2017-09-27 alle 00.35.53.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192994#comment-16192994 ] Kostas Kloudas commented on FLINK-7606: --- You are right! There is already a discussion about introducing something like an "idle" watermark, that simply advances time for inactive partitions. This will help in some similar usecases. Now for your case specifically, it could work only if you know that no more elements belonging to a specific period (in event time) will come. Because if such elements arrive, then your previous results will be incorrect, right? This is the reason why we have not introduced such a mechanism. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, > Schermata 2017-09-27 alle 00.35.53.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192891#comment-16192891 ] Paolo Rendano commented on FLINK-7606: -- HI [~kkl0u], 1) exactly 2) exactly, but my stream is a never ending stream, so I cannot close. The idea could be, why do not flush automatically the buffer not only on a memory use base but also after a configurable timeout? Could be a good enhancement to implement for different use cases. Paolo > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, > Schermata 2017-09-27 alle 00.35.53.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192818#comment-16192818 ] Kostas Kloudas commented on FLINK-7606: --- Hi [~i...@paolorendano.it], Sorry for the late reply. So, if I understand correctly, in a nutshell: 1) for event time, there is no memory leak problem (there is also a pending PR that probably fixes the problem also for processing time by unifying the code paths for both notions of time), but 2) at the end of you input, the watermark does not advance and the last batch of events is not processed as it is waiting for the watermark +10 sec to trigger the computation, right? In this case, if you know that your stream is finite, then you can close your source (call close() on your source) and this will send a watermark Long.MAX_VALUE that will flush the buffered elements. Kostas > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, > Schermata 2017-09-27 alle 00.35.53.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181713#comment-16181713 ] Paolo Rendano commented on FLINK-7606: -- Hi [~kkl0u], 1) sure I have to set and I was not setting it in my test: {code:java} env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); {code} I've double checked this and without the set I have a memory leak (as reported by [~matteoferrario29]). Looking at the memory after the test, it seems that used keys are disposed (the memory come back to the initial size after last GC). Example (after processing 100k keys 2msgs/key): [^Schermata 2017-09-27 alle 00.35.53.png] 2) I've done again my test related with issue FLINK-7606 adding more logs and checking again the result and now it seems that all the expected events are generated, but... the last chunk of events (maybe thousands) are not generated until I run again the test (even 1 more message is enough to trigger the generation of all the remaining events). It seems the minimum number is about 5k input messages before it starts to flush out the buffer. So.. the question is: *can you explain the strategy to flush out the generation of events*? How to trigger it? Of course as it is now can block the generation of events until a new message is processed (maybe with a watermark that exceed that +10 sec). Just one answer to your questions regarding my last test scenario: parallelism is 1, Idle precedes Start, Idle timestamp is set to x and Start timestamp is set to x+1sec, no delay set between messages, and during the test I see the watermark advancing. Since the generation of messages in jmeter is in a cycle and no delay between cycles, x+1sec of a cycle can be greater than x in the following cycle. This was set intentionally by me to verify reordering of events. Thanks Paolo > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, > Schermata 2017-09-27 alle 00.35.53.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179089#comment-16179089 ] Kostas Kloudas commented on FLINK-7606: --- Perfect! Thanks a lot! > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176654#comment-16176654 ] Paolo Rendano commented on FLINK-7606: -- Hi [~kkl0u], sorry this week was hard also for me. Next week for sure i'll have some time to check your suggestions and make the trial. Regarding my open issue FLINK-7606, I see also [~matteoferrario29] is losing events in CEP. I'll go deeper next week. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16170001#comment-16170001 ] Matteo Ferrario commented on FLINK-7606: Hi [~kkl0u], thanks for your reply. If I understand correctly, in processing time the state of NestedMapsStateTable object is never cleared, while in event time at each watermark it is checked the condition if the NFA is empty and then state is cleared. If this consideration is correct, the memory leak (in terms of NestedMapsStateTable that grows up without been cleared) will not occur setting the stream time characteristic as event time: {code:java} env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); {code} On the other hand, this memory leak will occur setting the stream time characteristic as processing time: {code:java} env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); {code} Is it correct? > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168158#comment-16168158 ] Kostas Kloudas commented on FLINK-7606: --- Now that I think about it, can you check your watermarks and their values? And if the watermark advances past the 10 min + 10 sec (your bounded out-of-orderness) in event time. If the watermark does not advance then the patterns do not time out. This, in combination with a potential re-ordering of events (as I described before) can lead to the NFA always having some stale "Idle" elements that are waiting for their corresponding "Start" which never arrives because it was dropped as late. To verify this, could you set the parallelism of your job to 1, make sure that "Idle" precedes "Start" and at the end of the iteration, your source sends a Watermark with a high timestamp so that all buffered elements are flushed or timed-out. This is a very controlled experiment just to see where we stand. It would also be useful if you have the timestamp generator for your input elements. To see how time and the watermark advances. Then I could give you more detailed answers. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168099#comment-16168099 ] Kostas Kloudas commented on FLINK-7606: --- Also, on another note, I read that you have fewer results than expected. Could you check if some of these patterns are emitted as timed out patterns? In essence, could you check if timed-out patterns + successful ones == 3. This could happen if, for example, a "Start" arrives before the "Idle" (due to parallelism), and there is a watermark in between them whose timestamp is greater than that of the "Idle" element. In this case, the "Idle" will be dropped as late and the pattern will time out. This is due to the fact that as stated in the documentation, the CEP library assumes correctness of the watermark. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168090#comment-16168090 ] Kostas Kloudas commented on FLINK-7606: --- Yes [~i...@paolorendano.it]. This holds for every keyed state. The amount of state you store is proportional to the number of active keys that you expect to have. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168025#comment-16168025 ] Paolo Rendano commented on FLINK-7606: -- Hi [~kkl0u], from your comment I understand that the basic memory (so with minimal load of messages) I have to setup for a single instance is directly related with the number of keys I want to be able to manage. I correctly understood, do I? > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167954#comment-16167954 ] Kostas Kloudas commented on FLINK-7606: --- It should be cleared. In event time, we do not have timers, but at each watermark the condition if the NFA is empty is checked and if yes, then the state is cleared up. In processing time, it is true that we are always waiting for the "next" element although this is going to change in 1.4. I am having a look to the code and I will come back as soon as I have something. Sorry [~matteoferrario29] for the delayed answer but this week has been a bit busy. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167635#comment-16167635 ] Aljoscha Krettek commented on FLINK-7606: - How do you mean? They could get the timed-out partially matched sequences and then all state for that key (NFA, element queue) should be cleared. Otherwise you will also have leaks in RocksDB. If the memory/heap backend is working correctly (as I'm assuming for the moment) then this is not a problem of leaking memory but of leaking state, i.e. state that is never being cleaned up. If this is the case we'll have the same problem on any state backend, it might just be the case that we will see problems later in RocksDB because we have more space to work with. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167242#comment-16167242 ] Dian Fu commented on FLINK-7606: I think this is by design otherwise users could not get the timed out partially matched sequences as they will be cleared by the timers. For the memory leak issue, it seems that you are using the {{MemoryStateBackend}}, is it just used for test or will also used in your product environment? I think with {{RocksDBStateBackend}}, there will be no memory leak issue. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166340#comment-16166340 ] Aljoscha Krettek commented on FLINK-7606: - Will NFAs not be cleared when the allowed "within" time is over. If this were not the case and if NFA state is in fact never cleaned up than this is a leak and will lead to problems. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)