Hi!
In case where new state type is incompatible with old one from savepoint, we
get IllegalStateException via org.apache.flink.util.Preconditions.checkState
from checkStateMetaInfo:
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220
For example, if an updated app uses reducing state but restoring from
aggregating state, we can get the following exception:
java.lang.IllegalStateException: Incompatible key/value state types. Was
[AGGREGATING], registered with [REDUCING].
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
at java.lang.Thread.run(Thread.java:748)
The topic of this email is not about how to solve this issue, rather discussing
about different type of exception to consistently inform users of incompatible
state. I think StateMigrationException seems reasonable for this, as other
types of state incompatibility is notified by that exception –
key/namespace/value. Although the existing use of StateMigrationException is
mostly from compatibility check from type serializer snapshot, its semantic
seems to notify developers of state incompatibility in general. The type
mismatch from state descriptor (like aggregating to reducing) seems more like
upfront/fast check of incompatibility, so it looks better using
StateMigrationException rather than too general IllegalStateException.
A bit of context behind this thought is we internally consider
IllegalStateException from flink core code base as sort of runtime state
violation (e.g., internal state is inconsistent), so we generally expect this
to be not from user’s mistake. Apparently, the above incompatible type is
frequent mistakes from developers when restoring from savepoint. We would like
to narrow such a specific type of user error down to a certain type of
exception, not being thrown as general java exception.
I would like to hear how you think about changing the exception type to
StateMigrationException in checkStateMetaInfo. If it’s not reasonable, I wonder
why and what alternative would be considered (e.g., another or new type).
Thanks,
Hwanju