Hello,

I've been having some trouble with debugging exceptions in user code when using 
the Flink runner. Here's an example from a window/DoFn/GroupByKey pipeline.

ERROR o.a.f.runtime.operators.BatchTask - Error in task code: CHAIN 
MapPartition (MapPartition at ParDo(MyDoFn)) -> FlatMap 
(Transform/Windowing/Window.Assign.out) -> Map (Key Extractor) -> GroupCombine 
(GroupCombine at GroupCombine: GroupByKey) -> Map (Key Extractor) (1/8)
org.apache.beam.sdk.util.UserCodeException: 
org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) 
~[beam-sdks-java-core-0.6.0.jar:0.6.0]
at org.org.my.pipelines.MyDoFn$auxiliary$s09rfuPj.invokeProcessElement(Unknown 
Source) ~[na:na]
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:198)
 ~[beam-runners-core-java-0.6.0.jar:0.6.0]
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:156)
 ~[beam-runners-core-java-0.6.0.jar:0.6.0]
at 
org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:109)
 ~[beam-runners-flink_2.10-0.6.0.jar:0.6.0]
at 
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
 ~[flink-runtime_2.10-1.2.0.jar:1.2.0]
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) 
[flink-runtime_2.10-1.2.0.jar:1.2.0]
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) 
[flink-runtime_2.10-1.2.0.jar:1.2.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) 
[flink-runtime_2.10-1.2.0.jar:1.2.0]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException: 
null
at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:82)
 ~[flink-runtime_2.10-1.2.0.jar:1.2.0]
at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
 ~[flink-runtime_2.10-1.2.0.jar:1.2.0]
at 
org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:138)
 ~[beam-runners-flink_2.10-0.6.0.jar:0.6.0]
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:351)
 ~[beam-runners-core-java-0.6.0.jar:0.6.0]
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:545)
 ~[beam-runners-core-java-0.6.0.jar:0.6.0]
at org.my.pipelines.MyDoFn.processElement(MyDoFn.java:49) 
~[pipelines-0.1.jar:na]

The top stacktrace references some kind of anonymous 
`invokeProcessElement(Unknown Source)` which is not really informative. The 
bottom stacktrace references my call to `context.output()`, which is even more 
confusing. I've gone through fixing a couple issue by manually try/catching and 
logging directly from within `processElement`, but this is far from ideal. Any 
advice on how to interpret those and possibly set things up in order to get 
more helpful error messages would be really helpful.

Running Beam 0.6, Flink 1.2.

Thanks!

Reply via email to