Thanks Gordon.

If its semantic is not (yet) clearly defined, I think we can make it a bit more 
clear.
Incrementally, I think any exception that's more specific than general Java 
exception (e.g., IllegalStateException) would be at least making information 
better, where or not there is clearly settled guideline for specific exceptions 
like StateMigrationException.

Thanks,
Hwanju

On 10/7/20, 11:53 PM, "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> wrote:

    CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



    Hi,

    Thanks for the discussions so far. I agree that the use of the
    StateMigrationException and it's semantics is rather undefined and
    inconsistent as of the current state.

    On Thu, Oct 8, 2020 at 7:38 AM Kim, Hwanju <hwanj...@amazon.com> wrote:

    > Hi Till,
    >
    > Thanks for the opinion and insight. I think your concern is valid, as
    > still nontrivial number of issues would be somewhat subtle to confidently
    > say it's framework or user error. Serialization-related issue is one of
    > them, as even user-triggered issue leads to exceptions in many places from
    > the framework without clear boundary. The approach so far has been not
    > based on theory and completeness, but more on data-driven and likelihood.
    > When it's confusing, we are err on the side of saying it's framework 
issue.
    > But what you pointed out about StateMigrationException from
    > migrateSerializedValue could be one example that is ambiguous. General 
idea
    > about such ambiguous issue is if it's one-off hiccup, misclassification
    > effect is low, but if it's constant (e.g., filesystem not working well),
    > its impact is high but there should be more exceptions thrown from many
    > places not just from a single point of state migration and expecting other
    > dominant exceptions to be notified as framework issue.
    >
    > If I understand it correctly,
    > 1) From the perspective of semantic, state type mismatch (e.g.,
    > aggregating vs. reducing) can be thrown as StateMigrationException, as it
    > is clearly from user with incompatible state.
    > 2) Subtle StateMigrationException throws (like during migration not from
    > compatibility check), we can refine that catching lower-level exceptions 
in
    > a finer grain manner (possibly using a different exception).
    >

    This observation is accurate. To summarize the discussions so far, we can
    categorize all failures on restore related to state compatibility into two
    categories:
    1) incompatible state serializers
    (TypeSerializerSchemaCompatibility#isIncompatible()) or types (ListState vs
    MapState), and
    2) if all compatibility checks pass and migration is required, subtle
    filesystem hiccups / corrupted checkpoints files / incorrect serializer
    implementations can still surface during the process of a migration.

    I'm wondering if we should instead include a different exception type for
    1).

    For 1), a new exception type named IncompatibleStateDescriptorException
    (TBD) seems like a better name, as in that case, the state can never be
    migrated and is always a user-failure.
    The name immediately implies that user-provided properties for state
    access, e.g. serializers / state type, are incompatible with checkpointed
    data.

    For 2), we can continue to use StateMigrationException to wrap lower-level
    exceptions. The intent here isn't to classify whether the error was
    user-caused or a framework issue, but to simply relate the exception to the
    process of state migration.

    I agree that these can be handled separately, by starting off first with
    differentiating the above scenarios with different exception types /
    clearing the use of StateMigrationException.

    Cheers,
    Gordon


    >
    > I think the two issues would be independent, so either can be handled
    > separately as long as the semantic of StateMigrationException is clear. 
Out
    > of the two, 1) seems straightforward, so if agreed, I can create an issue
    > to work on. Nonetheless, I would like to hear more about Gordon's thought.
    >
    > Thanks,
    > Hwanju
    >
    > On 10/7/20, 1:39 AM, "Till Rohrmann" <trohrm...@apache.org> wrote:
    >
    >     CAUTION: This email originated from outside of the organization. Do
    > not click links or open attachments unless you can confirm the sender and
    > know the content is safe.
    >
    >
    >
    >     Hi Hwanju,
    >
    >     thanks for starting this discussion. I pretty much like the idea to be
    > able
    >     to distinguish between user code faults and framework faults. This 
