[jira] [Comment Edited] (FLINK-7606) CEP operator leaks state

2017-10-05 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193026#comment-16193026
 ] 

Paolo Rendano edited comment on FLINK-7606 at 10/5/17 3:39 PM:
---

IMO the timeout should be triggered after the expiration of the time window (in 
the example 5 minutes) without any new elements (i.e. if the last received 
element is out of that time window). With this kind of strategy you would get 
at most a latency of the time window interval. Would it solve?


was (Author: i...@paolorendano.it):
IMO the timeout should be triggered after the expiration of the time window (in 
the example 10 minutes) without any new elements (i.e. if the last received 
element is out of that time window). With this kind of strategy you would get 
at most a latency of the time window interval. Would it solve?

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7606) CEP operator leaks state

2017-10-05 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193026#comment-16193026
 ] 

Paolo Rendano edited comment on FLINK-7606 at 10/5/17 3:38 PM:
---

IMO the timeout should be triggered after the expiration of the time window (in 
the example 10 minutes) without any new elements (i.e. if the last received 
element is out of that time window). With this kind of strategy you would get 
at most a latency of the time window interval. Would it solve?


was (Author: i...@paolorendano.it):
IMO the timeout should be triggered after the expiration of the time window (in 
the example 10 seconds) without any new elements (i.e. if the last received 
element is out of that time window). With this kind of strategy you would get 
at most a latency of the time window interval. Would it solve?

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-10-05 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193026#comment-16193026
 ] 

Paolo Rendano commented on FLINK-7606:
--

IMO the timeout should be triggered after the expiration of the time window (in 
the example 10 seconds) without any new elements (i.e. if the last received 
element is out of that time window). With this kind of strategy you would get 
at most a latency of the time window interval. Would it solve?

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast

2017-10-05 Thread Paolo Rendano (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paolo Rendano resolved FLINK-7549.
--
Resolution: Fixed

> CEP - Pattern not discovered if source streaming is very fast
> -
>
> Key: FLINK-7549
> URL: https://issues.apache.org/jira/browse/FLINK-7549
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Paolo Rendano
>
> Hi all,
> I'm doing some stress test on my pattern using JMeter to populate source data 
> on a rabbitmq queue. This queue contains status generated by different 
> devices . In my test case I set to loop on a base of 1000 cycles, each one 
> sending respectively the first and the second status that generate the event 
> using flink CEP (status keyed by device). I expect to get an output of 1000 
> events.
> In my early tests I launched that but I noticed that I get only partial 
> results in output (70/80% of the expected ones). Introducing a delay in 
> jmeter plan between the sending of the two status solved the problem. The 
> minimum delay (of course this is on my local machine, on other machines may 
> vary) that make things work is 20/25 ms.
> My code is structured this way (the following is a semplification):
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setAutoWatermarkInterval(100L);
> // source definition
> DataStream dataStreamSource =
> env.addSource(new 
> MYRMQAutoboundQueueSource<>(connectionConfig,
> conf.getSourceExchange(),
> conf.getSourceRoutingKey(),
> conf.getSourceQueueName(),
> true,
> new MyMessageWrapperSchema()))
> .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
> private static final long serialVersionUID = 
> -1L;
> @Override
> public long extractTimestamp(MyMessageWrapper 
> element) {
> if 
> (element.getData().get("stateTimestamp")==null) {
> throw new RuntimeException("Status 
> Timestamp is null during time ordering for device [" +  
> element.getData().get("deviceCode") + "]");
> }
> return 
> FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
> }
> })
> .name("MyIncomingStatus");
> // PATTERN  DEFINITION
> Pattern myPattern = Pattern
> .begin("start")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st", "none"))
> .next("end")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st","started"))
> .within(Time.minutes(3));
> // CEP DEFINITION
> PatternStream myPatternStream = 
> CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);
> DataStream> outputStream = 
> myPatternStream.flatSelect(patternFlatTimeoutFunction, 
> patternFlatSelectFunction);
> // SINK DEFINITION
> outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, 
> outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent");
> {code}
> digging and logging messages received by flink in "extractTimestamp", what 
> happens is that with that so high rate of messages, source may receive 
> messages with the same timestamp but with different deviceCode. 
> Any idea?
> Thanks, regards
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-10-05 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192891#comment-16192891
 ] 

