[ https://issues.apache.org/jira/browse/FLINK-19741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tzu-Li (Gordon) Tai updated FLINK-19741: ---------------------------------------- Description: h2. *Diagnosis* Currently, when restoring a {{InternalTimeServiceManager}}, we always attempt to read from the provided raw keyed state streams (using {{InternalTimerServiceSerializationProxy}}): https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L117 This is incorrect, since we don't write with the {{InternalTimerServiceSerializationProxy}} if the timers do not require legacy synchronous snapshots: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L192 (we currently only require that when users use RocksDB backend + heap timers). Therefore, the {{InternalTimeServiceManager}} can fail to be created on restore due to corrupt reads in the case where: * a checkpoint was taken where {{useLegacySynchronousSnapshots}} is false (hence nothing was written, and the time service manager does not use the raw keyed stream) * the raw keyed stream is used elsewhere (e.g. in the Flink application's user code) * on restore from the checkpoint, {{InternalTimeServiceManagerImpl.create()}} attempts to read from the raw keyed stream with the {{InternalTimerServiceSerializationProxy}}. Full error stack trace (with Flink 1.11.1): {code} 2020-10-21 13:16:51 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readUTF(DataInputStream.java:609) at java.io.DataInputStream.readUTF(DataInputStream.java:564) at org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.read(InternalTimerServiceSerializationProxy.java:110) at org.apache.flink.core.io.PostVersionedIOReadableWritable.read(PostVersionedIOReadableWritable.java:76) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:217) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:234) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167) ... 9 more {code} h2. *Reproducing* - Have an application with any operator that uses and writes to raw keyed state streams - Use heap backend + any timer factory or RocksDB backend + RocksDB timers - Take a savepoint or wait for a checkpoint, and trigger a restore h2. *Proposed Fix* The fix would be to also respect the {{useLegacySynchronousSnapshots}} flag in: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L231 was: h2. *Diagnosis* Currently, when restoring a {{InternalTimeServiceManager}}, we always attempt to read from the provided raw keyed state streams (using {{InternalTimerServiceSerializationProxy}}): https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L117 This is incorrect, since we don't write with the {{InternalTimerServiceSerializationProxy}} if the timers do not require legacy synchronous snapshots: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L192 (we currently only require that when users use RocksDB backend + heap timers). Therefore, the {{InternalTimeServiceManager}} can fail to be created on restore due to corrupt reads in the case where: * a checkpoint was taken where {{useLegacySynchronousSnapshots}} is false (hence nothing was written, and the time service manager does not use the raw keyed stream) * the raw keyed stream is used elsewhere (e.g. in the Flink application's user code) * on restore from the checkpoint, {{InternalTimeServiceManagerImpl.create()}} attempts to read from the raw keyed stream with the {{InternalTimerServiceSerializationProxy}}. Full error stack trace (with Flink 1.11.1): {code} java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: java.io.IOException: position out of bounds at org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167) ... 9 more Caused by: java.io.IOException: position out of bounds at org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228) ... 10 more {code} h2. *Reproducing* - Have an application with any operator that uses and writes to raw keyed state streams - Use heap backend + any timer factory or RocksDB backend + RocksDB timers - Take a savepoint or wait for a checkpoint, and trigger a restore h2. *Proposed Fix* The fix would be to also respect the {{useLegacySynchronousSnapshots}} flag in: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L231 > InternalTimeServiceManager fails to restore due to corrupt reads if there are > other users of raw keyed state streams > -------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-19741 > URL: https://issues.apache.org/jira/browse/FLINK-19741 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.9.3, 1.10.2, 1.11.2 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.12.0, 1.11.3 > > > h2. *Diagnosis* > Currently, when restoring a {{InternalTimeServiceManager}}, we always attempt > to read from the provided raw keyed state streams (using > {{InternalTimerServiceSerializationProxy}}): > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L117 > This is incorrect, since we don't write with the > {{InternalTimerServiceSerializationProxy}} if the timers do not require > legacy synchronous snapshots: > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L192 > (we currently only require that when users use RocksDB backend + heap timers). > Therefore, the {{InternalTimeServiceManager}} can fail to be created on > restore due to corrupt reads in the case where: > * a checkpoint was taken where {{useLegacySynchronousSnapshots}} is false > (hence nothing was written, and the time service manager does not use the raw > keyed stream) > * the raw keyed stream is used elsewhere (e.g. in the Flink application's > user code) > * on restore from the checkpoint, {{InternalTimeServiceManagerImpl.create()}} > attempts to read from the raw keyed stream with the > {{InternalTimerServiceSerializationProxy}}. > Full error stack trace (with Flink 1.11.1): > {code} > 2020-10-21 13:16:51 > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.EOFException > at java.io.DataInputStream.readFully(DataInputStream.java:197) > at java.io.DataInputStream.readUTF(DataInputStream.java:609) > at java.io.DataInputStream.readUTF(DataInputStream.java:564) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.read(InternalTimerServiceSerializationProxy.java:110) > at > org.apache.flink.core.io.PostVersionedIOReadableWritable.read(PostVersionedIOReadableWritable.java:76) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:217) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:234) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167) > ... 9 more > {code} > h2. *Reproducing* > - Have an application with any operator that uses and writes to raw keyed > state streams > - Use heap backend + any timer factory or RocksDB backend + RocksDB timers > - Take a savepoint or wait for a checkpoint, and trigger a restore > h2. *Proposed Fix* > The fix would be to also respect the {{useLegacySynchronousSnapshots}} flag > in: > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L231 -- This message was sent by Atlassian Jira (v8.3.4#803005)