[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568967#comment-17568967 ] David Maddison edited comment on FLINK-23886 at 7/20/22 10:17 AM: -- I realise this doesn't move the conversation any further toward a solution, but just to add another data point to show that this happens in different environments. Job has been running for 37 days, again Flink 1.13.5, commit 0ff28a7 @ 2021-12-14T23:26:04+01:00 {code:java} org.apache.flink.util.FlinkRuntimeException: Error while deserializing the element. at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:388) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:145) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:58) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:284) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:271) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:161) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:118) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:82) at org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:71) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:93) at org.apache.flink.contrib.streaming.state.RocksDBPriorityQueueSetFactory.create(RocksDBPriorityQueueSetFactory.java:113) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:476) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.createTimerPriorityQueue(InternalTimeServiceManagerImpl.java:174) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.registerOrGetTimerService(InternalTimeServiceManagerImpl.java:158) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.getInternalTimerService(InternalTimeServiceManagerImpl.java:136) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:620) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:225) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.io.EOFException at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329) at org.apache.flink.types.StringValue.readString(StringValue.java:786) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:386) ... 25 more {code} was (Author: maddisondavid): I realise this doesn't move the conversation any further toward a solution, but just to add another data point to show that this happens in different environments. Job has been running for 37 days, again Flink 13.5, commit 0ff28a7 @ 2021-12-14T23:26:04+01:00 {code:java} org.apache.flink.util.FlinkRuntimeException: Error while deserializing the element. at
[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568967#comment-17568967 ] David Maddison edited comment on FLINK-23886 at 7/20/22 10:14 AM: -- I realise this doesn't move the conversation any further toward a solution, but just to add another data point to show that this happens in different environments. Job has been running for 37 days, again Flink 13.5, commit 0ff28a7 @ 2021-12-14T23:26:04+01:00 {code:java} org.apache.flink.util.FlinkRuntimeException: Error while deserializing the element. at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:388) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:145) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:58) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:284) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:271) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:161) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:118) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:82) at org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:71) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:93) at org.apache.flink.contrib.streaming.state.RocksDBPriorityQueueSetFactory.create(RocksDBPriorityQueueSetFactory.java:113) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:476) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.createTimerPriorityQueue(InternalTimeServiceManagerImpl.java:174) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.registerOrGetTimerService(InternalTimeServiceManagerImpl.java:158) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.getInternalTimerService(InternalTimeServiceManagerImpl.java:136) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:620) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:225) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.io.EOFException at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329) at org.apache.flink.types.StringValue.readString(StringValue.java:786) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:386) ... 25 more {code} was (Author: maddisondavid): I realise this doesn't move the conversation any further to a solution, but just to add another data point to show that this happens in different environments. Job has been running for 37 days, again Flink 13.5, commit 0ff28a7 @ 2021-12-14T23:26:04+01:00 {code:java} org.apache.flink.util.FlinkRuntimeException: Error while deserializing the element. at
[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17418400#comment-17418400 ] Steven Zhen Wu edited comment on FLINK-23886 at 9/22/21, 3:58 PM: -- Just to add another data point. We observed the same issue with Flink 1.13.2 in production. We don't know how to reproduce this type of tricky state corruption problem ``` org.apache.flink.util.FlinkRuntimeException: Error while deserializing the element. at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:388) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:145) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:58) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:285) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:271) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:161) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:118) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:82) at org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:71) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:93) at org.apache.flink.contrib.streaming.state.RocksDBPriorityQueueSetFactory.create(RocksDBPriorityQueueSetFactory.java:113) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:476) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.createTimerPriorityQueue(InternalTimeServiceManagerImpl.java:174) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.registerOrGetTimerService(InternalTimeServiceManagerImpl.java:160) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.getInternalTimerService(InternalTimeServiceManagerImpl.java:136) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:620) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:225) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.io.EOFException at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329) at org.apache.flink.types.StringValue.readString(StringValue.java:781) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:386) ... 25 more ``` was (Author: stevenz3wu): Just to add another data point. We observed the same issue with Flink 1.13.2 in production. We don't know how to reproduce this type of tricky state corruption problem > An exception is thrown out when recover job timers from checkpoint file > --- > > Key: FLINK-23886 > URL: https://issues.apache.org/jira/browse/FLINK-23886 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.10.0, 1.11.3, 1.13.2 >Reporter: JING ZHANG >
[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17418400#comment-17418400 ] Steven Zhen Wu edited comment on FLINK-23886 at 9/22/21, 3:58 PM: -- Just to add another data point. We observed the same issue with Flink 1.13.2 in production. We don't know how to reproduce this type of tricky state corruption problem {code} org.apache.flink.util.FlinkRuntimeException: Error while deserializing the element. at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:388) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:145) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:58) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:285) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:271) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:161) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:118) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:82) at org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:71) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:93) at org.apache.flink.contrib.streaming.state.RocksDBPriorityQueueSetFactory.create(RocksDBPriorityQueueSetFactory.java:113) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:476) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.createTimerPriorityQueue(InternalTimeServiceManagerImpl.java:174) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.registerOrGetTimerService(InternalTimeServiceManagerImpl.java:160) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.getInternalTimerService(InternalTimeServiceManagerImpl.java:136) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:620) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:225) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.io.EOFException at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329) at org.apache.flink.types.StringValue.readString(StringValue.java:781) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:386) ... 25 more {code} was (Author: stevenz3wu): Just to add another data point. We observed the same issue with Flink 1.13.2 in production. We don't know how to reproduce this type of tricky state corruption problem ``` org.apache.flink.util.FlinkRuntimeException: Error while deserializing the element. at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:388) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:145) at
[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17407803#comment-17407803 ] JING ZHANG edited comment on FLINK-23886 at 9/1/21, 4:26 AM: - [~yunta] Sorry for late response. We use Flink 1.10, > 1. reproduce it on a supported Flink version (Flink-1.13 is better) Most of online jobs are using Flink-1.10 curently, we are are planning to upgrade to 1.13 in the next step. we would ping you after reproduce the problem in 1.13. > 2. share the code or provide a minimalistic example that reproduces this > problem We could not to provide a minimalistic example that reproduces this problem. This is a very occasional problem, even in the same job, it is difficult to appear. The job pattern is very simple, Source is a kafka with a protobuf schema. KeyBy a int field, do a tumbling time window, then apply a simple `ProcessWindowFunction`. was (Author: qingru zhang): [~yunta] Sorry for late response. We use Flink 1.10, > 1. reproduce it on a supported Flink version (Flink-1.13 is better) There is a little difficulty here, because most of online jobs are using Flink-1.10 curently, we are planning to upgrade to 1.13 in the next step. Besides, even in Flink-1.10, this is a very occasional problem, even in the same job, it is difficult to appear. > 2. share the code or provide a minimalistic example that reproduces this > problem > An exception is thrown out when recover job timers from checkpoint file > --- > > Key: FLINK-23886 > URL: https://issues.apache.org/jira/browse/FLINK-23886 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: JING ZHANG >Priority: Major > Attachments: image-2021-08-25-16-38-04-023.png, > image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, > image-2021-08-25-17-07-38-327.png > > > A user report the bug in the [mailist. > |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I > paste the content here. > Setup Specifics: > Version: 1.6.2 > RocksDB Map State > Timers stored in rocksdb > > When we have this job running for long periods of time like > 30 days, if > for some reason the job restarts, we encounter "Error while deserializing the > element". Is this a known issue fixed in later versions? I see some changes > to code for FLINK-10175, but we don't use any queryable state > > Below is the stack trace > > org.apache.flink.util.FlinkRuntimeException: Error while deserializing the > element. > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85) > at > org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764) > at >
[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17404501#comment-17404501 ] JING ZHANG edited comment on FLINK-23886 at 8/25/21, 2:18 PM: -- [~arvid] [~yunta] Thanks a lot for information. However, there is one weird point that could not explain. We expect the value of this state always be empty because TimerSerializer only writes the key to state, it never writes the value to state. But with Yun Tang's help, we recover the keyed state which stores rocksdb queue set, we find value of some timer state is not empty, which is unexpected. Besides we could deserialize the error timer state by the key type serializer and value type serializer of window state which is another state in the same Operator. Could this caused by concurrent problem? was (Author: qingru zhang): [~arvid] [~yunta] Thanks a lot for information. However, there is one weird point that could not explain. We expect the value of this state always be empty because TimerSerializer only writes the key to state, it never writes the value to state. But with Yun Tang's help, we recover the keyed state which stores rocksdb queue set, we find value of some timer state is not empty, which is unexpected. Besides we could deserialize the error timer state by the key type serializer and value type serializer of window state which is another state in the same Operator. Could this caused by concurrent problem? !image-2021-08-25-22-10-32-309.png|width=556,height=440! > An exception is thrown out when recover job timers from checkpoint file > --- > > Key: FLINK-23886 > URL: https://issues.apache.org/jira/browse/FLINK-23886 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: JING ZHANG >Priority: Major > Attachments: image-2021-08-25-16-38-04-023.png, > image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, > image-2021-08-25-17-07-38-327.png > > > A user report the bug in the [mailist. > |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I > paste the content here. > Setup Specifics: > Version: 1.6.2 > RocksDB Map State > Timers stored in rocksdb > > When we have this job running for long periods of time like > 30 days, if > for some reason the job restarts, we encounter "Error while deserializing the > element". Is this a known issue fixed in later versions? I see some changes > to code for FLINK-10175, but we don't use any queryable state > > Below is the stack trace > > org.apache.flink.util.FlinkRuntimeException: Error while deserializing the > element. > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85) > at > org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87) > at >
[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17404373#comment-17404373 ] Arvid Heise edited comment on FLINK-23886 at 8/25/21, 11:49 AM: {quote} Flink actually synchronizes invocations of onTimer() and processElement() [see timers description|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#timers] via [mail box thread modle|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1665-L1671] in StreamTask. As far as I can see, I cannot see concurrent problem here. Mabe [~arvid] could share more insights here. {quote} This is only true since Flink 1.11. Before that a few things were done concurrently, so I could imagine that there are bugs lingering. [~pnowojski] probably knows more. However, this probably also means that none of the currently supported version would exhibit the same issue. So even if we manage to find the bug and fix it, there won't be a bugfix release; you'd have to apply a patch on your own. was (Author: arvid): {quote} Flink actually synchronizes invocations of onTimer() and processElement() [see timers description|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#timers] via [mail box thread modle|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1665-L1671] in StreamTask. As far as I can see, I cannot see concurrent problem here. Mabe [~arvid] could share more insights here. {quote} This is only true since Flink 1.11. Before that a few things were done concurrently, so I could imagine that there are bugs lingering. [~pnowojski] probably knows more. However, this probably also means that none of the currently supported version would exhibit the same issue. So even we manage to find the bug and fix it, there won't be a bugfix release; you'd have to apply a patch on your own. > An exception is thrown out when recover job timers from checkpoint file > --- > > Key: FLINK-23886 > URL: https://issues.apache.org/jira/browse/FLINK-23886 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: JING ZHANG >Priority: Major > Attachments: image-2021-08-25-16-38-04-023.png, > image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, > image-2021-08-25-17-07-38-327.png > > > A user report the bug in the [mailist. > |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I > paste the content here. > Setup Specifics: > Version: 1.6.2 > RocksDB Map State > Timers stored in rocksdb > > When we have this job running for long periods of time like > 30 days, if > for some reason the job restarts, we encounter "Error while deserializing the > element". Is this a known issue fixed in later versions? I see some changes > to code for FLINK-10175, but we don't use any queryable state > > Below is the stack trace > > org.apache.flink.util.FlinkRuntimeException: Error while deserializing the > element. > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85) > at > org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792) > at >
[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17404312#comment-17404312 ] xiangqiao edited comment on FLINK-23886 at 8/25/21, 9:37 AM: - Hello [~yunta] ,we download the rocksdb data file of the failed subtask task locally, and traversed all record in *"_timer_state/processing_window-timers"* column family. We found the following points: 1.the value length of problem records is not 0. (for example,In the screenshot,"count:5" is normal record,"count:6" is problem record). 2.the key length of the problem record is 8 bytes less than that of the normal record. 3.The key formate of ListState is (keygroup,key,namespace),and the key formate of Timer is (keygroup, timestamp, key, namespace).Their lengths differ by 8 bytes. 4.And we can use key formate of ListState to deserialize the key of the problem record key *5.Therefore, it can be concluded that the data in the "window-contents" state is written to "_timer_state/processing_window-timers" state.* {code:java} @Test public void testTimerRestore() throws Exception { List columnFamilyDescriptors = new ArrayList<>(1 + 3); columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); columnFamilyDescriptors.add(new ColumnFamilyDescriptor("window-contents".getBytes())); columnFamilyDescriptors.add(new ColumnFamilyDescriptor("_timer_state/event_window-timers".getBytes())); columnFamilyDescriptors.add(new ColumnFamilyDescriptor("_timer_state/processing_window-timers".getBytes())); List columnFamilyHandles = new ArrayList<>(1 + 3); DataInputDeserializer inputView = new DataInputDeserializer(); int numPrefixBytes = 2; TypeSerializer keySerializer = IntSerializer.INSTANCE; TypeSerializer namespaceSerializer = new TimeWindow.Serializer(); TimerSerializer timerSerializer = new TimerSerializer(keySerializer, namespaceSerializer); try (RocksDB db = RocksDB.open(PredefinedOptions.DEFAULT.createDBOptions(), "/Users/xiangqiao/timer_restore/51_500_db", columnFamilyDescriptors, columnFamilyHandles)) { RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandles.get(3)); rocksIterator.seekToFirst(); long count = 0; while (rocksIterator.isValid()) { System.out.println("-count:" + count++ + "-"); rocksIterator.next(); byte[] keyBytes = rocksIterator.key(); byte[] valueBytes = rocksIterator.value(); System.out.println("key len:" + keyBytes.length + "," + keyBytes); System.out.println("value len:" + valueBytes.length + "," + valueBytes); inputView.setBuffer(keyBytes, numPrefixBytes, keyBytes.length - numPrefixBytes); try { System.out.println(timerSerializer.deserialize(inputView)); } catch (Exception e) { e.printStackTrace(); inputView.setBuffer(keyBytes, numPrefixBytes, keyBytes.length - numPrefixBytes); System.out.println("exception retry,key:"+ keySerializer.deserialize(inputView) + ",namespace:" + namespaceSerializer.deserialize(inputView).toString()); } } } } {code} !image-2021-08-25-17-07-38-327.png! was (Author: xiangqiao): Hello [~yunta] ,we download the rocksdb data file of the failed subtask task locally, and traversed all record in *"_timer_state/processing_window-timers"* column family. We found the following points: 1.the value length of problem records is not 0. (for example,In the screenshot,"count:5" is normal record,"count:6" is problem record). 2.the key length of the problem record is 8 bytes less than that of the normal record. 3.The key formate of ListState is (keygroup,key,namespace),and the key formate of Timer is (keygroup, timestamp, key, namespace).Their lengths differ by 8 bytes. 4.And we can use key formate of ListState to deserialize the key of the problem record key *5.Therefore, it can be concluded that the data in the "window-contents" state is written to "_timer_state/processing_window-timers" state.* {code:java} @Test public void testTimerRestore() throws Exception { List columnFamilyDescriptors = new ArrayList<>(1 + 3); columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); columnFamilyDescriptors.add(new ColumnFamilyDescriptor("window-contents".getBytes())); columnFamilyDescriptors.add(new ColumnFamilyDescriptor("_timer_state/event_window-timers".getBytes())); columnFamilyDescriptors.add(new ColumnFamilyDescriptor("_timer_state/processing_window-timers".getBytes())); List columnFamilyHandles = new ArrayList<>(1 + 3); DataInputDeserializer inputView = new DataInputDeserializer(); int numPrefixBytes = 2; TimerSerializer timerSerializer = new TimerSerializer(IntSerializer.INSTANCE, new
[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17404312#comment-17404312 ] xiangqiao edited comment on FLINK-23886 at 8/25/21, 9:34 AM: - Hello [~yunta] ,we download the rocksdb data file of the failed subtask task locally, and traversed all record in *"_timer_state/processing_window-timers"* column family. We found the following points: 1.the value length of problem records is not 0. (for example,In the screenshot,"count:5" is normal record,"count:6" is problem record). 2.the key length of the problem record is 8 bytes less than that of the normal record. 3.The key formate of ListState is (keygroup,key,namespace),and the key formate of Timer is (keygroup, timestamp, key, namespace).Their lengths differ by 8 bytes. 4.And we can use key formate of ListState to deserialize the key of the problem record key *5.Therefore, it can be concluded that the data in the "window-contents" state is written to "_timer_state/processing_window-timers" state.* {code:java} @Test public void testTimerRestore() throws Exception { List columnFamilyDescriptors = new ArrayList<>(1 + 3); columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); columnFamilyDescriptors.add(new ColumnFamilyDescriptor("window-contents".getBytes())); columnFamilyDescriptors.add(new ColumnFamilyDescriptor("_timer_state/event_window-timers".getBytes())); columnFamilyDescriptors.add(new ColumnFamilyDescriptor("_timer_state/processing_window-timers".getBytes())); List columnFamilyHandles = new ArrayList<>(1 + 3); DataInputDeserializer inputView = new DataInputDeserializer(); int numPrefixBytes = 2; TimerSerializer timerSerializer = new TimerSerializer(IntSerializer.INSTANCE, new TimeWindow.Serializer()); try (RocksDB db = RocksDB.open(PredefinedOptions.DEFAULT.createDBOptions(), "/Users/xiangqiao/timer_restore/51_500_db", columnFamilyDescriptors, columnFamilyHandles)) { RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandles.get(3)); rocksIterator.seekToFirst(); long count = 0; while (rocksIterator.isValid()) { System.out.println("-count:" + count++ + "-"); rocksIterator.next(); byte[] keyBytes = rocksIterator.key(); byte[] valueBytes = rocksIterator.value(); System.out.println("key len:" + keyBytes.length + "," + keyBytes); System.out.println("value len:" + valueBytes.length + "," + valueBytes); inputView.setBuffer(keyBytes, numPrefixBytes, keyBytes.length - numPrefixBytes); try { System.out.println(timerSerializer.deserialize(inputView)); } catch (Exception e) { e.printStackTrace(); } } } } {code} !image-2021-08-25-17-07-38-327.png! was (Author: xiangqiao): We download the rocksdb data file of the failed subtask task locally, and traversed all record in *"_timer_state/processing_window-timers"* column family. We found the following points: 1.the value length of problem records is not 0. (for example,In the screenshot,"count:5" is normal record,"count:6" is problem record). 2.the key length of the problem record is 8 bytes less than that of the normal record. 3.The key formate of ListState is (keygroup,key,namespace),and the key formate of Timer is (keygroup, timestamp, key, namespace).Their lengths differ by 8 bytes. 4.And we can use key formate of ListState to deserialize the key of the problem record key *5.Therefore, it can be concluded that the data in the "window-contents" state is written to "_timer_state/processing_window-timers" state.* {code:java} @Test public void testTimerRestore() throws Exception { List columnFamilyDescriptors = new ArrayList<>(1 + 3); columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); columnFamilyDescriptors.add(new ColumnFamilyDescriptor("window-contents".getBytes())); columnFamilyDescriptors.add(new ColumnFamilyDescriptor("_timer_state/event_window-timers".getBytes())); columnFamilyDescriptors.add(new ColumnFamilyDescriptor("_timer_state/processing_window-timers".getBytes())); List columnFamilyHandles = new ArrayList<>(1 + 3); DataInputDeserializer inputView = new DataInputDeserializer(); int numPrefixBytes = 2; TimerSerializer timerSerializer = new TimerSerializer(IntSerializer.INSTANCE, new TimeWindow.Serializer()); try (RocksDB db = RocksDB.open(PredefinedOptions.DEFAULT.createDBOptions(), "/Users/xiangqiao/timer_restore/51_500_db", columnFamilyDescriptors, columnFamilyHandles)) { RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandles.get(3)); rocksIterator.seekToFirst(); long count = 0; while
[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17403682#comment-17403682 ] xiangqiao edited comment on FLINK-23886 at 8/24/21, 9:29 AM: - thank you [~yunta] Add the common characteristics of problematic jobs, hoping to help locate the problem: 1.Occasionally, some checkpoints have problems,not all 2.DataStream job 3.Use processing time timer 4.Use com.twitter.chill.protobuf.ProtobufSerializer 5.Use RocksDB Incremental Checkpoint was (Author: xiangqiao): thank you [~yunta] Add the common characteristics of problematic jobs, hoping to help locate the problem: 1.Occasionally, some checkpoints have problems,not all 2.use processing time timer 3.use com.twitter.chill.protobuf.ProtobufSerializer 4.use RocksDB Incremental Checkpoint > An exception is thrown out when recover job timers from checkpoint file > --- > > Key: FLINK-23886 > URL: https://issues.apache.org/jira/browse/FLINK-23886 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: JING ZHANG >Priority: Major > > A user report the bug in the [mailist. > |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I > paste the content here. > Setup Specifics: > Version: 1.6.2 > RocksDB Map State > Timers stored in rocksdb > > When we have this job running for long periods of time like > 30 days, if > for some reason the job restarts, we encounter "Error while deserializing the > element". Is this a known issue fixed in later versions? I see some changes > to code for FLINK-10175, but we don't use any queryable state > > Below is the stack trace > > org.apache.flink.util.FlinkRuntimeException: Error while deserializing the > element. > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85) > at > org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.EOFException > at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) > at org.apache.flink.types.StringValue.readString(StringValue.java:769) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) > at >
[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17403682#comment-17403682 ] xiangqiao edited comment on FLINK-23886 at 8/24/21, 9:02 AM: - thank you [~yunta] Add the common characteristics of problematic jobs, hoping to help locate the problem: 1.Occasionally, some checkpoints have problems,not all 2.use processing time timer 3.use com.twitter.chill.protobuf.ProtobufSerializer 4.use RocksDB Incremental Checkpoint was (Author: xiangqiao): thank you [~yunta] Add the common characteristics of problematic tasks, hoping to help locate the problem: 1.Occasionally, some checkpoints have problems,not all 2.use processing time timer 3.use com.twitter.chill.protobuf.ProtobufSerializer 4.use RocksDB Incremental Checkpoint > An exception is thrown out when recover job timers from checkpoint file > --- > > Key: FLINK-23886 > URL: https://issues.apache.org/jira/browse/FLINK-23886 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: JING ZHANG >Priority: Major > > A user report the bug in the [mailist. > |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I > paste the content here. > Setup Specifics: > Version: 1.6.2 > RocksDB Map State > Timers stored in rocksdb > > When we have this job running for long periods of time like > 30 days, if > for some reason the job restarts, we encounter "Error while deserializing the > element". Is this a known issue fixed in later versions? I see some changes > to code for FLINK-10175, but we don't use any queryable state > > Below is the stack trace > > org.apache.flink.util.FlinkRuntimeException: Error while deserializing the > element. > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85) > at > org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.EOFException > at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) > at org.apache.flink.types.StringValue.readString(StringValue.java:769) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) > at >