Re: Unbalanced distribution of keyed stream to downstream parallel operators

2022-04-07 Thread Isidoros Ioannou
Hello Arvid ,
thank you for your reply.

Actually using a window to aggregate the events for a time period is not
applicable to my case since I need the records to be processed immediately.
Even if I could I still can not understand how I could forward
the aggregated events to lets say 2 parallel operators. The slot assignment
of the KeyGroup is done by flink. You mean key by again by a different
property so that the previous aggregate events get reassigned again to
operators. I apologize if my question is naive but I got a little confused.




Στις Δευ 4 Απρ 2022 στις 10:38 π.μ., ο/η Arvid Heise 
έγραψε:

> You should create a histogram over the keys of the records. If you see a
> skew, one way to go about it is to refine the key or split aggregations.
>
> For example, consider you want to count events per users and 2 users are
> actually bots spamming lots of events accounting for 50% of all events.
> Then, you will always collect all events of each bot on one machine which
> limits scalability. You can, however, first aggregate all events per user
> per day (or any other way to subdivide). Then, the same bot can be
> processed in parallel and you then do an overall aggregation.
>
> If that's not possible, then your problem itself limits the scalability
> and you can only try to not get both bot users on the same machine (which
> can happen in 2). Then you can simply try to shift the key by adding
> constants to it and check if the distribution looks better. Have a look at
> KeyGroupRangeAssignment [1] to test that out without running Flink itself.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
>
> On Mon, Apr 4, 2022 at 9:25 AM Isidoros Ioannou 
> wrote:
>
>> Hello Qingsheng,
>>
>> thank you a lot for your answer.
>>
>> I will try to modify the key as you mentioned in your first assumption.
>> In case the second assumption is valid also, what would you propose to
>> remedy the situation? Try to experiment with different values of max
>> parallelism?
>>
>>
>> Στις Σάβ 2 Απρ 2022 στις 6:55 π.μ., ο/η Qingsheng Ren 
>> έγραψε:
>>
>>> Hi Isidoros,
>>>
>>> Two assumptions in my mind:
>>>
>>> 1. Records are not evenly distributed across different keys, e.g. some
>>> accountId just has more events than others. If the record distribution is
>>> predicable, you can try to combine other fields or include more information
>>> into the key field to help balancing the distribution.
>>>
>>> 2. Keys themselves are not distributed evenly. In short the subtask ID
>>> that a key belongs to is calculated by murmurHash(key.hashCode()) %
>>> maxParallelism, so if the distribution of keys is quite strange, it’s
>>> possible that most keys drop into the same subtask with the algorithm
>>> above. AFAIK there isn't such kind of metric for monitoring number of keys
>>> in a subtask, but I think you can simply investigate it with a map function
>>> after keyBy.
>>>
>>> Hope this would be helpful!
>>>
>>> Qingsheng
>>>
>>> > On Apr 1, 2022, at 17:37, Isidoros Ioannou  wrote:
>>> >
>>> > Hello,
>>> >
>>> > we ran a flink application version 1.13.2 that consists of a kafka
>>> source with one partition so far
>>> > then we filter the data based on some conditions, mapped to POJOS and
>>> we transform to a KeyedStream based on an accountId long property from the
>>> POJO. The downstream operators are 10 CEP operators that run with
>>> parallelism of 14 and the maxParallelism is set to the (operatorParallelism
>>> * operatorParallelism).
>>> > As you see in the image attached the events are distributed unevenly
>>> so some subtasks are busy and others are idle.
>>> > Is there any way to distribute evenly the load to the subtasks? Thank
>>> you in advance.
>>> > 
>>> >
>>>
>>>


Re: Unbalanced distribution of keyed stream to downstream parallel operators

2022-04-04 Thread Isidoros Ioannou
Hello Qingsheng,

thank you a lot for your answer.

I will try to modify the key as you mentioned in your first assumption. In
case the second assumption is valid also, what would you propose to remedy
the situation? Try to experiment with different values of max parallelism?


