Hi Ajay,

I will definitely have a look to see what is happening.
And I will keep you posted.

Thanks for investigating it.

Kostas

> On Oct 1, 2017, at 12:50 AM, Ajay Krishna <ajaykris...@gmail.com> wrote:
> 
> Hi Kostas,
> 
> Here is an example of a simple event I am trying to detect. The first and 
> last line are the interesting points/events. The CEP library is not able to 
> detect something like that. 
> 
> 4> (96,Sat Sep 30 22:30:25 UTC 2017,complex 
> event,Low,32.781082,-117.01864,12.0,20.0)
> 
> 4> (96,Sat Sep 30 22:30:26 UTC 2017,complex 
> event,High,32.781082,-117.01864,0.0235,20.0)
> 
> 4> (96,Sat Sep 30 22:30:27 UTC 2017,complex 
> event,High,32.781082,-117.01864,0.02319611,20.0)
> 
> 4> (96,Sat Sep 30 22:30:28 UTC 2017,complex 
> event,Medium,32.781082,-117.01864,0.023357224,20.0)
> 
> 4> (96,Sat Sep 30 22:30:29 UTC 2017,complex 
> event,Low,32.781082,-117.01864,0.060904443,20.0)
> 
> 4> (96,Sat Sep 30 22:30:30 UTC 2017,complex 
> event,Medium,32.781082,-117.01864,0.100115,20.0)
> 
> 4> (96,Sat Sep 30 22:30:31 UTC 2017,complex 
> event,High,32.781082,-117.01864,0.12398389,20.0)
> 
> 4> (96,Sat Sep 30 22:30:32 UTC 2017,complex 
> event,Medium,32.781082,-117.01864,0.15611167,20.0)
> 
> 4> (96,Sat Sep 30 22:30:33 UTC 2017,complex 
> event,Low,32.781082,-117.01864,0.15817556,20.0)
> 
> 4> (96,Sat Sep 30 22:30:34 UTC 2017,complex 
> event,Low,32.781082,-117.01864,0.09934334,20.0)
> 
> 4> (96,Sat Sep 30 22:30:35 UTC 2017,complex 
> event,High,32.781082,-117.01864,16.0,20.0)
> 
> 
> 
> Notes about this experiment.
> 
> 1. Only one kafka partition and just one topic
> 
> 2. Flink env parallelism set to 4 and I am using AscendingTimestampExtractor 
> on KafkaSource09.
> 
> 3. In the data above, the first element is the id that I use for keyBy
> 
> 4. I started 4 Kafka producers in parallel with a random delay between them
> 
> 5. Each producer sends 10000 rows from a csv at an average of 18 seconds. Of 
> the data from 4 producers, the events for only one was detected. 
> 
> 6. Looking at the log files, I print on the stream and see all 40000 lines 
> where each id is associated with one process number. In the above data 96 is 
> only associated with 4. In this case there is just one partition in Kafka. If 
> I were to increase the number of partitions each id is spread across multiple 
> processes.
> 
> 7. I had ran another test with a different set of 4 ids just before the one 
> I've presented above and I expected to see 148 events for 4 ids and I saw all 
> of them being captured. I did not change anything as far as delays in the 
> producer.
> 
> The behavior is quite arbitrary and I am suspecting the cause is because of 
> bugs FLINK-7549 <https://issues.apache.org/jira/browse/FLINK-7549> and 
> FLINK-7606 <https://issues.apache.org/jira/browse/FLINK-7606>. Could you help 
> understand further.
> 
> Best regards,
> 
> Ajay
> 
> 
> 
> 
> On Thu, Sep 28, 2017 at 8:39 AM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> Hi Ajay,
> 
> After reading all the data from your source, could you somehow tell your 
> sources to send 
> a watermark of Long.MaxValue (or a high value)??
> 
> I am asking this, just to see if the problem is that the data is simply 
> buffered inside Flink because
> there is a problem with the timestamps and the watermarks.
> You could also see this from the WebUi, but seeing the size of your 
> checkpointed state.
> If the size increases, it means that something is stored there.
> 
> I will also have a deeper look.
> 
> Kostas
> 
>> On Sep 28, 2017, at 5:17 PM, Ajay Krishna <ajaykris...@gmail.com 
>> <mailto:ajaykris...@gmail.com>> wrote:
>> 
>> Hi Kostas,
>> 
>> Thank you for reaching out and for the suggestions. Here are the results
>> 
>> 1. Using an env parallelism of 1 performed similar with the additional 
>> problem that there was significant lag in the kafka topic
>> 2. I removed the additional keyBy(0) but that did not change anything
>> 3. I also tried only to check for the start only pattern and it was exactly 
>> the same where I saw one of the homes going through but 3 others just 
>> getting dropped. 
>> 4. I also tried slowing down the rate from 5000/second into Kafka to about 
>> 1000/second but I see similar results. 
>> 
>> I was wondering if you had any other solutions to the problem. I am 
>> specially concerned about 1 and 3. Is this library under active development 
>> ? Is there a JIRA open on this issue and could be open one to track this ? 
>> 
>> 
>> I was trying read on Stackoverlfow and found a user had a very very similar 
>> issue in Aug'16. So I also contacted him to discuss the issue and learn't 
>> that the pattern of failure was exactly the same. 
>> 
>> https://stackoverflow.com/questions/38870819/flink-cep-is-not-deterministic 
>> <https://stackoverflow.com/questions/38870819/flink-cep-is-not-deterministic>
>> 
>> 
>> Before I found the above post, I created a post for this issue
>> https://stackoverflow.com/questions/46458873/flink-cep-not-recognizing-pattern
>>  
>> <https://stackoverflow.com/questions/46458873/flink-cep-not-recognizing-pattern>
>> 
>> 
>> 
>> I would really appreciate your guidance on this. 
>> 
>> Best regards,
>> Ajay
>> 
>> 
>> 
>> 
>> 
>> On Thu, Sep 28, 2017 at 1:38 AM, Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>> wrote:
>> Hi Ajay,
>> 
>> I will look a bit more on the issue.
>> 
>> But in the meantime, could you run your job with parallelism of 1, to see if 
>> the results are the expected?
>> 
>> Also could you change the pattern, for example check only for the start, to 
>> see if all keys pass through.
>> 
>> As for the code, you apply keyBy(0) the cepMap stream twice, which is 
>> redundant and introduces latency. 
>> You could remove that to also see the impact.
>> 
>> Kostas
>> 
>>> On Sep 28, 2017, at 2:57 AM, Ajay Krishna <ajaykris...@gmail.com 
>>> <mailto:ajaykris...@gmail.com>> wrote:
>>> 
>>> Hi, 
>>> 
>>> I've been only working with flink for the past 2 weeks on a project and am 
>>> trying using the CEP library on sensor data. I am using flink version 
>>> 1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running 
>>> Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the 
>>> flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots
>>> 
>>> What I observe is the following. The input to Kafka is a json string and 
>>> when parsed on the flink side, it looks like this
>>> 
>>> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
>>> event,High,37.75142,-122.39458,12.0,20.0)
>>> I use a Tuple8 to capture the parsed data. The first field is home_id. The 
>>> time characteristic is set to EventTime and I have an  
>>> AscendingTimestampExtractor using the timestamp field. I have parallelism 
>>> for the execution environment is set to 4. I have a rather simple event 
>>> that I am trying to capture
>>> 
>>> DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float, Float>> 
>>> cepMapByHomeId = cepMap.keyBy(0);
>>> 
>>>             //cepMapByHomeId.print();
>>> 
>>>             
>>> Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>, ?> cep1 
>>> =
>>>                             
>>> Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start")
>>>                                             .where(new OverLowThreshold())
>>>                                             .followedBy("end")
>>>                                             .where(new OverHighThreshold());
>>> 
>>> 
>>>             PatternStream<Tuple8<Integer, Date, String, String, Float, 
>>> Float, Float, Float>> patternStream = CEP.pattern(cepMapByHomeId.keyBy(0), 
>>> cep1);
>>> 
>>> 
>>>             DataStream<Tuple7<Integer, Date, Date, String, String, Float, 
>>> Float>> alerts = patternStream.select(new PackageCapturedEvents());
>>> The pattern checks if the 7th field in the tuple8 goes over 12 and then 
>>> over 16. The output of the pattern is like this
>>> 
>>> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
>>> event,Non-event,37.75837,-122.41467)
>>> On the Kafka producer side, I am trying send simulated data for around 100 
>>> homes, so the home_id would go from 0-100 and the input is keyed by 
>>> home_id. I have about 10 partitions in kafka. The producer just loops going 
>>> through a csv file with a delay of about 100 ms between 2 rows of the csv 
>>> file. The data is exactly the same for all 100 of the csv files except for 
>>> home_id and the lat & long information. The timestamp is incremented by a 
>>> step of 1 sec. I start multiple processes to simulate data form different 
>>> homes.
>>> 
>>> THE PROBLEM:
>>> 
>>> Flink completely misses capturing events for a large subset of the input 
>>> data. I barely see the events for about 4-5 of the home_id values. I do a 
>>> print before applying the pattern and after and I see all home_ids before 
>>> and only a tiny subset after. Since the data is exactly the same, I expect 
>>> all homeid to be captured and written to my sink which is cassandra in this 
>>> case. I've looked through all available docs and examples but cannot seem 
>>> to get a fix for the problem.
>>> 
>>> I would really appreciate some guidance how to understand fix this.
>>> 
>>> 
>>> 
>>> Thank you,
>>> 
>>> Ajay
>>> 
>> 
>> 
> 
> 

Reply via email to