Hi Mahesh, I think I had the same NPE when I explored self defined combineFn. I think your combineFn might still need to define a coder to help Beam run it in distributed environment. Beam tries to invoke coder somewhere and then throw a NPE as there is no one defined.
Here is a PR I wrote that defined a AccumulatingCombineFn and implemented a coder for that for you reference. -Rui On Thu, Aug 16, 2018 at 12:03 PM Mahesh Vangala <vangalamahe...@gmail.com> wrote: > Hello Robin - > > Thank you so much for your help. > I added Serializable to Accum, and I got the following error. (Sorry, for > being a pain. I hope once I get past the initial hump ...) > > Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource > getEstimatedSizeBytes > > INFO: Filepattern test_in.csv matched 1 files with total size 36 > > Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split > > INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms > and produced 1 files and 9 bundles > > Aug 16, 2018 3:00:09 PM > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector > verifyUnmodifiedThrowingCheckedExceptions > > WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a > #structuralValue method which does not return true when the encoding of the > elements is equal. Element KV{null, > pipelines.variant_caller.AddLines$Accum@52449030} > > Aug 16, 2018 3:00:09 PM > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector > verifyUnmodifiedThrowingCheckedExceptions > > WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a > #structuralValue method which does not return true when the encoding of the > elements is equal. Element KV{null, > pipelines.variant_caller.AddLines$Accum@59bb25e2} > > Aug 16, 2018 3:00:09 PM > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector > verifyUnmodifiedThrowingCheckedExceptions > > WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a > #structuralValue method which does not return true when the encoding of the > elements is equal. Element KV{null, > pipelines.variant_caller.AddLines$Accum@7076d18e} > > Exception in thread "main" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.NullPointerException > > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( > DirectRunner.java:332) > > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( > DirectRunner.java:302) > > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197) > > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64) > > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) > > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) > > at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29) > > Caused by: java.lang.NullPointerException > > at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:35) > > at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:1) > > *--* > *Mahesh Vangala* > *(Ph) 443-326-1957* > *(web) mvangala.com <http://mvangala.com>* > > > On Thu, Aug 16, 2018 at 2:41 PM Robin Qiu <robi...@google.com> wrote: > >> Hello Mahesh, >> >> You can add "implements Serializable" to the Accum class, then it should >> work. >> >> By the way, in Java String is immutable, so in order to change, for >> example, accum.line, you need to write accum.line = accum.line.concat(line). >> >> Best, >> Robin >> >> On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala <vangalamahe...@gmail.com> >> wrote: >> >>> Hello all - >>> >>> I am trying to run a barebone beam pipeline to understand the "combine" >>> logic. I am from python world trying to learn java beam sdk due to my use >>> case of ETL with spark cluster. So, pardon me for my grotesque java code :) >>> >>> I appreciate if you could nudge me in the right path with this error: >>> (please see below) >>> >>> Here's my code: (read lines from input file and output the same lines to >>> outfile) >>> >>> public class VariantCaller >>> >>> { >>> >>> >>> >>> public static void main( String[] args ) >>> >>> { >>> >>> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args >>> ).create(); >>> >>> Pipeline p = Pipeline.create(opts); >>> >>> PCollection<String> lines = p.apply(TextIO.read().from( >>> "test_in.csv")); >>> >>> PCollection<String> mergedLines = lines.apply(Combine.globally( >>> new AddLines())) >>> >>> mergedLines.apply(TextIO.write().to("test_out.csv")); >>> >>> p.run(); >>> >>> } >>> >>> } >>> >>> >>> AddLines Class: >>> >>> >>> public class AddLines extends CombineFn<String, AddLines.Accum, String> >>> { >>> >>> /** >>> >>> * >>> >>> */ >>> >>> private static final long serialVersionUID = 1L; >>> >>> >>> public static class Accum { >>> >>> String line; >>> >>> } >>> >>> >>> @Override >>> >>> public Accum createAccumulator() { return new Accum(); } >>> >>> >>> @Override >>> >>> public Accum addInput(Accum accum, String line) { >>> >>> accum.line.concat(line); >>> >>> return accum; >>> >>> } >>> >>> >>> >>> @Override >>> >>> public Accum mergeAccumulators(Iterable<Accum> accums) { >>> >>> Accum merged = createAccumulator(); >>> >>> for (Accum accum : accums) { >>> >>> merged.line.concat("\n").concat(accum.line); >>> >>> } >>> >>> return merged; >>> >>> } >>> >>> >>> >>> @Override >>> >>> public String extractOutput(Accum accum) { >>> >>> return accum.line; >>> >>> } >>> >>> >>> } >>> >>> >>> Exception in thread "main" java.lang.IllegalStateException: Unable to >>> return a default Coder for >>> Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output >>> [PCollection]. Correct one of the following root causes: >>> >>> No Coder has been manually specified; you may do so using >>> .setCoder(). >>> >>> Inferring a Coder from the CoderRegistry failed: Cannot provide coder >>> for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: Unable to >>> provide a Coder for K. >>> >>> Building a Coder using a registered CoderProvider failed. >>> >>> See suppressed exceptions for detailed failures. >>> >>> Using the default output Coder from the producing PTransform failed: >>> PTransform.getOutputCoder called. >>> >>> at >>> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState( >>> Preconditions.java:444) >>> >>> at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:277) >>> >>> at org.apache.beam.sdk.values.PCollection.finishSpecifying( >>> PCollection.java:114) >>> >>> at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput( >>> TransformHierarchy.java:190) >>> >>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536) >>> >>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) >>> >>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325) >>> >>> at org.apache.beam.sdk.transforms.Combine$Globally.expand( >>> Combine.java:1074) >>> >>> at org.apache.beam.sdk.transforms.Combine$Globally.expand( >>> Combine.java:943) >>> >>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) >>> >>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) >>> >>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325) >>> >>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:27) >>> >>> *--* >>> *Mahesh Vangala* >>> *(Ph) 443-326-1957* >>> *(web) mvangala.com <http://mvangala.com>* >>> >>