Στις Σάβ 2 Απρ 2022 στις 6:55 π.μ., ο/η Qingsheng Ren 
έγραψε:

> Hi Isidoros,
>
> Two assumptions in my mind:
>
> 1. Records are not evenly distributed across different keys, e.g. some
> accountId just has more events than others. If the record distribution is
> predicable, you can try to combine other fields or include more information
> into the key field to help balancing the distribution.
>
> 2. Keys themselves are not distributed evenly. In short the subtask ID
> that a key belongs to is calculated by murmurHash(key.hashCode()) %
> maxParallelism, so if the distribution of keys is quite strange, it’s
> possible that most keys drop into the same subtask with the algorithm
> above. AFAIK there isn't such kind of metric for monitoring number of keys
> in a subtask, but I think you can simply investigate it with a map function
> after keyBy.
>
> Hope this would be helpful!
>
> Qingsheng
>
> > On Apr 1, 2022, at 17:37, Isidoros Ioannou  wrote:
> >
> > Hello,
> >
> > we ran a flink application version 1.13.2 that consists of a kafka
> source with one partition so far
> > then we filter the data based on some conditions, mapped to POJOS and we
> transform to a KeyedStream based on an accountId long property from the
> POJO. The downstream operators are 10 CEP operators that run with
> parallelism of 14 and the maxParallelism is set to the (operatorParallelism
> * operatorParallelism).
> > As you see in the image attached the events are distributed unevenly so
> some subtasks are busy and others are idle.
> > Is there any way to distribute evenly the load to the subtasks? Thank
> you in advance.
> > 
> >
>
>


Unbalanced distribution of keyed stream to downstream parallel operators

2022-04-01 Thread Isidoros Ioannou
Hello,

we ran a flink application version 1.13.2 that consists of a kafka source
with one partition so far
then we filter the data based on some conditions, mapped to POJOS and we
transform to a KeyedStream based on an accountId long property from the
POJO. The downstream operators are 10 CEP operators that run with
parallelism of 14 and the maxParallelism is set to the (operatorParallelism
* operatorParallelism).
As you see in the image attached the events are distributed unevenly so
some subtasks are busy and others are idle.
Is there any way to distribute evenly the load to the subtasks? Thank you
in advance.
[image: Capture.PNG]


Re: Flink kafka consumer disconnection, application processing stays behind

2022-03-24 Thread Isidoros Ioannou
Hi Qingsheng,

thank you a lot for you response.
The message I see from the consumer before the log exception I provided
previously is this:
"locationInformation":
"org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:778)",
"logger": "org.apache.kafka.clients.NetworkClient",
"message": "[Consumer
clientId=consumer-realtime-analytics-eu-production-node2-2,
groupId=realtime-analytics-eu-production-node2] Disconnecting from node 3
due to request timeout."

I saw it in debug mode and thats the reason I increased the "
request.timeout.ms".
I will follow your advice and investigate the broker logs once the event
occurs again.

Regarding the backpressure. the 10 cep operators we have, use some
iterative conditions that add some burden and in periods of high load the
operators are getting red in flink ui so these add the backpressure.
However, in mediocre load the operators are performing fine, except when we
have disconnections. It seems that after the disconnection the watermarks
are not emmited quickly causing the operators not to release the
data to sinks. I don't know actually if I have helped, but is there any
chance that it would be a problem of how we have configured the watermarks?

Στις Πέμ 24 Μαρ 2022 στις 10:27 π.μ., ο/η Qingsheng Ren 
έγραψε:

