I am using Processing time characteristic.

DataStream<Model> inputStream = env.fromElements(
            Model.of(1, "A", "US"),
            Model.of(2, "B", "US"),
            Model.of(3, "C", "US"),
            Model.of(4, "A", "AU"),
            Model.of(5, "B", "AU"),
            Model.of(6, "C", "AU"),
          //Model.of(7, "D", "US"),
            Model.of(8, "D", "AU"),
            Model.of(9, "A", "GB"),
            Model.of(10, "B", "GB"),
            Model.of(13, "D", "GB"),
            Model.of(11, "C", "GB"),
            Model.of(12, "D", "GB")

in the above inputStream the Model.of(7, "D", "US") is not supplied to
the pattern sequence. Nothing special at all
with this but I wanted to simulate the case that an event that
fulfills the pattern sequence might be missing
which it happens in my case.

BR,
Isidoros




Στις Πέμ, 4 Νοε 2021 στις 11:01 μ.μ., ο/η Austin Cawley-Edwards <
austin.caw...@gmail.com> έγραψε:

> Thanks for the update, the requirements make sense.
>
> Some follow up questions:
> * What time characteristic are you using? Processing or Event?
> * Can you describe a bit more what you mean by "input like the one I have
> commented bellow"? What is special about the one you have commented?
>
> Best,
> Austin
>
> On Thu, Nov 4, 2021 at 4:09 PM Isidoros Ioannou <akis3...@gmail.com>
> wrote:
>
>>
>>
>> ---------- Forwarded message ---------
>> Από: Isidoros Ioannou <akis3...@gmail.com>
>> Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ.
>> Subject: Re: IterativeCondition instead of SimpleCondition not matching
>> pattern
>> To: Austin Cawley-Edwards <austin.caw...@gmail.com>
>>
>>
>> Hi Austin,
>> thank you for your answer and I really appreciate your willingness to
>> help.
>>
>> Actually the desired output is the one below
>>
>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5,
>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}],
>> fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A',
>> symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}],
>> third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D',
>> symbol='GB'}]}
>> I would like only to generate sequences of Models that have the same
>> symbol. I noticed that if an event does not come as input
>> like the one I have commented bellow, it breaks all the pattern match and
>> the desired output is never produced
>>
>> DataStream<Model> inputStream = env.fromElements(
>>             Model.of(1, "A", "US"),
>>             Model.of(2, "B", "US"),
>>             Model.of(3, "C", "US"),
>>             Model.of(4, "A", "AU"),
>>             Model.of(5, "B", "AU"),
>>             Model.of(6, "C", "AU"),
>>           //Model.of(7, "D", "US"),
>>             Model.of(8, "D", "AU"),
>>             Model.of(9, "A", "GB"),
>>             Model.of(10, "B", "GB"),
>>             Model.of(13, "D", "GB"),
>>             Model.of(11, "C", "GB"),
>>             Model.of(12, "D", "GB")
>>
>> Kind Regards,
>> Isidoros
>>
>>
>> Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards <
>> austin.caw...@gmail.com> έγραψε:
>>
>>> Hi Isidoros,
>>>
>>> Thanks for reaching out to the mailing list. I haven't worked with the
>>> CEP library in a long time but can try to help. I'm having a little trouble
>>> understanding the desired output + rules. Can you mock up the desired
>>> output like you have for the fulfilled pattern sequence?
>>>
>>> Best,
>>> Austin
>>>
>>> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou <akis3...@gmail.com>
>>> wrote:
>>>
>>>>
>>>> I face an issue when try to match some elements in a Pattern sequence.
>>>> Flink 1.11.1 version. Here is my case:
>>>>
>>>> final StreamExecutionEnvironment env = 
>>>> EnvironmentProvider.getEnvironment();
>>>> DataStream<Model> inputStream = env.fromElements(
>>>>             Model.of(1, "A", "US"),
>>>>             Model.of(2, "B", "US"),
>>>>             Model.of(3, "C", "US"),
>>>>             Model.of(4, "A", "AU"),
>>>>             Model.of(5, "B", "AU"),
>>>>             Model.of(6, "C", "AU"),
>>>>           //Model.of(7, "D"),
>>>>             Model.of(8, "D", "AU"),
>>>>             Model.of(9, "A", "GB"),
>>>>             Model.of(10, "B", "GB"),
>>>>             Model.of(13, "D", "GB"),
>>>>             Model.of(11, "C", "GB"),
>>>>             Model.of(12, "D", "GB")
>>>>
>>>>
>>>>     
>>>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
>>>>             .forceNonParallel();
>>>>
>>>>     Pattern<Model, Model> pattern = Pattern.<Model>begin("start", 
>>>> AfterMatchSkipStrategy.skipToNext())
>>>>             .where(new IterativeCondition<Model>() {
>>>>                 @Override
>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>> throws Exception {
>>>>                     return value.getText().equalsIgnoreCase("A");
>>>>                 }
>>>>             }).followedBy("second")
>>>>             .where(new IterativeCondition<Model>() {
>>>>                 @Override
>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>> throws Exception {
>>>>
>>>>                     return value.getText().equalsIgnoreCase("B");
>>>>                 }
>>>>             }).followedBy("third")
>>>>             .where(new IterativeCondition<Model>() {
>>>>                 @Override
>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>> throws Exception {
>>>>
>>>>                     return value.getText().equalsIgnoreCase("C");
>>>>                 }
>>>>             }).followedBy("fourth")
>>>>             .where(new IterativeCondition<Model>() {
>>>>                 @Override
>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>> throws Exception {
>>>>                     var  list = 
>>>> StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(), 
>>>> false).collect(Collectors.toList());
>>>>                     var val = 
>>>> list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol());
>>>>                     return value.getText().equalsIgnoreCase("D") && val;
>>>>                 }
>>>>             });
>>>>
>>>>
>>>>             PatternStream<Model> marketOpenPatternStream = 
>>>> CEP.<Model>pattern(inputStream, pattern);
>>>>
>>>>              SingleOutputStreamOperator<List<Model>> marketOpenOutput =
>>>>             marketOpenPatternStream
>>>>                     .process(new PatternProcessFunction<Model, 
>>>> List<Model>>() {
>>>>                         @Override
>>>>                         public void processMatch(Map<String, List<Model>> 
>>>> match, Context ctx, Collector<List<Model>> out) throws Exception {
>>>>                             System.out.println(match);
>>>>                             out.collect(new ArrayList(match.values()));
>>>>                         }
>>>>                     })
>>>>
>>>> What I am trying to succeed is to match only patterns that have the same 
>>>> symbol. If I use SimpleCondition with checks only about the text of the 
>>>> Model(A, B,C..) without the symbol check in the last pattern, the pattern 
>>>> sequence is fulfilled and I get the following output:
>>>>
>>>> {start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, text='B',
>>>> symbol='US'}], third=[Model{id=3, text='C', symbol='US'}], 
>>>> fourth=[Model{id=8,text='D',symbol='AU'}]}
>>>>
>>>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, text='B', 
>>>> symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], 
>>>> fourth=[Model{id=8, text='D', symbol='AU'}]}
>>>>
>>>> {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, 
>>>> text='B', symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], 
>>>> fourth=[Model{id=12, text='D', symbol='GB'}]}
>>>>
>>>> However I want to avoid the match of elements with id= 1(A),2(B),3(C)
>>>> with the element with id = 8(D). For this reason I put the symbol check
>>>> with the event matched in the previous pattern in the last condition so I
>>>> dont get match since they do not have the same symbol. But after applying
>>>> the condition, now I do not get any output. none of the elements match the
>>>> pattern. What I am missing? Could someone help?
>>>>
>>>>

Reply via email to