[ https://issues.apache.org/jira/browse/FLINK-19692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216967#comment-17216967 ]
Antti Kaikkonen commented on FLINK-19692: ----------------------------------------- Using rocksdb did not help. I also noticed that this error is only thrown when parallelism is more than 1 because the feedback channel is not used with parallelism 1. I was able to reproduce with a fresh 1.11.2 single node installation with *taskmanager.numberOfTaskSlots: 2* in flink-conf.yaml. By the way I have not yet tested regular stateful functions with 1.11.2 so I'm not sure if this is only a problem with statefun-flink-datastream or flink-statefun in general. > Can't restore feedback channel from savepoint > --------------------------------------------- > > Key: FLINK-19692 > URL: https://issues.apache.org/jira/browse/FLINK-19692 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / State Processor, Stateful > Functions > Affects Versions: statefun-2.2.0 > Reporter: Antti Kaikkonen > Priority: Blocker > > When using the new statefun-flink-datastream integration the following error > is thrown by the *feedback -> union* task when trying to restore from a > savepoint: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: java.io.IOException: position out of bounds > at > org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167) > ... 9 more > Caused by: java.io.IOException: position out of bounds > at > org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228) > ... 10 more > {code} > The error is only thrown when the feedback channel has been used. > I have tested with the [example > application|https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java] > and the error is thrown only if it is modified to actually use the feedback > channel. I simply modified the invoke method to sometimes forward the > greeting to a random name: > {code:java} > @Override > public void invoke(Context context, Object input) { > int seen = seenCount.updateAndGet(MyFunction::increment); > context.send(GREETINGS, String.format("Hello %s at the %d-th time", input, > seen)); > String[] names = {"Stephan", "Igal", "Gordon", "Seth", "Marta"}; > ThreadLocalRandom random = ThreadLocalRandom.current(); > int index = random.nextInt(names.length); > final String name2 = names[index]; > if (random.nextDouble() < 0.5) context.send(new Address(GREET, name2), > input); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)