Thanks for the update, the requirements make sense. Some follow up questions: * What time characteristic are you using? Processing or Event? * Can you describe a bit more what you mean by "input like the one I have commented bellow"? What is special about the one you have commented?
Best, Austin On Thu, Nov 4, 2021 at 4:09 PM Isidoros Ioannou <akis3...@gmail.com> wrote: > > > ---------- Forwarded message --------- > Από: Isidoros Ioannou <akis3...@gmail.com> > Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ. > Subject: Re: IterativeCondition instead of SimpleCondition not matching > pattern > To: Austin Cawley-Edwards <austin.caw...@gmail.com> > > > Hi Austin, > thank you for your answer and I really appreciate your willingness to help. > > Actually the desired output is the one below > > {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, text='B', > symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], > fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A', > symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}], > third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D', > symbol='GB'}]} > I would like only to generate sequences of Models that have the same > symbol. I noticed that if an event does not come as input > like the one I have commented bellow, it breaks all the pattern match and > the desired output is never produced > > DataStream<Model> inputStream = env.fromElements( > Model.of(1, "A", "US"), > Model.of(2, "B", "US"), > Model.of(3, "C", "US"), > Model.of(4, "A", "AU"), > Model.of(5, "B", "AU"), > Model.of(6, "C", "AU"), > //Model.of(7, "D", "US"), > Model.of(8, "D", "AU"), > Model.of(9, "A", "GB"), > Model.of(10, "B", "GB"), > Model.of(13, "D", "GB"), > Model.of(11, "C", "GB"), > Model.of(12, "D", "GB") > > Kind Regards, > Isidoros > > > Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards < > austin.caw...@gmail.com> έγραψε: > >> Hi Isidoros, >> >> Thanks for reaching out to the mailing list. I haven't worked with the >> CEP library in a long time but can try to help. I'm having a little trouble >> understanding the desired output + rules. Can you mock up the desired >> output like you have for the fulfilled pattern sequence? >> >> Best, >> Austin >> >> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou <akis3...@gmail.com> >> wrote: >> >>> >>> I face an issue when try to match some elements in a Pattern sequence. >>> Flink 1.11.1 version. Here is my case: >>> >>> final StreamExecutionEnvironment env = EnvironmentProvider.getEnvironment(); >>> DataStream<Model> inputStream = env.fromElements( >>> Model.of(1, "A", "US"), >>> Model.of(2, "B", "US"), >>> Model.of(3, "C", "US"), >>> Model.of(4, "A", "AU"), >>> Model.of(5, "B", "AU"), >>> Model.of(6, "C", "AU"), >>> //Model.of(7, "D"), >>> Model.of(8, "D", "AU"), >>> Model.of(9, "A", "GB"), >>> Model.of(10, "B", "GB"), >>> Model.of(13, "D", "GB"), >>> Model.of(11, "C", "GB"), >>> Model.of(12, "D", "GB") >>> >>> >>> >>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()) >>> .forceNonParallel(); >>> >>> Pattern<Model, Model> pattern = Pattern.<Model>begin("start", >>> AfterMatchSkipStrategy.skipToNext()) >>> .where(new IterativeCondition<Model>() { >>> @Override >>> public boolean filter(Model value, Context<Model> ctx) >>> throws Exception { >>> return value.getText().equalsIgnoreCase("A"); >>> } >>> }).followedBy("second") >>> .where(new IterativeCondition<Model>() { >>> @Override >>> public boolean filter(Model value, Context<Model> ctx) >>> throws Exception { >>> >>> return value.getText().equalsIgnoreCase("B"); >>> } >>> }).followedBy("third") >>> .where(new IterativeCondition<Model>() { >>> @Override >>> public boolean filter(Model value, Context<Model> ctx) >>> throws Exception { >>> >>> return value.getText().equalsIgnoreCase("C"); >>> } >>> }).followedBy("fourth") >>> .where(new IterativeCondition<Model>() { >>> @Override >>> public boolean filter(Model value, Context<Model> ctx) >>> throws Exception { >>> var list = >>> StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(), >>> false).collect(Collectors.toList()); >>> var val = >>> list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol()); >>> return value.getText().equalsIgnoreCase("D") && val; >>> } >>> }); >>> >>> >>> PatternStream<Model> marketOpenPatternStream = >>> CEP.<Model>pattern(inputStream, pattern); >>> >>> SingleOutputStreamOperator<List<Model>> marketOpenOutput = >>> marketOpenPatternStream >>> .process(new PatternProcessFunction<Model, >>> List<Model>>() { >>> @Override >>> public void processMatch(Map<String, List<Model>> >>> match, Context ctx, Collector<List<Model>> out) throws Exception { >>> System.out.println(match); >>> out.collect(new ArrayList(match.values())); >>> } >>> }) >>> >>> What I am trying to succeed is to match only patterns that have the same >>> symbol. If I use SimpleCondition with checks only about the text of the >>> Model(A, B,C..) without the symbol check in the last pattern, the pattern >>> sequence is fulfilled and I get the following output: >>> >>> {start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, text='B', >>> symbol='US'}], third=[Model{id=3, text='C', symbol='US'}], >>> fourth=[Model{id=8,text='D',symbol='AU'}]} >>> >>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, text='B', >>> symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], >>> fourth=[Model{id=8, text='D', symbol='AU'}]} >>> >>> {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, text='B', >>> symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], >>> fourth=[Model{id=12, text='D', symbol='GB'}]} >>> >>> However I want to avoid the match of elements with id= 1(A),2(B),3(C) >>> with the element with id = 8(D). For this reason I put the symbol check >>> with the event matched in the previous pattern in the last condition so I >>> dont get match since they do not have the same symbol. But after applying >>> the condition, now I do not get any output. none of the elements match the >>> pattern. What I am missing? Could someone help? >>> >>>