Paolo Rendano commented on FLINK-7606:
--

HI [~kkl0u],
1) exactly
2) exactly, but my stream is a never ending stream, so I cannot close. The idea 
could be, why do not flush automatically the buffer not only on a memory use 
base but also after a configurable timeout? Could be a good enhancement to 
implement for different use cases.

Paolo

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast

2017-10-05 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192886#comment-16192886
 ] 

Paolo Rendano commented on FLINK-7549:
--

Hi [~kkl0u] yes, I solved setting:

{code:java}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
{code}

Thanks

> CEP - Pattern not discovered if source streaming is very fast
> -
>
> Key: FLINK-7549
> URL: https://issues.apache.org/jira/browse/FLINK-7549
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Paolo Rendano
>
> Hi all,
> I'm doing some stress test on my pattern using JMeter to populate source data 
> on a rabbitmq queue. This queue contains status generated by different 
> devices . In my test case I set to loop on a base of 1000 cycles, each one 
> sending respectively the first and the second status that generate the event 
> using flink CEP (status keyed by device). I expect to get an output of 1000 
> events.
> In my early tests I launched that but I noticed that I get only partial 
> results in output (70/80% of the expected ones). Introducing a delay in 
> jmeter plan between the sending of the two status solved the problem. The 
> minimum delay (of course this is on my local machine, on other machines may 
> vary) that make things work is 20/25 ms.
> My code is structured this way (the following is a semplification):
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setAutoWatermarkInterval(100L);
> // source definition
> DataStream dataStreamSource =
> env.addSource(new 
> MYRMQAutoboundQueueSource<>(connectionConfig,
> conf.getSourceExchange(),
> conf.getSourceRoutingKey(),
> conf.getSourceQueueName(),
> true,
> new MyMessageWrapperSchema()))
> .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
> private static final long serialVersionUID = 
> -1L;
> @Override
> public long extractTimestamp(MyMessageWrapper 
> element) {
> if 
> (element.getData().get("stateTimestamp")==null) {
> throw new RuntimeException("Status 
> Timestamp is null during time ordering for device [" +  
> element.getData().get("deviceCode") + "]");
> }
> return 
> FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
> }
> })
> .name("MyIncomingStatus");
> // PATTERN  DEFINITION
> Pattern myPattern = Pattern
> .begin("start")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st", "none"))
> .next("end")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st","started"))
> .within(Time.minutes(3));
> // CEP DEFINITION
> PatternStream myPatternStream = 
> CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);
> DataStream> outputStream = 
> myPatternStream.flatSelect(patternFlatTimeoutFunction, 
> patternFlatSelectFunction);
> // SINK DEFINITION
> outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, 
> outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent");
> {code}
> digging and logging messages received by flink in "extractTimestamp", what 
> happens is that with that so high rate of messages, source may receive 
> messages with the same timestamp but with different deviceCode. 
> Any idea?
> Thanks, regards
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast

2017-09-26 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181718#comment-16181718
 ] 

Paolo Rendano commented on FLINK-7549:
--

mentioned in

