[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2022-07-20 Thread David Maddison (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568967#comment-17568967
 ] 

David Maddison edited comment on FLINK-23886 at 7/20/22 10:17 AM:
--

I realise this doesn't move the conversation any further toward a solution, but 
just to add another data point to show that this happens in different 
environments.  Job has been running for 37 days, again Flink 1.13.5, commit 
0ff28a7 @ 2021-12-14T23:26:04+01:00
{code:java}
org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
element.
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:388)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:145)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:58)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:284)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:271)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:161)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:118)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:82)
at 
org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:71)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:93)
at 
org.apache.flink.contrib.streaming.state.RocksDBPriorityQueueSetFactory.create(RocksDBPriorityQueueSetFactory.java:113)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:476)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.createTimerPriorityQueue(InternalTimeServiceManagerImpl.java:174)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.registerOrGetTimerService(InternalTimeServiceManagerImpl.java:158)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.getInternalTimerService(InternalTimeServiceManagerImpl.java:136)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:620)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:225)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.EOFException
at 
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
at org.apache.flink.types.StringValue.readString(StringValue.java:786)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
at 
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161)
at 
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:386)
... 25 more
 {code}


was (Author: maddisondavid):
I realise this doesn't move the conversation any further toward a solution, but 
just to add another data point to show that this happens in different 
environments.  Job has been running for 37 days, again Flink 13.5, commit 
0ff28a7 @ 2021-12-14T23:26:04+01:00
{code:java}
org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
element.
at 

[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2022-07-20 Thread David Maddison (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568967#comment-17568967
 ] 

David Maddison edited comment on FLINK-23886 at 7/20/22 10:14 AM:
--

I realise this doesn't move the conversation any further toward a solution, but 
just to add another data point to show that this happens in different 
environments.  Job has been running for 37 days, again Flink 13.5, commit 
0ff28a7 @ 2021-12-14T23:26:04+01:00
{code:java}
org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
element.
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:388)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:145)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:58)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:284)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:271)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:161)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:118)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:82)
at 
org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:71)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:93)
at 
org.apache.flink.contrib.streaming.state.RocksDBPriorityQueueSetFactory.create(RocksDBPriorityQueueSetFactory.java:113)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:476)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.createTimerPriorityQueue(InternalTimeServiceManagerImpl.java:174)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.registerOrGetTimerService(InternalTimeServiceManagerImpl.java:158)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.getInternalTimerService(InternalTimeServiceManagerImpl.java:136)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:620)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:225)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.EOFException
at 
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
at org.apache.flink.types.StringValue.readString(StringValue.java:786)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
at 
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161)
at 
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:386)
... 25 more
 {code}


was (Author: maddisondavid):
I realise this doesn't move the conversation any further to a solution, but 
just to add another data point to show that this happens in different 
environments.  Job has been running for 37 days, again Flink 13.5, commit 
0ff28a7 @ 2021-12-14T23:26:04+01:00
{code:java}
org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
element.
at 

[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2021-09-22 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17418400#comment-17418400
 ] 

Steven Zhen Wu edited comment on FLINK-23886 at 9/22/21, 3:58 PM:
--

Just to add another data point. We observed the same issue with Flink 1.13.2 in 
production. We don't know how to reproduce this type of tricky state corruption 
problem
```
org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
element.
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:388)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:145)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:58)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:285)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:271)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:161)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:118)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:82)
at 
org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:71)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:93)
at 
org.apache.flink.contrib.streaming.state.RocksDBPriorityQueueSetFactory.create(RocksDBPriorityQueueSetFactory.java:113)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:476)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.createTimerPriorityQueue(InternalTimeServiceManagerImpl.java:174)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.registerOrGetTimerService(InternalTimeServiceManagerImpl.java:160)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.getInternalTimerService(InternalTimeServiceManagerImpl.java:136)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:620)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:225)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.EOFException
at 
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
at org.apache.flink.types.StringValue.readString(StringValue.java:781)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
at 
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161)
at 
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:386)
... 25 more
```


was (Author: stevenz3wu):
Just to add another data point. We observed the same issue with Flink 1.13.2 in 
production. We don't know how to reproduce this type of tricky state corruption 
problem

> An exception is thrown out when recover job timers from checkpoint file
> ---
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.0, 1.11.3, 1.13.2
>Reporter: JING ZHANG
>

[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2021-09-22 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17418400#comment-17418400
 ] 

Steven Zhen Wu edited comment on FLINK-23886 at 9/22/21, 3:58 PM:
--

Just to add another data point. We observed the same issue with Flink 1.13.2 in 
production. We don't know how to reproduce this type of tricky state corruption 
problem

{code}
org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
element.
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:388)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:145)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:58)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:285)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:271)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:161)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:118)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:82)
at 
org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:71)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:93)
at 
org.apache.flink.contrib.streaming.state.RocksDBPriorityQueueSetFactory.create(RocksDBPriorityQueueSetFactory.java:113)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:476)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.createTimerPriorityQueue(InternalTimeServiceManagerImpl.java:174)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.registerOrGetTimerService(InternalTimeServiceManagerImpl.java:160)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.getInternalTimerService(InternalTimeServiceManagerImpl.java:136)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:620)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:225)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.EOFException
at 
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
at org.apache.flink.types.StringValue.readString(StringValue.java:781)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
at 
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161)
at 
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:386)
... 25 more
{code}


