Hello,

could it be, that it's no longer possible to run pipelines with a BigQuery sink 
locally on the dev machine? I migrated a "Read JSON from GCS, parse and 
write to BQ" pipeline to Apache Beam 0.5.0 from the Dataflow SDK. 
All tests are green, the pipeline runs successfully on the Dataflow service 
with 
the test files, but locally with the DirectRunner I get a NPE.

It happens right after I create the TableRow element which I even double 
checked not to be null. Even when I artificially create a LogLine 
element in this step without taking the one from the input the NPE is thrown:


static class Outputter extends DoFn<LogLine, TableRow> {
(...)
        LogLine logLine = c.element();

        TableRow tableRow = logLine.toTableRow();
        tableRow.set("ts", c.timestamp().toString());

        if (c != null && tableRow != null){
            try {

                c.output(tableRow);
            }
            catch(NullPointerException e){
                LOG.error("catched NPE");
                e.printStackTrace();
            }
        }

The corrensponding Stacktrace looks like this:

ERROR: catched NPE
java.lang.NullPointerException
        at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
        at java.util.AbstractMap.hashCode(AbstractMap.java:530)
        at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
        at java.util.AbstractMap.hashCode(AbstractMap.java:530)
        at java.util.Arrays.hashCode(Arrays.java:4146)
        at java.util.Objects.hash(Objects.java:128)
        at 
org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow.hashCode(WindowedValue.java:409)
        at java.util.HashMap.hash(HashMap.java:338)
        at java.util.HashMap.get(HashMap.java:556)
        at 
org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:193)
        at 
org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:128)
        at 
org.apache.beam.runners.direct.repackaged.com.google.common.collect.HashMultimap.put(HashMultimap.java:49)
        at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
        at 
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:198)
        at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:352)
        at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:553)
        at 
ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter.processElement(FrontendPipeline.java:181)
        at 
ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter$auxiliary$sxgOpc6N.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:199)
        at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:161)
        at 
org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(PushbackSideInputDoFnRunner.java:111)
        at 
org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElementInReadyWindows(PushbackSideInputDoFnRunner.java:77)
        at 
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:134)
        at 
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
        at 
org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139)
        at 
org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Best,
Tobias

Reply via email to