Hi,

I just started exploring Flink CEP a day back and I thought I can use it to
make a simple event processor. For that I looked into the CEP examples by
Till and some other articles. 

Now I have 2 questions which i would like to ask:

*Part 1:*

I came up with the following piece of code, but this is not working as
expected.

///**************** Main ******************///


FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
            "testTopic",
            new SimpleStringSchema(),
            props);

    DataStream<String> input = env.addSource(consumer);
    LOG.info("About to process events");
    DataStream<ReadEventType> events =
            input
                    //.map(s -> s.f1)
                    .map(new MapStringToRRE())
                    .filter(Objects::nonNull);

    //events.print();

    DataStream<ReadEventType> partitionedInput = events
            .keyBy((KeySelector<ReadEventType, String>) value ->
value.getRawTransactionItem().getChargedAccount());

    Pattern<ReadEventType, ?> pattern =
Pattern.<ReadEventType>begin("first")
            .where(event -> event.getFormat() == FormatType.FILE)
            .followedBy("second")
            .where(event -> event.getFormat() == FormatType.FILE)
            .within(Time.seconds(1));

    PatternStream<ReadEventType> patternStream =
CEP.pattern(partitionedInput, pattern);

    DataStream<String> alerts =
patternStream.select((PatternSelectFunction<ReadEventType, String>)
CEPForBAMRRE::createAlert);

    alerts.print();

    env.execute("CEP monitoring job");
  }


///*********** Alert Function returning just concat of txn id
***************///

  private static String createAlert(Map<String, ReadEventType> pattern) {
    return pattern.get("first").getTransactionItem().getUid() + " " +
            pattern.get("second").getTransactionItem().getUid();
  }

///******************* properties for kafka **************///

  private static Properties getDefaultProperties(Properties prop){
    prop.put("group.id", "FlinkCEP");
    prop.put("bootstrap.servers", BOOTSTRAP_SERVERS);
    prop.put("zookeeper.connect", ZKEEPER);
    prop.put("auto.offset.reset", "earliest");
    return prop;
  }


As my kafka topic only sends me events with formattype = FILE, I was
expecting to see multiple alerts being raised. But thats not the case, i am
not getting any alert at the moment.

Can anyone point out what am I doing wrong? 

PART 2: 

Also, my main aim for using CEP is to read from different topics and raise
alert if a second event is *not* followed by a first event within a given
time interval. How can I achieve it with FlinkCEP? for now I can only see
that if 2 events follow within a time interval an alert should be raised. 


Thanks & Regards,
Biplob



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to