Tzu-Li (Gordon) Tai created FLINK-19748:
-------------------------------------------

             Summary: StateFun's UnboundedFeedbackLogger should call 
startNewKeyGroup for all assigned key groups
                 Key: FLINK-19748
                 URL: https://issues.apache.org/jira/browse/FLINK-19748
             Project: Flink
          Issue Type: Bug
          Components: Stateful Functions
    Affects Versions: statefun-2.2.0, statefun-2.1.0, statefun-2.0.0
            Reporter: Tzu-Li (Gordon) Tai


Currently, on commit the {{UnboundedFeedbackLogger}} only calls 
{{startNewKeyGroup}} on the raw keyed stream for key groups that actually have 
logged messages:
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java#L102

This means that it might skip some key groups, if a key group doesn't have any 
logged messages.

This doesn't conform with the expected usage of Flink's 
{{KeyedStateCheckpointOutputStream}}, where it expects that for ALL key groups 
within the range, {{startNewKeyGroup}} needs to be invoked.
The reason for this is that underneath, calling {{startNewKeyGroup}} would also 
record the starting stream offset position for the key group.
However, when iterating through a raw keyed stream, the key group offsets 
iterator {{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} doesn't take into 
account that some key groups weren't written and therefore do not have offsets 
defined, and the streams will be seeked to incorrect positions.

Ultimately, if some key groups were skipped while writing to the raw keyed 
stream, the following error will be thrown on restore:
{code}
java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: java.io.IOException: position out of bounds
        at 
org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
        ... 9 more
Caused by: java.io.IOException: position out of bounds
        at 
org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
        ... 10 more
{code}

**Possible solutions**

There are 2 possible solutions, either by fixing in StateFun or in Flink:

- This can be fixed in StateFun by ensuring that the feedback logger starts a 
new key group for all key groups in range, by doing:
{code}
for (int keyGroupId : rawKeyedStream.getKeyGroupList()) {
    rawKeyedStream.startNewKeyGroup(keyGroupId);
    // write to stream if there are logged messages for this key group
}
{code}

- Or, alternatively, we change the 
{{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} in Flink to skip key groups 
that don't have a defined offset (i.e. {{startNewKeyGroup}} wasn't called for 
these key groups).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to