[ https://issues.apache.org/jira/browse/FLINK-34171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17809586#comment-17809586 ]
Ken Burford commented on FLINK-34171: ------------------------------------- It's intermittently reproducible in 1.17.0 with both aligned and unaligned checkpoints, but I haven't attempted to see if it's present in a previous version. We had made multiple parallelism changes under 1.14.2 with this job without issue, so either we were very likely for over a year, or the issue appeared in a future version. I've found that we're more likely to reproduce it by letting our input fall behind, restarting the job, and then immediately taking a savepoint. Essentially, if we can increase the size of the "processed in-flight data", we increase the probability that a savepoint will exhibit this failure on restart when the parallelism has been changed. As a workaround, I'm concatenating the parallelism onto the RichAsyncFunction's uid, essentially forcing the state to clear when we make a parallelism change. That would seem to confirm my suspicion that this has to do with in-flight messages in that operator not being redistributed in the same way as the upstream KeyedProcessFunction. Were there any changes made to how RichAsyncFunction state is persisted/redistributed between 1.14 and 1.17? > Cannot restore from savepoint when increasing parallelism of operator using > reinterpretAsKeyedStream and RichAsyncFunction > -------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-34171 > URL: https://issues.apache.org/jira/browse/FLINK-34171 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Runtime / Checkpointing, Runtime / > State Backends > Affects Versions: 1.17.0 > Reporter: Ken Burford > Priority: Major > > We recently upgraded from Flink 1.14.2 to 1.17.0. Our job has not materially > changed beyond a few feature changes (enabling snapshot compression, > unaligned checkpoints), but we're seeing the following exception when > attempting to adjust the parallelism of our job up or down: > {code:java} > java.lang.RuntimeException: Exception occurred while setting the current key > context. > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478) > at > org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:94) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:64) > at > org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue$Segment.emitCompleted(UnorderedStreamElementQueue.java:272) > at > org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.emitCompletedElement(UnorderedStreamElementQueue.java:159) > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:393) > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$1800(AsyncWaitOperator.java:92) > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:621) > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:602) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:712) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:750) > Caused by: java.lang.IllegalArgumentException: Key group 30655 is not in > KeyGroupRange{startKeyGroup=19346, endKeyGroup=19360}. Unless you're directly > using low level state access APIs, this is most likely caused by > non-deterministic shuffle key (hashCode and equals implementation). > at > org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37) > at > org.apache.flink.runtime.state.heap.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:77) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:250) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371) > ... 29 more{code} > > We're seeing this in an operator where we make use of > DataStreamUtils::reinterpretAsKeyedStream for collecting multiple tasks into > a single operator chain. However, each task takes the same data structure as > input with an immutable key (represented as a string) which all use the same > exact KeySelector instance. > However, one pattern we're using here is a chain of: > KeyedProcessFunction --> RichAsyncFunction --> > reinterpretAsKeyedStream(KeyedProcessFunction) > ...and I suspect that this might have something to do the way that the > buffered in-flight data from the RichAsyncFunction is redistributed during > re-scaling. We've observed that this failure is seemingly non-deterministic > during re-scaling, but the probability of encountering it (from our > admittedly anecdotal and limited testing) is reduced, but not eliminated, > when we disable unaligned checkpoints. (Note that we first take a savepoint, > restore with unaligned checkpoints disabled, then take another savepoint > which we then use to adjust the parallelism to keep "persisted in-flight > data" out the savepoint.) > We've never had any issues in the past with this under 1.14, so we're > wondering if this is due to unaligned checkpointing, or possibly a > regression/change in behavior since then. And if it is due to unaligned > checkpointing, any thoughts on why disabling it hasn't seemed to address the > problem? > Update: I took a savepoint and confirmed that it could not be restored under > a new parallelism. I then removed the RichAsyncFunction and restarted the job > (using --allowNonRestoredState) successfully. I was then able to take another > savepoint and restart the job with the RichAsyncFunction re-inserted into the > DS. -- This message was sent by Atlassian Jira (v8.20.10#820010)