Hi Daiqing, I think Stefan is right and this will be fixed in the upcoming release. Could you open a JIRA for it with the Exception that you posted here?
Thanks, Kostas > On Aug 12, 2017, at 10:05 AM, Stefan Richter <s.rich...@data-artisans.com> > wrote: > > Hi, > > from a quick look, I would say this is likely a problem with the > NFASerializer: this class seems to be stateful, but its 'duplicate()‘ method > is simply returning ‚this‘. This means that code which relies on duplication > of serializers to shield against concurrent accesses can break, because > multiple threads can work on the same internal serializer state and corrupt > it. Will take a deeper look an monday. > > Best, > Stefan > >> Am 11.08.2017 um 20:55 schrieb Daiqing Li <lidaiqing1...@gmail.com >> <mailto:lidaiqing1...@gmail.com>>: >> >> Hi, >> >> I am running fling 1.3.1 on EMR. But I am getting this exception after >> running for a while. >> >> >> java.lang.RuntimeException: Exception occurred while processing valve output >> watermark: >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) >> at >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) >> at >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.RuntimeException: Could not copy NFA. >> at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908) >> at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852) >> at >> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) >> at >> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) >> at >> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) >> at >> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268) >> at >> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230) >> at >> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) >> at >> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) >> ... 7 more >> Caused by: java.io.StreamCorruptedException: invalid type code: 00 >> at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620) >> at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719) >> at >> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882) >> at >> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815) >> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) >> at >> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) >> at >> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) >> at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) >> at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903) >> ... 17 more >