Running multiple CEP pattern rules
Hi, We are running into errors when running multiple CEP patterns. Here’s our use-case : We are planning to build a rule based engine on top of flink with huge number of rules and doing a POC for that. For POC we have around 1000 pattern based rules which we are translating into CEP patterns and running these rules on a keyed stream of events data to detect patterns. We are partitioning the stream by orgId and each rule needs to be run into each org. Here’s the code we’ve written to implement that : /DataStream eventStream = null; DataStream partitionedInput = eventStream.keyBy((KeySelector) Event::getOrgid); List ruleList = new ArrayList<>(); for (int i = 0; i < 100; i++) { ruleList.add(new Rule("rule" + i, "process1", "process2", "process3")); ruleList.add( new Rule("rule" + (i + 500), "process4", "process5", "process6")); } for (Rule rule : ruleList) { String st = rule.getStart(); String mi = rule.getMid(); String en = rule.getEnd(); String nm = rule.getName(); Pattern pattern = Pattern.begin( Pattern.begin("start") .where( new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getProcess().equals(st); } }) .followedBy("middle") .where( new SimpleCondition() { @Override public boolean filter(Event event) { return !event.getProcess().equals(mi); } }) .optional() .followedBy("end") .where( new SimpleCondition() { @Override public boolean filter(Event event) { return event.getProcess().equals(en); } })); PatternStream patternStream = CEP.pattern(partitionedInput, pattern); DataStream alerts = patternStream.process( new PatternProcessFunction() { @Override public void processMatch( Map> map, Context context, Collector collector) throws Exception { Event start = map.containsKey("start") ? map.get("start").get(0) : null; Event middle = map.containsKey("middle") ? map.get("middle").get(0) : null; Event end = map.containsKey("end") ? map.get("end").get(0) : null; StringJoiner joiner = new StringJoiner(","); joiner .add("Rule : " + nm + " ") .add((start == null ? "" : start.getId())) .add((middle == null ? "" : middle.getId())) .add((end == null ? "" : end.getId())); collector.collect(joiner.toString()); } }); alerts.print();/ We tried to run this code on the flink cluster with 1 task manager with 4 task slots and the task manager crashed with the error : /Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079) at org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:910) at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:623) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRp
Re: Running multiple CEP pattern rules
Hi Tejas, It will not work that way. Bear in mind that every application of CEP.pattern creates a new operator in the graph. The exceptions you are seeing most probably result from calculating that huge graph and sending that over. You are reaching the timeout on submitting that huge graph. You can have many different patterns in a single job, but the number of vertices in your graph is not unlimited. In your scenario I'd try to combine the rules in a single operator. You could try to use the ProcessFunction for that. Best, Dawid On 28/05/2021 01:53, Tejas wrote: > Hi, > We are running into errors when running multiple CEP patterns. Here’s our > use-case : > We are planning to build a rule based engine on top of flink with huge > number of rules and doing a POC for that. For POC we have around 1000 > pattern based rules which we are translating into CEP patterns and running > these rules on a keyed stream of events data to detect patterns. We are > partitioning the stream by orgId and each rule needs to be run into each > org. Here’s the code we’ve written to implement that : > /DataStream eventStream = null; > DataStream partitionedInput = > eventStream.keyBy((KeySelector) Event::getOrgid); > List ruleList = new ArrayList<>(); > for (int i = 0; i < 100; i++) { > ruleList.add(new Rule("rule" + i, "process1", "process2", "process3")); > ruleList.add( > new Rule("rule" + (i + 500), "process4", "process5", "process6")); > } > for (Rule rule : ruleList) { > String st = rule.getStart(); > String mi = rule.getMid(); > String en = rule.getEnd(); > String nm = rule.getName(); > Pattern pattern = > Pattern.begin( > Pattern.begin("start") > .where( > new SimpleCondition() { > @Override > public boolean filter(Event value) throws Exception { > return value.getProcess().equals(st); > } > }) > .followedBy("middle") > .where( > new SimpleCondition() { > @Override > public boolean filter(Event event) { > return !event.getProcess().equals(mi); > } > }) > .optional() > .followedBy("end") > .where( > new SimpleCondition() { > @Override > public boolean filter(Event event) { > return event.getProcess().equals(en); > } > })); > PatternStream patternStream = CEP.pattern(partitionedInput, > pattern); > DataStream alerts = > patternStream.process( > new PatternProcessFunction() { > @Override > public void processMatch( > Map> map, Context context, > Collector collector) > throws Exception { > Event start = map.containsKey("start") ? > map.get("start").get(0) : null; > Event middle = map.containsKey("middle") ? > map.get("middle").get(0) : null; > Event end = map.containsKey("end") ? map.get("end").get(0) : > null; > StringJoiner joiner = new StringJoiner(","); > joiner > .add("Rule : " + nm + " ") > .add((start == null ? "" : start.getId())) > .add((middle == null ? "" : middle.getId())) > .add((end == null ? "" : end.getId())); > collector.collect(joiner.toString()); > } > }); > alerts.print();/ > We tried to run this code on the flink cluster with 1 task manager with 4 > task slots and the task manager crashed with the error : > /Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) > at > org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462) > at > org.apache.
Re: Running multiple CEP pattern rules
Hi Dawid, Do you have any plans to bring this functionality in flink CEP in future ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Running multiple CEP pattern rules
I am afraid there is no much of an active development going on in the CEP library. I would not expect new features there in the nearest future. On 28/05/2021 22:00, Tejas wrote: > Hi Dawid, > Do you have any plans to bring this functionality in flink CEP in future ? > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ OpenPGP_signature Description: OpenPGP digital signature