[
https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930051#comment-15930051
]
ASF GitHub Bot commented on FLINK-6018:
---------------------------------------
GitHub user aljoscha opened a pull request:
https://github.com/apache/flink/pull/3562
[FLINK-6018] Add tests for KryoSerializer restore with registered types
This is the result of the discussion in #3534.
I changed `TypeSerializer.isCompatibleWith()` to
`TypeSerializer.canRestoreFrom` because the relation is not necessarily
symmetric.
I added a `KryoSerializer.canRestoreFrom()` that only allows restoring when
we previously didn't have registered types/serializers.
I added a whole bunch of tests in `StateBackendTestBase`, this should be
review most thoroughly.
R: @StephanEwen and @tzulitai because this probably is interesting with the
serialiser update story that you're working on.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/aljoscha/flink jira-6018-state-init-fixups
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3562.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3562
----
commit f0c3af53d24a3eac914cf1ceb3b1761a40553dfe
Author: Aljoscha Krettek <[email protected]>
Date: 2017-03-16T14:17:05Z
[FLINK-6018] Add tests for KryoSerializer restore with registered types
commit b90cf5cad5176d8edcbd189a9b65cc4999cddd53
Author: Aljoscha Krettek <[email protected]>
Date: 2017-03-17T09:56:13Z
[FLINK-6018] Rename isCompatibleWith() to canRestoreFrom()
This make the method asymetric because in the case of KryoSerializer we
can restore from state that was stored using no registed
types/serializers while the other way around is not possible.
----
> Properly initialise StateDescriptor in
> AbstractStateBackend.getPartitionedState()
> ---------------------------------------------------------------------------------
>
> Key: FLINK-6018
> URL: https://issues.apache.org/jira/browse/FLINK-6018
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API, State Backends, Checkpointing
> Reporter: sunjincheng
> Assignee: sunjincheng
> Fix For: 1.3.0
>
>
> The code snippet currently in the `AbstractKeyedStateBackend #
> getPartitionedState` method, as follows:
> {code}
> line 352: // TODO: This is wrong, it should throw an exception that the
> initialization has not properly happened
> line 353: if (!stateDescriptor.isSerializerInitialized()) {
> line 354: stateDescriptor.initializeSerializerUnlessSet(new
> ExecutionConfig());
> line 354 }
> {code}
> Method `isSerializerInitialized`:
> {code}
> public boolean isSerializerInitialized() {
> return serializer != null;
> }
> {code}
> Method `initializeSerializerUnlessSet`:
> {code}
> public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
> if (serializer == null) {
> if (typeInfo != null) {
> serializer =
> typeInfo.createSerializer(executionConfig);
> } else {
> throw new IllegalStateException(
> "Cannot initialize serializer
> after TypeInformation was dropped during serialization");
> }
> }
> }
> {code}
> that is, in the `initializeSerializerUnlessSet` method, The `serializer` has
> been checked by `serializer == null`.So I hope this code has a little
> improvement to the following:
> approach 1:
> According to the `TODO` information we throw an exception
> {code}
> if (!stateDescriptor.isSerializerInitialized()) {
> throw new IllegalStateException("The serializer of the
> descriptor has not been initialized!");
> }
> {code}
> approach 2:
> Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized())
> {` logic.
> {code}
> stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
> {code}
> Meanwhile, If we use the approach 2, I suggest that
> `AbstractKeyedStateBackend` add a `private final ExecutionConfig
> executionConfig` property. then we can change the code like this:
> {code}
> stateDescriptor.initializeSerializerUnlessSet(executionConfig);
> {code}
> Are the above suggestions reasonable for you?
> Welcome anybody's feedback and corrections.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)