Hi,

from a quick look, I would say this is likely a problem with the NFASerializer: 
this class seems to be stateful, but its 'duplicate()‘ method is simply 
returning ‚this‘. This means that code which relies on duplication of 
serializers to shield against concurrent accesses can break, because multiple 
threads can work on the same internal serializer state and corrupt it. Will 
take a deeper look an monday.

Best,
Stefan

> Am 11.08.2017 um 20:55 schrieb Daiqing Li <lidaiqing1...@gmail.com>:
> 
> Hi,
> 
> I am running fling 1.3.1 on EMR. But I am getting this exception after 
> running for a while.
> 
> 
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>       at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>       at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
>       at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>       at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>       at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>       at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>       at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>       at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>       at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>       at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>       at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>       ... 7 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>       at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>       at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>       at 
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>       at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>       at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>       at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>       at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>       at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>       at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>       ... 17 more

Reply via email to