Antti-Kaikkonen edited a comment on pull request #13773:
URL: https://github.com/apache/flink/pull/13773#issuecomment-717293281


   I tried to build this from source and got an error when trying to restore a 
stateful function from a savepoint:
   
   1)
   ```
   git clone https://github.com/tzulitai/flink.git
   cd flink
   git checkout FLINK-19748-backport_1.11
   mvn clean package -DskipTests
   ```
   2)
   add to flink-conf.yaml:
   ```
   classloader.parent-first-patterns.additional: 
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
   #optionally use rocksdb
   state.backend: rocksdb
   taskmanager.numberOfTaskSlots: 2
   parallelism.default: 2
   ```
   
   2)
   Run https://github.com/Antti-Kaikkonen/FlinkStatefunCountTo1M with 
parallelism 2
   
   3)
   create a savepoint
   
   4)
   try to restore from the savepoint and the error is thrown in the 
**feedback-union -> functions** task:
   ```
   java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.StreamCorruptedException: invalid stream header: 008E0A20
        at 
java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:918)
        at java.io.ObjectInputStream.<init>(ObjectInputStream.java:376)
        at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:69)
        at 
org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.<init>(InstantiationUtil.java:227)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:572)
        at 
org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$InternalTimersSnapshotReaderPreVersioned.restoreKeyAndNamespaceSerializers(InternalTimersSnapshotReaderWriters.java:308)
        at 
org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$AbstractInternalTimersSnapshotReader.readTimersSnapshot(InternalTimersSnapshotReaderWriters.java:261)
        at 
org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.read(InternalTimerServiceSerializationProxy.java:115)
        at 
org.apache.flink.core.io.PostVersionedIOReadableWritable.read(PostVersionedIOReadableWritable.java:76)
        at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:217)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:252)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:181)
        ... 9 more
   ```
   I'm getting the same error with the default state backend and the rocksdb 
state backend. When I tried with rocksdb backend and heap timers I get a 
different error already when creating a savepoint.
   
   **Edit: I realized that I should have probably built from 
https://github.com/apache/flink/pull/13761 instead of this pull request. I'm 
testing it next**
   
   **Edit2:** Apparently my previous attempt was already with 
FLINK-19741-backport_1.11. I have now tested both FLINK-19741-backport_1.11 and 
FLINK-19748-backport_1.11 and both of them throw a different error in the 
**feedback-union -> functions** task. The error with FLINK-19748-backport_1.11 
is now is now updated in the description and the error with 
FLINK-19741-backport_1.11 is below:
   ```
   Exception: Exception while creating StreamOperatorStateContext.
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:220)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.IOException: java.io.IOException: position out of bounds
        at 
org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:251)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:179)
        ... 9 more
   Caused by: java.io.IOException: position out of bounds
        at 
org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:458)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:411)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:244)
        ... 10 more
   ```
   which is the same error as in my original bug description 
https://issues.apache.org/jira/projects/FLINK/issues/FLINK-19692


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to