also
    >     helps in deciding on the recovery action to take. Hence, I would be +1
    > for
    >     using certain exceptions consistently in order to indicate the origin
    > of a
    >     fault where possible. The big challenge I see is to do it consistently
    > and
    >     to share this contract with the community so that future changes take
    > this
    >     into account. I don't have a good idea other than being very diligent.
    >
    >     Concerning the concrete usage of the StateMigrationException, I fear
    > that
    >     we have cases where we throw this exception if the serializers are
    >     incompatible (user fault) but also when serialization fails
    > (IOException
    >     which can be a user (corrupt checkpoint) as well as framework fault
    >     (filesystem hickup)) (
    >
    > 
https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java#L213
    > ).
    >     Consequently, we might first need to clean the usage of
    >     StateMigrationException up before being able to use it consistently.
    >     Moreover, in the case of reading a checkpoint I am not entirely sure
    > how to
    >     discern all possible errors into user and framework faults.
    >
    >     I am pulling in Gordon who worked on this part in the past and might 
be
    >     able to give us some more details on the usage of the
    >     StateMigrationException.
    >
    >     Cheers,
    >     Till
    >
    >     On Wed, Oct 7, 2020 at 12:27 AM Kim, Hwanju
    > <hwanj...@amazon.com.invalid>
    >     wrote:
    >
    >     > Hi!
    >     >
    >     > In case where new state type is incompatible with old one from
    > savepoint,
    >     > we get IllegalStateException via
    >     > org.apache.flink.util.Preconditions.checkState from
    > checkStateMetaInfo:
    >     >
    >     >
    > 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220
    >     >
    >     > For example, if an updated app uses reducing state but restoring 
from
    >     > aggregating state, we can get the following exception:
    >     > java.lang.IllegalStateException: Incompatible key/value state types.
    > Was
    >     > [AGGREGATING], registered with [REDUCING].
    >     >         at
    >     >
    > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
    >     >         at
    >     >
    > 
org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216)
    >     >         at
    >     >
    > 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520)
    >     >         at
    >     >
    > 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
    >     >         at
    >     >
    > 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
    >     >         at
    >     >
    > 
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
    >     >         at
    >     >
    > 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
    >     >         at
    >     >
    > 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
    >     >         at
    >     >
    > 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568)
    >     >         at
    >     >
    > 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
    >     >         at
    >     >
    > 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438)
    >     >         at
    >     >
    > 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298)
    >     >         at
    > org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
    >     >         at java.lang.Thread.run(Thread.java:748)
    >     >
    >     >
    >     > The topic of this email is not about how to solve this issue, rather
    >     > discussing about different type of exception to consistently inform
    > users
    >     > of incompatible state. I think StateMigrationException seems
    > reasonable for
    >     > this, as other types of state incompatibility is notified by that
    > exception
    >     > – key/namespace/value. Although the existing use of
    > StateMigrationException
    >     > is mostly from compatibility check from type serializer snapshot, 
its
    >     > semantic seems to notify developers of state incompatibility in
    > general.
    >     > The type mismatch from state descriptor (like aggregating to
    > reducing)
    >     > seems more like upfront/fast check of incompatibility, so it looks
    > better
    >     > using StateMigrationException rather than too general
    > IllegalStateException.
    >     >
    >     > A bit of context behind this thought is we internally consider
    >     > IllegalStateException from flink core code base as sort of runtime
    > state
    >     > violation (e.g., internal state is inconsistent), so we generally
    > expect
    >     > this to be not from user’s mistake. Apparently, the above
    > incompatible type
    >     > is frequent mistakes from developers when restoring from savepoint.
    > We
    >     > would like to narrow such a specific type of user error down to a
    > certain
    >     > type of exception, not being thrown as general java exception.
    >     >
    >     > I would like to hear how you think about changing the exception type
    > to
    >     > StateMigrationException in checkStateMetaInfo. If it’s not
    > reasonable, I
    >     > wonder why and what alternative would be considered (e.g., another
    > or new
    >     > type).
    >     >
    >     > Thanks,
    >     > Hwanju
    >     >
    >     >
    >     >
    >
    >

Reply via email to