Any guidance would be most appreciated.
Thx
Steve
===========================================
java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException:
org.apache.flink.util.FlinkRuntimeException: Failure happened in
filter function.
at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:284)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at
java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:279)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:769)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failure
happened in filter function.
at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:282)
... 14 more
Caused by: java.lang.NullPointerException
at java.lang.String.contains(String.java:2133)
at
com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:119)
at
com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:114)
at
org.apache.flink.cep.pattern.conditions.AndCondition.filter(AndCondition.java:43)
at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
... 19 more
==================================================
The code
// Consume the data streams from AWS Kinesis stream
DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>(
pt.getRequired("stream"),
new EventSchema(),
kinesisConsumerConfig))
.name("Kinesis Stream Consumer");
//dataStream.print();
DataStream<Event> kinesisStream = dataStream
.assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
.map(event -> (IoTEvent) event);
// Prints the mapped records from the Kinesis stream
//kinesisStream.print();
Pattern<Event, ?> pattern = Pattern
.<Event> begin("first event").subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>()
{
//private static final long serialVersionUID =
-6301755149429716724L;
@Override
public boolean filter(IoTEvent value,
Context<IoTEvent> ctx) throws Exception {
return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
}
})
.next("second")
.subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>() {
//private static final long serialVersionUID =
2392863109523984059L;
@Override
public boolean filter(IoTEvent value,
Context<IoTEvent> ctx) throws Exception {
return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
}
})
.next("third")
.subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>() {
private static final long serialVersionUID =
2392863109523984059L;
@Override
public boolean filter(IoTEvent value,
Context<IoTEvent> ctx) throws Exception {
return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
}
})
.next("fourth")
.subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>() {
private static final long serialVersionUID =
2392863109523984059L;
@Override
public boolean filter(IoTEvent value,
Context<IoTEvent> ctx) throws Exception {
return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
}
})
.within(Time.seconds(10));
// Match the pattern in the input data stream
PatternStream<Event> patternStream =
CEP.pattern(kinesisStream, pattern);
// Detects MOTION pattern match and alert
DataStream<Alert> alerts = patternStream.select(
new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>>
pattern) throws Exception {
Alert alert = new Alert(pattern);
System.out.printf("AUDIO ALERT\n");
return alert;
}
}).name("Audio Alert Sink");