GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5732
[FLINK-9034] [FLINK-9035] [core] Fix state descriptors ## What is the purpose of the change Fixes two issue with the `StateDescriptors` that are used to obtain state access in transformation functions: ### Broken Equals and hashCode `equals()` and `hashCode()` depends on fields that are not always set and that may change during the life of a state descriptor. That is especially problematic, because the state descriptors are keys in a map, and if the meaning of `equals()` and `hashCode()` changes after insertion, the objects become keys that cannot be references / matched. This pull request changes `equals()` and `hashCode()` to only take state name and descriptor type (by class) into account for hashCode and equality, which are always constant and not changing as part of serializer initialization. **Illustration of the problem:** The following code fails with a `NullPointerException`, because the `hashCode()` method tries to access the serializer field, which may be uninitialized at that point. ```java ValueStateDescriptor<String> descr = new ValueStateDescriptor<>("name", String.class); descr.hashCode(); // exception ``` The equals() method is equally broken (no pun intended): ```java ValueStateDescriptor<String> a = new ValueStateDescriptor<>("name", String.class); ValueStateDescriptor<String> b = new ValueStateDescriptor<>("name", String.class); a.equals(b) // exception b.equals(a) // exception a.initializeSerializerUnlessSet(new ExecutionConfig()); a.equals(b) // false b.equals(a) // exception b.initializeSerializerUnlessSet(new ExecutionConfig()); a.equals(b) // true b.equals(a) // true ``` ### Type Information dropped prematurely The following code is currently problematic: ```java public class MyFunction extends RichMapFunction<A, B> { private final ValueStateDescriptor<MyType> descr = new ValueStateDescriptor<>("state name", MyType.class); private ValueState<MyType> state; @Override public void open(Configuration cfg) { state = getRuntimeContext().getValueState(descr); } } ``` The problem is that the state descriptor drops the type information and creates a serializer before serialization as part of shipping the function in the cluster. To do that, it initializes the serializer with an empty execution config, making serialization inconsistent. This is mainly an artifact from the days when dropping the type information before shipping was necessary, because the type info was not serializable. It now is, and we can fix that bug. ## Verifying this change **This change is sensitive, because it touches the structures that all users use to obtain access to persistent state.** This change adds a series of Unit tests to validate the fixed behavior. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no)** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - **All changes should preserve full API compatibility.** - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know): **Touches the structures that give access to checkpointed state.** - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink fix_state_descriptors Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5732.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5732 ---- commit 8ff1284d28a056b91d91607584fab2a55fbcc86c Author: Stephan Ewen <sewen@...> Date: 2018-03-20T14:15:08Z [hotfix] [core] Fix checkstyle in 'org.apache.flink.api.common.state' commit dc0df85e064ee45bcb0f83d21b00a1abc9359723 Author: Stephan Ewen <sewen@...> Date: 2018-03-20T14:29:12Z [hotfix] [core] Add missing serialVersionUID to MapStateDescriptor commit c62e84414ff4715399bce35ac74fb1d94256c3ed Author: Stephan Ewen <sewen@...> Date: 2018-03-20T14:43:33Z [hotfix] [core] Add @FunctionalInterface to KeySelector That clarifies that this interface should always be a SAM interface to allow that users created lambdas for its use. commit 60f0327e8f12de6397e009d2d5c4134024c3e674 Author: Stephan Ewen <sewen@...> Date: 2018-03-20T14:36:19Z [hotfix] [core] Demockitofy state descriptor tests commit e18ad3d7c39c7a92d2db1066ad968fa2e71e3233 Author: Stephan Ewen <sewen@...> Date: 2018-03-20T14:44:27Z [hotfix] [core] Make State Descriptors consistently use Preconditions instead of Objects. commit 3216f5ad0c7d6964ae885979aa5fcfe9c8e19135 Author: Stephan Ewen <sewen@...> Date: 2018-03-20T15:22:12Z [FLINK-9034] [core] StateDescriptor does not throw away TypeInformation upon serialization. Throwing away TypeInformation upon serialization was previously done because the type information was not serializable. Now that it is serializable, we can (and should) keep it to provide consistent user experience, where all serializers respect the ExecutionConfig. commit 7340bbec821e635cec7a4179a1444e0b0798bbdc Author: Stephan Ewen <sewen@...> Date: 2018-03-20T15:46:13Z [hotfix] [core] Consilidate serializer duplication tests in StateDescriptorTest where possible commit c060ee9acdac35e08b5d1873b19e7bbf4d6906e8 Author: Stephan Ewen <sewen@...> Date: 2018-03-20T16:16:06Z [FLINK-9035] [core] Fix state descriptor equals() and hashCode() handling ---- ---