One additional comment, from your code it seems you are using Flink 1.2. It would be worth upgrading to 1.3. The updated CEP library includes a lot of new features and bugfixes.
Cheers, Kostas > On May 26, 2017, at 3:33 PM, Kostas Kloudas <k.klou...@data-artisans.com> > wrote: > > Hi Biplob, > > From a first scan of the code I cannot find sth fishy. > > You are working on ProcessingTime, given that you do not > provide any time characteristic specification, right? > > In this case, if you print your partitionedInput stream, do you > see elements flowing as expected? > > If elements are flowing normally, is any back pressure created? > Or you keep on reading records from kafka uninterrupted? > I am asking to see if the CEP operator is doing sth that blocks the > pipeline or it just discards the elements. > > It could be also worth trying to add a source with artificial elements > env.fromCollection(…) > to see if in this case everything works normally. > > Kostas > >> On May 26, 2017, at 2:25 PM, Biplob Biswas <revolutioni...@gmail.com> wrote: >> >> Hi, >> >> I just started exploring Flink CEP a day back and I thought I can use it to >> make a simple event processor. For that I looked into the CEP examples by >> Till and some other articles. >> >> Now I have 2 questions which i would like to ask: >> >> *Part 1:* >> >> I came up with the following piece of code, but this is not working as >> expected. >> >> ///**************** Main ******************/// >> >> >> FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>( >> "testTopic", >> new SimpleStringSchema(), >> props); >> >> DataStream<String> input = env.addSource(consumer); >> LOG.info("About to process events"); >> DataStream<ReadEventType> events = >> input >> //.map(s -> s.f1) >> .map(new MapStringToRRE()) >> .filter(Objects::nonNull); >> >> //events.print(); >> >> DataStream<ReadEventType> partitionedInput = events >> .keyBy((KeySelector<ReadEventType, String>) value -> >> value.getRawTransactionItem().getChargedAccount()); >> >> Pattern<ReadEventType, ?> pattern = >> Pattern.<ReadEventType>begin("first") >> .where(event -> event.getFormat() == FormatType.FILE) >> .followedBy("second") >> .where(event -> event.getFormat() == FormatType.FILE) >> .within(Time.seconds(1)); >> >> PatternStream<ReadEventType> patternStream = >> CEP.pattern(partitionedInput, pattern); >> >> DataStream<String> alerts = >> patternStream.select((PatternSelectFunction<ReadEventType, String>) >> CEPForBAMRRE::createAlert); >> >> alerts.print(); >> >> env.execute("CEP monitoring job"); >> } >> >> >> ///*********** Alert Function returning just concat of txn id >> ***************/// >> >> private static String createAlert(Map<String, ReadEventType> pattern) { >> return pattern.get("first").getTransactionItem().getUid() + " " + >> pattern.get("second").getTransactionItem().getUid(); >> } >> >> ///******************* properties for kafka **************/// >> >> private static Properties getDefaultProperties(Properties prop){ >> prop.put("group.id", "FlinkCEP"); >> prop.put("bootstrap.servers", BOOTSTRAP_SERVERS); >> prop.put("zookeeper.connect", ZKEEPER); >> prop.put("auto.offset.reset", "earliest"); >> return prop; >> } >> >> >> As my kafka topic only sends me events with formattype = FILE, I was >> expecting to see multiple alerts being raised. But thats not the case, i am >> not getting any alert at the moment. >> >> Can anyone point out what am I doing wrong? >> >> PART 2: >> >> Also, my main aim for using CEP is to read from different topics and raise >> alert if a second event is *not* followed by a first event within a given >> time interval. How can I achieve it with FlinkCEP? for now I can only see >> that if 2 events follow within a time interval an alert should be raised. >> >> >> Thanks & Regards, >> Biplob >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333.html >> Sent from the Apache Flink User Mailing List archive. mailing list archive >> at Nabble.com. >