[
https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938528#comment-15938528
]
ASF GitHub Bot commented on FLINK-6018:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3603#discussion_r107692914
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
---
@@ -368,18 +388,29 @@ public void
testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exc
}
try {
+ // backends that lazily serializes (such as memory
state backend) will fail here
runSnapshot(backend.snapshot(682375462378L, 2,
streamFactory, CheckpointOptions.forFullCheckpoint()));
} catch (ExpectedKryoTestException e) {
numExceptions++;
+ } catch (Exception e) {
+ if (e.getCause() instanceof ExpectedKryoTestException) {
+ numExceptions++;
+ } else {
+ throw e;
+ }
}
assertEquals("Didn't see the expected Kryo exception.", 1,
numExceptions);
}
/**
- * Verify that we can restore a snapshot that was done with without
registered types
- * after registering types.
+ * Verify state restore resilience when:
+ * - snapshot was taken without any Kryo registrations, specific
serializers or default serializers for the state type
+ * - restored with the state type registered (no specific serializer)
+ *
+ * This test should not fail, because de- / serialization of the state
should noth be performed with Kryo's default
--- End diff --
Typo: noth
> Properly initialise StateDescriptor in
> AbstractStateBackend.getPartitionedState()
> ---------------------------------------------------------------------------------
>
> Key: FLINK-6018
> URL: https://issues.apache.org/jira/browse/FLINK-6018
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API, State Backends, Checkpointing
> Reporter: sunjincheng
> Assignee: Aljoscha Krettek
> Priority: Blocker
> Fix For: 1.3.0
>
>
> The code snippet currently in the `AbstractKeyedStateBackend #
> getPartitionedState` method, as follows:
> {code}
> line 352: // TODO: This is wrong, it should throw an exception that the
> initialization has not properly happened
> line 353: if (!stateDescriptor.isSerializerInitialized()) {
> line 354: stateDescriptor.initializeSerializerUnlessSet(new
> ExecutionConfig());
> line 354 }
> {code}
> Method `isSerializerInitialized`:
> {code}
> public boolean isSerializerInitialized() {
> return serializer != null;
> }
> {code}
> Method `initializeSerializerUnlessSet`:
> {code}
> public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
> if (serializer == null) {
> if (typeInfo != null) {
> serializer =
> typeInfo.createSerializer(executionConfig);
> } else {
> throw new IllegalStateException(
> "Cannot initialize serializer
> after TypeInformation was dropped during serialization");
> }
> }
> }
> {code}
> that is, in the `initializeSerializerUnlessSet` method, The `serializer` has
> been checked by `serializer == null`.So I hope this code has a little
> improvement to the following:
> approach 1:
> According to the `TODO` information we throw an exception
> {code}
> if (!stateDescriptor.isSerializerInitialized()) {
> throw new IllegalStateException("The serializer of the
> descriptor has not been initialized!");
> }
> {code}
> approach 2:
> Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized())
> {` logic.
> {code}
> stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
> {code}
> Meanwhile, If we use the approach 2, I suggest that
> `AbstractKeyedStateBackend` add a `private final ExecutionConfig
> executionConfig` property. then we can change the code like this:
> {code}
> stateDescriptor.initializeSerializerUnlessSet(executionConfig);
> {code}
> Are the above suggestions reasonable for you?
> Welcome anybody's feedback and corrections.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)