> Hi Isidoros,
>
> I’m not sure in which kind of way the timeout and the high back pressure
> are related, but I think we can try to resolve the request timeout issue
> first. You can take a look at the request log on Kafka broker and see if
> the request was received by broker, and how long it takes for broker to
> handle it. By default the request log is on WARN level, and you may want to
> increase it to DEBUG or TRACE to reveal more information.
>
> Another thought in my mind is about the content of the record, since you
> mentioned extremely high back pressure after the disconnection issue. If
> some messages are quite large or complex, it might block the network or
> require more resources to make the serde, even burden some operator in the
> pipeline and finally lead to back pressure. Once the back pressure happens
> in the pipeline, you can try to locate the operator causing the back
> pressure and make some analysis to see why the throughput drops, or dump
> the record to see if there’s something special in it.
>
> Hope these could be helpful!
>
> Best regards,
>
> Qingsheng
>
> > On Mar 23, 2022, at 19:19, Isidoros Ioannou  wrote:
> >
> > Hi, we are running flink 1.13.2 version on Kinesis Analytics. Our source
> is a kafka topic with one partition so far and we are using the
> FlinkKafkaConsumer (kafka-connector-1.13.2)
> > Sometimes we get some errors from the consumer like the below:
> >
> >
> "locationInformation":"org.apache.kafka.clients.FetchSessionHandler.handleError(FetchSessionHandler.java:445)",
> > "logger": "org.apache.kafka.clients.FetchSessionHandler",
> > "message": "[Consumer
> clientId=consumer-realtime-analytics-eu-production-node2-2,
> groupId=realtime-analytics-eu-production-node2] Error sending fetch request
> (sessionId=1343463307, epoch=172059) to node 3:
> org.apache.kafka.common.errors.DisconnectException.",
> > "threadName": "Kafka Fetcher for Source: Kafka -> Map -> Filter ->
> Map -> Filter -> Timestamps/Watermarks -> Filter (1/1)#0",
> >
> > With the debug logging it appeared that this happens due to request
> timeout so I have increased the request.timeout.ms to 6 , however it
> did not resolve the issue. Even if I get the disconnection I can see that
> after 1s the consumer sends a successful fetchRequest.
> >
> > The problem we have noticed is that after the disconnection the
> application stays behind from processing. the backpressure on the source
> gets 100% and the app consumes events at a lower rate even if we do not
> have much traffic to cope with.
> >
> > We use eventTime and the watermarks are not generated in the consumer
> since we have one partition. the source is the following
> >
> > DataStream stream =
> >
>  env.addSource(consumerBase).name("Kafka").uid("Kafka").filter(f ->
> !f.getServerId().equals("Demo150")).keyBy(ServerAwareJournal::getServerId);
> >
> > and then we assign the following watermark:
> >
> > WatermarkStrategy. >forBoundedOutOfOrderness(Duration.ofSeconds(3))
> > .withTimestampAssigner((element, recordTimestamp) ->
> element.getMessage().getDateTime().atZone(journalTimezone).toInstant()
> > .toEpochMilli()).withIdleness(Duration.ofMinutes

Flink kafka consumer disconnection, application processing stays behind

2022-03-23 Thread Isidoros Ioannou
Hi, we are running flink 1.13.2 version on Kinesis Analytics. Our source is
a kafka topic with one partition so far and we are using the
FlinkKafkaConsumer (kafka-connector-1.13.2)
Sometimes we get some errors from the consumer like the below:

"locationInformation":"org.apache.kafka.clients.FetchSessionHandler.handleError(FetchSessionHandler.java:445)",
"logger": "org.apache.kafka.clients.FetchSessionHandler",
"message": "[Consumer
clientId=consumer-realtime-analytics-eu-production-node2-2,
groupId=realtime-analytics-eu-production-node2] Error sending fetch request
(sessionId=1343463307, epoch=172059) to node 3:
org.apache.kafka.common.errors.DisconnectException.",
"threadName": "Kafka Fetcher for Source: Kafka -> Map -> Filter -> Map
-> Filter -> Timestamps/Watermarks -> Filter (1/1)#0",

With the debug logging it appeared that this happens due to request timeout
so I have increased the request.timeout.ms to 6 , however it did not
resolve the issue. Even if I get the disconnection I can see that after 1s
the consumer sends a successful fetchRequest.

The problem we have noticed is that after the disconnection the application
stays behind from processing. the backpressure on the source gets 100% and
the app consumes events at a lower rate even if we do not have much traffic
to cope with.

