Why not?

All those classes have a Symbol attribute, why can't you use that to key
the stream?

On Thu, Nov 4, 2021 at 5:51 PM Isidoros Ioannou <akis3...@gmail.com> wrote:

> Hi Seth,
>
> thank you for your answer.
> In this case you are right and it would solve my problem. but actually my
> case is a bit more complex and my mistake I wanted to provide a simple
> example.
>
> The actual case is,
>
> I have DataStream< ServerAwareMessage  > inputStream as a source ,
> Message is just an interface. The events can be of certain subtypes
>
> 1) class OrderRequest implements  ServerAwareMessage   {
>        String symbol
>        String orderType
> }
>
> 2) class OrderActivated implements  ServerAwareMessage   {
>        String symbol
>        String orderType
>        long orderId
> }
>
> 3) class DealPerformed implements  ServerAwareMessage   {
>        String symbol
>        String orderType
> }
>
> 4) class OrderFilled implements  ServerAwareMessage   {
>        String symbol
>        String orderType
>        long orderId
> }
>
> And here is the pattern sequence.
>
> Pattern.<ServerAwareMessage>begin(OPEN,
> AfterMatchSkipStrategy.skipToNext())
>                 .where(new SimpleCondition <ServerAwareMessage>() {
>                     @Override
>                     public boolean filter(ServerAwareMessage value) throws
> Exception {
>                          return value instanceof OrderRequest
>                      }  )
>                 .followedBy("OrderActivated ")
>                 .where(new IterativeCondition<ServerAwareMessage>() {
>                     @Override
>                     public boolean filter(ServerAwareMessage value,
> Context<ServerAwareMessage> ctx) throws Exception {
>                         if(value.getMessage() instanceof  OrderActivated
> ) {
>                             var msg = ( OrderActivated )
> value.getMessage();
>                             var list =
> StreamSupport.stream(ctx.getEventsForPattern(OPEN).spliterator(), false)
>                                 .filter(i -> i.getMessage() instanceof
> OrderRequest  )
>                                 .collect(Collectors.toList());
>                            return  list.stream().allMatch(i -> ((
> OrderRequest  )i.getMessage()).getSymbol().equals(msg.getSymbol()) &&
>                                 ((
> OrderRequest)i.getMessage()).getOrderType().equals(msg.getOrderType()));
>
>                         }
>                         return false;
>                     }
>                 })
>                 .followedBy("DealPerformed")
>                 .where(new IterativeCondition<ServerAwareMessage>() {
>                     @Override
>                     public boolean filter(ServerAwareMessage value,
> Context<ServerAwareMessage> ctx) throws Exception {
>                         if (value.getMessage() instanceof  DealPerformed
> ) {
>                             var order = ( DealPerformed  )
> value.getMessage();
>                             var list =
> StreamSupport.stream(ctx.getEventsForPattern(" OrderActivated
> ").spliterator(), false)
>                                 .filter(i -> i.getMessage() instanceof
> OrderPerformed)
>                                 .collect(Collectors.toList());
>
>                             return list.stream().allMatch(i -> ((
> OrderActivated   )i.getMessage()).getSymbol().equals(msg.getSymbol()) &&
>                                 ((  OrderActivated
> )i.getMessage()).getOrderType().equals(msg.getOrderType()));
>
>                         }
>                         return false;
>                     }
>                 })
>                 .followedBy("OrdeFilled")
>                 .where(new IterativeCondition<ServerAwareMessage>() {
>                     @Override
>                     public boolean filter(ServerAwareMessage value,
> Context<ServerAwareMessage> ctx) throws Exception {
>                         if (value.getMessage() instanceof  OrderFilled  ) {
>                             var order = ( OrderFilled  )
> value.getMessage();
>                             var list =
> StreamSupport.stream(ctx.getEventsForPattern( "DealPerformed"
> ).spliterator(), false)
>                                 .filter(i -> i.getMessage() instanceof
> OrderActivationRequestNewDue)
>                                 .collect(Collectors.toList());
>
>                             return list.stream().allMatch(i -> ((
> DealPerformed  )i.getMessage()).getSymbol().equals(order.getSymbol()) &&
>                                 (( DealPerformed
> )i.getMessage()).getOrderType().equals(order.getOrderType());
>
>                         }
>                         return false;
>                     }
>                 })
>
> In this case I can not group by unfortunately. so I may a receive a
> packet  { OrderRequest(1), OrderActivated (1) , OrderRequest (2),
> DealPerformed(1) , OrderActivated(2), OrderRequest(3), DealPerformed(2),
> OrderFilled(1), OrderFilled(2), OrderActivated(3)} and etc.
> For me it is crucial to match all the event sequence (1) (2), etc. and
> there is a case where the sequence of the Messages is incomplete , that
> means that an event does not get inserted into the pattern.
> The above pattern sequence unfortunately does not work properly. Any
> suggestions?
>
> BR,
> Isidoros
>
> Στις Πέμ, 4 Νοε 2021 στις 11:27 μ.μ., ο/η Isidoros Ioannou <
> akis3...@gmail.com> έγραψε:
>
>> I am using Processing time characteristic.
>>
>> DataStream<Model> inputStream = env.fromElements(
>>             Model.of(1, "A", "US"),
>>             Model.of(2, "B", "US"),
>>             Model.of(3, "C", "US"),
>>             Model.of(4, "A", "AU"),
>>             Model.of(5, "B", "AU"),
>>             Model.of(6, "C", "AU"),
>>           //Model.of(7, "D", "US"),
>>             Model.of(8, "D", "AU"),
>>             Model.of(9, "A", "GB"),
>>             Model.of(10, "B", "GB"),
>>             Model.of(13, "D", "GB"),
>>             Model.of(11, "C", "GB"),
>>             Model.of(12, "D", "GB")
>>
>> in the above inputStream the Model.of(7, "D", "US") is not supplied to the 
>> pattern sequence. Nothing special at all
>> with this but I wanted to simulate the case that an event that fulfills the 
>> pattern sequence might be missing
>> which it happens in my case.
>>
>> BR,
>> Isidoros
>>
>>
>>
>>
>> Στις Πέμ, 4 Νοε 2021 στις 11:01 μ.μ., ο/η Austin Cawley-Edwards <
>> austin.caw...@gmail.com> έγραψε:
>>
>>> Thanks for the update, the requirements make sense.
>>>
>>> Some follow up questions:
>>> * What time characteristic are you using? Processing or Event?
>>> * Can you describe a bit more what you mean by "input like the one I
>>> have commented bellow"? What is special about the one you have commented?
>>>
>>> Best,
>>> Austin
>>>
>>> On Thu, Nov 4, 2021 at 4:09 PM Isidoros Ioannou <akis3...@gmail.com>
>>> wrote:
>>>
>>>>
>>>>
>>>> ---------- Forwarded message ---------
>>>> Από: Isidoros Ioannou <akis3...@gmail.com>
>>>> Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ.
>>>> Subject: Re: IterativeCondition instead of SimpleCondition not matching
>>>> pattern
>>>> To: Austin Cawley-Edwards <austin.caw...@gmail.com>
>>>>
>>>>
>>>> Hi Austin,
>>>> thank you for your answer and I really appreciate your willingness to
>>>> help.
>>>>
>>>> Actually the desired output is the one below
>>>>
>>>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5,
>>>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}],
>>>> fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A',
>>>> symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}],
>>>> third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D',
>>>> symbol='GB'}]}
>>>> I would like only to generate sequences of Models that have the same
>>>> symbol. I noticed that if an event does not come as input
>>>> like the one I have commented bellow, it breaks all the pattern match
>>>> and the desired output is never produced
>>>>
>>>> DataStream<Model> inputStream = env.fromElements(
>>>>             Model.of(1, "A", "US"),
>>>>             Model.of(2, "B", "US"),
>>>>             Model.of(3, "C", "US"),
>>>>             Model.of(4, "A", "AU"),
>>>>             Model.of(5, "B", "AU"),
>>>>             Model.of(6, "C", "AU"),
>>>>           //Model.of(7, "D", "US"),
>>>>             Model.of(8, "D", "AU"),
>>>>             Model.of(9, "A", "GB"),
>>>>             Model.of(10, "B", "GB"),
>>>>             Model.of(13, "D", "GB"),
>>>>             Model.of(11, "C", "GB"),
>>>>             Model.of(12, "D", "GB")
>>>>
>>>> Kind Regards,
>>>> Isidoros
>>>>
>>>>
>>>> Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards <
>>>> austin.caw...@gmail.com> έγραψε:
>>>>
>>>>> Hi Isidoros,
>>>>>
>>>>> Thanks for reaching out to the mailing list. I haven't worked with the
>>>>> CEP library in a long time but can try to help. I'm having a little 
>>>>> trouble
>>>>> understanding the desired output + rules. Can you mock up the desired
>>>>> output like you have for the fulfilled pattern sequence?
>>>>>
>>>>> Best,
>>>>> Austin
>>>>>
>>>>> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou <akis3...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> I face an issue when try to match some elements in a Pattern
>>>>>> sequence. Flink 1.11.1 version. Here is my case:
>>>>>>
>>>>>> final StreamExecutionEnvironment env = 
>>>>>> EnvironmentProvider.getEnvironment();
>>>>>> DataStream<Model> inputStream = env.fromElements(
>>>>>>             Model.of(1, "A", "US"),
>>>>>>             Model.of(2, "B", "US"),
>>>>>>             Model.of(3, "C", "US"),
>>>>>>             Model.of(4, "A", "AU"),
>>>>>>             Model.of(5, "B", "AU"),
>>>>>>             Model.of(6, "C", "AU"),
>>>>>>           //Model.of(7, "D"),
>>>>>>             Model.of(8, "D", "AU"),
>>>>>>             Model.of(9, "A", "GB"),
>>>>>>             Model.of(10, "B", "GB"),
>>>>>>             Model.of(13, "D", "GB"),
>>>>>>             Model.of(11, "C", "GB"),
>>>>>>             Model.of(12, "D", "GB")
>>>>>>
>>>>>>
>>>>>>     
>>>>>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
>>>>>>             .forceNonParallel();
>>>>>>
>>>>>>     Pattern<Model, Model> pattern = Pattern.<Model>begin("start", 
>>>>>> AfterMatchSkipStrategy.skipToNext())
>>>>>>             .where(new IterativeCondition<Model>() {
>>>>>>                 @Override
>>>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>>>> throws Exception {
>>>>>>                     return value.getText().equalsIgnoreCase("A");
>>>>>>                 }
>>>>>>             }).followedBy("second")
>>>>>>             .where(new IterativeCondition<Model>() {
>>>>>>                 @Override
>>>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>>>> throws Exception {
>>>>>>
>>>>>>                     return value.getText().equalsIgnoreCase("B");
>>>>>>                 }
>>>>>>             }).followedBy("third")
>>>>>>             .where(new IterativeCondition<Model>() {
>>>>>>                 @Override
>>>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>>>> throws Exception {
>>>>>>
>>>>>>                     return value.getText().equalsIgnoreCase("C");
>>>>>>                 }
>>>>>>             }).followedBy("fourth")
>>>>>>             .where(new IterativeCondition<Model>() {
>>>>>>                 @Override
>>>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>>>> throws Exception {
>>>>>>                     var  list = 
>>>>>> StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(), 
>>>>>> false).collect(Collectors.toList());
>>>>>>                     var val = 
>>>>>> list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol());
>>>>>>                     return value.getText().equalsIgnoreCase("D") && val;
>>>>>>                 }
>>>>>>             });
>>>>>>
>>>>>>
>>>>>>             PatternStream<Model> marketOpenPatternStream = 
>>>>>> CEP.<Model>pattern(inputStream, pattern);
>>>>>>
>>>>>>              SingleOutputStreamOperator<List<Model>> marketOpenOutput =
>>>>>>             marketOpenPatternStream
>>>>>>                     .process(new PatternProcessFunction<Model, 
>>>>>> List<Model>>() {
>>>>>>                         @Override
>>>>>>                         public void processMatch(Map<String, 
>>>>>> List<Model>> match, Context ctx, Collector<List<Model>> out) throws 
>>>>>> Exception {
>>>>>>                             System.out.println(match);
>>>>>>                             out.collect(new ArrayList(match.values()));
>>>>>>                         }
>>>>>>                     })
>>>>>>
>>>>>> What I am trying to succeed is to match only patterns that have the same 
>>>>>> symbol. If I use SimpleCondition with checks only about the text of the 
>>>>>> Model(A, B,C..) without the symbol check in the last pattern, the 
>>>>>> pattern sequence is fulfilled and I get the following output:
>>>>>>
>>>>>> {start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, 
>>>>>> text='B',
>>>>>> symbol='US'}], third=[Model{id=3, text='C', symbol='US'}], 
>>>>>> fourth=[Model{id=8,text='D',symbol='AU'}]}
>>>>>>
>>>>>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, 
>>>>>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], 
>>>>>> fourth=[Model{id=8, text='D', symbol='AU'}]}
>>>>>>
>>>>>> {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, 
>>>>>> text='B', symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], 
>>>>>> fourth=[Model{id=12, text='D', symbol='GB'}]}
>>>>>>
>>>>>> However I want to avoid the match of elements with id= 1(A),2(B),3(C)
>>>>>> with the element with id = 8(D). For this reason I put the symbol check
>>>>>> with the event matched in the previous pattern in the last condition so I
>>>>>> dont get match since they do not have the same symbol. But after applying
>>>>>> the condition, now I do not get any output. none of the elements match 
>>>>>> the
>>>>>> pattern. What I am missing? Could someone help?
>>>>>>
>>>>>>

Reply via email to