Hi Biplob,

For the 1.4 version, the input of the select function has changed to expect a 
list of 
matching events (Map<String, List<IN>> map instead of Map<String, IN> map), as 
we have added quantifiers. 

Also the FIlterFunction has changed to SimpleCondition. 

The documentation is lagging a bit behind, but it is coming soon.

Now for the code I will have to dig into it a bit more.

Kostas

> On May 26, 2017, at 4:07 PM, Biplob Biswas <revolutioni...@gmail.com> wrote:
> 
> Hello Kostas,
> 
> Thanks for the suggestions.
> 
> I checked and I am getting my events in the partitionedInput stream when i
> am printing it but still nothing on the alert side. I checked flink UI for
> backpressure and all seems to be normal (I am having at max 1000 events per
> second on the kafka topic so  don't think backpressure could be a problem,
> atleast I expect so)
> 
> Also, I haven't run my test with my test data as a collection but I tried
> with this following example and I did get alerts as a result: 
> 
> 
> // CEPTest using collection
> 
> List<MyEvent> inputElements = new ArrayList<>();
>    inputElements.add(new MyEvent(1, 'a', 1, 1));
>    inputElements.add(new MyEvent(1, 'b', 1, 2));
>    inputElements.add(new MyEvent(1, 'b', 2, 2));
>    inputElements.add(new MyEvent(1, 'b', 3, 5));
> 
>    Pattern<MyEvent, ?> pattern = Pattern.<MyEvent>begin("a").where(new
> FilterFunction<MyEvent>() {
>      private static final long serialVersionUID = 7219646616484327688L;
> 
>      @Override
>      public boolean filter(MyEvent myEvent) throws Exception {
>        return myEvent.getPayload() == 'a';
>      }
>    }).followedBy("b").where(new FilterFunction<MyEvent>() {
>      private static final long serialVersionUID = 7219646616484327688L;
> 
>      @Override
>      public boolean filter(MyEvent myEvent) throws Exception {
>        return myEvent.getPayload() == 'b';
>      }
>    }).within(Time.seconds(1));//.within(Time.milliseconds(2L));
> 
>    StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>    env.getConfig().setAutoWatermarkInterval(1000);
> 
>    DataStream<MyEvent> input =
> env.fromCollection(inputElements).assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<MyEvent>() {
>      private static final long serialVersionUID = -6619787346214245526L;
> 
>      @Override
>      public long extractAscendingTimestamp(MyEvent myEvent) {
>        return myEvent.getTimestamp();
>      }
>    });
> 
>    PatternStream<MyEvent> patternStream = CEP.pattern(input.keyBy(new
> KeySelector<MyEvent, Long>() {
>      private static final long serialVersionUID = 6928745840509494198L;
> 
>      @Override
>      public Long getKey(MyEvent myEvent) throws Exception {
>        return myEvent.getId();
>      }
>    }), pattern);
> 
> 
>    patternStream.select(new PatternTimeoutFunction<MyEvent, String>() {
>      @Override
>      public String timeout(Map<String, MyEvent> map, long l) throws
> Exception {
>        return map.toString() +" @ "+ l;
>      }
> 
>      private static final long serialVersionUID = 300759199619789416L;
> 
> 
>    }, new PatternSelectFunction<MyEvent, String>() {
> 
>      @Override
>      public String select(Map<String, MyEvent> map) throws Exception {
>        return map.toString();
>      }
> 
>      private static final long serialVersionUID = 732172159423132724L;
>    }).print();
> 
> 
> 
> Also along with that now I upgraded my flink maven project to 1.4-Snapshot
> and there seems to be a problem there. 
> 
> According to  this
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html>
>  
> : 
> 
> class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN,
> OUT> {
>    @Override
>    public OUT select(Map<String, IN> pattern) {
>        IN startEvent = pattern.get("start");
>        IN endEvent = pattern.get("end");
>        return new OUT(startEvent, endEvent);
>    }
> }
> 
> but when I am doing it it expects a list from my side for the events:
> 
> class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN,
> OUT> {
>  @Override
>  public OUT select(Map<String, List&lt;IN>> map) throws Exception {
>    return null;
>  }
> }
> 
> Not really sure what am I doing wrong here, any inputs would be really
> helpful.
> 
> Regards,
> Biplob
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333p13341.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.

Reply via email to