Hi!

In case where new state type is incompatible with old one from savepoint, we 
get IllegalStateException via org.apache.flink.util.Preconditions.checkState 
from checkStateMetaInfo:
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220

For example, if an updated app uses reducing state but restoring from 
aggregating state, we can get the following exception:
java.lang.IllegalStateException: Incompatible key/value state types. Was 
[AGGREGATING], registered with [REDUCING].
        at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
        at 
org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
        at 
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
        at java.lang.Thread.run(Thread.java:748)


The topic of this email is not about how to solve this issue, rather discussing 
about different type of exception to consistently inform users of incompatible 
state. I think StateMigrationException seems reasonable for this, as other 
types of state incompatibility is notified by that exception – 
key/namespace/value. Although the existing use of StateMigrationException is 
mostly from compatibility check from type serializer snapshot, its semantic 
seems to notify developers of state incompatibility in general. The type 
mismatch from state descriptor (like aggregating to reducing) seems more like 
upfront/fast check of incompatibility, so it looks better using 
StateMigrationException rather than too general IllegalStateException.

A bit of context behind this thought is we internally consider 
IllegalStateException from flink core code base as sort of runtime state 
violation (e.g., internal state is inconsistent), so we generally expect this 
to be not from user’s mistake. Apparently, the above incompatible type is 
frequent mistakes from developers when restoring from savepoint. We would like 
to narrow such a specific type of user error down to a certain type of 
exception, not being thrown as general java exception.

I would like to hear how you think about changing the exception type to 
StateMigrationException in checkStateMetaInfo. If it’s not reasonable, I wonder 
why and what alternative would be considered (e.g., another or new type).

Thanks,
Hwanju


Reply via email to