Hi Seth, thank you for your answer. In this case you are right and it would solve my problem. but actually my case is a bit more complex and my mistake I wanted to provide a simple example.
The actual case is, I have DataStream< ServerAwareMessage > inputStream as a source , Message is just an interface. The events can be of certain subtypes 1) class OrderRequest implements ServerAwareMessage { String symbol String orderType } 2) class OrderActivated implements ServerAwareMessage { String symbol String orderType long orderId } 3) class DealPerformed implements ServerAwareMessage { String symbol String orderType } 4) class OrderFilled implements ServerAwareMessage { String symbol String orderType long orderId } And here is the pattern sequence. Pattern.<ServerAwareMessage>begin(OPEN, AfterMatchSkipStrategy.skipToNext()) .where(new SimpleCondition <ServerAwareMessage>() { @Override public boolean filter(ServerAwareMessage value) throws Exception { return value instanceof OrderRequest } ) .followedBy("OrderActivated ") .where(new IterativeCondition<ServerAwareMessage>() { @Override public boolean filter(ServerAwareMessage value, Context<ServerAwareMessage> ctx) throws Exception { if(value.getMessage() instanceof OrderActivated ) { var msg = ( OrderActivated ) value.getMessage(); var list = StreamSupport.stream(ctx.getEventsForPattern(OPEN).spliterator(), false) .filter(i -> i.getMessage() instanceof OrderRequest ) .collect(Collectors.toList()); return list.stream().allMatch(i -> (( OrderRequest )i.getMessage()).getSymbol().equals(msg.getSymbol()) && (( OrderRequest)i.getMessage()).getOrderType().equals(msg.getOrderType())); } return false; } }) .followedBy("DealPerformed") .where(new IterativeCondition<ServerAwareMessage>() { @Override public boolean filter(ServerAwareMessage value, Context<ServerAwareMessage> ctx) throws Exception { if (value.getMessage() instanceof DealPerformed ) { var order = ( DealPerformed ) value.getMessage(); var list = StreamSupport.stream(ctx.getEventsForPattern(" OrderActivated ").spliterator(), false) .filter(i -> i.getMessage() instanceof OrderPerformed) .collect(Collectors.toList()); return list.stream().allMatch(i -> (( OrderActivated )i.getMessage()).getSymbol().equals(msg.getSymbol()) && (( OrderActivated )i.getMessage()).getOrderType().equals(msg.getOrderType())); } return false; } }) .followedBy("OrdeFilled") .where(new IterativeCondition<ServerAwareMessage>() { @Override public boolean filter(ServerAwareMessage value, Context<ServerAwareMessage> ctx) throws Exception { if (value.getMessage() instanceof OrderFilled ) { var order = ( OrderFilled ) value.getMessage(); var list = StreamSupport.stream(ctx.getEventsForPattern( "DealPerformed" ).spliterator(), false) .filter(i -> i.getMessage() instanceof OrderActivationRequestNewDue) .collect(Collectors.toList()); return list.stream().allMatch(i -> (( DealPerformed )i.getMessage()).getSymbol().equals(order.getSymbol()) && (( DealPerformed )i.getMessage()).getOrderType().equals(order.getOrderType()); } return false; } }) In this case I can not group by unfortunately. so I may a receive a packet { OrderRequest(1), OrderActivated (1) , OrderRequest (2), DealPerformed(1) , OrderActivated(2), OrderRequest(3), DealPerformed(2), OrderFilled(1), OrderFilled(2), OrderActivated(3)} and etc. For me it is crucial to match all the event sequence (1) (2), etc. and there is a case where the sequence of the Messages is incomplete , that means that an event does not get inserted into the pattern. The above pattern sequence unfortunately does not work properly. Any suggestions? BR, Isidoros Στις Πέμ, 4 Νοε 2021 στις 11:27 μ.μ., ο/η Isidoros Ioannou < akis3...@gmail.com> έγραψε: > I am using Processing time characteristic. > > DataStream<Model> inputStream = env.fromElements( > Model.of(1, "A", "US"), > Model.of(2, "B", "US"), > Model.of(3, "C", "US"), > Model.of(4, "A", "AU"), > Model.of(5, "B", "AU"), > Model.of(6, "C", "AU"), > //Model.of(7, "D", "US"), > Model.of(8, "D", "AU"), > Model.of(9, "A", "GB"), > Model.of(10, "B", "GB"), > Model.of(13, "D", "GB"), > Model.of(11, "C", "GB"), > Model.of(12, "D", "GB") > > in the above inputStream the Model.of(7, "D", "US") is not supplied to the > pattern sequence. Nothing special at all > with this but I wanted to simulate the case that an event that fulfills the > pattern sequence might be missing > which it happens in my case. > > BR, > Isidoros > > > > > Στις Πέμ, 4 Νοε 2021 στις 11:01 μ.μ., ο/η Austin Cawley-Edwards < > austin.caw...@gmail.com> έγραψε: > >> Thanks for the update, the requirements make sense. >> >> Some follow up questions: >> * What time characteristic are you using? Processing or Event? >> * Can you describe a bit more what you mean by "input like the one I have >> commented bellow"? What is special about the one you have commented? >> >> Best, >> Austin >> >> On Thu, Nov 4, 2021 at 4:09 PM Isidoros Ioannou <akis3...@gmail.com> >> wrote: >> >>> >>> >>> ---------- Forwarded message --------- >>> Από: Isidoros Ioannou <akis3...@gmail.com> >>> Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ. >>> Subject: Re: IterativeCondition instead of SimpleCondition not matching >>> pattern >>> To: Austin Cawley-Edwards <austin.caw...@gmail.com> >>> >>> >>> Hi Austin, >>> thank you for your answer and I really appreciate your willingness to >>> help. >>> >>> Actually the desired output is the one below >>> >>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, >>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], >>> fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A', >>> symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}], >>> third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D', >>> symbol='GB'}]} >>> I would like only to generate sequences of Models that have the same >>> symbol. I noticed that if an event does not come as input >>> like the one I have commented bellow, it breaks all the pattern match >>> and the desired output is never produced >>> >>> DataStream<Model> inputStream = env.fromElements( >>> Model.of(1, "A", "US"), >>> Model.of(2, "B", "US"), >>> Model.of(3, "C", "US"), >>> Model.of(4, "A", "AU"), >>> Model.of(5, "B", "AU"), >>> Model.of(6, "C", "AU"), >>> //Model.of(7, "D", "US"), >>> Model.of(8, "D", "AU"), >>> Model.of(9, "A", "GB"), >>> Model.of(10, "B", "GB"), >>> Model.of(13, "D", "GB"), >>> Model.of(11, "C", "GB"), >>> Model.of(12, "D", "GB") >>> >>> Kind Regards, >>> Isidoros >>> >>> >>> Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards < >>> austin.caw...@gmail.com> έγραψε: >>> >>>> Hi Isidoros, >>>> >>>> Thanks for reaching out to the mailing list. I haven't worked with the >>>> CEP library in a long time but can try to help. I'm having a little trouble >>>> understanding the desired output + rules. Can you mock up the desired >>>> output like you have for the fulfilled pattern sequence? >>>> >>>> Best, >>>> Austin >>>> >>>> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou <akis3...@gmail.com> >>>> wrote: >>>> >>>>> >>>>> I face an issue when try to match some elements in a Pattern sequence. >>>>> Flink 1.11.1 version. Here is my case: >>>>> >>>>> final StreamExecutionEnvironment env = >>>>> EnvironmentProvider.getEnvironment(); >>>>> DataStream<Model> inputStream = env.fromElements( >>>>> Model.of(1, "A", "US"), >>>>> Model.of(2, "B", "US"), >>>>> Model.of(3, "C", "US"), >>>>> Model.of(4, "A", "AU"), >>>>> Model.of(5, "B", "AU"), >>>>> Model.of(6, "C", "AU"), >>>>> //Model.of(7, "D"), >>>>> Model.of(8, "D", "AU"), >>>>> Model.of(9, "A", "GB"), >>>>> Model.of(10, "B", "GB"), >>>>> Model.of(13, "D", "GB"), >>>>> Model.of(11, "C", "GB"), >>>>> Model.of(12, "D", "GB") >>>>> >>>>> >>>>> >>>>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()) >>>>> .forceNonParallel(); >>>>> >>>>> Pattern<Model, Model> pattern = Pattern.<Model>begin("start", >>>>> AfterMatchSkipStrategy.skipToNext()) >>>>> .where(new IterativeCondition<Model>() { >>>>> @Override >>>>> public boolean filter(Model value, Context<Model> ctx) >>>>> throws Exception { >>>>> return value.getText().equalsIgnoreCase("A"); >>>>> } >>>>> }).followedBy("second") >>>>> .where(new IterativeCondition<Model>() { >>>>> @Override >>>>> public boolean filter(Model value, Context<Model> ctx) >>>>> throws Exception { >>>>> >>>>> return value.getText().equalsIgnoreCase("B"); >>>>> } >>>>> }).followedBy("third") >>>>> .where(new IterativeCondition<Model>() { >>>>> @Override >>>>> public boolean filter(Model value, Context<Model> ctx) >>>>> throws Exception { >>>>> >>>>> return value.getText().equalsIgnoreCase("C"); >>>>> } >>>>> }).followedBy("fourth") >>>>> .where(new IterativeCondition<Model>() { >>>>> @Override >>>>> public boolean filter(Model value, Context<Model> ctx) >>>>> throws Exception { >>>>> var list = >>>>> StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(), >>>>> false).collect(Collectors.toList()); >>>>> var val = >>>>> list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol()); >>>>> return value.getText().equalsIgnoreCase("D") && val; >>>>> } >>>>> }); >>>>> >>>>> >>>>> PatternStream<Model> marketOpenPatternStream = >>>>> CEP.<Model>pattern(inputStream, pattern); >>>>> >>>>> SingleOutputStreamOperator<List<Model>> marketOpenOutput = >>>>> marketOpenPatternStream >>>>> .process(new PatternProcessFunction<Model, >>>>> List<Model>>() { >>>>> @Override >>>>> public void processMatch(Map<String, List<Model>> >>>>> match, Context ctx, Collector<List<Model>> out) throws Exception { >>>>> System.out.println(match); >>>>> out.collect(new ArrayList(match.values())); >>>>> } >>>>> }) >>>>> >>>>> What I am trying to succeed is to match only patterns that have the same >>>>> symbol. If I use SimpleCondition with checks only about the text of the >>>>> Model(A, B,C..) without the symbol check in the last pattern, the pattern >>>>> sequence is fulfilled and I get the following output: >>>>> >>>>> {start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, text='B', >>>>> symbol='US'}], third=[Model{id=3, text='C', symbol='US'}], >>>>> fourth=[Model{id=8,text='D',symbol='AU'}]} >>>>> >>>>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, >>>>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], >>>>> fourth=[Model{id=8, text='D', symbol='AU'}]} >>>>> >>>>> {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, >>>>> text='B', symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], >>>>> fourth=[Model{id=12, text='D', symbol='GB'}]} >>>>> >>>>> However I want to avoid the match of elements with id= 1(A),2(B),3(C) >>>>> with the element with id = 8(D). For this reason I put the symbol check >>>>> with the event matched in the previous pattern in the last condition so I >>>>> dont get match since they do not have the same symbol. But after applying >>>>> the condition, now I do not get any output. none of the elements match the >>>>> pattern. What I am missing? Could someone help? >>>>> >>>>>