Hi Norman, which version of Flink are you using? We recently fixed some issues with the CEP library which looked similar to your error message. The problem occurred when using the CEP library with processing time. Switching to event or ingestion time, solve the problem.
The fixes to make it also work with processing time are included in the latest snapshot version 1.1-SNAPSHOT and will be part of the upcoming 1.0.1 bugfix release. The bugfix release will actually be released today. If the problem should still remain with the latest version, it would be good to see your complete Flink program. Cheers, Till On Wed, Apr 6, 2016 at 11:04 AM, norman sp <wir12...@studserv.uni-leipzig.de > wrote: > Hi, > I'm trying out the new CEP library but have some problems with event > detection. > In my case Flink detects the event pattern: A followed by B within 10 > seconds. > But short time after event detection when the event pattern isn't matched > anymore, the program crashes with the error message: > > 04/06/2016 11:04:47 Job execution switched to status FAILING. > java.lang.NullPointerException > at > > org.apache.flink.cep.nfa.SharedBuffer.extractPatterns(SharedBuffer.java:205) > at org.apache.flink.cep.nfa.NFA.extractPatternMatches(NFA.java:305) > at org.apache.flink.cep.nfa.NFA.process(NFA.java:142) > at > > org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93) > at > > org.apache.flink.cep.operator.CEPPatternOperator.processWatermark(CEPPatternOperator.java:88) > at > > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158) > at > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > > > After that, the job execution is restarted and proceeds well until the next > AbstractCEPPatternOperator failes. > > That's my code: > Pattern<Tuple5<String, String, Double, Double, Double>, ?> FlowPattern = > Pattern.<Tuple5<String, String, Double, Double, Double>>begin("start") > .followedBy("FlowOver10") > .where(new FilterFunction<Tuple5<String,String,Double, Double, > Double>>() > {//some Filter}}) > .followedBy("PressureOver10") > .where(new FilterFunction<Tuple5<String,String,Double, Double, > Double>>() > {//some Filter}}) > .within(Time.seconds(10)); > > PatternStream<Tuple5<String, String, Double, Double, Double>> > FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern); > DataStream<String> warning = FlowFirstPatternStream.select(new > FlowPatternWarning()); > warning.print(); > > private static class FlowPatternWarning implements > PatternSelectFunction<Tuple5<String, String, Double, Double, Double>, > String> { > @Override > public String select(Map<String, Tuple5<String, String, > Double, Double, > Double>> pat) throws Exception { > Tuple5<String, String, Double, Double, Double> > pressure = > pat.get("PressureOver10"); > Tuple5<String, String, Double, Double, Double> flow = > pat.get("FlowOver10"); > > return " ####### Warning! FlowPattern ####### > " + > pressure.toString() + " - " + flow.toString(); > } > } > > > How can I solve that? > Hope somebody could help me. > > greetz Norman > > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >