[ 
https://issues.apache.org/jira/browse/FLINK-19741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-19741:
----------------------------------------
    Description: 
**Diagnosis**

Currently, when restoring a {{InternalTimeServiceManager}}, we always attempt 
to read from the provided raw keyed state streams (using 
{{InternalTimerServiceSerializationProxy}}):
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L117

This is incorrect, since we don't write with the 
{{InternalTimerServiceSerializationProxy}} if the timers do not require legacy 
synchronous snapshots:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L192
(we currently only require that when users use RocksDB backend + heap timers).

Therefore, the {{InternalTimeServiceManager}} can fail to be created on restore 
due to corrupt reads in the case where:
* a checkpoint was taken where {{useLegacySynchronousSnapshots}} is false 
(hence nothing was written, and the time service manager does not use the raw 
keyed stream)
* the raw keyed stream is used elsewhere (e.g. in the Flink application's user 
code)
* on restore from the checkpoint, {{InternalTimeServiceManagerImpl.create()}} 
attempts to read from the raw keyed stream with the 
{{InternalTimerServiceSerializationProxy}}.

Full stack trace (with Flink 1.11.1):
{code}
java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: java.io.IOException: position out of bounds
        at 
org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
        ... 9 more
Caused by: java.io.IOException: position out of bounds
        at 
org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
        ... 10 more
{code}

**Reproducing**

- Have an application with any operator that uses and writes to raw keyed state 
streams
- Use heap backend + any timer factory or RocksDB backend + RocksDB timers
- Take a savepoint or wait for a checkpoint, and trigger a restore

**Proposed Fix**

The fix would be to also respect the {{useLegacySynchronousSnapshots}} flag in:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L231

  was:
Currently, when restoring a {{InternalTimeServiceManager}}, we always attempt 
to read from the provided raw keyed state streams (using 
{{InternalTimerServiceSerializationProxy}}):
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L117

This is incorrect, since we don't write with the 
{{InternalTimerServiceSerializationProxy}} if the timers do not require legacy 
synchronous snapshots:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L192
(we currently only require that when users use RocksDB backend + heap timers).

Therefore, the {{InternalTimeServiceManager}} can fail to be created on restore 
due to corrupt reads in the case where:
* a checkpoint was taken where {{useLegacySynchronousSnapshots}} is false 
(hence nothing was written, and the time service manager does not use the raw 
keyed stream)
* the raw keyed stream is used elsewhere (e.g. in the Flink application's user 
code)
* on restore from the checkpoint, {{InternalTimeServiceManagerImpl.create()}} 
attempts to read from the raw keyed stream with the 
{{InternalTimerServiceSerializationProxy}}.

The fix would be to also respect the {{useLegacySynchronousSnapshots}} flag in:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L231


> InternalTimeServiceManager fails to restore due to corrupt reads if there are 
> other users of raw keyed state streams
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-19741
>                 URL: https://issues.apache.org/jira/browse/FLINK-19741
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.9.3, 1.10.2, 1.11.2
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>
> **Diagnosis**
> Currently, when restoring a {{InternalTimeServiceManager}}, we always attempt 
> to read from the provided raw keyed state streams (using 
> {{InternalTimerServiceSerializationProxy}}):
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L117
> This is incorrect, since we don't write with the 
> {{InternalTimerServiceSerializationProxy}} if the timers do not require 
> legacy synchronous snapshots:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L192
> (we currently only require that when users use RocksDB backend + heap timers).
> Therefore, the {{InternalTimeServiceManager}} can fail to be created on 
> restore due to corrupt reads in the case where:
> * a checkpoint was taken where {{useLegacySynchronousSnapshots}} is false 
> (hence nothing was written, and the time service manager does not use the raw 
> keyed stream)
> * the raw keyed stream is used elsewhere (e.g. in the Flink application's 
> user code)
> * on restore from the checkpoint, {{InternalTimeServiceManagerImpl.create()}} 
> attempts to read from the raw keyed stream with the 
> {{InternalTimerServiceSerializationProxy}}.
> Full stack trace (with Flink 1.11.1):
> {code}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: java.io.IOException: position out of bounds
>       at 
> org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
>       ... 9 more
> Caused by: java.io.IOException: position out of bounds
>       at 
> org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
>       ... 10 more
> {code}
> **Reproducing**
> - Have an application with any operator that uses and writes to raw keyed 
> state streams
> - Use heap backend + any timer factory or RocksDB backend + RocksDB timers
> - Take a savepoint or wait for a checkpoint, and trigger a restore
> **Proposed Fix**
> The fix would be to also respect the {{useLegacySynchronousSnapshots}} flag 
> in:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L231



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to