Hi Seth,

thank you for your answer.
In this case you are right and it would solve my problem. but actually my
case is a bit more complex and my mistake I wanted to provide a simple
example.

The actual case is,

I have DataStream< ServerAwareMessage  > inputStream as a source ,
Message is just an interface. The events can be of certain subtypes

1) class OrderRequest implements  ServerAwareMessage   {
       String symbol
       String orderType
}

2) class OrderActivated implements  ServerAwareMessage   {
       String symbol
       String orderType
       long orderId
}

3) class DealPerformed implements  ServerAwareMessage   {
       String symbol
       String orderType
}

4) class OrderFilled implements  ServerAwareMessage   {
       String symbol
       String orderType
       long orderId
}

And here is the pattern sequence.

Pattern.<ServerAwareMessage>begin(OPEN, AfterMatchSkipStrategy.skipToNext())
                .where(new SimpleCondition <ServerAwareMessage>() {
                    @Override
                    public boolean filter(ServerAwareMessage value) throws
Exception {
                         return value instanceof OrderRequest
                     }  )
                .followedBy("OrderActivated ")
                .where(new IterativeCondition<ServerAwareMessage>() {
                    @Override
                    public boolean filter(ServerAwareMessage value,
Context<ServerAwareMessage> ctx) throws Exception {
                        if(value.getMessage() instanceof  OrderActivated  )
{
                            var msg = ( OrderActivated ) value.getMessage();
                            var list =
StreamSupport.stream(ctx.getEventsForPattern(OPEN).spliterator(), false)
                                .filter(i -> i.getMessage() instanceof
OrderRequest  )
                                .collect(Collectors.toList());
                           return  list.stream().allMatch(i -> ((
OrderRequest  )i.getMessage()).getSymbol().equals(msg.getSymbol()) &&
                                ((
OrderRequest)i.getMessage()).getOrderType().equals(msg.getOrderType()));

                        }
                        return false;
                    }
                })
                .followedBy("DealPerformed")
                .where(new IterativeCondition<ServerAwareMessage>() {
                    @Override
                    public boolean filter(ServerAwareMessage value,
Context<ServerAwareMessage> ctx) throws Exception {
                        if (value.getMessage() instanceof  DealPerformed  )
{
                            var order = ( DealPerformed  )
value.getMessage();
                            var list =
StreamSupport.stream(ctx.getEventsForPattern(" OrderActivated
").spliterator(), false)
                                .filter(i -> i.getMessage() instanceof
OrderPerformed)
                                .collect(Collectors.toList());

                            return list.stream().allMatch(i -> ((
OrderActivated   )i.getMessage()).getSymbol().equals(msg.getSymbol()) &&
                                ((  OrderActivated
)i.getMessage()).getOrderType().equals(msg.getOrderType()));

                        }
                        return false;
                    }
                })
                .followedBy("OrdeFilled")
                .where(new IterativeCondition<ServerAwareMessage>() {
                    @Override
                    public boolean filter(ServerAwareMessage value,
Context<ServerAwareMessage> ctx) throws Exception {
                        if (value.getMessage() instanceof  OrderFilled  ) {
                            var order = ( OrderFilled  ) value.getMessage();
                            var list =
StreamSupport.stream(ctx.getEventsForPattern( "DealPerformed"
).spliterator(), false)
                                .filter(i -> i.getMessage() instanceof
OrderActivationRequestNewDue)
                                .collect(Collectors.toList());

                            return list.stream().allMatch(i -> ((
DealPerformed  )i.getMessage()).getSymbol().equals(order.getSymbol()) &&
                                (( DealPerformed
)i.getMessage()).getOrderType().equals(order.getOrderType());

                        }
                        return false;
                    }
                })

In this case I can not group by unfortunately. so I may a receive a packet
{ OrderRequest(1), OrderActivated (1) , OrderRequest (2), DealPerformed(1)
, OrderActivated(2), OrderRequest(3), DealPerformed(2), OrderFilled(1),
OrderFilled(2), OrderActivated(3)} and etc.
For me it is crucial to match all the event sequence (1) (2), etc. and
there is a case where the sequence of the Messages is incomplete , that
means that an event does not get inserted into the pattern.
The above pattern sequence unfortunately does not work properly. Any
suggestions?

BR,
Isidoros

Στις Πέμ, 4 Νοε 2021 στις 11:27 μ.μ., ο/η Isidoros Ioannou <
akis3...@gmail.com> έγραψε:

> I am using Processing time characteristic.
>
> DataStream<Model> inputStream = env.fromElements(
>             Model.of(1, "A", "US"),
>             Model.of(2, "B", "US"),
>             Model.of(3, "C", "US"),
>             Model.of(4, "A", "AU"),
>             Model.of(5, "B", "AU"),
>             Model.of(6, "C", "AU"),
>           //Model.of(7, "D", "US"),
>             Model.of(8, "D", "AU"),
>             Model.of(9, "A", "GB"),
>             Model.of(10, "B", "GB"),
>             Model.of(13, "D", "GB"),
>             Model.of(11, "C", "GB"),
>             Model.of(12, "D", "GB")
>
> in the above inputStream the Model.of(7, "D", "US") is not supplied to the 
> pattern sequence. Nothing special at all
> with this but I wanted to simulate the case that an event that fulfills the 
> pattern sequence might be missing
> which it happens in my case.
>
> BR,
> Isidoros
>
>
>
>
> Στις Πέμ, 4 Νοε 2021 στις 11:01 μ.μ., ο/η Austin Cawley-Edwards <
> austin.caw...@gmail.com> έγραψε:
>
>> Thanks for the update, the requirements make sense.
>>
>> Some follow up questions:
>> * What time characteristic are you using? Processing or Event?
>> * Can you describe a bit more what you mean by "input like the one I have
>> commented bellow"? What is special about the one you have commented?
>>
>> Best,
>> Austin
>>
>> On Thu, Nov 4, 2021 at 4:09 PM Isidoros Ioannou <akis3...@gmail.com>
>> wrote:
>>
>>>
>>>
>>> ---------- Forwarded message ---------
>>> Από: Isidoros Ioannou <akis3...@gmail.com>
>>> Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ.
>>> Subject: Re: IterativeCondition instead of SimpleCondition not matching
>>> pattern
>>> To: Austin Cawley-Edwards <austin.caw...@gmail.com>
>>>
>>>
>>> Hi Austin,
>>> thank you for your answer and I really appreciate your willingness to
>>> help.
>>>
>>> Actually the desired output is the one below
>>>
>>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5,
>>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}],
>>> fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A',
>>> symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}],
>>> third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D',
>>> symbol='GB'}]}
>>> I would like only to generate sequences of Models that have the same
>>> symbol. I noticed that if an event does not come as input
>>> like the one I have commented bellow, it breaks all the pattern match
>>> and the desired output is never produced
>>>
>>> DataStream<Model> inputStream = env.fromElements(
>>>             Model.of(1, "A", "US"),
>>>             Model.of(2, "B", "US"),
>>>             Model.of(3, "C", "US"),
>>>             Model.of(4, "A", "AU"),
>>>             Model.of(5, "B", "AU"),
>>>             Model.of(6, "C", "AU"),
>>>           //Model.of(7, "D", "US"),
>>>             Model.of(8, "D", "AU"),
>>>             Model.of(9, "A", "GB"),
>>>             Model.of(10, "B", "GB"),
>>>             Model.of(13, "D", "GB"),
>>>             Model.of(11, "C", "GB"),
>>>             Model.of(12, "D", "GB")
>>>
>>> Kind Regards,
>>> Isidoros
>>>
>>>
>>> Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> έγραψε:
>>>
>>>> Hi Isidoros,
>>>>
>>>> Thanks for reaching out to the mailing list. I haven't worked with the
>>>> CEP library in a long time but can try to help. I'm having a little trouble
>>>> understanding the desired output + rules. Can you mock up the desired
>>>> output like you have for the fulfilled pattern sequence?
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou <akis3...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>> I face an issue when try to match some elements in a Pattern sequence.
>>>>> Flink 1.11.1 version. Here is my case:
>>>>>
>>>>> final StreamExecutionEnvironment env = 
>>>>> EnvironmentProvider.getEnvironment();
>>>>> DataStream<Model> inputStream = env.fromElements(
>>>>>             Model.of(1, "A", "US"),
>>>>>             Model.of(2, "B", "US"),
>>>>>             Model.of(3, "C", "US"),
>>>>>             Model.of(4, "A", "AU"),
>>>>>             Model.of(5, "B", "AU"),
>>>>>             Model.of(6, "C", "AU"),
>>>>>           //Model.of(7, "D"),
>>>>>             Model.of(8, "D", "AU"),
>>>>>             Model.of(9, "A", "GB"),
>>>>>             Model.of(10, "B", "GB"),
>>>>>             Model.of(13, "D", "GB"),
>>>>>             Model.of(11, "C", "GB"),
>>>>>             Model.of(12, "D", "GB")
>>>>>
>>>>>
>>>>>     
>>>>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
>>>>>             .forceNonParallel();
>>>>>
>>>>>     Pattern<Model, Model> pattern = Pattern.<Model>begin("start", 
>>>>> AfterMatchSkipStrategy.skipToNext())
>>>>>             .where(new IterativeCondition<Model>() {
>>>>>                 @Override
>>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>>> throws Exception {
>>>>>                     return value.getText().equalsIgnoreCase("A");
>>>>>                 }
>>>>>             }).followedBy("second")
>>>>>             .where(new IterativeCondition<Model>() {
>>>>>                 @Override
>>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>>> throws Exception {
>>>>>
>>>>>                     return value.getText().equalsIgnoreCase("B");
>>>>>                 }
>>>>>             }).followedBy("third")
>>>>>             .where(new IterativeCondition<Model>() {
>>>>>                 @Override
>>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>>> throws Exception {
>>>>>
>>>>>                     return value.getText().equalsIgnoreCase("C");
>>>>>                 }
>>>>>             }).followedBy("fourth")
>>>>>             .where(new IterativeCondition<Model>() {
>>>>>                 @Override
>>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>>> throws Exception {
>>>>>                     var  list = 
>>>>> StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(), 
>>>>> false).collect(Collectors.toList());
>>>>>                     var val = 
>>>>> list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol());
>>>>>                     return value.getText().equalsIgnoreCase("D") && val;
>>>>>                 }
>>>>>             });
>>>>>
>>>>>
>>>>>             PatternStream<Model> marketOpenPatternStream = 
>>>>> CEP.<Model>pattern(inputStream, pattern);
>>>>>
>>>>>              SingleOutputStreamOperator<List<Model>> marketOpenOutput =
>>>>>             marketOpenPatternStream
>>>>>                     .process(new PatternProcessFunction<Model, 
>>>>> List<Model>>() {
>>>>>                         @Override
>>>>>                         public void processMatch(Map<String, List<Model>> 
>>>>> match, Context ctx, Collector<List<Model>> out) throws Exception {
>>>>>                             System.out.println(match);
>>>>>                             out.collect(new ArrayList(match.values()));
>>>>>                         }
>>>>>                     })
>>>>>
>>>>> What I am trying to succeed is to match only patterns that have the same 
>>>>> symbol. If I use SimpleCondition with checks only about the text of the 
>>>>> Model(A, B,C..) without the symbol check in the last pattern, the pattern 
>>>>> sequence is fulfilled and I get the following output:
>>>>>
>>>>> {start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, text='B',
>>>>> symbol='US'}], third=[Model{id=3, text='C', symbol='US'}], 
>>>>> fourth=[Model{id=8,text='D',symbol='AU'}]}
>>>>>
>>>>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, 
>>>>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], 
>>>>> fourth=[Model{id=8, text='D', symbol='AU'}]}
>>>>>
>>>>> {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, 
>>>>> text='B', symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], 
>>>>> fourth=[Model{id=12, text='D', symbol='GB'}]}
>>>>>
>>>>> However I want to avoid the match of elements with id= 1(A),2(B),3(C)
>>>>> with the element with id = 8(D). For this reason I put the symbol check
>>>>> with the event matched in the previous pattern in the last condition so I
>>>>> dont get match since they do not have the same symbol. But after applying
>>>>> the condition, now I do not get any output. none of the elements match the
>>>>> pattern. What I am missing? Could someone help?
>>>>>
>>>>>

Reply via email to