Hi Hwanju,

thanks for starting this discussion. I pretty much like the idea to be able
to distinguish between user code faults and framework faults. This also
helps in deciding on the recovery action to take. Hence, I would be +1 for
using certain exceptions consistently in order to indicate the origin of a
fault where possible. The big challenge I see is to do it consistently and
to share this contract with the community so that future changes take this
into account. I don't have a good idea other than being very diligent.

Concerning the concrete usage of the StateMigrationException, I fear that
we have cases where we throw this exception if the serializers are
incompatible (user fault) but also when serialization fails (IOException
which can be a user (corrupt checkpoint) as well as framework fault
(filesystem hickup)) (
https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java#L213).
Consequently, we might first need to clean the usage of
StateMigrationException up before being able to use it consistently.
Moreover, in the case of reading a checkpoint I am not entirely sure how to
discern all possible errors into user and framework faults.

I am pulling in Gordon who worked on this part in the past and might be
able to give us some more details on the usage of the
StateMigrationException.

Cheers,
Till

On Wed, Oct 7, 2020 at 12:27 AM Kim, Hwanju <hwanj...@amazon.com.invalid>
wrote:

> Hi!
>
> In case where new state type is incompatible with old one from savepoint,
> we get IllegalStateException via
> org.apache.flink.util.Preconditions.checkState from checkStateMetaInfo:
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220
>
> For example, if an updated app uses reducing state but restoring from
> aggregating state, we can get the following exception:
> java.lang.IllegalStateException: Incompatible key/value state types. Was
> [AGGREGATING], registered with [REDUCING].
>         at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>         at
> org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216)
>         at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520)
>         at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
>         at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
>         at
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>         at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>         at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568)
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
>         at java.lang.Thread.run(Thread.java:748)
>
>
> The topic of this email is not about how to solve this issue, rather
> discussing about different type of exception to consistently inform users
> of incompatible state. I think StateMigrationException seems reasonable for
> this, as other types of state incompatibility is notified by that exception
> – key/namespace/value. Although the existing use of StateMigrationException
> is mostly from compatibility check from type serializer snapshot, its
> semantic seems to notify developers of state incompatibility in general.
> The type mismatch from state descriptor (like aggregating to reducing)
> seems more like upfront/fast check of incompatibility, so it looks better
> using StateMigrationException rather than too general IllegalStateException.
>
> A bit of context behind this thought is we internally consider
> IllegalStateException from flink core code base as sort of runtime state
> violation (e.g., internal state is inconsistent), so we generally expect
> this to be not from user’s mistake. Apparently, the above incompatible type
> is frequent mistakes from developers when restoring from savepoint. We
> would like to narrow such a specific type of user error down to a certain
> type of exception, not being thrown as general java exception.
>
> I would like to hear how you think about changing the exception type to
> StateMigrationException in checkStateMetaInfo. If it’s not reasonable, I
> wonder why and what alternative would be considered (e.g., another or new
> type).
>
> Thanks,
> Hwanju
>
>
>

Reply via email to