> CEP - Pattern not discovered if source streaming is very fast
> -
>
> Key: FLINK-7549
> URL: https://issues.apache.org/jira/browse/FLINK-7549
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Paolo Rendano
>
> Hi all,
> I'm doing some stress test on my pattern using JMeter to populate source data 
> on a rabbitmq queue. This queue contains status generated by different 
> devices . In my test case I set to loop on a base of 1000 cycles, each one 
> sending respectively the first and the second status that generate the event 
> using flink CEP (status keyed by device). I expect to get an output of 1000 
> events.
> In my early tests I launched that but I noticed that I get only partial 
> results in output (70/80% of the expected ones). Introducing a delay in 
> jmeter plan between the sending of the two status solved the problem. The 
> minimum delay (of course this is on my local machine, on other machines may 
> vary) that make things work is 20/25 ms.
> My code is structured this way (the following is a semplification):
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setAutoWatermarkInterval(100L);
> // source definition
> DataStream dataStreamSource =
> env.addSource(new 
> MYRMQAutoboundQueueSource<>(connectionConfig,
> conf.getSourceExchange(),
> conf.getSourceRoutingKey(),
> conf.getSourceQueueName(),
> true,
> new MyMessageWrapperSchema()))
> .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
> private static final long serialVersionUID = 
> -1L;
> @Override
> public long extractTimestamp(MyMessageWrapper 
> element) {
> if 
> (element.getData().get("stateTimestamp")==null) {
> throw new RuntimeException("Status 
> Timestamp is null during time ordering for device [" +  
> element.getData().get("deviceCode") + "]");
> }
> return 
> FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
> }
> })
> .name("MyIncomingStatus");
> // PATTERN  DEFINITION
> Pattern myPattern = Pattern
> .begin("start")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st", "none"))
> .next("end")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st","started"))
> .within(Time.minutes(3));
> // CEP DEFINITION
> PatternStream myPatternStream = 
> CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);
> DataStream> outputStream = 
> myPatternStream.flatSelect(patternFlatTimeoutFunction, 
> patternFlatSelectFunction);
> // SINK DEFINITION
> outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, 
> outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent");
> {code}
> digging and logging messages received by flink in "extractTimestamp", what 
> happens is that with that so high rate of messages, source may receive 
> messages with the same timestamp but with different deviceCode. 
> Any idea?
> Thanks, regards
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7606) CEP operator leaks state

2017-09-26 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181713#comment-16181713
 ] 

Paolo Rendano edited comment on FLINK-7606 at 9/26/17 10:59 PM:


Hi [~kkl0u],
1) sure I have to set and I was not setting it in my test: 
{code:java}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
{code}
I've double checked this and without the set I have a memory leak (as reported 
by [~matteoferrario29]). Looking at the memory after the test, it seems that 
used keys are disposed (the memory come back to the initial size after last 
GC). Example (after processing 100k keys 2msgs/key): 
[^Schermata 2017-09-27 alle 00.35.53.png]

2) I've done again my test related with issue FLINK-7549 adding more logs and 
checking again the result and now it seems that all the expected events are 
generated, but... the last chunk of events (maybe thousands) are not generated 
until I run again the test (even 1 more message is enough to trigger the 
generation of all the remaining events). It seems the minimum number is about 
5k input messages before it starts to flush out the buffer. So.. the question 
is: *can you explain the strategy to flush out the generation of events*? How 
to trigger it? Of course as it is now can block the generation of events until 
a new message is processed (maybe with a watermark that exceed that +10 sec).
Just one answer to your questions regarding my last test scenario: parallelism 
is 1, Idle precedes Start, Idle timestamp is set to x and Start timestamp is 
set to x+1sec, no delay set between messages, and during the test I see the 
watermark advancing. Since the generation of messages in jmeter is in a cycle 
and no delay between cycles, x+1sec of a cycle can be greater than x in the 
following cycle. This was set intentionally by me to verify reordering of 
events.

Thanks
Paolo



was (Author: i...@paolorendano.it):
Hi [~kkl0u],
1) sure I have to set and I was not setting it in my test: 
{code:java}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
{code}
I've double checked this and without the set I have a memory leak (as reported 
by [~matteoferrario29]). Looking at the memory after the test, it seems that 
used keys are disposed (the memory come back to the initial size after last 
GC). Example (after processing 100k keys 2msgs/key): 
[^Schermata 2017-09-27 alle 00.35.53.png]

