Hi again Varun,

I am investigating the problem you mentioned and I found a bug in the 
SharedBuffer, 
but I am not sure if it is the only bug that affects you.

Could you please try this branch https://github.com/kl0u/flink/tree/cep-inv 
<https://github.com/kl0u/flink/tree/cep-inv> and let me know
if the problem is still there?

In addition, are you using Scala with case classes or Java?

Thanks for helping fix the problem,
Kostas

> On Jan 24, 2018, at 5:54 PM, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> Hi Varun,
> 
> Thanks for taking time to look into it. Could you give a sample input and 
> your pattern to reproduce the problem?
> That would help a lot at figuring out the cause of the problem.
> 
> Thanks,
> Kostas
> 
>> On Jan 23, 2018, at 5:40 PM, Varun Dhore <varundhor...@gmail.com 
>> <mailto:varundhor...@gmail.com>> wrote:
>> 
>> Hi Kostas,
>> 
>> I was able to reproduce the error with 1.4.0. After upgrading the cluster to 
>> 1.5 snapshot and running through the same data I am still experiencing the 
>> same exception. CEP patterns that I am running are using followed by 
>> patterns e.g AfBfC. From my experience I was never able to get stable 
>> execution when checkpoints are enabled. When I disable checkpoints CEP jobs 
>> are running fine. Aside from this particular error I also notice that 
>> majority of checkpoints expire as the do not complete within configured 5 
>> min timeout period. Any suggestions on further debugging runtime checkpoints 
>> would be very helpful. 
>> Thanks in advance for your assistance.
>> 
>> Regards,
>> Varun 
>> 
>> On Jan 18, 2018, at 8:11 AM, Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>> wrote:
>> 
>>> Thanks a lot Varun!
>>> 
>>> Kostas
>>> 
>>>> On Jan 17, 2018, at 9:59 PM, Varun Dhore <varundhor...@gmail.com 
>>>> <mailto:varundhor...@gmail.com>> wrote:
>>>> 
>>>> Thank you Kostas. Since this error is not easily reproducible on my end 
>>>> I’ll continue testing this and confirm the resolution once I am able to do 
>>>> so.
>>>> 
>>>> Thanks,
>>>> Varun 
>>>> 
>>>> Sent from my iPhone
>>>> 
>>>> On Jan 15, 2018, at 10:21 AM, Kostas Kloudas <k.klou...@data-artisans.com 
>>>> <mailto:k.klou...@data-artisans.com>> wrote:
>>>> 
>>>>> Hi Varun,
>>>>> 
>>>>> This can be related to this issue: 
>>>>> https://issues.apache.org/jira/browse/FLINK-8226 
>>>>> <https://issues.apache.org/jira/browse/FLINK-8226>
>>>>> which is currently fixed on the master.
>>>>> 
>>>>> Could you please try the current master to see if the error persists?
>>>>> 
>>>>> Thanks,
>>>>> Kostas
>>>>> 
>>>>>> On Jan 15, 2018, at 4:09 PM, Varun Dhore <varundhor...@gmail.com 
>>>>>> <mailto:varundhor...@gmail.com>> wrote:
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> Hello Flink community,
>>>>>>>  
>>>>>>> I have encountered following exception while testing 1.4.0 release. 
>>>>>>> This error is occurring intermittently and my CEP job keeps restarting 
>>>>>>> after this exception. I am running the job with Event time semantics 
>>>>>>> and checkpoints enabled.
>>>>>>>  
>>>>>>>  
>>>>>>>             java.lang.RuntimeException: Exception occurred while 
>>>>>>> processing valve output watermark:
>>>>>>>                         at 
>>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>>>>>                         at 
>>>>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>>>>>                         at 
>>>>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>>>>>                         at 
>>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>>>>>>                         at 
>>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>>>>>                         at 
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>>>>>                         at 
>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>>>>                         at java.lang.Thread.run(Thread.java:745)
>>>>>>>             Caused by: java.lang.RuntimeException: Error while adding 
>>>>>>> data to RocksDB
>>>>>>>                         at 
>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
>>>>>>>                         at 
>>>>>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
>>>>>>>                         at 
>>>>>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248)
>>>>>>>                         at 
>>>>>>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>>>>>>>                         at 
>>>>>>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>>>>>>>                         at 
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>>>>>>>                         at 
>>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>>>>>>>                         ... 7 more
>>>>>>>             Caused by: java.lang.IllegalStateException: Could not find 
>>>>>>> id for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: 
>>>>>>> ”e1”, timestamp: 1515593398897), 1515593398897, 0), 
>>>>>>> [SharedBufferEdge(null, 1)], 2)
>>>>>>>                         at 
>>>>>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>>>>>>                         at 
>>>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:972)
>>>>>>>                         at 
>>>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:839)
>>>>>>>                         at 
>>>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:919)
>>>>>>>                         at 
>>>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:839)
>>>>>>>                         at 
>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
>>>>>>>                         ... 13 more
>>>>>>>  
>>>>>>>  
>>>>>>> Thanks,
>>>>>>> Varun
>>>>> 
>>> 
> 

Reply via email to