[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17601454#comment-17601454 ] Peleg Tsadok commented on FLINK-28653: -- [~chesnay] Thanks you so much! It finally worked with this: {code:java} ListTypeInfo userListTypeInfo = new ListTypeInfo(TypeInformation.of(User.class)); env.addSource(sourceFunction).returns(User.class) .keyBy(user -> 0) .window(SlidingProcessingTimeWindows.of( Time.seconds(3L), Time.seconds(1L) )) .aggregate(aggregateFunction, userListTypeInfo, userListTypeInfo) .uid("buffer") .print() .uid("sink-print"); {code} > State Schema Evolution does not work - Flink defaults to Kryo serialization > even for POJOs and Avro SpecificRecords > --- > > Key: FLINK-28653 > URL: https://issues.apache.org/jira/browse/FLINK-28653 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Affects Versions: 1.14.3, 1.15.0 > Environment: I ran the job on a Flink cluster I spun up using docker > compose: > ``` > version: "2.2" > services: > jobmanager: > image: flink:latest > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager: > image: flink:latest > depends_on: > - jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > ``` > My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip. > I'm running macOS Monterey Version 12.4. >Reporter: Peleg Tsadok >Priority: Major > Labels: KryoSerializer, State, avro, pojo, schema-evolution > > I am trying to do a POC of Flink State Schema Evolution. I am using Flink > 1.15.0 and Java 11 but also tested on Flink 1.14.3. > I tried to create 3 data classes - one for each serialization type: > 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not > supported for POJO serialization in Flink. > 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, > `Long`, `String`. The getters, setters and constructors are generated using > Lombok. > 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin. > For each class I wrote a stream job that uses a time window to buffer > elements and turn them into a list. > For each class I tried to do the following: > 1. Run a job > 2. Stop with savepoint > 3. Add a field to the data class > 4. Submit using savepoint > For all data classes the submit with savepoint failed with this exception: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) > ... 11 more >
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600770#comment-17600770 ] Chesnay Schepler commented on FLINK-28653: -- My bad, the problem is actually quite obvious and I should have seen it sooner. Your accumulator is a List, and those are serialized with Kryo. As a result you can't evolve the contained POJO/avro type, since they never actually go through our POJO/Avro serialization stack. I'd suggest to use the {{aggregate()}} variant that also accepts a {{TypeInformation}} for the accumulator type, and then pass {{new ListTypeInfo(TypeInformation.of(User.class))}}. Flink will then use the built-in list serializer which will internally leverage the pojo serializer. > State Schema Evolution does not work - Flink defaults to Kryo serialization > even for POJOs and Avro SpecificRecords > --- > > Key: FLINK-28653 > URL: https://issues.apache.org/jira/browse/FLINK-28653 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Affects Versions: 1.14.3, 1.15.0 > Environment: I ran the job on a Flink cluster I spun up using docker > compose: > ``` > version: "2.2" > services: > jobmanager: > image: flink:latest > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager: > image: flink:latest > depends_on: > - jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > ``` > My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip. > I'm running macOS Monterey Version 12.4. >Reporter: Peleg Tsadok >Priority: Major > Labels: KryoSerializer, State, avro, pojo, schema-evolution > > I am trying to do a POC of Flink State Schema Evolution. I am using Flink > 1.15.0 and Java 11 but also tested on Flink 1.14.3. > I tried to create 3 data classes - one for each serialization type: > 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not > supported for POJO serialization in Flink. > 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, > `Long`, `String`. The getters, setters and constructors are generated using > Lombok. > 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin. > For each class I wrote a stream job that uses a time window to buffer > elements and turn them into a list. > For each class I tried to do the following: > 1. Run a job > 2. Stop with savepoint > 3. Add a field to the data class > 4. Submit using savepoint > For all data classes the submit with savepoint failed with this exception: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) > at >
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17599606#comment-17599606 ] Peleg Tsadok commented on FLINK-28653: -- [~chesnay] I created a new savepoint after removing the generic parameters. > State Schema Evolution does not work - Flink defaults to Kryo serialization > even for POJOs and Avro SpecificRecords > --- > > Key: FLINK-28653 > URL: https://issues.apache.org/jira/browse/FLINK-28653 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Affects Versions: 1.14.3, 1.15.0 > Environment: I ran the job on a Flink cluster I spun up using docker > compose: > ``` > version: "2.2" > services: > jobmanager: > image: flink:latest > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager: > image: flink:latest > depends_on: > - jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > ``` > My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip. > I'm running macOS Monterey Version 12.4. >Reporter: Peleg Tsadok >Priority: Major > Labels: KryoSerializer, State, avro, pojo, schema-evolution > > I am trying to do a POC of Flink State Schema Evolution. I am using Flink > 1.15.0 and Java 11 but also tested on Flink 1.14.3. > I tried to create 3 data classes - one for each serialization type: > 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not > supported for POJO serialization in Flink. > 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, > `Long`, `String`. The getters, setters and constructors are generated using > Lombok. > 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin. > For each class I wrote a stream job that uses a time window to buffer > elements and turn them into a list. > For each class I tried to do the following: > 1. Run a job > 2. Stop with savepoint > 3. Add a field to the data class > 4. Submit using savepoint > For all data classes the submit with savepoint failed with this exception: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) > ... 11 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106) > at >
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17598272#comment-17598272 ] Chesnay Schepler commented on FLINK-28653: -- Did you create a new savepoint after removing the generic parameters, or are you trying to restore a previously created savepoint? > State Schema Evolution does not work - Flink defaults to Kryo serialization > even for POJOs and Avro SpecificRecords > --- > > Key: FLINK-28653 > URL: https://issues.apache.org/jira/browse/FLINK-28653 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Affects Versions: 1.14.3, 1.15.0 > Environment: I ran the job on a Flink cluster I spun up using docker > compose: > ``` > version: "2.2" > services: > jobmanager: > image: flink:latest > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager: > image: flink:latest > depends_on: > - jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > ``` > My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip. > I'm running macOS Monterey Version 12.4. >Reporter: Peleg Tsadok >Priority: Major > Labels: KryoSerializer, State, avro, pojo, schema-evolution > > I am trying to do a POC of Flink State Schema Evolution. I am using Flink > 1.15.0 and Java 11 but also tested on Flink 1.14.3. > I tried to create 3 data classes - one for each serialization type: > 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not > supported for POJO serialization in Flink. > 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, > `Long`, `String`. The getters, setters and constructors are generated using > Lombok. > 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin. > For each class I wrote a stream job that uses a time window to buffer > elements and turn them into a list. > For each class I tried to do the following: > 1. Run a job > 2. Stop with savepoint > 3. Add a field to the data class > 4. Submit using savepoint > For all data classes the submit with savepoint failed with this exception: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) > ... 11 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106) > at
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17585755#comment-17585755 ] Peleg Tsadok commented on FLINK-28653: -- [~chesnay] I removed all generic parameters from my code, and I still get this exception when restoring: {noformat} 2022-08-27 14:29:16 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) ... 11 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106) at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:143) at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:74) at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:140) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 13 more Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index 194 out of bounds for length 3 Serialization trace: favoriteColor (io.peleg.avro.User) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402) at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKVStateData(HeapSavepointRestoreOperation.java:219) at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKeyGroupStateData(HeapSavepointRestoreOperation.java:149) at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:125) at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:57) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:169) ... 20 more Caused by: java.lang.IndexOutOfBoundsException: Index 194 out of bounds for length 3 at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source) at
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17574259#comment-17574259 ] Chesnay Schepler commented on FLINK-28653: -- The issue is that your aggregate function internally uses a List, and there's no way for Flink to know what type T is. AFAICT there's also nothing the API that allows this to be set. IOW, you may have remove the generic parameter from the buffer, or (not sure if this works) use a similar trick as we do for OutputTags and create an anonymous class. {{.aggregate(new Buffer(){})}} > State Schema Evolution does not work - Flink defaults to Kryo serialization > even for POJOs and Avro SpecificRecords > --- > > Key: FLINK-28653 > URL: https://issues.apache.org/jira/browse/FLINK-28653 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Affects Versions: 1.14.3, 1.15.0 > Environment: I ran the job on a Flink cluster I spun up using docker > compose: > ``` > version: "2.2" > services: > jobmanager: > image: flink:latest > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager: > image: flink:latest > depends_on: > - jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > ``` > My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip. > I'm running macOS Monterey Version 12.4. >Reporter: Peleg Tsadok >Priority: Major > Labels: KryoSerializer, State, avro, pojo, schema-evolution > > I am trying to do a POC of Flink State Schema Evolution. I am using Flink > 1.15.0 and Java 11 but also tested on Flink 1.14.3. > I tried to create 3 data classes - one for each serialization type: > 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not > supported for POJO serialization in Flink. > 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, > `Long`, `String`. The getters, setters and constructors are generated using > Lombok. > 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin. > For each class I wrote a stream job that uses a time window to buffer > elements and turn them into a list. > For each class I tried to do the following: > 1. Run a job > 2. Stop with savepoint > 3. Add a field to the data class > 4. Submit using savepoint > For all data classes the submit with savepoint failed with this exception: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) > ... 11 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend >
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17573247#comment-17573247 ] Peleg Tsadok commented on FLINK-28653: -- Okay I fixed the AvroMissingFieldException by adding a default value to the endTime field in the Avro schema. I also added another type hint after the aggregate: {code:java} env.addSource(sourceFunction).returns(clazz) .keyBy(user -> 0) .window(SlidingProcessingTimeWindows.of( Time.seconds(3L), Time.seconds(1L) )) .aggregate(new Buffer()).returns(windowOutput) {code} But this error still occurs: {noformat} 2022-07-30 14:16:59 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) ... 11 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106) at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:143) at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:74) at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:140) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 13 more Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index 80 out of bounds for length 3 Serialization trace: favoriteColor (io.peleg.avro.User) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402) at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKVStateData(HeapSavepointRestoreOperation.java:219) at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKeyGroupStateData(HeapSavepointRestoreOperation.java:149) at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:125) at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:57) at
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17573242#comment-17573242 ] Peleg Tsadok commented on FLINK-28653: -- [~chesnay] Great comments! I fixed the ignnored environment argument. I also added a type hint so that Flink could recognize the type even though JobRunner is generic like so: {code:java} env.addSource(sourceFunction).returns(clazz){code} So it helped because now we getting a different error when I restore from a version with a missing field in Avro: {noformat} 2022-07-30 13:26:38 Path in schema: --> endTime at org.apache.avro.generic.GenericData.getDefaultValue(GenericData.java:1176) at org.apache.avro.data.RecordBuilderBase.defaultValue(RecordBuilderBase.java:138) at io.peleg.avro.User$Builder.build(User.java:549) at io.peleg.avro.RandomUserSourceFunction.randomUser(RandomUserSourceFunction.java:45) at io.peleg.avro.RandomUserSourceFunction.run(RandomUserSourceFunction.java:23) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332){noformat} endTime is the field I added to the Avro schema to test the state schema evolution feature. I made it nullable so it is weird that Avro throws this org.apache.avro.AvroMissingFieldException. Why do you think this happans? > State Schema Evolution does not work - Flink defaults to Kryo serialization > even for POJOs and Avro SpecificRecords > --- > > Key: FLINK-28653 > URL: https://issues.apache.org/jira/browse/FLINK-28653 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Affects Versions: 1.14.3, 1.15.0 > Environment: I ran the job on a Flink cluster I spun up using docker > compose: > ``` > version: "2.2" > services: > jobmanager: > image: flink:latest > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager: > image: flink:latest > depends_on: > - jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > ``` > My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip. > I'm running macOS Monterey Version 12.4. >Reporter: Peleg Tsadok >Priority: Major > Labels: KryoSerializer, State, avro, pojo, schema-evolution > > I am trying to do a POC of Flink State Schema Evolution. I am using Flink > 1.15.0 and Java 11 but also tested on Flink 1.14.3. > I tried to create 3 data classes - one for each serialization type: > 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not > supported for POJO serialization in Flink. > 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, > `Long`, `String`. The getters, setters and constructors are generated using > Lombok. > 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin. > For each class I wrote a stream job that uses a time window to buffer > elements and turn them into a list. > For each class I tried to do the following: > 1. Run a job > 2. Stop with savepoint > 3. Add a field to the data class > 4. Submit using savepoint > For all data classes the submit with savepoint failed with this exception: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17572977#comment-17572977 ] Chesnay Schepler commented on FLINK-28653: -- You're also using too many generic parameters; Flink can't infer the type that the functions consume, so Kryo is/should used for everything. Because the JobRunner has a parameter T, Flink doesn't know anything about the actual type (because at runtime it's just {{Object}}). In contrast, if JobRunner used the type User Flink could infer more about the data (e.g., that it's a POJO). > State Schema Evolution does not work - Flink defaults to Kryo serialization > even for POJOs and Avro SpecificRecords > --- > > Key: FLINK-28653 > URL: https://issues.apache.org/jira/browse/FLINK-28653 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Affects Versions: 1.14.3, 1.15.0 > Environment: I ran the job on a Flink cluster I spun up using docker > compose: > ``` > version: "2.2" > services: > jobmanager: > image: flink:latest > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager: > image: flink:latest > depends_on: > - jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > ``` > My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip. > I'm running macOS Monterey Version 12.4. >Reporter: Peleg Tsadok >Priority: Major > Labels: KryoSerializer, State, avro, pojo, schema-evolution > > I am trying to do a POC of Flink State Schema Evolution. I am using Flink > 1.15.0 and Java 11 but also tested on Flink 1.14.3. > I tried to create 3 data classes - one for each serialization type: > 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not > supported for POJO serialization in Flink. > 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, > `Long`, `String`. The getters, setters and constructors are generated using > Lombok. > 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin. > For each class I wrote a stream job that uses a time window to buffer > elements and turn them into a list. > For each class I tried to do the following: > 1. Run a job > 2. Stop with savepoint > 3. Add a field to the data class > 4. Submit using savepoint > For all data classes the submit with savepoint failed with this exception: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) > ... 11 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend >
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17572958#comment-17572958 ] Chesnay Schepler commented on FLINK-28653: -- FYI, if your lombok class would be recognized as a POJO then your "kryo" job would also use the pojo serializer. These lines are _not_ required for the pojo serializer to be used: {code:java} env.getConfig().registerPojoType(User.class); env.getConfig().disableForceKryo(); {code} > State Schema Evolution does not work - Flink defaults to Kryo serialization > even for POJOs and Avro SpecificRecords > --- > > Key: FLINK-28653 > URL: https://issues.apache.org/jira/browse/FLINK-28653 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Affects Versions: 1.14.3, 1.15.0 > Environment: I ran the job on a Flink cluster I spun up using docker > compose: > ``` > version: "2.2" > services: > jobmanager: > image: flink:latest > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager: > image: flink:latest > depends_on: > - jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > ``` > My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip. > I'm running macOS Monterey Version 12.4. >Reporter: Peleg Tsadok >Priority: Major > Labels: KryoSerializer, State, avro, pojo, schema-evolution > > I am trying to do a POC of Flink State Schema Evolution. I am using Flink > 1.15.0 and Java 11 but also tested on Flink 1.14.3. > I tried to create 3 data classes - one for each serialization type: > 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not > supported for POJO serialization in Flink. > 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, > `Long`, `String`. The getters, setters and constructors are generated using > Lombok. > 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin. > For each class I wrote a stream job that uses a time window to buffer > elements and turn them into a list. > For each class I tried to do the following: > 1. Run a job > 2. Stop with savepoint > 3. Add a field to the data class > 4. Submit using savepoint > For all data classes the submit with savepoint failed with this exception: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) > ... 11 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at >
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17572947#comment-17572947 ] Chesnay Schepler commented on FLINK-28653: -- Also, are you intentionally not bundling lombok in your application jar? > State Schema Evolution does not work - Flink defaults to Kryo serialization > even for POJOs and Avro SpecificRecords > --- > > Key: FLINK-28653 > URL: https://issues.apache.org/jira/browse/FLINK-28653 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Affects Versions: 1.14.3, 1.15.0 > Environment: I ran the job on a Flink cluster I spun up using docker > compose: > ``` > version: "2.2" > services: > jobmanager: > image: flink:latest > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager: > image: flink:latest > depends_on: > - jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > ``` > My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip. > I'm running macOS Monterey Version 12.4. >Reporter: Peleg Tsadok >Priority: Major > Labels: KryoSerializer, State, avro, pojo, schema-evolution > > I am trying to do a POC of Flink State Schema Evolution. I am using Flink > 1.15.0 and Java 11 but also tested on Flink 1.14.3. > I tried to create 3 data classes - one for each serialization type: > 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not > supported for POJO serialization in Flink. > 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, > `Long`, `String`. The getters, setters and constructors are generated using > Lombok. > 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin. > For each class I wrote a stream job that uses a time window to buffer > elements and turn them into a list. > For each class I tried to do the following: > 1. Run a job > 2. Stop with savepoint > 3. Add a field to the data class > 4. Submit using savepoint > For all data classes the submit with savepoint failed with this exception: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) > ... 11 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106) > at >
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17572946#comment-17572946 ] Chesnay Schepler commented on FLINK-28653: -- [~peleg68] Please run {{TypeInformation.of(User.class);}} with INFO logging enabled and check if the TypeExtractor rejects it as a Pojo. > State Schema Evolution does not work - Flink defaults to Kryo serialization > even for POJOs and Avro SpecificRecords > --- > > Key: FLINK-28653 > URL: https://issues.apache.org/jira/browse/FLINK-28653 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Affects Versions: 1.14.3, 1.15.0 > Environment: I ran the job on a Flink cluster I spun up using docker > compose: > ``` > version: "2.2" > services: > jobmanager: > image: flink:latest > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager: > image: flink:latest > depends_on: > - jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > ``` > My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip. > I'm running macOS Monterey Version 12.4. >Reporter: Peleg Tsadok >Priority: Major > Labels: KryoSerializer, State, avro, pojo, schema-evolution > > I am trying to do a POC of Flink State Schema Evolution. I am using Flink > 1.15.0 and Java 11 but also tested on Flink 1.14.3. > I tried to create 3 data classes - one for each serialization type: > 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not > supported for POJO serialization in Flink. > 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, > `Long`, `String`. The getters, setters and constructors are generated using > Lombok. > 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin. > For each class I wrote a stream job that uses a time window to buffer > elements and turn them into a list. > For each class I tried to do the following: > 1. Run a job > 2. Stop with savepoint > 3. Add a field to the data class > 4. Submit using savepoint > For all data classes the submit with savepoint failed with this exception: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) > ... 11 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106) >
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17572921#comment-17572921 ] Peleg Tsadok commented on FLINK-28653: -- [~kkrugler] I don't think the problem is with lombok since the Avro generated class does not use Lombok and it doesn't work as well. [~masteryhx] I tried using both {{env.getConfig().enableForceAvro() and env.getConfig().disableGenericTypes(). }}It didn't work. What else can I do? Is it possible the problem is related to the fact I'm testing this on an ARM processor? Can anybody maybe try to recreate the problem on their machine? Unfortunately I don't have access to a machine with an Intel processor. > State Schema Evolution does not work - Flink defaults to Kryo serialization > even for POJOs and Avro SpecificRecords > --- > > Key: FLINK-28653 > URL: https://issues.apache.org/jira/browse/FLINK-28653 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Affects Versions: 1.14.3, 1.15.0 > Environment: I ran the job on a Flink cluster I spun up using docker > compose: > ``` > version: "2.2" > services: > jobmanager: > image: flink:latest > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager: > image: flink:latest > depends_on: > - jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > ``` > My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip. > I'm running macOS Monterey Version 12.4. >Reporter: Peleg Tsadok >Priority: Major > Labels: KryoSerializer, State, avro, pojo, schema-evolution > > I am trying to do a POC of Flink State Schema Evolution. I am using Flink > 1.15.0 and Java 11 but also tested on Flink 1.14.3. > I tried to create 3 data classes - one for each serialization type: > 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not > supported for POJO serialization in Flink. > 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, > `Long`, `String`. The getters, setters and constructors are generated using > Lombok. > 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin. > For each class I wrote a stream job that uses a time window to buffer > elements and turn them into a list. > For each class I tried to do the following: > 1. Run a job > 2. Stop with savepoint > 3. Add a field to the data class > 4. Submit using savepoint > For all data classes the submit with savepoint failed with this exception: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) > ... 11 more
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571719#comment-17571719 ] Hangxiang Yu commented on FLINK-28653: -- Hi, Maybe you could try to set {{env.getConfig().enableForceAvro() to force avro}} {{or }} {{{}{}}}{{{}env.getConfig().disableGenericTypes() to disable kryo{}}} and then see whether it works. > State Schema Evolution does not work - Flink defaults to Kryo serialization > even for POJOs and Avro SpecificRecords > --- > > Key: FLINK-28653 > URL: https://issues.apache.org/jira/browse/FLINK-28653 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Affects Versions: 1.14.3, 1.15.0 > Environment: I ran the job on a Flink cluster I spun up using docker > compose: > ``` > version: "2.2" > services: > jobmanager: > image: flink:latest > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager: > image: flink:latest > depends_on: > - jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > ``` > My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip. > I'm running macOS Monterey Version 12.4. >Reporter: Peleg Tsadok >Priority: Major > Labels: KryoSerializer, State, avro, pojo, schema-evolution > > I am trying to do a POC of Flink State Schema Evolution. I am using Flink > 1.15.0 and Java 11 but also tested on Flink 1.14.3. > I tried to create 3 data classes - one for each serialization type: > 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not > supported for POJO serialization in Flink. > 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, > `Long`, `String`. The getters, setters and constructors are generated using > Lombok. > 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin. > For each class I wrote a stream job that uses a time window to buffer > elements and turn them into a list. > For each class I tried to do the following: > 1. Run a job > 2. Stop with savepoint > 3. Add a field to the data class > 4. Submit using savepoint > For all data classes the submit with savepoint failed with this exception: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) > ... 11 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172) > at >
[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords
[ https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17570567#comment-17570567 ] Kenneth William Krugler commented on FLINK-28653: - What happens if you explicitly define the no-args constructor and getters/setters for User.java, without relying on Lombok? > State Schema Evolution does not work - Flink defaults to Kryo serialization > even for POJOs and Avro SpecificRecords > --- > > Key: FLINK-28653 > URL: https://issues.apache.org/jira/browse/FLINK-28653 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Affects Versions: 1.14.3, 1.15.0 > Environment: I ran the job on a Flink cluster I spun up using docker > compose: > ``` > version: "2.2" > services: > jobmanager: > image: flink:latest > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager: > image: flink:latest > depends_on: > - jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > ``` > My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip. > I'm running macOS Monterey Version 12.4. >Reporter: Peleg Tsadok >Priority: Major > Labels: KryoSerializer, State, avro, pojo, schema-evolution > > I am trying to do a POC of Flink State Schema Evolution. I am using Flink > 1.15.0 and Java 11 but also tested on Flink 1.14.3. > I tried to create 3 data classes - one for each serialization type: > 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not > supported for POJO serialization in Flink. > 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, > `Long`, `String`. The getters, setters and constructors are generated using > Lombok. > 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin. > For each class I wrote a stream job that uses a time window to buffer > elements and turn them into a list. > For each class I tried to do the following: > 1. Run a job > 2. Stop with savepoint > 3. Add a field to the data class > 4. Submit using savepoint > For all data classes the submit with savepoint failed with this exception: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) > ... 11 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106) >