was (Author: stevenz3wu):
Just to add another data point. We observed the same issue with Flink 1.13.2 in 
production. We don't know how to reproduce this type of tricky state corruption 
problem
```
org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
element.
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:388)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:145)
at 

[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2021-08-31 Thread JING ZHANG (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17407803#comment-17407803
 ] 

JING ZHANG edited comment on FLINK-23886 at 9/1/21, 4:26 AM:
-

[~yunta] Sorry for late response.

We use Flink 1.10, 

> 1. reproduce it on a supported Flink version (Flink-1.13 is better)

Most of online jobs are using Flink-1.10 curently, we are are planning to 
upgrade to 1.13 in the next step. we would ping you after reproduce the problem 
in 1.13.

> 2. share the code or provide a minimalistic example that reproduces this 
> problem

We could not to provide a minimalistic example that reproduces this problem. 
This is a very occasional problem, even in the same job, it is difficult to 
appear.

The job pattern is very simple, Source is a kafka with a protobuf schema. KeyBy 
a int field, do a tumbling time window, then apply a simple 
`ProcessWindowFunction`.


was (Author: qingru zhang):
[~yunta] Sorry for late response.

We use Flink 1.10, 

> 1. reproduce it on a supported Flink version (Flink-1.13 is better)

There is a little difficulty here, because most of online jobs are using 
Flink-1.10 curently, we are planning to upgrade to 1.13 in the next step. 
Besides, even in Flink-1.10, this is a very occasional problem, even in the 
same job, it is difficult to appear.

> 2. share the code or provide a minimalistic example that reproduces this 
> problem

> An exception is thrown out when recover job timers from checkpoint file
> ---
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: JING ZHANG
>Priority: Major
> Attachments: image-2021-08-25-16-38-04-023.png, 
> image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, 
> image-2021-08-25-17-07-38-327.png
>
>
> A user report the bug in the [mailist. 
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
>  paste the content here.
> Setup Specifics:
>  Version: 1.6.2
>  RocksDB Map State
>  Timers stored in rocksdb
>   
>  When we have this job running for long periods of time like > 30 days, if 
> for some reason the job restarts, we encounter "Error while deserializing the 
> element". Is this a known issue fixed in later versions? I see some changes 
> to code for FLINK-10175, but we don't use any queryable state 
>   
>  Below is the stack trace
>   
>  org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> element.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
> at 
> 

[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2021-08-25 Thread JING ZHANG (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17404501#comment-17404501
 ] 

JING ZHANG edited comment on FLINK-23886 at 8/25/21, 2:18 PM:
--

[~arvid] [~yunta] Thanks a lot for information.

However, there is one weird point that could not explain. We expect the value 
of this state always be empty because TimerSerializer only writes the key to 
state, it never writes the value to state. But with Yun Tang's help, we recover 
the keyed state which stores rocksdb queue set, we find value of some timer 
state is not empty, which is unexpected. Besides we could deserialize the error 
timer state by the key type serializer and value type serializer of window 
state which is another state in the same Operator.

Could this caused by concurrent problem? 


was (Author: qingru zhang):
[~arvid] [~yunta] Thanks a lot for information.

However, there is one weird point that could not explain. We expect the value 
of this state always be empty because TimerSerializer only writes the key to 
state, it never writes the value to state. But with Yun Tang's help, we recover 
the keyed state which stores rocksdb queue set, we find value of some timer 
state is not empty, which is unexpected. Besides we could deserialize the error 
timer state by the key type serializer and value type serializer of window 
state which is another state in the same Operator.

Could this caused by concurrent problem? 

!image-2021-08-25-22-10-32-309.png|width=556,height=440!

> An exception is thrown out when recover job timers from checkpoint file
> ---
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: JING ZHANG
>Priority: Major
> Attachments: image-2021-08-25-16-38-04-023.png, 
> image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, 
> image-2021-08-25-17-07-38-327.png
>
>
> A user report the bug in the [mailist. 
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
>  paste the content here.
> Setup Specifics:
>  Version: 1.6.2
>  RocksDB Map State
>  Timers stored in rocksdb
>   
>  When we have this job running for long periods of time like > 30 days, if 
> for some reason the job restarts, we encounter "Error while deserializing the 
> element". Is this a known issue fixed in later versions? I see some changes 
> to code for FLINK-10175, but we don't use any queryable state 
>   
>  Below is the stack trace
>   
>  org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> element.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
> at 
> 

[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2021-08-25 Thread Arvid Heise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17404373#comment-17404373
 ] 

Arvid Heise edited comment on FLINK-23886 at 8/25/21, 11:49 AM:


{quote}
Flink actually synchronizes invocations of onTimer() and processElement() [see 
timers 
description|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#timers]
 via [mail box thread 
modle|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1665-L1671]
 in StreamTask. As far as I can see, I cannot see concurrent problem here.

Mabe [~arvid] could share more insights here.
{quote}
This is only true since Flink 1.11. Before that a few things were done 
concurrently, so I could imagine that there are bugs lingering. [~pnowojski] 
probably knows more.

However, this probably also means that none of the currently supported version 
would exhibit the same issue. So even if we manage to find the bug and fix it, 
there won't be a bugfix release; you'd have to apply a patch on your own.


was (Author: arvid):
{quote}
Flink actually synchronizes invocations of onTimer() and processElement() [see 
timers 
description|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#timers]
 via [mail box thread 
modle|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1665-L1671]
 in StreamTask. As far as I can see, I cannot see concurrent problem here.

Mabe [~arvid] could share more insights here.
{quote}
This is only true since Flink 1.11. Before that a few things were done 
concurrently, so I could imagine that there are bugs lingering. [~pnowojski] 
probably knows more.

However, this probably also means that none of the currently supported version 
would exhibit the same issue. So even we manage to find the bug and fix it, 
there won't be a bugfix release; you'd have to apply a patch on your own.

> An exception is thrown out when recover job timers from checkpoint file
> ---
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: JING ZHANG
>Priority: Major
> Attachments: image-2021-08-25-16-38-04-023.png, 
> image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, 
> image-2021-08-25-17-07-38-327.png
>
>
> A user report the bug in the [mailist. 
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
>  paste the content here.
> Setup Specifics:
>  Version: 1.6.2
>  RocksDB Map State
>  Timers stored in rocksdb
>   
>  When we have this job running for long periods of time like > 30 days, if 
> for some reason the job restarts, we encounter "Error while deserializing the 
> element". Is this a known issue fixed in later versions? I see some changes 
> to code for FLINK-10175, but we don't use any queryable state 
>   
>  Below is the stack trace
>   
>  org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> element.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at 
> 

[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2021-08-25 Thread xiangqiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17404312#comment-17404312
 ] 

xiangqiao edited comment on FLINK-23886 at 8/25/21, 9:37 AM:
-

Hello [~yunta] ,we download the  rocksdb data file of the failed subtask task 
locally, and traversed all record in *"_timer_state/processing_window-timers"* 
column family.

We found the following points:

1.the value length of problem records  is not 0. (for example,In the 
screenshot,"count:5" is normal record,"count:6" is problem record).

2.the key length of the problem record is 8 bytes less than that of the normal 
record.

3.The  key formate of  ListState is (keygroup,key,namespace),and the  key 
formate of Timer is (keygroup, timestamp, key, namespace).Their lengths differ 
by 8 bytes.

4.And we can use key formate of  ListState  to deserialize the key of the 
problem record key

*5.Therefore, it can be concluded that the data in the "window-contents" state 
is written to "_timer_state/processing_window-timers" state.*

 
{code:java}
@Test
public void testTimerRestore() throws Exception {
   List columnFamilyDescriptors = new ArrayList<>(1 + 
3);
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("window-contents".getBytes()));
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("_timer_state/event_window-timers".getBytes()));
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("_timer_state/processing_window-timers".getBytes()));

   List columnFamilyHandles = new ArrayList<>(1 + 3);

   DataInputDeserializer inputView = new DataInputDeserializer();
   int numPrefixBytes = 2;
   TypeSerializer keySerializer = IntSerializer.INSTANCE;
   TypeSerializer namespaceSerializer = new TimeWindow.Serializer();

   TimerSerializer timerSerializer = new TimerSerializer(keySerializer, 
namespaceSerializer);


   try (RocksDB db = RocksDB.open(PredefinedOptions.DEFAULT.createDBOptions(), 
"/Users/xiangqiao/timer_restore/51_500_db", columnFamilyDescriptors, 
columnFamilyHandles)) {
  RocksIteratorWrapper rocksIterator = 
RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandles.get(3));
  rocksIterator.seekToFirst();

  long count = 0;
  while (rocksIterator.isValid()) {
 System.out.println("-count:" + count++ + "-");
 rocksIterator.next();
 byte[] keyBytes = rocksIterator.key();
 byte[] valueBytes = rocksIterator.value();
 System.out.println("key len:" + keyBytes.length + "," + keyBytes);
 System.out.println("value len:" + valueBytes.length + "," + 
valueBytes);

 inputView.setBuffer(keyBytes, numPrefixBytes, keyBytes.length - 
numPrefixBytes);
 try {
System.out.println(timerSerializer.deserialize(inputView));
 } catch (Exception e) {
e.printStackTrace();
inputView.setBuffer(keyBytes, numPrefixBytes, keyBytes.length - 
numPrefixBytes);
System.out.println("exception retry,key:"+ 
keySerializer.deserialize(inputView) + ",namespace:" + 
namespaceSerializer.deserialize(inputView).toString());
 }
  }
   }
}
{code}
!image-2021-08-25-17-07-38-327.png!


was (Author: xiangqiao):
Hello [~yunta] ,we download the  rocksdb data file of the failed subtask task 
locally, and traversed all record in *"_timer_state/processing_window-timers"* 
column family.

We found the following points:

1.the value length of problem records  is not 0. (for example,In the 
screenshot,"count:5" is normal record,"count:6" is problem record).

2.the key length of the problem record is 8 bytes less than that of the normal 
record.

3.The  key formate of  ListState is (keygroup,key,namespace),and the  key 
formate of Timer is (keygroup, timestamp, key, namespace).Their lengths differ 
by 8 bytes.

4.And we can use key formate of  ListState  to deserialize the key of the 
problem record key

*5.Therefore, it can be concluded that the data in the "window-contents" state 
is written to "_timer_state/processing_window-timers" state.*

 
{code:java}
@Test
public void testTimerRestore() throws Exception {
   List columnFamilyDescriptors = new ArrayList<>(1 + 
3);
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("window-contents".getBytes()));
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("_timer_state/event_window-timers".getBytes()));
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("_timer_state/processing_window-timers".getBytes()));

   List columnFamilyHandles = new ArrayList<>(1 + 3);

   DataInputDeserializer inputView = new DataInputDeserializer();
   int numPrefixBytes = 2;
   TimerSerializer timerSerializer = new 
TimerSerializer(IntSerializer.INSTANCE, new 

[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2021-08-25 Thread xiangqiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17404312#comment-17404312
 ] 

xiangqiao edited comment on FLINK-23886 at 8/25/21, 9:34 AM:
-

Hello [~yunta] ,we download the  rocksdb data file of the failed subtask task 
locally, and traversed all record in *"_timer_state/processing_window-timers"* 
column family.

We found the following points:

1.the value length of problem records  is not 0. (for example,In the 
screenshot,"count:5" is normal record,"count:6" is problem record).

2.the key length of the problem record is 8 bytes less than that of the normal 
record.

3.The  key formate of  ListState is (keygroup,key,namespace),and the  key 
formate of Timer is (keygroup, timestamp, key, namespace).Their lengths differ 
by 8 bytes.

4.And we can use key formate of  ListState  to deserialize the key of the 
problem record key

*5.Therefore, it can be concluded that the data in the "window-contents" state 
is written to "_timer_state/processing_window-timers" state.*

 
{code:java}
@Test
public void testTimerRestore() throws Exception {
   List columnFamilyDescriptors = new ArrayList<>(1 + 
3);
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("window-contents".getBytes()));
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("_timer_state/event_window-timers".getBytes()));
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("_timer_state/processing_window-timers".getBytes()));

   List columnFamilyHandles = new ArrayList<>(1 + 3);

   DataInputDeserializer inputView = new DataInputDeserializer();
   int numPrefixBytes = 2;
   TimerSerializer timerSerializer = new 
TimerSerializer(IntSerializer.INSTANCE, new TimeWindow.Serializer());


   try (RocksDB db = RocksDB.open(PredefinedOptions.DEFAULT.createDBOptions(), 
"/Users/xiangqiao/timer_restore/51_500_db", columnFamilyDescriptors, 
columnFamilyHandles)) {
  RocksIteratorWrapper rocksIterator = 
RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandles.get(3));
  rocksIterator.seekToFirst();

  long count = 0;
  while (rocksIterator.isValid()) {
 System.out.println("-count:" + count++ + "-");
 rocksIterator.next();
 byte[] keyBytes = rocksIterator.key();
 byte[] valueBytes = rocksIterator.value();
 System.out.println("key len:" + keyBytes.length + "," + keyBytes);
 System.out.println("value len:" + valueBytes.length + "," + 
valueBytes);

 inputView.setBuffer(keyBytes, numPrefixBytes, keyBytes.length - 
numPrefixBytes);
 try {
System.out.println(timerSerializer.deserialize(inputView));
 } catch (Exception e) {
e.printStackTrace();
 }
  }
   }
}
{code}
!image-2021-08-25-17-07-38-327.png!


was (Author: xiangqiao):
We download the  rocksdb data file of the failed subtask task locally, and 
traversed all record in *"_timer_state/processing_window-timers"* column family.

We found the following points:

1.the value length of problem records  is not 0. (for example,In the 
screenshot,"count:5" is normal record,"count:6" is problem record).

2.the key length of the problem record is 8 bytes less than that of the normal 
record.

3.The  key formate of  ListState is (keygroup,key,namespace),and the  key 
formate of Timer is (keygroup, timestamp, key, namespace).Their lengths differ 
by 8 bytes.

4.And we can use key formate of  ListState  to deserialize the key of the 
problem record key

*5.Therefore, it can be concluded that the data in the "window-contents" state 
is written to "_timer_state/processing_window-timers" state.*

 
{code:java}
@Test
public void testTimerRestore() throws Exception {
   List columnFamilyDescriptors = new ArrayList<>(1 + 
3);
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("window-contents".getBytes()));
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("_timer_state/event_window-timers".getBytes()));
   columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("_timer_state/processing_window-timers".getBytes()));

   List columnFamilyHandles = new ArrayList<>(1 + 3);

   DataInputDeserializer inputView = new DataInputDeserializer();
   int numPrefixBytes = 2;
   TimerSerializer timerSerializer = new 
TimerSerializer(IntSerializer.INSTANCE, new TimeWindow.Serializer());


   try (RocksDB db = RocksDB.open(PredefinedOptions.DEFAULT.createDBOptions(), 
"/Users/xiangqiao/timer_restore/51_500_db", columnFamilyDescriptors, 
columnFamilyHandles)) {
  RocksIteratorWrapper rocksIterator = 
RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandles.get(3));
  rocksIterator.seekToFirst();

  long count = 0;
  while 

[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2021-08-24 Thread xiangqiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17403682#comment-17403682
 ] 

xiangqiao edited comment on FLINK-23886 at 8/24/21, 9:29 AM:
-

 thank you [~yunta] 
 Add the common characteristics of problematic jobs, hoping to help locate the 
problem:
 1.Occasionally, some checkpoints have problems,not all
 2.DataStream job
 3.Use processing time timer
 4.Use com.twitter.chill.protobuf.ProtobufSerializer
 5.Use RocksDB Incremental Checkpoint
 


was (Author: xiangqiao):
 thank you [~yunta] 
 Add the common characteristics of problematic jobs, hoping to help locate the 
problem:
 1.Occasionally, some checkpoints have problems,not all
 2.use processing time timer
 3.use com.twitter.chill.protobuf.ProtobufSerializer
 4.use RocksDB Incremental Checkpoint

> An exception is thrown out when recover job timers from checkpoint file
> ---
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: JING ZHANG
>Priority: Major
>
> A user report the bug in the [mailist. 
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
>  paste the content here.
> Setup Specifics:
>  Version: 1.6.2
>  RocksDB Map State
>  Timers stored in rocksdb
>   
>  When we have this job running for long periods of time like > 30 days, if 
> for some reason the job restarts, we encounter "Error while deserializing the 
> element". Is this a known issue fixed in later versions? I see some changes 
> to code for FLINK-10175, but we don't use any queryable state 
>   
>  Below is the stack trace
>   
>  org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> element.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> at 
> 

[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2021-08-24 Thread xiangqiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17403682#comment-17403682
 ] 

xiangqiao edited comment on FLINK-23886 at 8/24/21, 9:02 AM:
-

 thank you [~yunta] 
 Add the common characteristics of problematic jobs, hoping to help locate the 
problem:
 1.Occasionally, some checkpoints have problems,not all
 2.use processing time timer
 3.use com.twitter.chill.protobuf.ProtobufSerializer
 4.use RocksDB Incremental Checkpoint


was (Author: xiangqiao):
 thank you [~yunta] 
Add the common characteristics of problematic tasks, hoping to help locate the 
problem:
1.Occasionally, some checkpoints have problems,not all
2.use processing time timer
3.use com.twitter.chill.protobuf.ProtobufSerializer
4.use RocksDB Incremental Checkpoint

> An exception is thrown out when recover job timers from checkpoint file
> ---
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: JING ZHANG
>Priority: Major
>
> A user report the bug in the [mailist. 
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
>  paste the content here.
> Setup Specifics:
>  Version: 1.6.2
>  RocksDB Map State
>  Timers stored in rocksdb
>   
>  When we have this job running for long periods of time like > 30 days, if 
> for some reason the job restarts, we encounter "Error while deserializing the 
> element". Is this a known issue fixed in later versions? I see some changes 
> to code for FLINK-10175, but we don't use any queryable state 
>   
>  Below is the stack trace
>   
>  org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> element.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> at 
>