Hi,

Thanks for the discussions so far. I agree that the use of the
StateMigrationException and it's semantics is rather undefined and
inconsistent as of the current state.

On Thu, Oct 8, 2020 at 7:38 AM Kim, Hwanju <hwanj...@amazon.com> wrote:

> Hi Till,
>
> Thanks for the opinion and insight. I think your concern is valid, as
> still nontrivial number of issues would be somewhat subtle to confidently
> say it's framework or user error. Serialization-related issue is one of
> them, as even user-triggered issue leads to exceptions in many places from
> the framework without clear boundary. The approach so far has been not
> based on theory and completeness, but more on data-driven and likelihood.
> When it's confusing, we are err on the side of saying it's framework issue.
> But what you pointed out about StateMigrationException from
> migrateSerializedValue could be one example that is ambiguous. General idea
> about such ambiguous issue is if it's one-off hiccup, misclassification
> effect is low, but if it's constant (e.g., filesystem not working well),
> its impact is high but there should be more exceptions thrown from many
> places not just from a single point of state migration and expecting other
> dominant exceptions to be notified as framework issue.
>
> If I understand it correctly,
> 1) From the perspective of semantic, state type mismatch (e.g.,
> aggregating vs. reducing) can be thrown as StateMigrationException, as it
> is clearly from user with incompatible state.
> 2) Subtle StateMigrationException throws (like during migration not from
> compatibility check), we can refine that catching lower-level exceptions in
> a finer grain manner (possibly using a different exception).
>

This observation is accurate. To summarize the discussions so far, we can
categorize all failures on restore related to state compatibility into two
categories:
1) incompatible state serializers
(TypeSerializerSchemaCompatibility#isIncompatible()) or types (ListState vs
MapState), and
2) if all compatibility checks pass and migration is required, subtle
filesystem hiccups / corrupted checkpoints files / incorrect serializer
implementations can still surface during the process of a migration.

I'm wondering if we should instead include a different exception type for
1).

For 1), a new exception type named IncompatibleStateDescriptorException
(TBD) seems like a better name, as in that case, the state can never be
migrated and is always a user-failure.
The name immediately implies that user-provided properties for state
access, e.g. serializers / state type, are incompatible with checkpointed
data.

For 2), we can continue to use StateMigrationException to wrap lower-level
exceptions. The intent here isn't to classify whether the error was
user-caused or a framework issue, but to simply relate the exception to the
process of state migration.

I agree that these can be handled separately, by starting off first with
differentiating the above scenarios with different exception types /
clearing the use of StateMigrationException.

Cheers,
Gordon


>
> I think the two issues would be independent, so either can be handled
> separately as long as the semantic of StateMigrationException is clear. Out
> of the two, 1) seems straightforward, so if agreed, I can create an issue
> to work on. Nonetheless, I would like to hear more about Gordon's thought.
>
> Thanks,
> Hwanju
>
> On 10/7/20, 1:39 AM, "Till Rohrmann" <trohrm...@apache.org> wrote:
>
>     CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
>     Hi Hwanju,
>
>     thanks for starting this discussion. I pretty much like the idea to be
> able
>     to distinguish between user code faults and framework faults. This also
>     helps in deciding on the recovery action to take. Hence, I would be +1
> for
>     using certain exceptions consistently in order to indicate the origin
> of a
>     fault where possible. The big challenge I see is to do it consistently
> and
>     to share this contract with the community so that future changes take
> this
>     into account. I don't have a good idea other than being very diligent.
>
>     Concerning the concrete usage of the StateMigrationException, I fear
> that
>     we have cases where we throw this exception if the serializers are
>     incompatible (user fault) but also when serialization fails
> (IOException
>     which can be a user (corrupt checkpoint) as well as framework fault
>     (filesystem hickup)) (
>
> https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java#L213
> ).
>     Consequently, we might first need to clean the usage of
>     StateMigrationException up before being able to use it consistently.
>     Moreover, in the case of reading a checkpoint I am not entirely sure
> how to
>     discern all possible errors into user and framework faults.
>
>     I am pulling in Gordon who worked on this part in the past and might be
>     able to give us some more details on the usage of the
>     StateMigrationException.
>
>     Cheers,
>     Till
>
>     On Wed, Oct 7, 2020 at 12:27 AM Kim, Hwanju
> <hwanj...@amazon.com.invalid>
>     wrote:
>
>     > Hi!
>     >
>     > In case where new state type is incompatible with old one from
> savepoint,
>     > we get IllegalStateException via
>     > org.apache.flink.util.Preconditions.checkState from
> checkStateMetaInfo:
>     >
>     >
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220
>     >
>     > For example, if an updated app uses reducing state but restoring from
>     > aggregating state, we can get the following exception:
>     > java.lang.IllegalStateException: Incompatible key/value state types.
> Was
>     > [AGGREGATING], registered with [REDUCING].
>     >         at
>     >
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>     >         at
>     >
> org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216)
>     >         at
>     >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520)
>     >         at
>     >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
>     >         at
>     >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
>     >         at
>     >
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>     >         at
>     >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>     >         at
>     >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
>     >         at
>     >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568)
>     >         at
>     >
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
>     >         at
>     >
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438)
>     >         at
>     >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298)
>     >         at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
>     >         at java.lang.Thread.run(Thread.java:748)
>     >
>     >
>     > The topic of this email is not about how to solve this issue, rather
>     > discussing about different type of exception to consistently inform
> users
>     > of incompatible state. I think StateMigrationException seems
> reasonable for
>     > this, as other types of state incompatibility is notified by that
> exception
>     > – key/namespace/value. Although the existing use of
> StateMigrationException
>     > is mostly from compatibility check from type serializer snapshot, its
>     > semantic seems to notify developers of state incompatibility in
> general.
>     > The type mismatch from state descriptor (like aggregating to
> reducing)
>     > seems more like upfront/fast check of incompatibility, so it looks
> better
>     > using StateMigrationException rather than too general
> IllegalStateException.
>     >
>     > A bit of context behind this thought is we internally consider
>     > IllegalStateException from flink core code base as sort of runtime
> state
>     > violation (e.g., internal state is inconsistent), so we generally
> expect
>     > this to be not from user’s mistake. Apparently, the above
> incompatible type
>     > is frequent mistakes from developers when restoring from savepoint.
> We
>     > would like to narrow such a specific type of user error down to a
> certain
>     > type of exception, not being thrown as general java exception.
>     >
>     > I would like to hear how you think about changing the exception type
> to
>     > StateMigrationException in checkStateMetaInfo. If it’s not
> reasonable, I
>     > wonder why and what alternative would be considered (e.g., another
> or new
>     > type).
>     >
>     > Thanks,
>     > Hwanju
>     >
>     >
>     >
>
>

Reply via email to