[ https://issues.apache.org/jira/browse/BEAM-2807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16141601#comment-16141601 ]
Daniel Harper commented on BEAM-2807: ------------------------------------- {{getCanonicalName}} will return null on anonymous classes https://docs.oracle.com/javase/8/docs/api/java/lang/Class.html#getCanonicalName {quote} Returns the canonical name of the underlying class as defined by the Java Language Specification. Returns null if the underlying class does not have a canonical name (i.e., if it is a local or anonymous class or an array whose component type does not have a canonical name). {quote} > NullPointerException during checkpoint on FlinkRunner > ----------------------------------------------------- > > Key: BEAM-2807 > URL: https://issues.apache.org/jira/browse/BEAM-2807 > Project: Beam > Issue Type: Bug > Components: beam-model > Affects Versions: 2.1.0 > Reporter: Daniel Harper > Assignee: Kenneth Knowles > Priority: Blocker > > *Beam version:* 2.1.0 > *Runner:* FlinkRunner > We're seeing the following exception when checkpointing, which is causing our > job to restart > {code} > 2017-08-25 09:42:17,658 INFO org.apache.flink.runtime.taskmanager.Task > - Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) > -> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem > (7/32) (f00a31b722a31030f18d83ac613de21d) switched from RUNNING to FAILED. > AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 > for operator Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> > ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem > (7/32).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:966) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for > operator Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> > ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem > (7/32). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.NullPointerException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed keyed > state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1018) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:957) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.NullPointerException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 7 more > Caused by: java.lang.NullPointerException > at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347) > at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323) > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$CoderTypeSerializerConfigSnapshot.write(CoderTypeSerializer.java:189) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.write(TypeSerializerSerializationUtil.java:413) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:229) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:151) > at > org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoWriterV3.writeStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:107) > at > org.apache.flink.runtime.state.KeyedBackendSerializationProxy.write(KeyedBackendSerializationProxy.java:104) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:293) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:286) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:329) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) > at > org.apache.flink.streaming.runtime.io.BarrierTracker.notifyCheckpoint(BarrierTracker.java:263) > at > org.apache.flink.streaming.runtime.io.BarrierTracker.processBarrier(BarrierTracker.java:178) > at > org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:97) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > ... 1 more > {code} > From debugging locally I've narrowed it down to here > {code} > Caused by: java.lang.NullPointerException > at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347) > at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323) > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$CoderTypeSerializerConfigSnapshot.write(CoderTypeSerializer.java:189) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.write(TypeSerializerSerializationUtil.java:413) > {code} > Specifically in > [CoderTypeSerializer.java#189|https://github.com/apache/beam/blob/609016d700c84800cf942482fb7cd2ddaa420b00/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L189], > when it calls {{DataOutputStream.writeUTF(String)}}, there is some logic in > the {{writeUTF}} method that gets the string length. This is what is causing > the NPE as the {{coderName}} field is null. > I think this stems from the > [constructor|https://github.com/apache/beam/blob/609016d700c84800cf942482fb7cd2ddaa420b00/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L164] > which sets the {{coderName}} by calling {{.getClass().getCanonicalName();}} > on the {{coder}} that is passed into the constructor > On debugging I've noticed this returns {{null}} when calling > {{.getClass().getCanonicalName();}} on an instance of > [Count$CountFn|https://github.com/apache/beam/blob/2040e2bd4203f81ea63966110e4eef3a1ff72393/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L134] -- This message was sent by Atlassian JIRA (v6.4.14#64029)