We use eventTime and the watermarks are not generated in the consumer since
we have one partition. the source is the following

DataStream stream =

env.addSource(consumerBase).name("Kafka").uid("Kafka").filter(f ->
!f.getServerId().equals("Demo150")).keyBy(ServerAwareJournal::getServerId);

and then we assign the following watermark:

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) ->
element.getMessage().getDateTime().atZone(journalTimezone).toInstant()
.toEpochMilli()).withIdleness(Duration.ofMinutes(1));

the upstream operators are 10 cep operators with a parallelism of 15 and
then there is a union of the data emitted from the CEP operators and added
to firehose sink.
Another thing is that we ran two parallel instances of the same application
i.e two kinesis analytics nodes (one for debug purposes), the debug node
has checkpointing disabled.

Could you please give me some advice on where to look to find a solution to
this issue?
Thanks in advance


Low Watermark

2022-02-25 Thread Isidoros Ioannou
Hello, could someone please explain what the Low Watermark indicates in the
Flink UI in the attached image?
I have event time enabled with a boundOutOfOrdernessStrategy of 3s for the
incoming events and I use CEP with a within window of 5 minutes.


Re: IterativeCondition instead of SimpleCondition not matching pattern

2021-11-04 Thread Isidoros Ioannou
Hi Seth,

thank you for your answer.
In this case you are right and it would solve my problem. but actually my
case is a bit more complex and my mistake I wanted to provide a simple
example.

The actual case is,

I have DataStream< ServerAwareMessage  > inputStream as a source ,
Message is just an interface. The events can be of certain subtypes

1) class OrderRequest implements  ServerAwareMessage   {
   String symbol
   String orderType
}

2) class OrderActivated implements  ServerAwareMessage   {
   String symbol
   String orderType
   long orderId
}

3) class DealPerformed implements  ServerAwareMessage   {
   String symbol
   String orderType
}

4) class OrderFilled implements  ServerAwareMessage   {
   String symbol
   String orderType
   long orderId
}

And here is the pattern sequence.

