Thanks, Robin. Based on Robin's comment above, I looked into CombineFn test script in beam git repo, and implemented getCoder method along the lines in that script. ( https://github.com/vangalamaheshh/my-beam/blob/master/variant-caller/src/main/java/pipelines/variant_caller/AddLines.java ) Do you think, getCoder implementation is not necessary? Thanks for your help though. Much appreciate!
- Mahesh *--* *Mahesh Vangala* *(Ph) 443-326-1957* *(web) mvangala.com <http://mvangala.com>* On Fri, Aug 17, 2018 at 1:01 PM Robin Qiu <[email protected]> wrote: > Hi Mahesh, > > I think you have the NullPointerException because your Accumulator is not > initialized properly. > > In your createAccumulator() method, you created a Accum object without > setting its line field. So later when accum.line got accessed, you got the > exception. > > By initializing the Accum class you should be able to fix this problem. > (e.g. String line = "";, instead of only String line;) > > Hope this helps, > Robin > > On Thu, Aug 16, 2018 at 12:14 PM Rui Wang <[email protected]> wrote: > >> Sorry I forgot to attach the PR link: >> https://github.com/apache/beam/pull/6154/files#diff-7358f3f0511940ea565e6584f652ed02R342 >> >> -Rui >> >> On Thu, Aug 16, 2018 at 12:13 PM Rui Wang <[email protected]> wrote: >> >>> Hi Mahesh, >>> >>> I think I had the same NPE when I explored self defined combineFn. I >>> think your combineFn might still need to define a coder to help Beam run it >>> in distributed environment. Beam tries to invoke coder somewhere and then >>> throw a NPE as there is no one defined. >>> >>> Here is a PR I wrote that defined a AccumulatingCombineFn and >>> implemented a coder for that for you reference. >>> >>> -Rui >>> >>> On Thu, Aug 16, 2018 at 12:03 PM Mahesh Vangala < >>> [email protected]> wrote: >>> >>>> Hello Robin - >>>> >>>> Thank you so much for your help. >>>> I added Serializable to Accum, and I got the following error. (Sorry, >>>> for being a pain. I hope once I get past the initial hump ...) >>>> >>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource >>>> getEstimatedSizeBytes >>>> >>>> INFO: Filepattern test_in.csv matched 1 files with total size 36 >>>> >>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split >>>> >>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 >>>> ms and produced 1 files and 9 bundles >>>> >>>> Aug 16, 2018 3:00:09 PM >>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector >>>> verifyUnmodifiedThrowingCheckedExceptions >>>> >>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a >>>> #structuralValue method which does not return true when the encoding of the >>>> elements is equal. Element KV{null, >>>> pipelines.variant_caller.AddLines$Accum@52449030} >>>> >>>> Aug 16, 2018 3:00:09 PM >>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector >>>> verifyUnmodifiedThrowingCheckedExceptions >>>> >>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a >>>> #structuralValue method which does not return true when the encoding of the >>>> elements is equal. Element KV{null, >>>> pipelines.variant_caller.AddLines$Accum@59bb25e2} >>>> >>>> Aug 16, 2018 3:00:09 PM >>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector >>>> verifyUnmodifiedThrowingCheckedExceptions >>>> >>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a >>>> #structuralValue method which does not return true when the encoding of the >>>> elements is equal. Element KV{null, >>>> pipelines.variant_caller.AddLines$Accum@7076d18e} >>>> >>>> Exception in thread "main" >>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>>> java.lang.NullPointerException >>>> >>>> at >>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>>> DirectRunner.java:332) >>>> >>>> at >>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>>> DirectRunner.java:302) >>>> >>>> at org.apache.beam.runners.direct.DirectRunner.run( >>>> DirectRunner.java:197) >>>> >>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64 >>>> ) >>>> >>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) >>>> >>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) >>>> >>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29) >>>> >>>> Caused by: java.lang.NullPointerException >>>> >>>> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:35 >>>> ) >>>> >>>> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:1) >>>> >>>> *--* >>>> *Mahesh Vangala* >>>> *(Ph) 443-326-1957* >>>> *(web) mvangala.com <http://mvangala.com>* >>>> >>>> >>>> On Thu, Aug 16, 2018 at 2:41 PM Robin Qiu <[email protected]> wrote: >>>> >>>>> Hello Mahesh, >>>>> >>>>> You can add "implements Serializable" to the Accum class, then it >>>>> should work. >>>>> >>>>> By the way, in Java String is immutable, so in order to change, for >>>>> example, accum.line, you need to write accum.line = >>>>> accum.line.concat(line). >>>>> >>>>> Best, >>>>> Robin >>>>> >>>>> On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala < >>>>> [email protected]> wrote: >>>>> >>>>>> Hello all - >>>>>> >>>>>> I am trying to run a barebone beam pipeline to understand the >>>>>> "combine" logic. I am from python world trying to learn java beam sdk due >>>>>> to my use case of ETL with spark cluster. So, pardon me for my grotesque >>>>>> java code :) >>>>>> >>>>>> I appreciate if you could nudge me in the right path with this error: >>>>>> (please see below) >>>>>> >>>>>> Here's my code: (read lines from input file and output the same lines >>>>>> to outfile) >>>>>> >>>>>> public class VariantCaller >>>>>> >>>>>> { >>>>>> >>>>>> >>>>>> >>>>>> public static void main( String[] args ) >>>>>> >>>>>> { >>>>>> >>>>>> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args >>>>>> ).create(); >>>>>> >>>>>> Pipeline p = Pipeline.create(opts); >>>>>> >>>>>> PCollection<String> lines = p.apply(TextIO.read().from( >>>>>> "test_in.csv")); >>>>>> >>>>>> PCollection<String> mergedLines = lines >>>>>> .apply(Combine.globally(new AddLines())) >>>>>> >>>>>> mergedLines.apply(TextIO.write().to("test_out.csv")); >>>>>> >>>>>> p.run(); >>>>>> >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> AddLines Class: >>>>>> >>>>>> >>>>>> public class AddLines extends CombineFn<String, AddLines.Accum, >>>>>> String> { >>>>>> >>>>>> /** >>>>>> >>>>>> * >>>>>> >>>>>> */ >>>>>> >>>>>> private static final long serialVersionUID = 1L; >>>>>> >>>>>> >>>>>> public static class Accum { >>>>>> >>>>>> String line; >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> @Override >>>>>> >>>>>> public Accum createAccumulator() { return new Accum(); } >>>>>> >>>>>> >>>>>> @Override >>>>>> >>>>>> public Accum addInput(Accum accum, String line) { >>>>>> >>>>>> accum.line.concat(line); >>>>>> >>>>>> return accum; >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> @Override >>>>>> >>>>>> public Accum mergeAccumulators(Iterable<Accum> accums) { >>>>>> >>>>>> Accum merged = createAccumulator(); >>>>>> >>>>>> for (Accum accum : accums) { >>>>>> >>>>>> merged.line.concat("\n").concat(accum.line); >>>>>> >>>>>> } >>>>>> >>>>>> return merged; >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> @Override >>>>>> >>>>>> public String extractOutput(Accum accum) { >>>>>> >>>>>> return accum.line; >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> Exception in thread "main" java.lang.IllegalStateException: Unable >>>>>> to return a default Coder for >>>>>> Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output >>>>>> [PCollection]. Correct one of the following root causes: >>>>>> >>>>>> No Coder has been manually specified; you may do so using >>>>>> .setCoder(). >>>>>> >>>>>> Inferring a Coder from the CoderRegistry failed: Cannot provide >>>>>> coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: >>>>>> Unable to provide a Coder for K. >>>>>> >>>>>> Building a Coder using a registered CoderProvider failed. >>>>>> >>>>>> See suppressed exceptions for detailed failures. >>>>>> >>>>>> Using the default output Coder from the producing PTransform >>>>>> failed: PTransform.getOutputCoder called. >>>>>> >>>>>> at >>>>>> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState( >>>>>> Preconditions.java:444) >>>>>> >>>>>> at org.apache.beam.sdk.values.PCollection.getCoder( >>>>>> PCollection.java:277) >>>>>> >>>>>> at org.apache.beam.sdk.values.PCollection.finishSpecifying( >>>>>> PCollection.java:114) >>>>>> >>>>>> at >>>>>> org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput( >>>>>> TransformHierarchy.java:190) >>>>>> >>>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536) >>>>>> >>>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) >>>>>> >>>>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325) >>>>>> >>>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand( >>>>>> Combine.java:1074) >>>>>> >>>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand( >>>>>> Combine.java:943) >>>>>> >>>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) >>>>>> >>>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) >>>>>> >>>>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325) >>>>>> >>>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:27) >>>>>> >>>>>> *--* >>>>>> *Mahesh Vangala* >>>>>> *(Ph) 443-326-1957* >>>>>> *(web) mvangala.com <http://mvangala.com>* >>>>>> >>>>>