2) I've done again my test related with issue FLINK-7606 adding more logs and 
checking again the result and now it seems that all the expected events are 
generated, but... the last chunk of events (maybe thousands) are not generated 
until I run again the test (even 1 more message is enough to trigger the 
generation of all the remaining events). It seems the minimum number is about 
5k input messages before it starts to flush out the buffer. So.. the question 
is: *can you explain the strategy to flush out the generation of events*? How 
to trigger it? Of course as it is now can block the generation of events until 
a new message is processed (maybe with a watermark that exceed that +10 sec).
Just one answer to your questions regarding my last test scenario: parallelism 
is 1, Idle precedes Start, Idle timestamp is set to x and Start timestamp is 
set to x+1sec, no delay set between messages, and during the test I see the 
watermark advancing. Since the generation of messages in jmeter is in a cycle 
and no delay between cycles, x+1sec of a cycle can be greater than x in the 
following cycle. This was set intentionally by me to verify reordering of 
events.

Thanks
Paolo


> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of 

[jira] [Comment Edited] (FLINK-7606) CEP operator leaks state

2017-09-26 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176654#comment-16176654
 ] 

Paolo Rendano edited comment on FLINK-7606 at 9/26/17 10:59 PM:


Hi [~kkl0u],
sorry this week was hard also for me. Next week for sure i'll have some time to 
check your suggestions and make the trial. Regarding my open issue FLINK-7549, 
I see also [~matteoferrario29] is losing events in CEP. I'll go deeper next 
week.




was (Author: i...@paolorendano.it):
Hi [~kkl0u],
sorry this week was hard also for me. Next week for sure i'll have some time to 
check your suggestions and make the trial. Regarding my open issue FLINK-7606, 
I see also [~matteoferrario29] is losing events in CEP. I'll go deeper next 
week.



> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-09-26 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181713#comment-16181713
 ] 

Paolo Rendano commented on FLINK-7606:
--

Hi [~kkl0u],
1) sure I have to set and I was not setting it in my test: 
{code:java}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
{code}
I've double checked this and without the set I have a memory leak (as reported 
by [~matteoferrario29]). Looking at the memory after the test, it seems that 
used keys are disposed (the memory come back to the initial size after last 
GC). Example (after processing 100k keys 2msgs/key): 
[^Schermata 2017-09-27 alle 00.35.53.png]

2) I've done again my test related with issue FLINK-7606 adding more logs and 
checking again the result and now it seems that all the expected events are 
generated, but... the last chunk of events (maybe thousands) are not generated 
until I run again the test (even 1 more message is enough to trigger the 
generation of all the remaining events). It seems the minimum number is about 
5k input messages before it starts to flush out the buffer. So.. the question 
is: *can you explain the strategy to flush out the generation of events*? How 
to trigger it? Of course as it is now can block the generation of events until 
a new message is processed (maybe with a watermark that exceed that +10 sec).
Just one answer to your questions regarding my last test scenario: parallelism 
is 1, Idle precedes Start, Idle timestamp is set to x and Start timestamp is 
set to x+1sec, no delay set between messages, and during the test I see the 
watermark advancing. Since the generation of messages in jmeter is in a cycle 
and no delay between cycles, x+1sec of a cycle can be greater than x in the 
following cycle. This was set intentionally by me to verify reordering of 
events.

Thanks
Paolo


> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7606) CEP operator leaks state

2017-09-26 Thread Paolo Rendano (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paolo Rendano updated FLINK-7606:
-
Attachment: Schermata 2017-09-27 alle 00.35.53.png

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-09-22 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176654#comment-16176654
 ] 

Paolo Rendano commented on FLINK-7606:
--

Hi [~kkl0u],
sorry this week was hard also for me. Next week for sure i'll have some time to 
check your suggestions and make the trial. Regarding my open issue FLINK-7606, 
I see also [~matteoferrario29] is losing events in CEP. I'll go deeper next 
week.



> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-09-15 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168025#comment-16168025
 ] 

Paolo Rendano commented on FLINK-7606:
--

Hi [~kkl0u],
from your comment I understand that the basic memory (so with minimal load of 
messages) I have to setup for a single instance is directly related with the 
number of keys I want to be able to manage. I correctly understood, do I?

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast

