Why not? All those classes have a Symbol attribute, why can't you use that to key the stream?
On Thu, Nov 4, 2021 at 5:51 PM Isidoros Ioannou <akis3...@gmail.com> wrote: > Hi Seth, > > thank you for your answer. > In this case you are right and it would solve my problem. but actually my > case is a bit more complex and my mistake I wanted to provide a simple > example. > > The actual case is, > > I have DataStream< ServerAwareMessage > inputStream as a source , > Message is just an interface. The events can be of certain subtypes > > 1) class OrderRequest implements ServerAwareMessage { > String symbol > String orderType > } > > 2) class OrderActivated implements ServerAwareMessage { > String symbol > String orderType > long orderId > } > > 3) class DealPerformed implements ServerAwareMessage { > String symbol > String orderType > } > > 4) class OrderFilled implements ServerAwareMessage { > String symbol > String orderType > long orderId > } > > And here is the pattern sequence. > > Pattern.<ServerAwareMessage>begin(OPEN, > AfterMatchSkipStrategy.skipToNext()) > .where(new SimpleCondition <ServerAwareMessage>() { > @Override > public boolean filter(ServerAwareMessage value) throws > Exception { > return value instanceof OrderRequest > } ) > .followedBy("OrderActivated ") > .where(new IterativeCondition<ServerAwareMessage>() { > @Override > public boolean filter(ServerAwareMessage value, > Context<ServerAwareMessage> ctx) throws Exception { > if(value.getMessage() instanceof OrderActivated > ) { > var msg = ( OrderActivated ) > value.getMessage(); > var list = > StreamSupport.stream(ctx.getEventsForPattern(OPEN).spliterator(), false) > .filter(i -> i.getMessage() instanceof > OrderRequest ) > .collect(Collectors.toList()); > return list.stream().allMatch(i -> (( > OrderRequest )i.getMessage()).getSymbol().equals(msg.getSymbol()) && > (( > OrderRequest)i.getMessage()).getOrderType().equals(msg.getOrderType())); > > } > return false; > } > }) > .followedBy("DealPerformed") > .where(new IterativeCondition<ServerAwareMessage>() { > @Override > public boolean filter(ServerAwareMessage value, > Context<ServerAwareMessage> ctx) throws Exception { > if (value.getMessage() instanceof DealPerformed > ) { > var order = ( DealPerformed ) > value.getMessage(); > var list = > StreamSupport.stream(ctx.getEventsForPattern(" OrderActivated > ").spliterator(), false) > .filter(i -> i.getMessage() instanceof > OrderPerformed) > .collect(Collectors.toList()); > > return list.stream().allMatch(i -> (( > OrderActivated )i.getMessage()).getSymbol().equals(msg.getSymbol()) && > (( OrderActivated > )i.getMessage()).getOrderType().equals(msg.getOrderType())); > > } > return false; > } > }) > .followedBy("OrdeFilled") > .where(new IterativeCondition<ServerAwareMessage>() { > @Override > public boolean filter(ServerAwareMessage value, > Context<ServerAwareMessage> ctx) throws Exception { > if (value.getMessage() instanceof OrderFilled ) { > var order = ( OrderFilled ) > value.getMessage(); > var list = > StreamSupport.stream(ctx.getEventsForPattern( "DealPerformed" > ).spliterator(), false) > .filter(i -> i.getMessage() instanceof > OrderActivationRequestNewDue) > .collect(Collectors.toList()); > > return list.stream().allMatch(i -> (( > DealPerformed )i.getMessage()).getSymbol().equals(order.getSymbol()) && > (( DealPerformed > )i.getMessage()).getOrderType().equals(order.getOrderType()); > > } > return false; > } > }) > > In this case I can not group by unfortunately. so I may a receive a > packet { OrderRequest(1), OrderActivated (1) , OrderRequest (2), > DealPerformed(1) , OrderActivated(2), OrderRequest(3), DealPerformed(2), > OrderFilled(1), OrderFilled(2), OrderActivated(3)} and etc. > For me it is crucial to match all the event sequence (1) (2), etc. and > there is a case where the sequence of the Messages is incomplete , that > means that an event does not get inserted into the pattern. > The above pattern sequence unfortunately does not work properly. Any > suggestions? > > BR, > Isidoros > > Στις Πέμ, 4 Νοε 2021 στις 11:27 μ.μ., ο/η Isidoros Ioannou < > akis3...@gmail.com> έγραψε: > >> I am using Processing time characteristic. >> >> DataStream<Model> inputStream = env.fromElements( >> Model.of(1, "A", "US"), >> Model.of(2, "B", "US"), >> Model.of(3, "C", "US"), >> Model.of(4, "A", "AU"), >> Model.of(5, "B", "AU"), >> Model.of(6, "C", "AU"), >> //Model.of(7, "D", "US"), >> Model.of(8, "D", "AU"), >> Model.of(9, "A", "GB"), >> Model.of(10, "B", "GB"), >> Model.of(13, "D", "GB"), >> Model.of(11, "C", "GB"), >> Model.of(12, "D", "GB") >> >> in the above inputStream the Model.of(7, "D", "US") is not supplied to the >> pattern sequence. Nothing special at all >> with this but I wanted to simulate the case that an event that fulfills the >> pattern sequence might be missing >> which it happens in my case. >> >> BR, >> Isidoros >> >> >> >> >> Στις Πέμ, 4 Νοε 2021 στις 11:01 μ.μ., ο/η Austin Cawley-Edwards < >> austin.caw...@gmail.com> έγραψε: >> >>> Thanks for the update, the requirements make sense. >>> >>> Some follow up questions: >>> * What time characteristic are you using? Processing or Event? >>> * Can you describe a bit more what you mean by "input like the one I >>> have commented bellow"? What is special about the one you have commented? >>> >>> Best, >>> Austin >>> >>> On Thu, Nov 4, 2021 at 4:09 PM Isidoros Ioannou <akis3...@gmail.com> >>> wrote: >>> >>>> >>>> >>>> ---------- Forwarded message --------- >>>> Από: Isidoros Ioannou <akis3...@gmail.com> >>>> Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ. >>>> Subject: Re: IterativeCondition instead of SimpleCondition not matching >>>> pattern >>>> To: Austin Cawley-Edwards <austin.caw...@gmail.com> >>>> >>>> >>>> Hi Austin, >>>> thank you for your answer and I really appreciate your willingness to >>>> help. >>>> >>>> Actually the desired output is the one below >>>> >>>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, >>>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], >>>> fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A', >>>> symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}], >>>> third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D', >>>> symbol='GB'}]} >>>> I would like only to generate sequences of Models that have the same >>>> symbol. I noticed that if an event does not come as input >>>> like the one I have commented bellow, it breaks all the pattern match >>>> and the desired output is never produced >>>> >>>> DataStream<Model> inputStream = env.fromElements( >>>> Model.of(1, "A", "US"), >>>> Model.of(2, "B", "US"), >>>> Model.of(3, "C", "US"), >>>> Model.of(4, "A", "AU"), >>>> Model.of(5, "B", "AU"), >>>> Model.of(6, "C", "AU"), >>>> //Model.of(7, "D", "US"), >>>> Model.of(8, "D", "AU"), >>>> Model.of(9, "A", "GB"), >>>> Model.of(10, "B", "GB"), >>>> Model.of(13, "D", "GB"), >>>> Model.of(11, "C", "GB"), >>>> Model.of(12, "D", "GB") >>>> >>>> Kind Regards, >>>> Isidoros >>>> >>>> >>>> Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards < >>>> austin.caw...@gmail.com> έγραψε: >>>> >>>>> Hi Isidoros, >>>>> >>>>> Thanks for reaching out to the mailing list. I haven't worked with the >>>>> CEP library in a long time but can try to help. I'm having a little >>>>> trouble >>>>> understanding the desired output + rules. Can you mock up the desired >>>>> output like you have for the fulfilled pattern sequence? >>>>> >>>>> Best, >>>>> Austin >>>>> >>>>> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou <akis3...@gmail.com> >>>>> wrote: >>>>> >>>>>> >>>>>> I face an issue when try to match some elements in a Pattern >>>>>> sequence. Flink 1.11.1 version. Here is my case: >>>>>> >>>>>> final StreamExecutionEnvironment env = >>>>>> EnvironmentProvider.getEnvironment(); >>>>>> DataStream<Model> inputStream = env.fromElements( >>>>>> Model.of(1, "A", "US"), >>>>>> Model.of(2, "B", "US"), >>>>>> Model.of(3, "C", "US"), >>>>>> Model.of(4, "A", "AU"), >>>>>> Model.of(5, "B", "AU"), >>>>>> Model.of(6, "C", "AU"), >>>>>> //Model.of(7, "D"), >>>>>> Model.of(8, "D", "AU"), >>>>>> Model.of(9, "A", "GB"), >>>>>> Model.of(10, "B", "GB"), >>>>>> Model.of(13, "D", "GB"), >>>>>> Model.of(11, "C", "GB"), >>>>>> Model.of(12, "D", "GB") >>>>>> >>>>>> >>>>>> >>>>>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()) >>>>>> .forceNonParallel(); >>>>>> >>>>>> Pattern<Model, Model> pattern = Pattern.<Model>begin("start", >>>>>> AfterMatchSkipStrategy.skipToNext()) >>>>>> .where(new IterativeCondition<Model>() { >>>>>> @Override >>>>>> public boolean filter(Model value, Context<Model> ctx) >>>>>> throws Exception { >>>>>> return value.getText().equalsIgnoreCase("A"); >>>>>> } >>>>>> }).followedBy("second") >>>>>> .where(new IterativeCondition<Model>() { >>>>>> @Override >>>>>> public boolean filter(Model value, Context<Model> ctx) >>>>>> throws Exception { >>>>>> >>>>>> return value.getText().equalsIgnoreCase("B"); >>>>>> } >>>>>> }).followedBy("third") >>>>>> .where(new IterativeCondition<Model>() { >>>>>> @Override >>>>>> public boolean filter(Model value, Context<Model> ctx) >>>>>> throws Exception { >>>>>> >>>>>> return value.getText().equalsIgnoreCase("C"); >>>>>> } >>>>>> }).followedBy("fourth") >>>>>> .where(new IterativeCondition<Model>() { >>>>>> @Override >>>>>> public boolean filter(Model value, Context<Model> ctx) >>>>>> throws Exception { >>>>>> var list = >>>>>> StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(), >>>>>> false).collect(Collectors.toList()); >>>>>> var val = >>>>>> list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol()); >>>>>> return value.getText().equalsIgnoreCase("D") && val; >>>>>> } >>>>>> }); >>>>>> >>>>>> >>>>>> PatternStream<Model> marketOpenPatternStream = >>>>>> CEP.<Model>pattern(inputStream, pattern); >>>>>> >>>>>> SingleOutputStreamOperator<List<Model>> marketOpenOutput = >>>>>> marketOpenPatternStream >>>>>> .process(new PatternProcessFunction<Model, >>>>>> List<Model>>() { >>>>>> @Override >>>>>> public void processMatch(Map<String, >>>>>> List<Model>> match, Context ctx, Collector<List<Model>> out) throws >>>>>> Exception { >>>>>> System.out.println(match); >>>>>> out.collect(new ArrayList(match.values())); >>>>>> } >>>>>> }) >>>>>> >>>>>> What I am trying to succeed is to match only patterns that have the same >>>>>> symbol. If I use SimpleCondition with checks only about the text of the >>>>>> Model(A, B,C..) without the symbol check in the last pattern, the >>>>>> pattern sequence is fulfilled and I get the following output: >>>>>> >>>>>> {start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, >>>>>> text='B', >>>>>> symbol='US'}], third=[Model{id=3, text='C', symbol='US'}], >>>>>> fourth=[Model{id=8,text='D',symbol='AU'}]} >>>>>> >>>>>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, >>>>>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], >>>>>> fourth=[Model{id=8, text='D', symbol='AU'}]} >>>>>> >>>>>> {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, >>>>>> text='B', symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], >>>>>> fourth=[Model{id=12, text='D', symbol='GB'}]} >>>>>> >>>>>> However I want to avoid the match of elements with id= 1(A),2(B),3(C) >>>>>> with the element with id = 8(D). For this reason I put the symbol check >>>>>> with the event matched in the previous pattern in the last condition so I >>>>>> dont get match since they do not have the same symbol. But after applying >>>>>> the condition, now I do not get any output. none of the elements match >>>>>> the >>>>>> pattern. What I am missing? Could someone help? >>>>>> >>>>>>