One additional comment, from your code it seems you are using Flink 1.2.
It would be worth upgrading to 1.3. The updated CEP library includes a lot of 
new features and bugfixes.

Cheers,
Kostas

> On May 26, 2017, at 3:33 PM, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> Hi Biplob,
> 
> From a first scan of the code I cannot find sth fishy.
> 
> You are working on ProcessingTime, given that you do not 
> provide any time characteristic specification, right?
> 
> In this case, if you print your partitionedInput stream, do you 
> see elements flowing as expected?
> 
> If elements are flowing normally, is any back pressure created? 
> Or you keep on reading records from kafka uninterrupted? 
> I am asking to see if the CEP operator is doing sth that blocks the 
> pipeline or it just discards the elements.
> 
> It could be also worth trying to add a source with artificial elements 
> env.fromCollection(…) 
> to see if in this case everything works normally.
> 
> Kostas
> 
>> On May 26, 2017, at 2:25 PM, Biplob Biswas <revolutioni...@gmail.com> wrote:
>> 
>> Hi,
>> 
>> I just started exploring Flink CEP a day back and I thought I can use it to
>> make a simple event processor. For that I looked into the CEP examples by
>> Till and some other articles. 
>> 
>> Now I have 2 questions which i would like to ask:
>> 
>> *Part 1:*
>> 
>> I came up with the following piece of code, but this is not working as
>> expected.
>> 
>> ///**************** Main ******************///
>> 
>> 
>> FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
>>           "testTopic",
>>           new SimpleStringSchema(),
>>           props);
>> 
>>   DataStream<String> input = env.addSource(consumer);
>>   LOG.info("About to process events");
>>   DataStream<ReadEventType> events =
>>           input
>>                   //.map(s -> s.f1)
>>                   .map(new MapStringToRRE())
>>                   .filter(Objects::nonNull);
>> 
>>   //events.print();
>> 
>>   DataStream<ReadEventType> partitionedInput = events
>>           .keyBy((KeySelector<ReadEventType, String>) value ->
>> value.getRawTransactionItem().getChargedAccount());
>> 
>>   Pattern<ReadEventType, ?> pattern =
>> Pattern.<ReadEventType>begin("first")
>>           .where(event -> event.getFormat() == FormatType.FILE)
>>           .followedBy("second")
>>           .where(event -> event.getFormat() == FormatType.FILE)
>>           .within(Time.seconds(1));
>> 
>>   PatternStream<ReadEventType> patternStream =
>> CEP.pattern(partitionedInput, pattern);
>> 
>>   DataStream<String> alerts =
>> patternStream.select((PatternSelectFunction<ReadEventType, String>)
>> CEPForBAMRRE::createAlert);
>> 
>>   alerts.print();
>> 
>>   env.execute("CEP monitoring job");
>> }
>> 
>> 
>> ///*********** Alert Function returning just concat of txn id
>> ***************///
>> 
>> private static String createAlert(Map<String, ReadEventType> pattern) {
>>   return pattern.get("first").getTransactionItem().getUid() + " " +
>>           pattern.get("second").getTransactionItem().getUid();
>> }
>> 
>> ///******************* properties for kafka **************///
>> 
>> private static Properties getDefaultProperties(Properties prop){
>>   prop.put("group.id", "FlinkCEP");
>>   prop.put("bootstrap.servers", BOOTSTRAP_SERVERS);
>>   prop.put("zookeeper.connect", ZKEEPER);
>>   prop.put("auto.offset.reset", "earliest");
>>   return prop;
>> }
>> 
>> 
>> As my kafka topic only sends me events with formattype = FILE, I was
>> expecting to see multiple alerts being raised. But thats not the case, i am
>> not getting any alert at the moment.
>> 
>> Can anyone point out what am I doing wrong? 
>> 
>> PART 2: 
>> 
>> Also, my main aim for using CEP is to read from different topics and raise
>> alert if a second event is *not* followed by a first event within a given
>> time interval. How can I achieve it with FlinkCEP? for now I can only see
>> that if 2 events follow within a time interval an alert should be raised. 
>> 
>> 
>> Thanks & Regards,
>> Biplob
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>> at Nabble.com.
> 

Reply via email to