Aljoscha, any ideas perhaps? On Wed, Apr 5, 2017 at 12:52 PM, peay <[email protected]> wrote:
> Hello, > > I've been having some trouble with debugging exceptions in user code when > using the Flink runner. Here's an example from a window/DoFn/GroupByKey > pipeline. > > ERROR o.a.f.runtime.operators.BatchTask - Error in task code: CHAIN > MapPartition (MapPartition at ParDo(MyDoFn)) -> FlatMap > (Transform/Windowing/Window.Assign.out) -> Map (Key Extractor) -> > GroupCombine (GroupCombine at GroupCombine: GroupByKey) -> Map (Key > Extractor) (1/8) > org.apache.beam.sdk.util.UserCodeException: org.apache.flink.runtime. > operators.chaining.ExceptionInChainedStubException > at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) > ~[beam-sdks-java-core-0.6.0.jar:0.6.0] > at > org.org.my.pipelines.MyDoFn$auxiliary$s09rfuPj.invokeProcessElement(Unknown > Source) ~[na:na] > at org.apache.beam.runners.core.SimpleDoFnRunner. > invokeProcessElement(SimpleDoFnRunner.java:198) > ~[beam-runners-core-java-0.6.0.jar:0.6.0] > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:156) > ~[beam-runners-core-java-0.6.0.jar:0.6.0] > at org.apache.beam.runners.flink.translation.functions. > FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:109) > ~[beam-runners-flink_2.10-0.6.0.jar:0.6.0] > at org.apache.flink.runtime.operators.MapPartitionDriver. > run(MapPartitionDriver.java:103) ~[flink-runtime_2.10-1.2.0.jar:1.2.0] > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > [flink-runtime_2.10-1.2.0.jar:1.2.0] > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > [flink-runtime_2.10-1.2.0.jar:1.2.0] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > [flink-runtime_2.10-1.2.0.jar:1.2.0] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121] > org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException: > null > at org.apache.flink.runtime.operators.chaining. > ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:82) > ~[flink-runtime_2.10-1.2.0.jar:1.2.0] > at org.apache.flink.runtime.operators.util.metrics. > CountingCollector.collect(CountingCollector.java:35) > ~[flink-runtime_2.10-1.2.0.jar:1.2.0] > at org.apache.beam.runners.flink.translation.functions. > FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:138) > ~[beam-runners-flink_2.10-0.6.0.jar:0.6.0] > at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext. > outputWindowedValue(SimpleDoFnRunner.java:351) > ~[beam-runners-core-java-0.6.0.jar:0.6.0] > at org.apache.beam.runners.core.SimpleDoFnRunner$ > DoFnProcessContext.output(SimpleDoFnRunner.java:545) > ~[beam-runners-core-java-0.6.0.jar:0.6.0] > at org.my.pipelines.MyDoFn.processElement(MyDoFn.java:49) > ~[pipelines-0.1.jar:na] > > The top stacktrace references some kind of anonymous > `invokeProcessElement(Unknown Source)` which is not really informative. The > bottom stacktrace references my call to `context.output()`, which is even > more confusing. I've gone through fixing a couple issue by manually > try/catching and logging directly from within `processElement`, but this is > far from ideal. Any advice on how to interpret those and possibly set > things up in order to get more helpful error messages would be really > helpful. > > Running Beam 0.6, Flink 1.2. > > Thanks! > > >
