This thread is duplicated on the dev mailing list [1]. [1] https://lists.apache.org/x/thread.html/r87fa8153137a4968f6a4f6b47c97c4d892664d864c51a79574821165@%3Cdev.flink.apache.org%3E
Best, D. On Tue, Jul 27, 2021 at 5:38 PM Kathula, Sandeep <sandeep_kath...@intuit.com> wrote: > Hi, > > We have a simple Beam application like a work count running with > Flink runner *(Beam 2.26 and Flink 1.9)*. We are using Beam’s value > state. I am trying to read the state from savepoint using Flink's State > Processor API but getting a NullPointerException. Converted the whole code > into Pure Flink application, created a savepoint and tried to read the > state where we are able to read the state successfully. > > > > Exception Stack trace: > > > > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:631) > > at > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:222) > > at > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) > > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820) > > at > org.apache.flink.api.java.DataSet.count(DataSet.java:398) > > at > com.intuit.spp.example.StateReader.main(StateReader.java:34) > > Caused by: java.io.IOException: Failed to restore state backend > > at > org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231) > > at > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177) > > at > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79) > > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) > > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.lang.Exception: Exception while creating > StreamOperatorStateContext. > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195) > > at > org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223) > > ... 6 more > > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for > f25cb861abbd020d3595d47c5d53d3fd_f25cb861abbd020d3595d47c5d53d3fd_(1/1) > from any of the 1 provided restore options. > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) > > ... 7 more > > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) > > at > org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > > ... 9 more > > Caused by: java.lang.NullPointerException > > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280) > > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254) > > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154) > > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) > > ... 13 more > > > > > > > > > > > > *When I debugged, it is showing that it is throwing > NullPointerException at > https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L280 > <https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L280>* > > *metaInfoSnapshot is null. I then checked what all kvStateId values we are > getting at > https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277 > <https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277>.* > > > > I also did stateMetaInfoSnapshot.getName() along with corresponding > kvStateId and I saw that Beam is creating additional states internally. It > is giving > > > > stateMetaInfoSnapshot Name > > kvStateId > > pushed-back-elements > > 0 > > ivid_counts > > 1 > > watermark-holds > > 2 > > sortBuffer > > 3 > > pending-timers > > 4 > > sortBufferMinStamp > > 5 > > _timer_state/event_beam-timer > > 6 > > _timer_state/processing_beam-timer > > 7 > > > > *Out of which ivid_counts state is created by us and rest of the state is > created by Beam. In one instance it is trying to retrieve kvStateId 11570 > and its not able to get any state from > https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277 > <https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277> > giving stateMetaInfoSnapshot as NULL.* > > > > *I debugged further but couldn’t find the reason why it is trying to read > some non-existent state. * > > > > > > Can you please help on this? > > > > > > Thanks, > > Sandeep > > >