[
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17603137#comment-17603137
]
Tommy Schnabel commented on FLINK-23886:
----------------------------------------
Hi there, reporting in from Twilio Segment that we've seen this in Flink 1.14.4:
{code:java}
Caused by: java.io.EOFException
at
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
at org.apache.flink.types.StringValue.readString(StringValue.java:781)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:126)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
at
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161)
at
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43)
at
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:386)
... 26 more {code}
We remediated by forking and patching Flink to drop the corrupted state. We
then ran data through our system to recreate any timers that _could_ have been
lost. All in all we dropped 84 corrupted timers.
Our patch included:
* Catching and rethrowing a new custom exception when we see an EOFException
in TimerSerializer#deserialize
* Modifying RocksDBCachingPriorityQueueSet to catch our new exception and skip
to the next element
* Some null handling since we're explicitly returning null in cases where we
detect corrupted state
I've attached the diff from the commit in our fork in case it is helpful for
anyone else. We're not planning to actively run that code, but instead keep it
around as a remediation tool if we were to ever run into this again.
We're happy to help and provide more details should they be useful!
[^segment-drop-corrupted-timer-state.diff]
> An exception is thrown out when recover job timers from checkpoint file
> -----------------------------------------------------------------------
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.10.0, 1.11.3, 1.13.2
> Reporter: Jing Zhang
> Priority: Major
> Attachments: image-2021-08-25-16-38-04-023.png,
> image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png,
> image-2021-08-25-17-07-38-327.png, segment-drop-corrupted-timer-state.diff
>
>
> A user report the bug in the [mailist.
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
> paste the content here.
> Setup Specifics:
> Version: 1.6.2
> RocksDB Map State
> Timers stored in rocksdb
>
> When we have this job running for long periods of time like > 30 days, if
> for some reason the job restarts, we encounter "Error while deserializing the
> element". Is this a known issue fixed in later versions? I see some changes
> to code for FLINK-10175, but we don't use any queryable state
>
> Below is the stack trace
>
> org.apache.flink.util.FlinkRuntimeException: Error while deserializing the
> element.
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.<init>(KeyGroupPartitionedPriorityQueue.java:89)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)
> ... 20 more
--
This message was sent by Atlassian Jira
(v8.20.10#820010)