2017-08-29 Thread Paolo Rendano (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paolo Rendano updated FLINK-7549:
-
Description: 
Hi all,
I'm doing some stress test on my pattern using JMeter to populate source data 
on a rabbitmq queue. This queue contains status generated by different devices 
. In my test case I set to loop on a base of 1000 cycles, each one sending 
respectively the first and the second status that generate the event using 
flink CEP (status keyed by device). I expect to get an output of 1000 events.
In my early tests I launched that but I noticed that I get only partial results 
in output (70/80% of the expected ones). Introducing a delay in jmeter plan 
between the sending of the two status solved the problem. The minimum delay (of 
course this is on my local machine, on other machines may vary) that make 
things work is 20/25 ms.

My code is structured this way (the following is a semplification):


{code:java}
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100L);

// source definition
DataStream dataStreamSource =
env.addSource(new 
MYRMQAutoboundQueueSource<>(connectionConfig,
conf.getSourceExchange(),
conf.getSourceRoutingKey(),
conf.getSourceQueueName(),
true,
new MyMessageWrapperSchema()))
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
private static final long serialVersionUID = 
-1L;
@Override
public long extractTimestamp(MyMessageWrapper 
element) {
if 
(element.getData().get("stateTimestamp")==null) {
throw new RuntimeException("Status 
Timestamp is null during time ordering for device [" +  
element.getData().get("deviceCode") + "]");
}
return 
FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
}
})
.name("MyIncomingStatus");

// PATTERN  DEFINITION
Pattern myPattern = Pattern
.begin("start")
.subtype(MyMessageWrapper.class)
.where(whereEquals("st", "none"))
.next("end")
.subtype(MyMessageWrapper.class)
.where(whereEquals("st","started"))
.within(Time.minutes(3));

// CEP DEFINITION
PatternStream myPatternStream = 
CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);

DataStream> outputStream = 
myPatternStream.flatSelect(patternFlatTimeoutFunction, 
patternFlatSelectFunction);

// SINK DEFINITION
outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, outputExchange, 
new MyMessageWrapperSchema())).name("MyGeneratedEvent");
{code}

digging and logging messages received by flink in "extractTimestamp", what 
happens is that with that so high rate of messages, source may receive messages 
with the same timestamp but with different deviceCode. 
Any idea?

Thanks, regards
Paolo

  was:
Hi all,
I'm doing some stress test on my pattern using JMeter to populate source data 
on a rabbitmq queue. This queue contains status generated by different devices 
. In my test case I set to loop on a base of 1000 cycles, each one sending 
respectively the first and the second status that generate the event using 
flink CEP (status keyed by device). 
In my early tests I launched that but I noticed that I get only partial results 
in output (70/80% of the expected ones). Introducing a delay in jmeter plan 
between the sending of the two status solved the problem. The minimum delay (of 
course this is on my local machine, on other machines may vary) that make 
things work is 20/25 ms.

My code is structured this way (the following is a semplification):


{code:java}
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100L);

// source definition
DataStream dataStreamSource =
env.addSource(new 
MYRMQAutoboundQueueSource<>(connectionConfig,
conf.getSourceExchange(),
conf.getSourceRoutingKey(),
conf.getSourceQueueName(),
true,
new MyMessageWrapperSchema()))
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
   

[jira] [Updated] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast

2017-08-29 Thread Paolo Rendano (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paolo Rendano updated FLINK-7549:
-
Description: 
Hi all,
I'm doing some stress test on my pattern using JMeter to populate source data 
on a rabbitmq queue. This queue contains status generated by different devices 
. In my test case I set to loop on a base of 1000 cycles, each one sending 
respectively the first and the second status that generate the event using 
flink CEP (status keyed by device). 
In my early tests I launched that but I noticed that I get only partial results 
in output (70/80% of the expected ones). Introducing a delay in jmeter plan 
between the sending of the two status solved the problem. The minimum delay (of 
course this is on my local machine, on other machines may vary) that make 
things work is 20/25 ms.

My code is structured this way (the following is a semplification):


{code:java}
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100L);

