Hello,
could it be, that it's no longer possible to run pipelines with a BigQuery sink
locally on the dev machine? I migrated a "Read JSON from GCS, parse and
write to BQ" pipeline to Apache Beam 0.5.0 from the Dataflow SDK.
All tests are green, the pipeline runs successfully on the Dataflow service
with
the test files, but locally with the DirectRunner I get a NPE.
It happens right after I create the TableRow element which I even double
checked not to be null. Even when I artificially create a LogLine
element in this step without taking the one from the input the NPE is thrown:
static class Outputter extends DoFn<LogLine, TableRow> {
(...)
LogLine logLine = c.element();
TableRow tableRow = logLine.toTableRow();
tableRow.set("ts", c.timestamp().toString());
if (c != null && tableRow != null){
try {
c.output(tableRow);
}
catch(NullPointerException e){
LOG.error("catched NPE");
e.printStackTrace();
}
}
The corrensponding Stacktrace looks like this:
ERROR: catched NPE
java.lang.NullPointerException
at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
at java.util.AbstractMap.hashCode(AbstractMap.java:530)
at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
at java.util.AbstractMap.hashCode(AbstractMap.java:530)
at java.util.Arrays.hashCode(Arrays.java:4146)
at java.util.Objects.hash(Objects.java:128)
at
org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow.hashCode(WindowedValue.java:409)
at java.util.HashMap.hash(HashMap.java:338)
at java.util.HashMap.get(HashMap.java:556)
at
org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:193)
at
org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:128)
at
org.apache.beam.runners.direct.repackaged.com.google.common.collect.HashMultimap.put(HashMultimap.java:49)
at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
at
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:198)
at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:352)
at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:553)
at
ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter.processElement(FrontendPipeline.java:181)
at
ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter$auxiliary$sxgOpc6N.invokeProcessElement(Unknown
Source)
at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:199)
at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:161)
at
org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(PushbackSideInputDoFnRunner.java:111)
at
org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElementInReadyWindows(PushbackSideInputDoFnRunner.java:77)
at
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:134)
at
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
at
org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139)
at
org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Best,
Tobias