Hi Biplob,

From a first scan of the code I cannot find sth fishy.

You are working on ProcessingTime, given that you do not 
provide any time characteristic specification, right?

In this case, if you print your partitionedInput stream, do you 
see elements flowing as expected?

If elements are flowing normally, is any back pressure created? 
Or you keep on reading records from kafka uninterrupted? 
I am asking to see if the CEP operator is doing sth that blocks the 
pipeline or it just discards the elements.

It could be also worth trying to add a source with artificial elements 
env.fromCollection(…) 
to see if in this case everything works normally.

Kostas

> On May 26, 2017, at 2:25 PM, Biplob Biswas <revolutioni...@gmail.com> wrote:
> 
> Hi,
> 
> I just started exploring Flink CEP a day back and I thought I can use it to
> make a simple event processor. For that I looked into the CEP examples by
> Till and some other articles. 
> 
> Now I have 2 questions which i would like to ask:
> 
> *Part 1:*
> 
> I came up with the following piece of code, but this is not working as
> expected.
> 
> ///**************** Main ******************///
> 
> 
> FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
>            "testTopic",
>            new SimpleStringSchema(),
>            props);
> 
>    DataStream<String> input = env.addSource(consumer);
>    LOG.info("About to process events");
>    DataStream<ReadEventType> events =
>            input
>                    //.map(s -> s.f1)
>                    .map(new MapStringToRRE())
>                    .filter(Objects::nonNull);
> 
>    //events.print();
> 
>    DataStream<ReadEventType> partitionedInput = events
>            .keyBy((KeySelector<ReadEventType, String>) value ->
> value.getRawTransactionItem().getChargedAccount());
> 
>    Pattern<ReadEventType, ?> pattern =
> Pattern.<ReadEventType>begin("first")
>            .where(event -> event.getFormat() == FormatType.FILE)
>            .followedBy("second")
>            .where(event -> event.getFormat() == FormatType.FILE)
>            .within(Time.seconds(1));
> 
>    PatternStream<ReadEventType> patternStream =
> CEP.pattern(partitionedInput, pattern);
> 
>    DataStream<String> alerts =
> patternStream.select((PatternSelectFunction<ReadEventType, String>)
> CEPForBAMRRE::createAlert);
> 
>    alerts.print();
> 
>    env.execute("CEP monitoring job");
>  }
> 
> 
> ///*********** Alert Function returning just concat of txn id
> ***************///
> 
>  private static String createAlert(Map<String, ReadEventType> pattern) {
>    return pattern.get("first").getTransactionItem().getUid() + " " +
>            pattern.get("second").getTransactionItem().getUid();
>  }
> 
> ///******************* properties for kafka **************///
> 
>  private static Properties getDefaultProperties(Properties prop){
>    prop.put("group.id", "FlinkCEP");
>    prop.put("bootstrap.servers", BOOTSTRAP_SERVERS);
>    prop.put("zookeeper.connect", ZKEEPER);
>    prop.put("auto.offset.reset", "earliest");
>    return prop;
>  }
> 
> 
> As my kafka topic only sends me events with formattype = FILE, I was
> expecting to see multiple alerts being raised. But thats not the case, i am
> not getting any alert at the moment.
> 
> Can anyone point out what am I doing wrong? 
> 
> PART 2: 
> 
> Also, my main aim for using CEP is to read from different topics and raise
> alert if a second event is *not* followed by a first event within a given
> time interval. How can I achieve it with FlinkCEP? for now I can only see
> that if 2 events follow within a time interval an alert should be raised. 
> 
> 
> Thanks & Regards,
> Biplob
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.

Reply via email to