// source definition
DataStream dataStreamSource =
env.addSource(new 
MYRMQAutoboundQueueSource<>(connectionConfig,
conf.getSourceExchange(),
conf.getSourceRoutingKey(),
conf.getSourceQueueName(),
true,
new MyMessageWrapperSchema()))
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
private static final long serialVersionUID = 
-1L;
@Override
public long extractTimestamp(MyMessageWrapper 
element) {
if 
(element.getData().get("stateTimestamp")==null) {
throw new RuntimeException("Status 
Timestamp is null during time ordering for device [" +  
element.getData().get("deviceCode") + "]");
}
return 
FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
}
})
.name("MyIncomingStatus");

// PATTERN  DEFINITION
Pattern myPattern = Pattern
.begin("start")
.subtype(MyMessageWrapper.class)
.where(whereEquals("st", "none"))
.next("end")
.subtype(MyMessageWrapper.class)
.where(whereEquals("st","started"))
.within(Time.minutes(3));

// CEP DEFINITION
PatternStream myPatternStream = 
CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);

DataStream> outputStream = 
myPatternStream.flatSelect(patternFlatTimeoutFunction, 
patternFlatSelectFunction);

// SINK DEFINITION
outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, outputExchange, 
new MyMessageWrapperSchema())).name("MyGeneratedEvent");
{code}

digging and logging messages received by flink in "extractTimestamp", what 
happens is that with that so high rate of messages, source may receive messages 
with the same timestamp but with different deviceCode. 
Any idea?

Thanks, regards
Paolo

  was:
Hi all,
I'm doing some stress test on my pattern using JMeter to populate source data 
on a rabbitmq queue. This queue contains status generated by different devices 
. In my test case I set to loop on a base of 1000 cycles, each one sending 
respectively the first and the second status that generate the event using 
flink CEP (status keyed by device). 
In my early tests I launched that but I noticed that I get only partial results 
in output (70/80% of the expected ones). Introducing a delay in jmeter plan 
between the sending of the two status solved the problem. The minimum delay (of 
course this is on my local machine, on other machines may vary) that make 
things work is 20/25 ms.

My code is structured this way (the following is a semplification):


{code:java}
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100L);

// source definition
DataStream dataStreamSource =
env.addSource(new 
MYRMQAutoboundQueueSource<>(connectionConfig,
conf.getSourceExchange(),
conf.getSourceRoutingKey(),
conf.getSourceQueueName(),
true,
new MyMessageWrapperSchema()))
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
private 

[jira] [Updated] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast

2017-08-29 Thread Paolo Rendano (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paolo Rendano updated FLINK-7549:
-
Description: 
Hi all,
I'm doing some stress test on my pattern using JMeter to populate source data 
on a rabbitmq queue. This queue contains status generated by different devices 
. In my test case I set to loop on a base of 1000 cycles, each one sending 
respectively the first and the second status that generate the event using 
flink CEP (status keyed by device). 
In my early tests I launched that but I noticed that I get only partial results 
in output (70/80% of the expected ones). Introducing a delay in jmeter plan 
between the sending of the two status solved the problem. The minimum delay (of 
course this is on my local machine, on other machines may vary) that make 
things work is 20/25 ms.

My code is structured this way (the following is a semplification):


{code:java}
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100L);

// source definition
DataStream dataStreamSource =
env.addSource(new 
MYRMQAutoboundQueueSource<>(connectionConfig,
conf.getSourceExchange(),
conf.getSourceRoutingKey(),
conf.getSourceQueueName(),
true,
new MyMessageWrapperSchema()))
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
private static final long serialVersionUID = 
-1L;
@Override
public long extractTimestamp(MyMessageWrapper 
element) {
if 
(element.getData().get("stateTimestamp")==null) {
throw new RuntimeException("Status 
Timestamp is null during time ordering for device [" +  
element.getData().get("deviceCode") + "]");
}
return 
FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
}
})
.name("MyIncomingStatus");

// PATTERN  DEFINITION
Pattern myPattern = Pattern
.begin("start")
.subtype(MyMessageWrapper.class)
.where(whereEquals("st", "none"))
.next("end")
.subtype(MyMessageWrapper.class)
.where(whereEquals("st","started"))
.within(Time.minutes(3));

// CEP DEFINITION
PatternStream< MyMessageWrapper > myPatternStream = 
CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);

