Have you tried pattern like:

/Pattern.begin[Event]("b",
//AfterMatchSkipStrategy.skipPastLast//).where(...).followedBy("c").where(...).followedBy("e").where(...)/

The method followedBy(Pattern) constructs a Pattern with a subGroup
pattern. The skip strategy there does not have any effect.

Best,

Dawid


On 25/07/2019 16:50, Federico D'Ambrosio wrote:
> Hello everyone,
>
> I need a bit of help concerning a correct formulation for a Complex
> Event Pattern, using CEP.
>
> I have a stream of events which once keyed for ids, they may look like
> this:
>
> a b1 b2 b3 b4 b5 c1 c2 d1 d2 c3 c4 e1 e2 f1
>
> what I want to achieve is to get, from a formulation similar to this:
>
> [1] b c e
>
> this:
>
> b1 c1 e1
>
> that is, for each input stream, have an output composed of only the
> first appearance of events of class b, c and e.
>
> I realize that a pattern formulated like [1] would also match:
>
> b1 c2 e1, b1 c2 e2 and so on, so that I would need to refine it.
>
> So, I tried using oneOrMore(), consecutive() and
> AfterMatchSkipStrategy.skypToFirst, like this:
>
> val b = Pattern
>   .begin[Event]("b")
>   .where((value, _) => value.state == "b")
>   .oneOrMore().consecutive()
>
> val c = Pattern
>   .begin[Event]("c")
>   .where((value, _) => value.state == "c")
>   .oneOrMore().consecutive()
>
> val e = Pattern
>   .begin[Event]("e", AfterMatchSkipStrategy.skipToFirst("b"))
>   .where((value, _) => value.state == "e")
>   .oneOrMore().consecutive()
>
> val pattern: Pattern[Event, _] =
>   b.followedBy(c).followedBy(e)
>
> In the process function I would do something like this:
>
> override def processMatch(matches: util.Map[String, util.List[Event]],
>                                   ctx: PatternProcessFunction.Context,
>                                   out: Collector[OutputEvent]): Unit =  {
>
>     val bEvent = matches.get("b").asScala.head
>     val cEvent = matches.get("c").asScala.head
>     val eEvent = matches.get("e").asScala.head
>
>     out.collect(OutputEvent(bEvent, cEvent, eEvent))
> }
>
> But unfortunately it doesn't work like I want, which makes me think
> I'm missing something within the functionalities of Flink CEP.
>
> What's the best way to achieve what I want? Is it possible?
> Should I even use any AfterMatchSkipStrategy?
>
> Thank you,
> Federico D'Ambrosio

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to