Hi Norman,

which version of Flink are you using? We recently fixed some issues with
the CEP library which looked similar to your error message. The problem
occurred when using the CEP library with processing time. Switching to
event or ingestion time, solve the problem.

The fixes to make it also work with processing time are included in the
latest snapshot version 1.1-SNAPSHOT and will be part of the upcoming 1.0.1
bugfix release. The bugfix release will actually be released today.

If the problem should still remain with the latest version, it would be
good to see your complete Flink program.

Cheers,
Till

On Wed, Apr 6, 2016 at 11:04 AM, norman sp <wir12...@studserv.uni-leipzig.de
> wrote:

> Hi,
> I'm trying out the new CEP library but have some problems with event
> detection.
> In my case Flink detects the event pattern: A followed by B within 10
> seconds.
> But short time after event detection when the event pattern isn't matched
> anymore, the program crashes with the error message:
>
> 04/06/2016 11:04:47     Job execution switched to status FAILING.
> java.lang.NullPointerException
>         at
>
> org.apache.flink.cep.nfa.SharedBuffer.extractPatterns(SharedBuffer.java:205)
>         at org.apache.flink.cep.nfa.NFA.extractPatternMatches(NFA.java:305)
>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:142)
>         at
>
> org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93)
>         at
>
> org.apache.flink.cep.operator.CEPPatternOperator.processWatermark(CEPPatternOperator.java:88)
>         at
>
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> After that, the job execution is restarted and proceeds well until the next
> AbstractCEPPatternOperator failes.
>
> That's my code:
> Pattern<Tuple5&lt;String, String, Double, Double, Double>, ?> FlowPattern =
> Pattern.<Tuple5&lt;String, String, Double, Double, Double>>begin("start")
> .followedBy("FlowOver10")
> .where(new FilterFunction<Tuple5&lt;String,String,Double, Double,
> Double>>()
> {//some Filter}})
> .followedBy("PressureOver10")
> .where(new FilterFunction<Tuple5&lt;String,String,Double, Double,
> Double>>()
> {//some Filter}})
> .within(Time.seconds(10));
>
> PatternStream<Tuple5&lt;String, String, Double, Double, Double>>
> FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
> DataStream<String> warning = FlowFirstPatternStream.select(new
> FlowPatternWarning());
> warning.print();
>
> private static class FlowPatternWarning implements
> PatternSelectFunction<Tuple5&lt;String, String, Double, Double, Double>,
> String> {
>                 @Override
>                 public String select(Map<String, Tuple5&lt;String, String,
> Double, Double,
> Double>> pat) throws Exception {
>                       Tuple5<String, String, Double, Double, Double>
> pressure =
> pat.get("PressureOver10");
>                       Tuple5<String, String, Double, Double, Double> flow =
> pat.get("FlowOver10");
>
>                         return "  #######   Warning! FlowPattern   #######
> " +
> pressure.toString() + " - " + flow.toString();
>                 }
>         }
>
>
> How can I solve that?
> Hope somebody could help me.
>
> greetz Norman
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Reply via email to