I am using Processing time characteristic. DataStream<Model> inputStream = env.fromElements( Model.of(1, "A", "US"), Model.of(2, "B", "US"), Model.of(3, "C", "US"), Model.of(4, "A", "AU"), Model.of(5, "B", "AU"), Model.of(6, "C", "AU"), //Model.of(7, "D", "US"), Model.of(8, "D", "AU"), Model.of(9, "A", "GB"), Model.of(10, "B", "GB"), Model.of(13, "D", "GB"), Model.of(11, "C", "GB"), Model.of(12, "D", "GB")
in the above inputStream the Model.of(7, "D", "US") is not supplied to the pattern sequence. Nothing special at all with this but I wanted to simulate the case that an event that fulfills the pattern sequence might be missing which it happens in my case. BR, Isidoros Στις Πέμ, 4 Νοε 2021 στις 11:01 μ.μ., ο/η Austin Cawley-Edwards < austin.caw...@gmail.com> έγραψε: > Thanks for the update, the requirements make sense. > > Some follow up questions: > * What time characteristic are you using? Processing or Event? > * Can you describe a bit more what you mean by "input like the one I have > commented bellow"? What is special about the one you have commented? > > Best, > Austin > > On Thu, Nov 4, 2021 at 4:09 PM Isidoros Ioannou <akis3...@gmail.com> > wrote: > >> >> >> ---------- Forwarded message --------- >> Από: Isidoros Ioannou <akis3...@gmail.com> >> Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ. >> Subject: Re: IterativeCondition instead of SimpleCondition not matching >> pattern >> To: Austin Cawley-Edwards <austin.caw...@gmail.com> >> >> >> Hi Austin, >> thank you for your answer and I really appreciate your willingness to >> help. >> >> Actually the desired output is the one below >> >> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, >> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], >> fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A', >> symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}], >> third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D', >> symbol='GB'}]} >> I would like only to generate sequences of Models that have the same >> symbol. I noticed that if an event does not come as input >> like the one I have commented bellow, it breaks all the pattern match and >> the desired output is never produced >> >> DataStream<Model> inputStream = env.fromElements( >> Model.of(1, "A", "US"), >> Model.of(2, "B", "US"), >> Model.of(3, "C", "US"), >> Model.of(4, "A", "AU"), >> Model.of(5, "B", "AU"), >> Model.of(6, "C", "AU"), >> //Model.of(7, "D", "US"), >> Model.of(8, "D", "AU"), >> Model.of(9, "A", "GB"), >> Model.of(10, "B", "GB"), >> Model.of(13, "D", "GB"), >> Model.of(11, "C", "GB"), >> Model.of(12, "D", "GB") >> >> Kind Regards, >> Isidoros >> >> >> Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards < >> austin.caw...@gmail.com> έγραψε: >> >>> Hi Isidoros, >>> >>> Thanks for reaching out to the mailing list. I haven't worked with the >>> CEP library in a long time but can try to help. I'm having a little trouble >>> understanding the desired output + rules. Can you mock up the desired >>> output like you have for the fulfilled pattern sequence? >>> >>> Best, >>> Austin >>> >>> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou <akis3...@gmail.com> >>> wrote: >>> >>>> >>>> I face an issue when try to match some elements in a Pattern sequence. >>>> Flink 1.11.1 version. Here is my case: >>>> >>>> final StreamExecutionEnvironment env = >>>> EnvironmentProvider.getEnvironment(); >>>> DataStream<Model> inputStream = env.fromElements( >>>> Model.of(1, "A", "US"), >>>> Model.of(2, "B", "US"), >>>> Model.of(3, "C", "US"), >>>> Model.of(4, "A", "AU"), >>>> Model.of(5, "B", "AU"), >>>> Model.of(6, "C", "AU"), >>>> //Model.of(7, "D"), >>>> Model.of(8, "D", "AU"), >>>> Model.of(9, "A", "GB"), >>>> Model.of(10, "B", "GB"), >>>> Model.of(13, "D", "GB"), >>>> Model.of(11, "C", "GB"), >>>> Model.of(12, "D", "GB") >>>> >>>> >>>> >>>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()) >>>> .forceNonParallel(); >>>> >>>> Pattern<Model, Model> pattern = Pattern.<Model>begin("start", >>>> AfterMatchSkipStrategy.skipToNext()) >>>> .where(new IterativeCondition<Model>() { >>>> @Override >>>> public boolean filter(Model value, Context<Model> ctx) >>>> throws Exception { >>>> return value.getText().equalsIgnoreCase("A"); >>>> } >>>> }).followedBy("second") >>>> .where(new IterativeCondition<Model>() { >>>> @Override >>>> public boolean filter(Model value, Context<Model> ctx) >>>> throws Exception { >>>> >>>> return value.getText().equalsIgnoreCase("B"); >>>> } >>>> }).followedBy("third") >>>> .where(new IterativeCondition<Model>() { >>>> @Override >>>> public boolean filter(Model value, Context<Model> ctx) >>>> throws Exception { >>>> >>>> return value.getText().equalsIgnoreCase("C"); >>>> } >>>> }).followedBy("fourth") >>>> .where(new IterativeCondition<Model>() { >>>> @Override >>>> public boolean filter(Model value, Context<Model> ctx) >>>> throws Exception { >>>> var list = >>>> StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(), >>>> false).collect(Collectors.toList()); >>>> var val = >>>> list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol()); >>>> return value.getText().equalsIgnoreCase("D") && val; >>>> } >>>> }); >>>> >>>> >>>> PatternStream<Model> marketOpenPatternStream = >>>> CEP.<Model>pattern(inputStream, pattern); >>>> >>>> SingleOutputStreamOperator<List<Model>> marketOpenOutput = >>>> marketOpenPatternStream >>>> .process(new PatternProcessFunction<Model, >>>> List<Model>>() { >>>> @Override >>>> public void processMatch(Map<String, List<Model>> >>>> match, Context ctx, Collector<List<Model>> out) throws Exception { >>>> System.out.println(match); >>>> out.collect(new ArrayList(match.values())); >>>> } >>>> }) >>>> >>>> What I am trying to succeed is to match only patterns that have the same >>>> symbol. If I use SimpleCondition with checks only about the text of the >>>> Model(A, B,C..) without the symbol check in the last pattern, the pattern >>>> sequence is fulfilled and I get the following output: >>>> >>>> {start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, text='B', >>>> symbol='US'}], third=[Model{id=3, text='C', symbol='US'}], >>>> fourth=[Model{id=8,text='D',symbol='AU'}]} >>>> >>>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, text='B', >>>> symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], >>>> fourth=[Model{id=8, text='D', symbol='AU'}]} >>>> >>>> {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, >>>> text='B', symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], >>>> fourth=[Model{id=12, text='D', symbol='GB'}]} >>>> >>>> However I want to avoid the match of elements with id= 1(A),2(B),3(C) >>>> with the element with id = 8(D). For this reason I put the symbol check >>>> with the event matched in the previous pattern in the last condition so I >>>> dont get match since they do not have the same symbol. But after applying >>>> the condition, now I do not get any output. none of the elements match the >>>> pattern. What I am missing? Could someone help? >>>> >>>>