Pattern.begin(OPEN, AfterMatchSkipStrategy.skipToNext())
.where(new SimpleCondition () {
@Override
public boolean filter(ServerAwareMessage value) throws
Exception {
 return value instanceof OrderRequest
 }  )
.followedBy("OrderActivated ")
.where(new IterativeCondition() {
@Override
public boolean filter(ServerAwareMessage value,
Context ctx) throws Exception {
if(value.getMessage() instanceof  OrderActivated  )
{
var msg = ( OrderActivated ) value.getMessage();
var list =
StreamSupport.stream(ctx.getEventsForPattern(OPEN).spliterator(), false)
.filter(i -> i.getMessage() instanceof
OrderRequest  )
.collect(Collectors.toList());
   return  list.stream().allMatch(i -> ((
OrderRequest  )i.getMessage()).getSymbol().equals(msg.getSymbol()) &&
((
OrderRequest)i.getMessage()).getOrderType().equals(msg.getOrderType()));

}
return false;
}
})
.followedBy("DealPerformed")
.where(new IterativeCondition() {
@Override
public boolean filter(ServerAwareMessage value,
Context ctx) throws Exception {
if (value.getMessage() instanceof  DealPerformed  )
{
var order = ( DealPerformed  )
value.getMessage();
var list =
StreamSupport.stream(ctx.getEventsForPattern(" OrderActivated
").spliterator(), false)
.filter(i -> i.getMessage() instanceof
OrderPerformed)
.collect(Collectors.toList());

return list.stream().allMatch(i -> ((
OrderActivated   )i.getMessage()).getSymbol().equals(msg.getSymbol()) &&
((  OrderActivated
)i.getMessage()).getOrderType().equals(msg.getOrderType()));

}
return false;
}
})
.followedBy("OrdeFilled")
.where(new IterativeCondition() {
@Override
public boolean filter(ServerAwareMessage value,
Context ctx) throws Exception {
if (value.getMessage() instanceof  OrderFilled  ) {
var order = ( OrderFilled  ) value.getMessage();
var list =
StreamSupport.stream(ctx.getEventsForPattern( "DealPerformed"
).spliterator(), false)
.filter(i -> i.getMessage() instanceof
OrderActivationRequestNewDue)
.collect(Collectors.toList());

return list.stream().allMatch(i -> ((
DealPerformed  )i.getMessage()).getSymbol().equals(order.getSymbol()) &&
(( DealPerformed
)i.getMessage()).getOrderType().equals(order.getOrderType());

}
return false;
}
})

In this case I can not group by unfortunately. so I may a receive a packet
{ OrderRequest(1), OrderActivated (1) , OrderRequest (2), DealPerformed(1)
, OrderActivated(2), OrderRequest(3), DealPerformed(2), OrderFilled(1),
OrderFilled(2), OrderActivated(3)} and etc.
For me it is crucial to match all the event sequence (1) (2), etc. and
there is a case where the sequence of the Messages is incomplete , that
means that an event does not get inserted into the pattern.
The above pattern sequence unfortunately does not work properly. Any
suggestions?

BR,
Isidoros

Στις Πέμ, 4 Νοε 2021 στις 11:27 μ.μ., ο/η Isidoros Ioannou <
akis3...@gmail.com> έγραψε:

> I am using Processing time characteristic.
&g

Re: IterativeCondition instead of SimpleCondition not matching pattern

2021-11-04 Thread Isidoros Ioannou
I am using Processing time characteristic.

DataStream inputStream = env.fromElements(
Model.of(1, "A", "US"),
Model.of(2, "B", "US"),
Model.of(3, "C", "US"),
Model.of(4, "A", "AU"),
Model.of(5, "B", "AU"),
Model.of(6, "C", "AU"),
  //Model.of(7, "D", "US"),
Model.of(8, "D", "AU"),
Model.of(9, "A", "GB"),
Model.of(10, "B", "GB"),
Model.of(13, "D", "GB"),
Model.of(11, "C", "GB"),
Model.of(12, "D", "GB")

in the above inputStream the Model.of(7, "D", "US") is not supplied to
the pattern sequence. Nothing special at all
with this but I wanted to simulate the case that an event that
fulfills the pattern sequence might be missing
which it happens in my case.

BR,
Isidoros




Στις Πέμ, 4 Νοε 2021 στις 11:01 μ.μ., ο/η Austin Cawley-Edwards <
austin.caw...@gmail.com> έγραψε:

> Thanks for the update, the requirements make sense.
>
> Some follow up questions:
> * What time characteristic are you using? Processing or Event?
> * Can you describe a bit more what you mean by "input like the one I have
> commented bellow"? What is special about the one you have commented?
>
> Best,
> Austin
>
> On Thu, Nov 4, 2021 at 4:09 PM Isidoros Ioannou 
> wrote:
>
>>
>>
>> -- Forwarded message -
>> Από: Isidoros Ioannou 
>> Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ.
>> Subject: Re: IterativeCondition instead of SimpleCondition not matching
>> pattern
>> To: Austin Cawley-Edwards 
>>
>>
>> Hi Austin,
>> thank you for your answer and I really appreciate your willingness to
>> help.
>>
>> Actually the desired output is the one below
>>
>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5,
>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}],
>> fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A',
>> symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}],
>> third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D',
>> symbol='GB'}]}
>> I would like only to generate sequences of Models that have the same
>> symbol. I noticed that if an event does not come as input
>> like the one I have commented bellow, it breaks all the pattern match and
>> the desired output is never produced
>>
>> DataStream inputStream = env.fromElements(
>> Model.of(1, "A", "US"),
>> Model.of(2, "B", "US"),
>> Model.of(3, "C", "US"),
>> Model.of(4, "A", "AU"),
>> Model.of(5, "B", "AU"),
>> Model.of(6, "C", "AU"),
>>   //Model.of(7, "D", "US"),
>> Model.of(8, "D", "AU"),
>>     Model.of(9, "A", "GB"),
>> Model.of(10, "B", "GB"),
>> Model.of(13, "D", "GB"),
>> Model.of(11, "C", "GB"),
>> Model.of(12, "D", "GB")
>>
>> Kind Regards,
>> Isidoros
>>
>>
>> Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards <
>> austin.caw...@gmail.com> έγραψε:
>>
>>> Hi Isidoros,
>>>
>>> Thanks for reaching out to the mailing list. I haven't worked with the
>>> CEP library in a long time but can try to help. I'm having a little trouble
>>> understanding the desired output + rules. Can you mock up the desired
>>> output like you have for the fulfilled pattern sequence?
>>>
>>> Best,
>>> Austin
>>>
>>> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou 
>>> wrote:
>>>
>>>>
>>>> I face an issue when try to match some elements in a Pattern sequence.
>>>> Flink 1.11.1 version. Here is my case:
>>>>
>>>> final StreamExecutionEnvironment env = 
>>>> EnvironmentProvider.getEnvironment();
>>>> DataStream inputStream = env.fromElements(
>>>> Model.of(1, "A", "US"),
>>>> Model.of(2, "B", "US"),
>>>> Model.of(3, "C&

Fwd: IterativeCondition instead of SimpleCondition not matching pattern

2021-11-04 Thread Isidoros Ioannou
-- Forwarded message -
Από: Isidoros Ioannou 
Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ.
Subject: Re: IterativeCondition instead of SimpleCondition not matching
pattern
To: Austin Cawley-Edwards 


Hi Austin,
thank you for your answer and I really appreciate your willingness to help.

Actually the desired output is the one below

{start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, text='B',
symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}],
fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A',
symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}],
third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D',
symbol='GB'}]}
I would like only to generate sequences of Models that have the same
symbol. I noticed that if an event does not come as input
like the one I have commented bellow, it breaks all the pattern match and
the desired output is never produced

