[ 
https://issues.apache.org/jira/browse/BEAM-2807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16158640#comment-16158640
 ] 

Daniel Harper commented on BEAM-2807:
-------------------------------------

Hi there,

I'm happy to look into this/working towards a fix, but I'm not entirely sure 
what this code is supposed to be doing so would need some guidance of what the 
fix should be that won't break everything.

Any input would be appreciated! :)

> NullPointerException during checkpoint on FlinkRunner
> -----------------------------------------------------
>
>                 Key: BEAM-2807
>                 URL: https://issues.apache.org/jira/browse/BEAM-2807
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.1.0
>            Reporter: Daniel Harper
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>
> *Beam version:* 2.1.0
> *Runner:* FlinkRunner
> We're seeing the following exception when checkpointing, which is causing our 
> job to restart
> {code}
> 2017-08-25 09:42:17,658 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) 
> -> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem 
> (7/32) (f00a31b722a31030f18d83ac613de21d) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> 
> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem 
> (7/32).}
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:966)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> 
> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem 
> (7/32).
>     ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.NullPointerException
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>     at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893)
>     ... 5 more
>     Suppressed: java.lang.Exception: Could not properly cancel managed keyed 
> state future.
>         at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1018)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:957)
>         ... 5 more
>     Caused by: java.util.concurrent.ExecutionException: 
> java.lang.NullPointerException
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>         at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>         at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>         at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>         ... 7 more
>     Caused by: java.lang.NullPointerException
>         at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347)
>         at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
>         at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$CoderTypeSerializerConfigSnapshot.write(CoderTypeSerializer.java:189)
>         at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.write(TypeSerializerSerializationUtil.java:413)
>         at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:229)
>         at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:151)
>         at 
> org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoWriterV3.writeStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:107)
>         at 
> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.write(KeyedBackendSerializationProxy.java:104)
>         at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:293)
>         at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:286)
>         at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:329)
>         at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
>         at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.notifyCheckpoint(BarrierTracker.java:263)
>         at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.processBarrier(BarrierTracker.java:178)
>         at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:97)
>         at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
>         at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         ... 1 more
> {code}
> From debugging locally I've narrowed it down to here 
> {code}
>     Caused by: java.lang.NullPointerException
>         at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347)
>         at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
>         at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$CoderTypeSerializerConfigSnapshot.write(CoderTypeSerializer.java:189)
>         at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.write(TypeSerializerSerializationUtil.java:413)
> {code}
> Specifically in 
> [CoderTypeSerializer.java#189|https://github.com/apache/beam/blob/609016d700c84800cf942482fb7cd2ddaa420b00/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L189],
>  when it calls {{DataOutputStream.writeUTF(String)}}, there is some logic in 
> the {{writeUTF}} method that gets the string length. This is what is causing 
> the NPE as the {{coderName}} field is null.
> I think this stems from the 
> [constructor|https://github.com/apache/beam/blob/609016d700c84800cf942482fb7cd2ddaa420b00/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L164]
>  which sets the {{coderName}} by calling {{.getClass().getCanonicalName();}} 
> on the {{coder}} that is passed into the constructor
> On debugging I've noticed this returns {{null}} when calling 
> {{.getClass().getCanonicalName();}} on an instance of 
> [Count$CountFn|https://github.com/apache/beam/blob/2040e2bd4203f81ea63966110e4eef3a1ff72393/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L134]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to