Hi Sandeep,

In general I'd say it will be tricky to read Beam state this way as it
doesn't use Flink primitives, but it's writing state in custom binary
format (it can be de-serialized, but it's not easy to put all of the pieces
together).

Can you please share an example code of how you're reading the state? Also
can please you try this with latest Beam / Flink versions (the ones you're
using are no longer supported)?

Best,
D.

On Tue, Jul 27, 2021 at 5:46 PM Kathula, Sandeep
<sandeep_kath...@intuit.com.invalid> wrote:

> Hi,
>      We have a simple Beam application like a work count running with
> Flink runner (Beam 2.26 and Flink 1.9). We are using Beam’s value state. I
> am trying to read the state from savepoint using  Flink's State Processor
> API but getting a NullPointerException. Converted the whole code into Pure
> Flink application, created a savepoint and tried to read the state where we
> are able to read the state successfully.
>
> Exception Stack trace:
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>                 at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>                 at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:631)
>                 at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:222)
>                 at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>                 at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
>                 at
> org.apache.flink.api.java.DataSet.count(DataSet.java:398)
>                 at
> com.intuit.spp.example.StateReader.main(StateReader.java:34)
> Caused by: java.io.IOException: Failed to restore state backend
>                 at
> org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231)
>                 at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
>                 at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
>                 at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>                 at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
>                 at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>                 at
> org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223)
>                 ... 6 more
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> f25cb861abbd020d3595d47c5d53d3fd_f25cb861abbd020d3595d47c5d53d3fd_(1/1)
> from any of the 1 provided restore options.
>                 at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>                 at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
>                 at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>                 ... 7 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore heap backend
>                 at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
>                 at
> org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
>                 at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
>                 at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>                 at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>                 ... 9 more
> Caused by: java.lang.NullPointerException
>                 at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
>                 at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
>                 at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
>                 at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
>                 ... 13 more
>
>
>
>
>
>             When I debugged, it is showing that it is throwing
> NullPointerException at
> https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L280
> metaInfoSnapshot is null. I then checked what all kvStateId values we are
> getting at
> https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277
> .
>
> I also did stateMetaInfoSnapshot.getName() along with corresponding
> kvStateId and I saw that Beam is creating additional states internally. It
> is giving
>
> stateMetaInfoSnapshot Name
> kvStateId
> pushed-back-elements
> 0
> ivid_counts
> 1
> watermark-holds
> 2
> sortBuffer
> 3
> pending-timers
> 4
> sortBufferMinStamp
> 5
> _timer_state/event_beam-timer
> 6
> _timer_state/processing_beam-timer
> 7
>
> Out of which ivid_counts state is created by us and rest of the state is
> created by Beam. In one instance it is trying to retrieve kvStateId 11570
> and its not able to get any state from
> https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277
> giving stateMetaInfoSnapshot as NULL.
>
> I debugged further but couldn’t find the reason why it is trying to read
> some non-existent state.
>
>
> Can you please help on this?
>
>
> Thanks,
> Sandeep
>
>

Reply via email to