DataStream inputStream = env.fromElements(
Model.of(1, "A", "US"),
Model.of(2, "B", "US"),
Model.of(3, "C", "US"),
Model.of(4, "A", "AU"),
Model.of(5, "B", "AU"),
Model.of(6, "C", "AU"),
  //Model.of(7, "D", "US"),
Model.of(8, "D", "AU"),
Model.of(9, "A", "GB"),
Model.of(10, "B", "GB"),
Model.of(13, "D", "GB"),
Model.of(11, "C", "GB"),
Model.of(12, "D", "GB")

Kind Regards,
Isidoros


Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards <
austin.caw...@gmail.com> έγραψε:

> Hi Isidoros,
>
> Thanks for reaching out to the mailing list. I haven't worked with the CEP
> library in a long time but can try to help. I'm having a little trouble
> understanding the desired output + rules. Can you mock up the desired
> output like you have for the fulfilled pattern sequence?
>
> Best,
> Austin
>
> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou 
> wrote:
>
>>
>> I face an issue when try to match some elements in a Pattern sequence.
>> Flink 1.11.1 version. Here is my case:
>>
>> final StreamExecutionEnvironment env = EnvironmentProvider.getEnvironment();
>> DataStream inputStream = env.fromElements(
>> Model.of(1, "A", "US"),
>> Model.of(2, "B", "US"),
>> Model.of(3, "C", "US"),
>> Model.of(4, "A", "AU"),
>> Model.of(5, "B", "AU"),
>> Model.of(6, "C", "AU"),
>>   //Model.of(7, "D"),
>> Model.of(8, "D", "AU"),
>> Model.of(9, "A", "GB"),
>> Model.of(10, "B", "GB"),
>> Model.of(13, "D", "GB"),
>> Model.of(11, "C", "GB"),
>> Model.of(12, "D", "GB")
>>
>>
>> 
>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
>> .forceNonParallel();
>>
>> Pattern pattern = Pattern.begin("start", 
>> AfterMatchSkipStrategy.skipToNext())
>> .where(new IterativeCondition() {
>> @Override
>> public boolean filter(Model value, Context ctx) 
>> throws Exception {
>> return value.getText().equalsIgnoreCase("A");
>> }
>> }).followedBy("second")
>> .where(new IterativeCondition() {
>> @Override
>> public boolean filter(Model value, Context ctx) 
>> throws Exception {
>>
>> return value.getText().equalsIgnoreCase("B");
>> }
>> }).followedBy("third")
>> .where(new IterativeCondition() {
>> @Override
>> public boolean filter(Model value, Context ctx) 
>> throws Exception {
>>
>> return value.getText().equalsIgnoreCase("C");
>> }
>> }).followedBy("fourth")
>> .where(new IterativeCondition() {
>> @Override
>> public boolean filter(Model value, Context ctx) 
>> throws Exception {
>> var  list = 
>

Fwd: IterativeCondition instead of SimpleCondition not matching pattern

2021-11-04 Thread Isidoros Ioannou
I face an issue when try to match some elements in a Pattern sequence.
Flink 1.11.1 version. Here is my case:

final StreamExecutionEnvironment env = EnvironmentProvider.getEnvironment();
DataStream inputStream = env.fromElements(
Model.of(1, "A", "US"),
Model.of(2, "B", "US"),
Model.of(3, "C", "US"),
Model.of(4, "A", "AU"),
Model.of(5, "B", "AU"),
Model.of(6, "C", "AU"),
  //Model.of(7, "D"),
Model.of(8, "D", "AU"),
Model.of(9, "A", "GB"),
Model.of(10, "B", "GB"),
Model.of(13, "D", "GB"),
Model.of(11, "C", "GB"),
Model.of(12, "D", "GB")


).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
.forceNonParallel();

Pattern pattern = Pattern.begin("start",
AfterMatchSkipStrategy.skipToNext())
.where(new IterativeCondition() {
@Override
public boolean filter(Model value, Context ctx)
throws Exception {
return value.getText().equalsIgnoreCase("A");
}
}).followedBy("second")
.where(new IterativeCondition() {
@Override
public boolean filter(Model value, Context ctx)
throws Exception {

return value.getText().equalsIgnoreCase("B");
}
}).followedBy("third")
.where(new IterativeCondition() {
@Override
public boolean filter(Model value, Context ctx)
throws Exception {

return value.getText().equalsIgnoreCase("C");
}
}).followedBy("fourth")
.where(new IterativeCondition() {
@Override
public boolean filter(Model value, Context ctx)
throws Exception {
var  list =
StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(),
false).collect(Collectors.toList());
var val =
list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol());
return value.getText().equalsIgnoreCase("D") && val;
}
});


PatternStream marketOpenPatternStream =
CEP.pattern(inputStream, pattern);

 SingleOutputStreamOperator> marketOpenOutput =
marketOpenPatternStream
.process(new PatternProcessFunction>() {
@Override
public void processMatch(Map> match, Context ctx, Collector> out) throws
Exception {
System.out.println(match);
out.collect(new ArrayList(match.values()));
}
})

What I am trying to succeed is to match only patterns that have the
same symbol. If I use SimpleCondition with checks only about the text
of the Model(A, B,C..) without the symbol check in the last pattern,
the pattern sequence is fulfilled and I get the following output:

{start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, text='B',
symbol='US'}], third=[Model{id=3, text='C', symbol='US'}],
fourth=[Model{id=8,text='D',symbol='AU'}]}

{start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5,
text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}],
fourth=[Model{id=8, text='D', symbol='AU'}]}

{start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10,
text='B', symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}],
fourth=[Model{id=12, text='D', symbol='GB'}]}

However I want to avoid the match of elements with id= 1(A),2(B),3(C) with
the element with id = 8(D). For this reason I put the symbol check with the
event matched in the previous pattern in the last condition so I dont get
match since they do not have the same symbol. But after applying the
condition, now I do not get any output. none of the elements match the
pattern. What I am missing? Could someone help?