Re: Unbalanced distribution of keyed stream to downstream parallel operators
Hello Arvid , thank you for your reply. Actually using a window to aggregate the events for a time period is not applicable to my case since I need the records to be processed immediately. Even if I could I still can not understand how I could forward the aggregated events to lets say 2 parallel operators. The slot assignment of the KeyGroup is done by flink. You mean key by again by a different property so that the previous aggregate events get reassigned again to operators. I apologize if my question is naive but I got a little confused. Στις Δευ 4 Απρ 2022 στις 10:38 π.μ., ο/η Arvid Heise έγραψε: > You should create a histogram over the keys of the records. If you see a > skew, one way to go about it is to refine the key or split aggregations. > > For example, consider you want to count events per users and 2 users are > actually bots spamming lots of events accounting for 50% of all events. > Then, you will always collect all events of each bot on one machine which > limits scalability. You can, however, first aggregate all events per user > per day (or any other way to subdivide). Then, the same bot can be > processed in parallel and you then do an overall aggregation. > > If that's not possible, then your problem itself limits the scalability > and you can only try to not get both bot users on the same machine (which > can happen in 2). Then you can simply try to shift the key by adding > constants to it and check if the distribution looks better. Have a look at > KeyGroupRangeAssignment [1] to test that out without running Flink itself. > > [1] > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java > > On Mon, Apr 4, 2022 at 9:25 AM Isidoros Ioannou > wrote: > >> Hello Qingsheng, >> >> thank you a lot for your answer. >> >> I will try to modify the key as you mentioned in your first assumption. >> In case the second assumption is valid also, what would you propose to >> remedy the situation? Try to experiment with different values of max >> parallelism? >> >> >> Στις Σάβ 2 Απρ 2022 στις 6:55 π.μ., ο/η Qingsheng Ren >> έγραψε: >> >>> Hi Isidoros, >>> >>> Two assumptions in my mind: >>> >>> 1. Records are not evenly distributed across different keys, e.g. some >>> accountId just has more events than others. If the record distribution is >>> predicable, you can try to combine other fields or include more information >>> into the key field to help balancing the distribution. >>> >>> 2. Keys themselves are not distributed evenly. In short the subtask ID >>> that a key belongs to is calculated by murmurHash(key.hashCode()) % >>> maxParallelism, so if the distribution of keys is quite strange, it’s >>> possible that most keys drop into the same subtask with the algorithm >>> above. AFAIK there isn't such kind of metric for monitoring number of keys >>> in a subtask, but I think you can simply investigate it with a map function >>> after keyBy. >>> >>> Hope this would be helpful! >>> >>> Qingsheng >>> >>> > On Apr 1, 2022, at 17:37, Isidoros Ioannou wrote: >>> > >>> > Hello, >>> > >>> > we ran a flink application version 1.13.2 that consists of a kafka >>> source with one partition so far >>> > then we filter the data based on some conditions, mapped to POJOS and >>> we transform to a KeyedStream based on an accountId long property from the >>> POJO. The downstream operators are 10 CEP operators that run with >>> parallelism of 14 and the maxParallelism is set to the (operatorParallelism >>> * operatorParallelism). >>> > As you see in the image attached the events are distributed unevenly >>> so some subtasks are busy and others are idle. >>> > Is there any way to distribute evenly the load to the subtasks? Thank >>> you in advance. >>> > >>> > >>> >>>
Re: Unbalanced distribution of keyed stream to downstream parallel operators
Hello Qingsheng, thank you a lot for your answer. I will try to modify the key as you mentioned in your first assumption. In case the second assumption is valid also, what would you propose to remedy the situation? Try to experiment with different values of max parallelism? Στις Σάβ 2 Απρ 2022 στις 6:55 π.μ., ο/η Qingsheng Ren έγραψε: > Hi Isidoros, > > Two assumptions in my mind: > > 1. Records are not evenly distributed across different keys, e.g. some > accountId just has more events than others. If the record distribution is > predicable, you can try to combine other fields or include more information > into the key field to help balancing the distribution. > > 2. Keys themselves are not distributed evenly. In short the subtask ID > that a key belongs to is calculated by murmurHash(key.hashCode()) % > maxParallelism, so if the distribution of keys is quite strange, it’s > possible that most keys drop into the same subtask with the algorithm > above. AFAIK there isn't such kind of metric for monitoring number of keys > in a subtask, but I think you can simply investigate it with a map function > after keyBy. > > Hope this would be helpful! > > Qingsheng > > > On Apr 1, 2022, at 17:37, Isidoros Ioannou wrote: > > > > Hello, > > > > we ran a flink application version 1.13.2 that consists of a kafka > source with one partition so far > > then we filter the data based on some conditions, mapped to POJOS and we > transform to a KeyedStream based on an accountId long property from the > POJO. The downstream operators are 10 CEP operators that run with > parallelism of 14 and the maxParallelism is set to the (operatorParallelism > * operatorParallelism). > > As you see in the image attached the events are distributed unevenly so > some subtasks are busy and others are idle. > > Is there any way to distribute evenly the load to the subtasks? Thank > you in advance. > > > > > >
Unbalanced distribution of keyed stream to downstream parallel operators
Hello, we ran a flink application version 1.13.2 that consists of a kafka source with one partition so far then we filter the data based on some conditions, mapped to POJOS and we transform to a KeyedStream based on an accountId long property from the POJO. The downstream operators are 10 CEP operators that run with parallelism of 14 and the maxParallelism is set to the (operatorParallelism * operatorParallelism). As you see in the image attached the events are distributed unevenly so some subtasks are busy and others are idle. Is there any way to distribute evenly the load to the subtasks? Thank you in advance. [image: Capture.PNG]
Re: Flink kafka consumer disconnection, application processing stays behind
Hi Qingsheng, thank you a lot for you response. The message I see from the consumer before the log exception I provided previously is this: "locationInformation": "org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:778)", "logger": "org.apache.kafka.clients.NetworkClient", "message": "[Consumer clientId=consumer-realtime-analytics-eu-production-node2-2, groupId=realtime-analytics-eu-production-node2] Disconnecting from node 3 due to request timeout." I saw it in debug mode and thats the reason I increased the " request.timeout.ms". I will follow your advice and investigate the broker logs once the event occurs again. Regarding the backpressure. the 10 cep operators we have, use some iterative conditions that add some burden and in periods of high load the operators are getting red in flink ui so these add the backpressure. However, in mediocre load the operators are performing fine, except when we have disconnections. It seems that after the disconnection the watermarks are not emmited quickly causing the operators not to release the data to sinks. I don't know actually if I have helped, but is there any chance that it would be a problem of how we have configured the watermarks? Στις Πέμ 24 Μαρ 2022 στις 10:27 π.μ., ο/η Qingsheng Ren έγραψε: > Hi Isidoros, > > I’m not sure in which kind of way the timeout and the high back pressure > are related, but I think we can try to resolve the request timeout issue > first. You can take a look at the request log on Kafka broker and see if > the request was received by broker, and how long it takes for broker to > handle it. By default the request log is on WARN level, and you may want to > increase it to DEBUG or TRACE to reveal more information. > > Another thought in my mind is about the content of the record, since you > mentioned extremely high back pressure after the disconnection issue. If > some messages are quite large or complex, it might block the network or > require more resources to make the serde, even burden some operator in the > pipeline and finally lead to back pressure. Once the back pressure happens > in the pipeline, you can try to locate the operator causing the back > pressure and make some analysis to see why the throughput drops, or dump > the record to see if there’s something special in it. > > Hope these could be helpful! > > Best regards, > > Qingsheng > > > On Mar 23, 2022, at 19:19, Isidoros Ioannou wrote: > > > > Hi, we are running flink 1.13.2 version on Kinesis Analytics. Our source > is a kafka topic with one partition so far and we are using the > FlinkKafkaConsumer (kafka-connector-1.13.2) > > Sometimes we get some errors from the consumer like the below: > > > > > "locationInformation":"org.apache.kafka.clients.FetchSessionHandler.handleError(FetchSessionHandler.java:445)", > > "logger": "org.apache.kafka.clients.FetchSessionHandler", > > "message": "[Consumer > clientId=consumer-realtime-analytics-eu-production-node2-2, > groupId=realtime-analytics-eu-production-node2] Error sending fetch request > (sessionId=1343463307, epoch=172059) to node 3: > org.apache.kafka.common.errors.DisconnectException.", > > "threadName": "Kafka Fetcher for Source: Kafka -> Map -> Filter -> > Map -> Filter -> Timestamps/Watermarks -> Filter (1/1)#0", > > > > With the debug logging it appeared that this happens due to request > timeout so I have increased the request.timeout.ms to 6 , however it > did not resolve the issue. Even if I get the disconnection I can see that > after 1s the consumer sends a successful fetchRequest. > > > > The problem we have noticed is that after the disconnection the > application stays behind from processing. the backpressure on the source > gets 100% and the app consumes events at a lower rate even if we do not > have much traffic to cope with. > > > > We use eventTime and the watermarks are not generated in the consumer > since we have one partition. the source is the following > > > > DataStream stream = > > > env.addSource(consumerBase).name("Kafka").uid("Kafka").filter(f -> > !f.getServerId().equals("Demo150")).keyBy(ServerAwareJournal::getServerId); > > > > and then we assign the following watermark: > > > > WatermarkStrategy. >forBoundedOutOfOrderness(Duration.ofSeconds(3)) > > .withTimestampAssigner((element, recordTimestamp) -> > element.getMessage().getDateTime().atZone(journalTimezone).toInstant() > > .toEpochMilli()).withIdleness(Duration.ofMinutes
Flink kafka consumer disconnection, application processing stays behind
Hi, we are running flink 1.13.2 version on Kinesis Analytics. Our source is a kafka topic with one partition so far and we are using the FlinkKafkaConsumer (kafka-connector-1.13.2) Sometimes we get some errors from the consumer like the below: "locationInformation":"org.apache.kafka.clients.FetchSessionHandler.handleError(FetchSessionHandler.java:445)", "logger": "org.apache.kafka.clients.FetchSessionHandler", "message": "[Consumer clientId=consumer-realtime-analytics-eu-production-node2-2, groupId=realtime-analytics-eu-production-node2] Error sending fetch request (sessionId=1343463307, epoch=172059) to node 3: org.apache.kafka.common.errors.DisconnectException.", "threadName": "Kafka Fetcher for Source: Kafka -> Map -> Filter -> Map -> Filter -> Timestamps/Watermarks -> Filter (1/1)#0", With the debug logging it appeared that this happens due to request timeout so I have increased the request.timeout.ms to 6 , however it did not resolve the issue. Even if I get the disconnection I can see that after 1s the consumer sends a successful fetchRequest. The problem we have noticed is that after the disconnection the application stays behind from processing. the backpressure on the source gets 100% and the app consumes events at a lower rate even if we do not have much traffic to cope with. We use eventTime and the watermarks are not generated in the consumer since we have one partition. the source is the following DataStream stream = env.addSource(consumerBase).name("Kafka").uid("Kafka").filter(f -> !f.getServerId().equals("Demo150")).keyBy(ServerAwareJournal::getServerId); and then we assign the following watermark: WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, recordTimestamp) -> element.getMessage().getDateTime().atZone(journalTimezone).toInstant() .toEpochMilli()).withIdleness(Duration.ofMinutes(1)); the upstream operators are 10 cep operators with a parallelism of 15 and then there is a union of the data emitted from the CEP operators and added to firehose sink. Another thing is that we ran two parallel instances of the same application i.e two kinesis analytics nodes (one for debug purposes), the debug node has checkpointing disabled. Could you please give me some advice on where to look to find a solution to this issue? Thanks in advance
Low Watermark
Hello, could someone please explain what the Low Watermark indicates in the Flink UI in the attached image? I have event time enabled with a boundOutOfOrdernessStrategy of 3s for the incoming events and I use CEP with a within window of 5 minutes.
Re: IterativeCondition instead of SimpleCondition not matching pattern
Hi Seth, thank you for your answer. In this case you are right and it would solve my problem. but actually my case is a bit more complex and my mistake I wanted to provide a simple example. The actual case is, I have DataStream< ServerAwareMessage > inputStream as a source , Message is just an interface. The events can be of certain subtypes 1) class OrderRequest implements ServerAwareMessage { String symbol String orderType } 2) class OrderActivated implements ServerAwareMessage { String symbol String orderType long orderId } 3) class DealPerformed implements ServerAwareMessage { String symbol String orderType } 4) class OrderFilled implements ServerAwareMessage { String symbol String orderType long orderId } And here is the pattern sequence. Pattern.begin(OPEN, AfterMatchSkipStrategy.skipToNext()) .where(new SimpleCondition () { @Override public boolean filter(ServerAwareMessage value) throws Exception { return value instanceof OrderRequest } ) .followedBy("OrderActivated ") .where(new IterativeCondition() { @Override public boolean filter(ServerAwareMessage value, Context ctx) throws Exception { if(value.getMessage() instanceof OrderActivated ) { var msg = ( OrderActivated ) value.getMessage(); var list = StreamSupport.stream(ctx.getEventsForPattern(OPEN).spliterator(), false) .filter(i -> i.getMessage() instanceof OrderRequest ) .collect(Collectors.toList()); return list.stream().allMatch(i -> (( OrderRequest )i.getMessage()).getSymbol().equals(msg.getSymbol()) && (( OrderRequest)i.getMessage()).getOrderType().equals(msg.getOrderType())); } return false; } }) .followedBy("DealPerformed") .where(new IterativeCondition() { @Override public boolean filter(ServerAwareMessage value, Context ctx) throws Exception { if (value.getMessage() instanceof DealPerformed ) { var order = ( DealPerformed ) value.getMessage(); var list = StreamSupport.stream(ctx.getEventsForPattern(" OrderActivated ").spliterator(), false) .filter(i -> i.getMessage() instanceof OrderPerformed) .collect(Collectors.toList()); return list.stream().allMatch(i -> (( OrderActivated )i.getMessage()).getSymbol().equals(msg.getSymbol()) && (( OrderActivated )i.getMessage()).getOrderType().equals(msg.getOrderType())); } return false; } }) .followedBy("OrdeFilled") .where(new IterativeCondition() { @Override public boolean filter(ServerAwareMessage value, Context ctx) throws Exception { if (value.getMessage() instanceof OrderFilled ) { var order = ( OrderFilled ) value.getMessage(); var list = StreamSupport.stream(ctx.getEventsForPattern( "DealPerformed" ).spliterator(), false) .filter(i -> i.getMessage() instanceof OrderActivationRequestNewDue) .collect(Collectors.toList()); return list.stream().allMatch(i -> (( DealPerformed )i.getMessage()).getSymbol().equals(order.getSymbol()) && (( DealPerformed )i.getMessage()).getOrderType().equals(order.getOrderType()); } return false; } }) In this case I can not group by unfortunately. so I may a receive a packet { OrderRequest(1), OrderActivated (1) , OrderRequest (2), DealPerformed(1) , OrderActivated(2), OrderRequest(3), DealPerformed(2), OrderFilled(1), OrderFilled(2), OrderActivated(3)} and etc. For me it is crucial to match all the event sequence (1) (2), etc. and there is a case where the sequence of the Messages is incomplete , that means that an event does not get inserted into the pattern. The above pattern sequence unfortunately does not work properly. Any suggestions? BR, Isidoros Στις Πέμ, 4 Νοε 2021 στις 11:27 μ.μ., ο/η Isidoros Ioannou < akis3...@gmail.com> έγραψε: > I am using Processing time characteristic. &g
Re: IterativeCondition instead of SimpleCondition not matching pattern
I am using Processing time characteristic. DataStream inputStream = env.fromElements( Model.of(1, "A", "US"), Model.of(2, "B", "US"), Model.of(3, "C", "US"), Model.of(4, "A", "AU"), Model.of(5, "B", "AU"), Model.of(6, "C", "AU"), //Model.of(7, "D", "US"), Model.of(8, "D", "AU"), Model.of(9, "A", "GB"), Model.of(10, "B", "GB"), Model.of(13, "D", "GB"), Model.of(11, "C", "GB"), Model.of(12, "D", "GB") in the above inputStream the Model.of(7, "D", "US") is not supplied to the pattern sequence. Nothing special at all with this but I wanted to simulate the case that an event that fulfills the pattern sequence might be missing which it happens in my case. BR, Isidoros Στις Πέμ, 4 Νοε 2021 στις 11:01 μ.μ., ο/η Austin Cawley-Edwards < austin.caw...@gmail.com> έγραψε: > Thanks for the update, the requirements make sense. > > Some follow up questions: > * What time characteristic are you using? Processing or Event? > * Can you describe a bit more what you mean by "input like the one I have > commented bellow"? What is special about the one you have commented? > > Best, > Austin > > On Thu, Nov 4, 2021 at 4:09 PM Isidoros Ioannou > wrote: > >> >> >> -- Forwarded message - >> Από: Isidoros Ioannou >> Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ. >> Subject: Re: IterativeCondition instead of SimpleCondition not matching >> pattern >> To: Austin Cawley-Edwards >> >> >> Hi Austin, >> thank you for your answer and I really appreciate your willingness to >> help. >> >> Actually the desired output is the one below >> >> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, >> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], >> fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A', >> symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}], >> third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D', >> symbol='GB'}]} >> I would like only to generate sequences of Models that have the same >> symbol. I noticed that if an event does not come as input >> like the one I have commented bellow, it breaks all the pattern match and >> the desired output is never produced >> >> DataStream inputStream = env.fromElements( >> Model.of(1, "A", "US"), >> Model.of(2, "B", "US"), >> Model.of(3, "C", "US"), >> Model.of(4, "A", "AU"), >> Model.of(5, "B", "AU"), >> Model.of(6, "C", "AU"), >> //Model.of(7, "D", "US"), >> Model.of(8, "D", "AU"), >> Model.of(9, "A", "GB"), >> Model.of(10, "B", "GB"), >> Model.of(13, "D", "GB"), >> Model.of(11, "C", "GB"), >> Model.of(12, "D", "GB") >> >> Kind Regards, >> Isidoros >> >> >> Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards < >> austin.caw...@gmail.com> έγραψε: >> >>> Hi Isidoros, >>> >>> Thanks for reaching out to the mailing list. I haven't worked with the >>> CEP library in a long time but can try to help. I'm having a little trouble >>> understanding the desired output + rules. Can you mock up the desired >>> output like you have for the fulfilled pattern sequence? >>> >>> Best, >>> Austin >>> >>> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou >>> wrote: >>> >>>> >>>> I face an issue when try to match some elements in a Pattern sequence. >>>> Flink 1.11.1 version. Here is my case: >>>> >>>> final StreamExecutionEnvironment env = >>>> EnvironmentProvider.getEnvironment(); >>>> DataStream inputStream = env.fromElements( >>>> Model.of(1, "A", "US"), >>>> Model.of(2, "B", "US"), >>>> Model.of(3, "C&
Fwd: IterativeCondition instead of SimpleCondition not matching pattern
-- Forwarded message - Από: Isidoros Ioannou Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ. Subject: Re: IterativeCondition instead of SimpleCondition not matching pattern To: Austin Cawley-Edwards Hi Austin, thank you for your answer and I really appreciate your willingness to help. Actually the desired output is the one below {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D', symbol='GB'}]} I would like only to generate sequences of Models that have the same symbol. I noticed that if an event does not come as input like the one I have commented bellow, it breaks all the pattern match and the desired output is never produced DataStream inputStream = env.fromElements( Model.of(1, "A", "US"), Model.of(2, "B", "US"), Model.of(3, "C", "US"), Model.of(4, "A", "AU"), Model.of(5, "B", "AU"), Model.of(6, "C", "AU"), //Model.of(7, "D", "US"), Model.of(8, "D", "AU"), Model.of(9, "A", "GB"), Model.of(10, "B", "GB"), Model.of(13, "D", "GB"), Model.of(11, "C", "GB"), Model.of(12, "D", "GB") Kind Regards, Isidoros Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards < austin.caw...@gmail.com> έγραψε: > Hi Isidoros, > > Thanks for reaching out to the mailing list. I haven't worked with the CEP > library in a long time but can try to help. I'm having a little trouble > understanding the desired output + rules. Can you mock up the desired > output like you have for the fulfilled pattern sequence? > > Best, > Austin > > On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou > wrote: > >> >> I face an issue when try to match some elements in a Pattern sequence. >> Flink 1.11.1 version. Here is my case: >> >> final StreamExecutionEnvironment env = EnvironmentProvider.getEnvironment(); >> DataStream inputStream = env.fromElements( >> Model.of(1, "A", "US"), >> Model.of(2, "B", "US"), >> Model.of(3, "C", "US"), >> Model.of(4, "A", "AU"), >> Model.of(5, "B", "AU"), >> Model.of(6, "C", "AU"), >> //Model.of(7, "D"), >> Model.of(8, "D", "AU"), >> Model.of(9, "A", "GB"), >> Model.of(10, "B", "GB"), >> Model.of(13, "D", "GB"), >> Model.of(11, "C", "GB"), >> Model.of(12, "D", "GB") >> >> >> >> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()) >> .forceNonParallel(); >> >> Pattern pattern = Pattern.begin("start", >> AfterMatchSkipStrategy.skipToNext()) >> .where(new IterativeCondition() { >> @Override >> public boolean filter(Model value, Context ctx) >> throws Exception { >> return value.getText().equalsIgnoreCase("A"); >> } >> }).followedBy("second") >> .where(new IterativeCondition() { >> @Override >> public boolean filter(Model value, Context ctx) >> throws Exception { >> >> return value.getText().equalsIgnoreCase("B"); >> } >> }).followedBy("third") >> .where(new IterativeCondition() { >> @Override >> public boolean filter(Model value, Context ctx) >> throws Exception { >> >> return value.getText().equalsIgnoreCase("C"); >> } >> }).followedBy("fourth") >> .where(new IterativeCondition() { >> @Override >> public boolean filter(Model value, Context ctx) >> throws Exception { >> var list = >
Fwd: IterativeCondition instead of SimpleCondition not matching pattern
I face an issue when try to match some elements in a Pattern sequence. Flink 1.11.1 version. Here is my case: final StreamExecutionEnvironment env = EnvironmentProvider.getEnvironment(); DataStream inputStream = env.fromElements( Model.of(1, "A", "US"), Model.of(2, "B", "US"), Model.of(3, "C", "US"), Model.of(4, "A", "AU"), Model.of(5, "B", "AU"), Model.of(6, "C", "AU"), //Model.of(7, "D"), Model.of(8, "D", "AU"), Model.of(9, "A", "GB"), Model.of(10, "B", "GB"), Model.of(13, "D", "GB"), Model.of(11, "C", "GB"), Model.of(12, "D", "GB") ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()) .forceNonParallel(); Pattern pattern = Pattern.begin("start", AfterMatchSkipStrategy.skipToNext()) .where(new IterativeCondition() { @Override public boolean filter(Model value, Context ctx) throws Exception { return value.getText().equalsIgnoreCase("A"); } }).followedBy("second") .where(new IterativeCondition() { @Override public boolean filter(Model value, Context ctx) throws Exception { return value.getText().equalsIgnoreCase("B"); } }).followedBy("third") .where(new IterativeCondition() { @Override public boolean filter(Model value, Context ctx) throws Exception { return value.getText().equalsIgnoreCase("C"); } }).followedBy("fourth") .where(new IterativeCondition() { @Override public boolean filter(Model value, Context ctx) throws Exception { var list = StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(), false).collect(Collectors.toList()); var val = list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol()); return value.getText().equalsIgnoreCase("D") && val; } }); PatternStream marketOpenPatternStream = CEP.pattern(inputStream, pattern); SingleOutputStreamOperator> marketOpenOutput = marketOpenPatternStream .process(new PatternProcessFunction>() { @Override public void processMatch(Map> match, Context ctx, Collector> out) throws Exception { System.out.println(match); out.collect(new ArrayList(match.values())); } }) What I am trying to succeed is to match only patterns that have the same symbol. If I use SimpleCondition with checks only about the text of the Model(A, B,C..) without the symbol check in the last pattern, the pattern sequence is fulfilled and I get the following output: {start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, text='B', symbol='US'}], third=[Model{id=3, text='C', symbol='US'}], fourth=[Model{id=8,text='D',symbol='AU'}]} {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D', symbol='GB'}]} However I want to avoid the match of elements with id= 1(A),2(B),3(C) with the element with id = 8(D). For this reason I put the symbol check with the event matched in the previous pattern in the last condition so I dont get match since they do not have the same symbol. But after applying the condition, now I do not get any output. none of the elements match the pattern. What I am missing? Could someone help?