Hi Mahesh and Rui, I believe if you comment out your own implementation of getCoder, your program should still work. The reason is that coder will be inferred if you don't provide a coder.
Best, Robin On Fri, Aug 17, 2018 at 12:03 PM Rui Wang <ruw...@google.com> wrote: > Hi Robin, > > I have the same question here. If we don't implement coder for class Accum > but only add implements Serializable to class Accum, how is the accumulator > coder generated? > > -Rui > > On Fri, Aug 17, 2018 at 11:50 AM Mahesh Vangala <vangalamahe...@gmail.com> > wrote: > >> Thanks, Robin. >> Based on Robin's comment above, I looked into CombineFn test script in >> beam git repo, and implemented getCoder method along the lines in that >> script. ( >> https://github.com/vangalamaheshh/my-beam/blob/master/variant-caller/src/main/java/pipelines/variant_caller/AddLines.java >> ) >> Do you think, getCoder implementation is not necessary? >> Thanks for your help though. >> Much appreciate! >> >> - Mahesh >> >> >> *--* >> *Mahesh Vangala* >> *(Ph) 443-326-1957* >> *(web) mvangala.com <http://mvangala.com>* >> >> >> On Fri, Aug 17, 2018 at 1:01 PM Robin Qiu <robi...@google.com> wrote: >> >>> Hi Mahesh, >>> >>> I think you have the NullPointerException because your Accumulator is >>> not initialized properly. >>> >>> In your createAccumulator() method, you created a Accum object without >>> setting its line field. So later when accum.line got accessed, you got the >>> exception. >>> >>> By initializing the Accum class you should be able to fix this problem. >>> (e.g. String line = "";, instead of only String line;) >>> >>> Hope this helps, >>> Robin >>> >>> On Thu, Aug 16, 2018 at 12:14 PM Rui Wang <ruw...@google.com> wrote: >>> >>>> Sorry I forgot to attach the PR link: >>>> https://github.com/apache/beam/pull/6154/files#diff-7358f3f0511940ea565e6584f652ed02R342 >>>> >>>> -Rui >>>> >>>> On Thu, Aug 16, 2018 at 12:13 PM Rui Wang <ruw...@google.com> wrote: >>>> >>>>> Hi Mahesh, >>>>> >>>>> I think I had the same NPE when I explored self defined combineFn. I >>>>> think your combineFn might still need to define a coder to help Beam run >>>>> it >>>>> in distributed environment. Beam tries to invoke coder somewhere and then >>>>> throw a NPE as there is no one defined. >>>>> >>>>> Here is a PR I wrote that defined a AccumulatingCombineFn and >>>>> implemented a coder for that for you reference. >>>>> >>>>> -Rui >>>>> >>>>> On Thu, Aug 16, 2018 at 12:03 PM Mahesh Vangala < >>>>> vangalamahe...@gmail.com> wrote: >>>>> >>>>>> Hello Robin - >>>>>> >>>>>> Thank you so much for your help. >>>>>> I added Serializable to Accum, and I got the following error. (Sorry, >>>>>> for being a pain. I hope once I get past the initial hump ...) >>>>>> >>>>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource >>>>>> getEstimatedSizeBytes >>>>>> >>>>>> INFO: Filepattern test_in.csv matched 1 files with total size 36 >>>>>> >>>>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split >>>>>> >>>>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 >>>>>> ms and produced 1 files and 9 bundles >>>>>> >>>>>> Aug 16, 2018 3:00:09 PM >>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector >>>>>> verifyUnmodifiedThrowingCheckedExceptions >>>>>> >>>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a >>>>>> #structuralValue method which does not return true when the encoding of >>>>>> the >>>>>> elements is equal. Element KV{null, >>>>>> pipelines.variant_caller.AddLines$Accum@52449030} >>>>>> >>>>>> Aug 16, 2018 3:00:09 PM >>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector >>>>>> verifyUnmodifiedThrowingCheckedExceptions >>>>>> >>>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a >>>>>> #structuralValue method which does not return true when the encoding of >>>>>> the >>>>>> elements is equal. Element KV{null, >>>>>> pipelines.variant_caller.AddLines$Accum@59bb25e2} >>>>>> >>>>>> Aug 16, 2018 3:00:09 PM >>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector >>>>>> verifyUnmodifiedThrowingCheckedExceptions >>>>>> >>>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a >>>>>> #structuralValue method which does not return true when the encoding of >>>>>> the >>>>>> elements is equal. Element KV{null, >>>>>> pipelines.variant_caller.AddLines$Accum@7076d18e} >>>>>> >>>>>> Exception in thread "main" >>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>>>>> java.lang.NullPointerException >>>>>> >>>>>> at >>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>>>>> DirectRunner.java:332) >>>>>> >>>>>> at >>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>>>>> DirectRunner.java:302) >>>>>> >>>>>> at org.apache.beam.runners.direct.DirectRunner.run( >>>>>> DirectRunner.java:197) >>>>>> >>>>>> at org.apache.beam.runners.direct.DirectRunner.run( >>>>>> DirectRunner.java:64) >>>>>> >>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) >>>>>> >>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) >>>>>> >>>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29) >>>>>> >>>>>> Caused by: java.lang.NullPointerException >>>>>> >>>>>> at pipelines.variant_caller.AddLines.mergeAccumulators( >>>>>> AddLines.java:35) >>>>>> >>>>>> at pipelines.variant_caller.AddLines.mergeAccumulators( >>>>>> AddLines.java:1) >>>>>> >>>>>> *--* >>>>>> *Mahesh Vangala* >>>>>> *(Ph) 443-326-1957* >>>>>> *(web) mvangala.com <http://mvangala.com>* >>>>>> >>>>>> >>>>>> On Thu, Aug 16, 2018 at 2:41 PM Robin Qiu <robi...@google.com> wrote: >>>>>> >>>>>>> Hello Mahesh, >>>>>>> >>>>>>> You can add "implements Serializable" to the Accum class, then it >>>>>>> should work. >>>>>>> >>>>>>> By the way, in Java String is immutable, so in order to change, for >>>>>>> example, accum.line, you need to write accum.line = >>>>>>> accum.line.concat(line). >>>>>>> >>>>>>> Best, >>>>>>> Robin >>>>>>> >>>>>>> On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala < >>>>>>> vangalamahe...@gmail.com> wrote: >>>>>>> >>>>>>>> Hello all - >>>>>>>> >>>>>>>> I am trying to run a barebone beam pipeline to understand the >>>>>>>> "combine" logic. I am from python world trying to learn java beam sdk >>>>>>>> due >>>>>>>> to my use case of ETL with spark cluster. So, pardon me for my >>>>>>>> grotesque >>>>>>>> java code :) >>>>>>>> >>>>>>>> I appreciate if you could nudge me in the right path with this >>>>>>>> error: (please see below) >>>>>>>> >>>>>>>> Here's my code: (read lines from input file and output the same >>>>>>>> lines to outfile) >>>>>>>> >>>>>>>> public class VariantCaller >>>>>>>> >>>>>>>> { >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> public static void main( String[] args ) >>>>>>>> >>>>>>>> { >>>>>>>> >>>>>>>> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args >>>>>>>> ).create(); >>>>>>>> >>>>>>>> Pipeline p = Pipeline.create(opts); >>>>>>>> >>>>>>>> PCollection<String> lines = p.apply(TextIO.read().from( >>>>>>>> "test_in.csv")); >>>>>>>> >>>>>>>> PCollection<String> mergedLines = lines >>>>>>>> .apply(Combine.globally(new AddLines())) >>>>>>>> >>>>>>>> mergedLines.apply(TextIO.write().to("test_out.csv")); >>>>>>>> >>>>>>>> p.run(); >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> AddLines Class: >>>>>>>> >>>>>>>> >>>>>>>> public class AddLines extends CombineFn<String, AddLines.Accum, >>>>>>>> String> { >>>>>>>> >>>>>>>> /** >>>>>>>> >>>>>>>> * >>>>>>>> >>>>>>>> */ >>>>>>>> >>>>>>>> private static final long serialVersionUID = 1L; >>>>>>>> >>>>>>>> >>>>>>>> public static class Accum { >>>>>>>> >>>>>>>> String line; >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> @Override >>>>>>>> >>>>>>>> public Accum createAccumulator() { return new Accum(); } >>>>>>>> >>>>>>>> >>>>>>>> @Override >>>>>>>> >>>>>>>> public Accum addInput(Accum accum, String line) { >>>>>>>> >>>>>>>> accum.line.concat(line); >>>>>>>> >>>>>>>> return accum; >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> @Override >>>>>>>> >>>>>>>> public Accum mergeAccumulators(Iterable<Accum> accums) { >>>>>>>> >>>>>>>> Accum merged = createAccumulator(); >>>>>>>> >>>>>>>> for (Accum accum : accums) { >>>>>>>> >>>>>>>> merged.line.concat("\n").concat(accum.line); >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> return merged; >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> @Override >>>>>>>> >>>>>>>> public String extractOutput(Accum accum) { >>>>>>>> >>>>>>>> return accum.line; >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> Exception in thread "main" java.lang.IllegalStateException: Unable >>>>>>>> to return a default Coder for >>>>>>>> Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output >>>>>>>> [PCollection]. Correct one of the following root causes: >>>>>>>> >>>>>>>> No Coder has been manually specified; you may do so using >>>>>>>> .setCoder(). >>>>>>>> >>>>>>>> Inferring a Coder from the CoderRegistry failed: Cannot provide >>>>>>>> coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: >>>>>>>> Unable to provide a Coder for K. >>>>>>>> >>>>>>>> Building a Coder using a registered CoderProvider failed. >>>>>>>> >>>>>>>> See suppressed exceptions for detailed failures. >>>>>>>> >>>>>>>> Using the default output Coder from the producing PTransform >>>>>>>> failed: PTransform.getOutputCoder called. >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState( >>>>>>>> Preconditions.java:444) >>>>>>>> >>>>>>>> at org.apache.beam.sdk.values.PCollection.getCoder( >>>>>>>> PCollection.java:277) >>>>>>>> >>>>>>>> at org.apache.beam.sdk.values.PCollection.finishSpecifying( >>>>>>>> PCollection.java:114) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput( >>>>>>>> TransformHierarchy.java:190) >>>>>>>> >>>>>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536) >>>>>>>> >>>>>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) >>>>>>>> >>>>>>>> at org.apache.beam.sdk.values.PCollection.apply( >>>>>>>> PCollection.java:325) >>>>>>>> >>>>>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand( >>>>>>>> Combine.java:1074) >>>>>>>> >>>>>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand( >>>>>>>> Combine.java:943) >>>>>>>> >>>>>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) >>>>>>>> >>>>>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) >>>>>>>> >>>>>>>> at org.apache.beam.sdk.values.PCollection.apply( >>>>>>>> PCollection.java:325) >>>>>>>> >>>>>>>> at pipelines.variant_caller.VariantCaller.main( >>>>>>>> VariantCaller.java:27) >>>>>>>> >>>>>>>> *--* >>>>>>>> *Mahesh Vangala* >>>>>>>> *(Ph) 443-326-1957* >>>>>>>> *(web) mvangala.com <http://mvangala.com>* >>>>>>>> >>>>>>>