DataStream> outputStream = 
myPatternStream.flatSelect(patternFlatTimeoutFunction, 
patternFlatSelectFunction);

// SINK DEFINITION
outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, outputExchange, 
new MyMessageWrapperSchema())).name("MyGeneratedEvent");
{code}

digging and logging messages received by flink in "extractTimestamp", what 
happens is that with that so high rate of messages, source may receive messages 
with the same timestamp but with different deviceCode. 
Any idea?

Thanks, regards
Paolo

  was:
Hi all,
I'm doing some stress test on my pattern using JMeter to populate source data 
on a rabbitmq queue. This queue contains status generated by different devices 
. In my test case I set to loop on a base of 1000 cycles, each one sending 
respectively the first and the second status that generate the event using 
flink CEP (status keyed by device). 
In my early tests I launched that but I noticed that I get only partial results 
in output (70/80% of the expected ones). Introducing a delay in jmeter plan 
between the sending of the two status solved the problem. The minimum delay (of 
course this is on my local machine, on other machines may vary) that make 
things work is 20/25 ms.

My code is structured this way (the following is a semplification):

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100L);

// source definition
DataStream dataStreamSource =
env.addSource(new 
MYRMQAutoboundQueueSource<>(connectionConfig,
conf.getSourceExchange(),
conf.getSourceRoutingKey(),
conf.getSourceQueueName(),
true,
new MyMessageWrapperSchema()))
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {

[jira] [Created] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast

2017-08-29 Thread Paolo Rendano (JIRA)
Paolo Rendano created FLINK-7549:


 Summary: CEP - Pattern not discovered if source streaming is very 
fast
 Key: FLINK-7549
 URL: https://issues.apache.org/jira/browse/FLINK-7549
 Project: Flink
  Issue Type: Bug
  Components: CEP
Affects Versions: 1.3.2, 1.3.1
Reporter: Paolo Rendano


Hi all,
I'm doing some stress test on my pattern using JMeter to populate source data 
on a rabbitmq queue. This queue contains status generated by different devices 
. In my test case I set to loop on a base of 1000 cycles, each one sending 
respectively the first and the second status that generate the event using 
flink CEP (status keyed by device). 
In my early tests I launched that but I noticed that I get only partial results 
in output (70/80% of the expected ones). Introducing a delay in jmeter plan 
between the sending of the two status solved the problem. The minimum delay (of 
course this is on my local machine, on other machines may vary) that make 
things work is 20/25 ms.

My code is structured this way (the following is a semplification):

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100L);

// source definition
DataStream dataStreamSource =
env.addSource(new 
MYRMQAutoboundQueueSource<>(connectionConfig,
conf.getSourceExchange(),
conf.getSourceRoutingKey(),
conf.getSourceQueueName(),
true,
new MyMessageWrapperSchema()))
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
private static final long serialVersionUID = 
-1L;
@Override
public long extractTimestamp(MyMessageWrapper 
element) {
if 
(element.getData().get("stateTimestamp")==null) {
throw new RuntimeException("Status 
Timestamp is null during time ordering for device [" +  
element.getData().get("deviceCode") + "]");
}
return 
FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
}
})
.name("MyIncomingStatus");

// PATTERN  DEFINITION
Pattern myPattern = Pattern
.begin("start")
.subtype(MyMessageWrapper.class)
.where(whereEquals("st", "none"))
.next("end")
.subtype(MyMessageWrapper.class)
.where(whereEquals("st","started"))
.within(Time.minutes(3));

// CEP DEFINITION
PatternStream< MyMessageWrapper > myPatternStream = 
CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);

DataStream> outputStream = 
myPatternStream.flatSelect(patternFlatTimeoutFunction, 
patternFlatSelectFunction);

// SINK DEFINITION
outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, outputExchange, 
new MyMessageWrapperSchema())).name("MyGeneratedEvent");

digging and logging messages received by flink in "extractTimestamp", what 
happens is that with that so high rate of messages, source may receive messages 
with the same timestamp but with different deviceCode. 
Any idea?

Thanks, regards
Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)