Hi Daiqing,

I think Stefan is right and this will be fixed in the upcoming release.
Could you open a JIRA for it with the Exception that you posted here?

Thanks,
Kostas

> On Aug 12, 2017, at 10:05 AM, Stefan Richter <s.rich...@data-artisans.com> 
> wrote:
> 
> Hi,
> 
> from a quick look, I would say this is likely a problem with the 
> NFASerializer: this class seems to be stateful, but its 'duplicate()‘ method 
> is simply returning ‚this‘. This means that code which relies on duplication 
> of serializers to shield against concurrent accesses can break, because 
> multiple threads can work on the same internal serializer state and corrupt 
> it. Will take a deeper look an monday.
> 
> Best,
> Stefan
> 
>> Am 11.08.2017 um 20:55 schrieb Daiqing Li <lidaiqing1...@gmail.com 
>> <mailto:lidaiqing1...@gmail.com>>:
>> 
>> Hi,
>> 
>> I am running fling 1.3.1 on EMR. But I am getting this exception after 
>> running for a while.
>> 
>> 
>> java.lang.RuntimeException: Exception occurred while processing valve output 
>> watermark: 
>>      at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>      at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>      at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>      at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>      at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.RuntimeException: Could not copy NFA.
>>      at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>>      at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>>      at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>>      at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>>      at 
>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>>      at 
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>>      at 
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>>      at 
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>>      at 
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>>      at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>>      at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>>      ... 7 more
>> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>>      at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>>      at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>>      at 
>> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>>      at 
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>>      at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>>      at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>>      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>>      at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>>      at 
>> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>>      at 
>> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>>      at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>>      at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>>      ... 17 more
> 

Reply via email to