Hi, Thanks for the discussions so far. I agree that the use of the StateMigrationException and it's semantics is rather undefined and inconsistent as of the current state.
On Thu, Oct 8, 2020 at 7:38 AM Kim, Hwanju <hwanj...@amazon.com> wrote: > Hi Till, > > Thanks for the opinion and insight. I think your concern is valid, as > still nontrivial number of issues would be somewhat subtle to confidently > say it's framework or user error. Serialization-related issue is one of > them, as even user-triggered issue leads to exceptions in many places from > the framework without clear boundary. The approach so far has been not > based on theory and completeness, but more on data-driven and likelihood. > When it's confusing, we are err on the side of saying it's framework issue. > But what you pointed out about StateMigrationException from > migrateSerializedValue could be one example that is ambiguous. General idea > about such ambiguous issue is if it's one-off hiccup, misclassification > effect is low, but if it's constant (e.g., filesystem not working well), > its impact is high but there should be more exceptions thrown from many > places not just from a single point of state migration and expecting other > dominant exceptions to be notified as framework issue. > > If I understand it correctly, > 1) From the perspective of semantic, state type mismatch (e.g., > aggregating vs. reducing) can be thrown as StateMigrationException, as it > is clearly from user with incompatible state. > 2) Subtle StateMigrationException throws (like during migration not from > compatibility check), we can refine that catching lower-level exceptions in > a finer grain manner (possibly using a different exception). > This observation is accurate. To summarize the discussions so far, we can categorize all failures on restore related to state compatibility into two categories: 1) incompatible state serializers (TypeSerializerSchemaCompatibility#isIncompatible()) or types (ListState vs MapState), and 2) if all compatibility checks pass and migration is required, subtle filesystem hiccups / corrupted checkpoints files / incorrect serializer implementations can still surface during the process of a migration. I'm wondering if we should instead include a different exception type for 1). For 1), a new exception type named IncompatibleStateDescriptorException (TBD) seems like a better name, as in that case, the state can never be migrated and is always a user-failure. The name immediately implies that user-provided properties for state access, e.g. serializers / state type, are incompatible with checkpointed data. For 2), we can continue to use StateMigrationException to wrap lower-level exceptions. The intent here isn't to classify whether the error was user-caused or a framework issue, but to simply relate the exception to the process of state migration. I agree that these can be handled separately, by starting off first with differentiating the above scenarios with different exception types / clearing the use of StateMigrationException. Cheers, Gordon > > I think the two issues would be independent, so either can be handled > separately as long as the semantic of StateMigrationException is clear. Out > of the two, 1) seems straightforward, so if agreed, I can create an issue > to work on. Nonetheless, I would like to hear more about Gordon's thought. > > Thanks, > Hwanju > > On 10/7/20, 1:39 AM, "Till Rohrmann" <trohrm...@apache.org> wrote: > > CAUTION: This email originated from outside of the organization. Do > not click links or open attachments unless you can confirm the sender and > know the content is safe. > > > > Hi Hwanju, > > thanks for starting this discussion. I pretty much like the idea to be > able > to distinguish between user code faults and framework faults. This also > helps in deciding on the recovery action to take. Hence, I would be +1 > for > using certain exceptions consistently in order to indicate the origin > of a > fault where possible. The big challenge I see is to do it consistently > and > to share this contract with the community so that future changes take > this > into account. I don't have a good idea other than being very diligent. > > Concerning the concrete usage of the StateMigrationException, I fear > that > we have cases where we throw this exception if the serializers are > incompatible (user fault) but also when serialization fails > (IOException > which can be a user (corrupt checkpoint) as well as framework fault > (filesystem hickup)) ( > > https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java#L213 > ). > Consequently, we might first need to clean the usage of > StateMigrationException up before being able to use it consistently. > Moreover, in the case of reading a checkpoint I am not entirely sure > how to > discern all possible errors into user and framework faults. > > I am pulling in Gordon who worked on this part in the past and might be > able to give us some more details on the usage of the > StateMigrationException. > > Cheers, > Till > > On Wed, Oct 7, 2020 at 12:27 AM Kim, Hwanju > <hwanj...@amazon.com.invalid> > wrote: > > > Hi! > > > > In case where new state type is incompatible with old one from > savepoint, > > we get IllegalStateException via > > org.apache.flink.util.Preconditions.checkState from > checkStateMetaInfo: > > > > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220 > > > > For example, if an updated app uses reducing state but restoring from > > aggregating state, we can get the following exception: > > java.lang.IllegalStateException: Incompatible key/value state types. > Was > > [AGGREGATING], registered with [REDUCING]. > > at > > > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > > at > > > org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216) > > at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520) > > at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) > > at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) > > at > > > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > > at > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > > at > > > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) > > at > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568) > > at > > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298) > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:714) > > at java.lang.Thread.run(Thread.java:748) > > > > > > The topic of this email is not about how to solve this issue, rather > > discussing about different type of exception to consistently inform > users > > of incompatible state. I think StateMigrationException seems > reasonable for > > this, as other types of state incompatibility is notified by that > exception > > – key/namespace/value. Although the existing use of > StateMigrationException > > is mostly from compatibility check from type serializer snapshot, its > > semantic seems to notify developers of state incompatibility in > general. > > The type mismatch from state descriptor (like aggregating to > reducing) > > seems more like upfront/fast check of incompatibility, so it looks > better > > using StateMigrationException rather than too general > IllegalStateException. > > > > A bit of context behind this thought is we internally consider > > IllegalStateException from flink core code base as sort of runtime > state > > violation (e.g., internal state is inconsistent), so we generally > expect > > this to be not from user’s mistake. Apparently, the above > incompatible type > > is frequent mistakes from developers when restoring from savepoint. > We > > would like to narrow such a specific type of user error down to a > certain > > type of exception, not being thrown as general java exception. > > > > I would like to hear how you think about changing the exception type > to > > StateMigrationException in checkStateMetaInfo. If it’s not > reasonable, I > > wonder why and what alternative would be considered (e.g., another > or new > > type). > > > > Thanks, > > Hwanju > > > > > > > >