This thread is duplicated on the dev mailing list [1].

[1]
https://lists.apache.org/x/thread.html/r87fa8153137a4968f6a4f6b47c97c4d892664d864c51a79574821165@%3Cdev.flink.apache.org%3E

Best,
D.

On Tue, Jul 27, 2021 at 5:38 PM Kathula, Sandeep <sandeep_kath...@intuit.com>
wrote:

> Hi,
>
>      We have a simple Beam application like a work count running with
> Flink runner *(Beam 2.26 and Flink 1.9)*. We are using Beam’s value
> state. I am trying to read the state from savepoint using  Flink's State
> Processor API but getting a NullPointerException. Converted the whole code
> into Pure Flink application, created a savepoint and tried to read the
> state where we are able to read the state successfully.
>
>
>
> Exception Stack trace:
>
>
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>
>                 at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>
>                 at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:631)
>
>                 at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:222)
>
>                 at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>
>                 at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
>
>                 at
> org.apache.flink.api.java.DataSet.count(DataSet.java:398)
>
>                 at
> com.intuit.spp.example.StateReader.main(StateReader.java:34)
>
> Caused by: java.io.IOException: Failed to restore state backend
>
>                 at
> org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231)
>
>                 at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
>
>                 at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
>
>                 at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>                 at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
>
>                 at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>
>                 at
> org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223)
>
>                 ... 6 more
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> f25cb861abbd020d3595d47c5d53d3fd_f25cb861abbd020d3595d47c5d53d3fd_(1/1)
> from any of the 1 provided restore options.
>
>                 at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>
>                 at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
>
>                 at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>
>                 ... 7 more
>
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore heap backend
>
>                 at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
>
>                 at
> org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
>
>                 at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
>
>                 at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
>                 at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>
>                 ... 9 more
>
> Caused by: java.lang.NullPointerException
>
>                 at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
>
>                 at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
>
>                 at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
>
>                 at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
>
>                 ... 13 more
>
>
>
>
>
>
>
>
>
>
>
>             *When I debugged, it is showing that it is throwing
> NullPointerException at
> https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L280
> <https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L280>*
>
> *metaInfoSnapshot is null. I then checked what all kvStateId values we are
> getting at
> https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277
> <https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277>.*
>
>
>
> I also did stateMetaInfoSnapshot.getName() along with corresponding
> kvStateId and I saw that Beam is creating additional states internally. It
> is giving
>
>
>
> stateMetaInfoSnapshot Name
>
> kvStateId
>
> pushed-back-elements
>
> 0
>
> ivid_counts
>
> 1
>
> watermark-holds
>
> 2
>
> sortBuffer
>
> 3
>
> pending-timers
>
> 4
>
> sortBufferMinStamp
>
> 5
>
> _timer_state/event_beam-timer
>
> 6
>
> _timer_state/processing_beam-timer
>
> 7
>
>
>
> *Out of which ivid_counts state is created by us and rest of the state is
> created by Beam. In one instance it is trying to retrieve kvStateId 11570
> and its not able to get any state from
> https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277
> <https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277>
> giving stateMetaInfoSnapshot as NULL.*
>
>
>
> *I debugged further but couldn’t find the reason why it is trying to read
> some non-existent state. *
>
>
>
>
>
> Can you please help on this?
>
>
>
>
>
> Thanks,
>
> Sandeep
>
>
>

Reply via email to