[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/5732 ---
[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5732#discussion_r176219997 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java --- @@ -121,13 +117,9 @@ public void testValueStateDescriptorAutoSerializer() throws Exception { @SuppressWarnings("unchecked") @Test public void testSerializerDuplication() { - TypeSerializer statefulSerializer = mock(TypeSerializer.class); - when(statefulSerializer.duplicate()).thenAnswer(new Answer>() { - @Override - public TypeSerializer answer(InvocationOnMock invocation) throws Throwable { - return mock(TypeSerializer.class); - } - }); + // we need a serializer that actually duplicates for testing (a stateful one) --- End diff -- Same comment as above. ---
[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5732#discussion_r176220974 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java --- @@ -77,18 +80,22 @@ /** The serializer for the type. May be eagerly initialized in the constructor, * or lazily once the type is serialized or an ExecutionConfig is provided. */ + @Nullable protected TypeSerializer serializer; + /** The type information describing the value type. Only used to lazily create the serializer --- End diff -- nit: I think this was also a copying error in the original comment, but this is not necessarily a "value", unless we simply thing of all state as a value, in which case I'm fine with this. ---
[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5732#discussion_r176220075 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java --- @@ -129,23 +125,12 @@ public void testMapStateDescriptorAutoSerializer() throws Exception { * Tests that the returned serializer is duplicated. This allows to * share the state descriptor. */ - @SuppressWarnings("unchecked") @Test public void testSerializerDuplication() { - TypeSerializer keySerializer = mock(TypeSerializer.class); - TypeSerializer valueSerializer = mock(TypeSerializer.class); - when(keySerializer.duplicate()).thenAnswer(new Answer>() { - @Override - public TypeSerializer answer(InvocationOnMock invocation) throws Throwable { - return mock(TypeSerializer.class); - } - }); - when(valueSerializer.duplicate()).thenAnswer(new Answer>() { - @Override - public TypeSerializer answer(InvocationOnMock invocation) throws Throwable { - return mock(TypeSerializer.class); - } - }); + // we need a serializer that actually duplicates for testing (a stateful one) --- End diff -- See above ---
[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5732#discussion_r176220108 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java --- @@ -118,17 +115,14 @@ public void testValueStateDescriptorAutoSerializer() throws Exception { @SuppressWarnings("unchecked") @Test public void testSerializerDuplication() { - TypeSerializer statefulSerializer = mock(TypeSerializer.class); - when(statefulSerializer.duplicate()).thenAnswer(new Answer>() { - @Override - public TypeSerializer answer(InvocationOnMock invocation) throws Throwable { - return mock(TypeSerializer.class); - } - }); - - ReduceFunction reducer = mock(ReduceFunction.class); - - ReducingStateDescriptor descr = new ReducingStateDescriptor<>("foobar", reducer, statefulSerializer); + // we need a serializer that actually duplicates for testing (a stateful one) --- End diff -- See above ---
[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5732#discussion_r176220129 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java --- @@ -149,13 +145,9 @@ public void testVeryLargeDefaultValue() throws Exception { @SuppressWarnings("unchecked") @Test public void testSerializerDuplication() { - TypeSerializer statefulSerializer = mock(TypeSerializer.class); - when(statefulSerializer.duplicate()).thenAnswer(new Answer>() { - @Override - public TypeSerializer answer(InvocationOnMock invocation) throws Throwable { - return mock(TypeSerializer.class); - } - }); + // we need a serializer that actually duplicates for testing (a stateful one) --- End diff -- See above ---
[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5732#discussion_r176222855 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java --- @@ -130,6 +135,31 @@ public void testInitializeSerializerAfterSerializationWithCustomConfig() throws .getRegistration(File.class).getId() > 0); } + // + // Tests for serializer initialization + // + + /** +* FLINK-6775, tests that the returned serializer is duplicated. +* This allows to share the state descriptor across threads. +*/ + @Test + public void testSerializerDuplication() throws Exception { + // we need a serializer that actually duplicates for testing (a stateful one) --- End diff -- Same as above, we should assert that assumption ---
[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5732#discussion_r176219956 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java --- @@ -41,16 +40,11 @@ * Tests that the returned serializer is duplicated. This allows to * share the state descriptor. */ - @SuppressWarnings("unchecked") @Test public void testSerializerDuplication() { - TypeSerializer serializer = mock(TypeSerializer.class); - when(serializer.duplicate()).thenAnswer(new Answer>() { - @Override - public TypeSerializer answer(InvocationOnMock invocation) throws Throwable { - return mock(TypeSerializer.class); - } - }); + // we need a serializer that actually duplicates for testing (a stateful one) --- End diff -- Will this condition always hold? Should we maybe guard this assumption with an assertion, i.e. assert that the result of `serialiser.duplicate()` is different from the original serialiser? ---
[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5732#discussion_r176221943 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java --- @@ -249,12 +257,13 @@ public boolean isSerializerInitialized() { */ public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { --- End diff -- This is slightly orthogonal to this change, but: could we get rid of this method and instead change `getSerializer()` to `getSerializer(ExecutionConfig)`. That way, we don't have to be concerned about forgetting to call `initializeSerializerUnlessSet()`. ---
[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5732#discussion_r176006260 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java --- @@ -77,18 +80,22 @@ /** The serializer for the type. May be eagerly initialized in the constructor, * or lazily once the type is serialized or an ExecutionConfig is provided. */ + @Nullable protected TypeSerializer serializer; + /** The type information describing the value type. Only used to lazily create the serializer +* and dropped during serialization */ + @Nullable --- End diff -- good catch, will fix that upon merging ---
[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5732#discussion_r175983599 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java --- @@ -77,18 +80,22 @@ /** The serializer for the type. May be eagerly initialized in the constructor, * or lazily once the type is serialized or an ExecutionConfig is provided. */ + @Nullable protected TypeSerializer serializer; + /** The type information describing the value type. Only used to lazily create the serializer +* and dropped during serialization */ + @Nullable --- End diff -- nit:Type information will not dropped during serialization now, it dropped in `initializeSerializerUnlessSet()`. ---
[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5732 [FLINK-9034] [FLINK-9035] [core] Fix state descriptors ## What is the purpose of the change Fixes two issue with the `StateDescriptors` that are used to obtain state access in transformation functions: ### Broken Equals and hashCode `equals()` and `hashCode()` depends on fields that are not always set and that may change during the life of a state descriptor. That is especially problematic, because the state descriptors are keys in a map, and if the meaning of `equals()` and `hashCode()` changes after insertion, the objects become keys that cannot be references / matched. This pull request changes `equals()` and `hashCode()` to only take state name and descriptor type (by class) into account for hashCode and equality, which are always constant and not changing as part of serializer initialization. **Illustration of the problem:** The following code fails with a `NullPointerException`, because the `hashCode()` method tries to access the serializer field, which may be uninitialized at that point. ```java ValueStateDescriptor descr = new ValueStateDescriptor<>("name", String.class); descr.hashCode(); // exception ``` The equals() method is equally broken (no pun intended): ```java ValueStateDescriptor a = new ValueStateDescriptor<>("name", String.class); ValueStateDescriptor b = new ValueStateDescriptor<>("name", String.class); a.equals(b) // exception b.equals(a) // exception a.initializeSerializerUnlessSet(new ExecutionConfig()); a.equals(b) // false b.equals(a) // exception b.initializeSerializerUnlessSet(new ExecutionConfig()); a.equals(b) // true b.equals(a) // true ``` ### Type Information dropped prematurely The following code is currently problematic: ```java public class MyFunction extends RichMapFunction { private final ValueStateDescriptor descr = new ValueStateDescriptor<>("state name", MyType.class); private ValueState state; @Override public void open(Configuration cfg) { state = getRuntimeContext().getValueState(descr); } } ``` The problem is that the state descriptor drops the type information and creates a serializer before serialization as part of shipping the function in the cluster. To do that, it initializes the serializer with an empty execution config, making serialization inconsistent. This is mainly an artifact from the days when dropping the type information before shipping was necessary, because the type info was not serializable. It now is, and we can fix that bug. ## Verifying this change **This change is sensitive, because it touches the structures that all users use to obtain access to persistent state.** This change adds a series of Unit tests to validate the fixed behavior. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no)** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - **All changes should preserve full API compatibility.** - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know): **Touches the structures that give access to checkpointed state.** - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink fix_state_descriptors Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5732.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5732 commit 8ff1284d28a056b91d91607584fab2a55fbcc86c Author: Stephan Ewen Date: 2018-03-20T14:15:08Z [hotfix] [core] Fix checkstyle in 'org.apache.flink.api.common.state' commit dc0df85e064ee45bcb0f83d21b00a1abc9359723 Author: Stephan Ewen Date: 2018-03-20T14:29:12Z [hotfix] [core] Add missing serialVersionUID to MapStateDescriptor commit c62e84414ff4715399bce35ac74fb1d94256c3ed Author: Stephan Ewen Date: 2018-03-20T14:43:33Z [hotfix] [core] Add @FunctionalInterface to KeySelector That clarifies that t