[ 
https://issues.apache.org/jira/browse/FLINK-19692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17217384#comment-17217384
 ] 

Antti Kaikkonen edited comment on FLINK-19692 at 10/20/20, 7:34 AM:
--------------------------------------------------------------------

I did some testing with regular embedded stateful functions and got the same 
error. So either this is a bug in stateful functions or there is something 
wrong with my setup. Here is the program that I created for testing: 
[https://github.com/Antti-Kaikkonen/FlinkStatefunCountTo1M]

I don't remember having this issue with statefun 2.1 and flink 1.10 but I'm not 
100% sure.

edit: I forgot to mention that the error is thrown even when the program 
doesn't use any persistent states.


was (Author: antti-kaikkonen):
I did some testing with regular embedded stateful functions and got the same 
error. So either this is a bug in stateful functions or there is something 
wrong with my setup. Here is the program that I created for testing: 
https://github.com/Antti-Kaikkonen/FlinkStatefunCountTo1M

I don't remember having this issue with statefun 2.1 and flink 1.10 but I'm not 
100% sure.

> Can't restore feedback channel from savepoint
> ---------------------------------------------
>
>                 Key: FLINK-19692
>                 URL: https://issues.apache.org/jira/browse/FLINK-19692
>             Project: Flink
>          Issue Type: Bug
>          Components: Stateful Functions
>    Affects Versions: statefun-2.2.0
>            Reporter: Antti Kaikkonen
>            Priority: Blocker
>
> When using the new statefun-flink-datastream integration the following error 
> is thrown by the *feedback -> union* task when trying to restore from a 
> savepoint:
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: java.io.IOException: position out of bounds
>     at 
> org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
>     ... 9 more
> Caused by: java.io.IOException: position out of bounds
>     at 
> org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
>     ... 10 more
> {code}
>  The error is only thrown when the feedback channel has been used. 
> I have tested with the [example 
> application|https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java]
>  and the error is thrown only if it is modified to actually use the feedback 
> channel. I simply modified the invoke method to sometimes forward the 
> greeting to a random name: 
> {code:java}
> @Override
> public void invoke(Context context, Object input) {
>   int seen = seenCount.updateAndGet(MyFunction::increment);
>   context.send(GREETINGS, String.format("Hello %s at the %d-th time", input, 
> seen));
>   String[] names = {"Stephan", "Igal", "Gordon", "Seth", "Marta"};
>   ThreadLocalRandom random = ThreadLocalRandom.current();
>   int index = random.nextInt(names.length);
>   final String name2 = names[index];
>   if (random.nextDouble() < 0.5) context.send(new Address(GREET, name2), 
> input);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to