Thanks, Robin.
Based on Robin's comment above, I looked into CombineFn test script in beam
git repo, and implemented getCoder method along the lines in that script. (
https://github.com/vangalamaheshh/my-beam/blob/master/variant-caller/src/main/java/pipelines/variant_caller/AddLines.java
)
Do you think, getCoder implementation is not necessary?
Thanks for your help though.
Much appreciate!

- Mahesh


*--*
*Mahesh Vangala*
*(Ph) 443-326-1957*
*(web) mvangala.com <http://mvangala.com>*


On Fri, Aug 17, 2018 at 1:01 PM Robin Qiu <robi...@google.com> wrote:

> Hi Mahesh,
>
> I think you have the NullPointerException because your Accumulator is not
> initialized properly.
>
> In your createAccumulator() method, you created a Accum object without
> setting its line field. So later when accum.line got accessed, you got the
> exception.
>
> By initializing the Accum class you should be able to fix this problem.
> (e.g. String line = "";, instead of only String line;)
>
> Hope this helps,
> Robin
>
> On Thu, Aug 16, 2018 at 12:14 PM Rui Wang <ruw...@google.com> wrote:
>
>> Sorry I forgot to attach the PR link:
>> https://github.com/apache/beam/pull/6154/files#diff-7358f3f0511940ea565e6584f652ed02R342
>>
>> -Rui
>>
>> On Thu, Aug 16, 2018 at 12:13 PM Rui Wang <ruw...@google.com> wrote:
>>
>>> Hi Mahesh,
>>>
>>> I think I had the same NPE when I explored self defined combineFn. I
>>> think your combineFn might still need to define a coder to help Beam run it
>>> in distributed environment. Beam tries to invoke coder somewhere and then
>>> throw a NPE as there is no one defined.
>>>
>>> Here is a PR I wrote that defined a AccumulatingCombineFn and
>>> implemented a coder for that for you reference.
>>>
>>> -Rui
>>>
>>> On Thu, Aug 16, 2018 at 12:03 PM Mahesh Vangala <
>>> vangalamahe...@gmail.com> wrote:
>>>
>>>> Hello Robin -
>>>>
>>>> Thank you so much for your help.
>>>> I added Serializable to Accum, and I got the following error. (Sorry,
>>>> for being a pain. I hope once I get past the initial hump ...)
>>>>
>>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource
>>>> getEstimatedSizeBytes
>>>>
>>>> INFO: Filepattern test_in.csv matched 1 files with total size 36
>>>>
>>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split
>>>>
>>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1
>>>> ms and produced 1 files and 9 bundles
>>>>
>>>> Aug 16, 2018 3:00:09 PM
>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>>>> verifyUnmodifiedThrowingCheckedExceptions
>>>>
>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>>>> #structuralValue method which does not return true when the encoding of the
>>>> elements is equal. Element KV{null,
>>>> pipelines.variant_caller.AddLines$Accum@52449030}
>>>>
>>>> Aug 16, 2018 3:00:09 PM
>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>>>> verifyUnmodifiedThrowingCheckedExceptions
>>>>
>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>>>> #structuralValue method which does not return true when the encoding of the
>>>> elements is equal. Element KV{null,
>>>> pipelines.variant_caller.AddLines$Accum@59bb25e2}
>>>>
>>>> Aug 16, 2018 3:00:09 PM
>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>>>> verifyUnmodifiedThrowingCheckedExceptions
>>>>
>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>>>> #structuralValue method which does not return true when the encoding of the
>>>> elements is equal. Element KV{null,
>>>> pipelines.variant_caller.AddLines$Accum@7076d18e}
>>>>
>>>> Exception in thread "main"
>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>> java.lang.NullPointerException
>>>>
>>>> at
>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>> DirectRunner.java:332)
>>>>
>>>> at
>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>> DirectRunner.java:302)
>>>>
>>>> at org.apache.beam.runners.direct.DirectRunner.run(
>>>> DirectRunner.java:197)
>>>>
>>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64
>>>> )
>>>>
>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>>>>
>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>>>>
>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>>>>
>>>> Caused by: java.lang.NullPointerException
>>>>
>>>> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:35
>>>> )
>>>>
>>>> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:1)
>>>>
>>>> *--*
>>>> *Mahesh Vangala*
>>>> *(Ph) 443-326-1957*
>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>
>>>>
>>>> On Thu, Aug 16, 2018 at 2:41 PM Robin Qiu <robi...@google.com> wrote:
>>>>
>>>>> Hello Mahesh,
>>>>>
>>>>> You can add "implements Serializable" to the Accum class, then it
>>>>> should work.
>>>>>
>>>>> By the way, in Java String is immutable, so in order to change, for
>>>>> example, accum.line, you need to write accum.line = 
>>>>> accum.line.concat(line).
>>>>>
>>>>> Best,
>>>>> Robin
>>>>>
>>>>> On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala <
>>>>> vangalamahe...@gmail.com> wrote:
>>>>>
>>>>>> Hello all -
>>>>>>
>>>>>> I am trying to run a barebone beam pipeline to understand the
>>>>>> "combine" logic. I am from python world trying to learn java beam sdk due
>>>>>> to my use case of ETL with spark cluster. So, pardon me for my grotesque
>>>>>> java code :)
>>>>>>
>>>>>> I appreciate if you could nudge me in the right path with this error:
>>>>>> (please see below)
>>>>>>
>>>>>> Here's my code: (read lines from input file and output the same lines
>>>>>> to outfile)
>>>>>>
>>>>>> public class VariantCaller
>>>>>>
>>>>>> {
>>>>>>
>>>>>>
>>>>>>
>>>>>>     public static void main( String[] args )
>>>>>>
>>>>>>     {
>>>>>>
>>>>>>         PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>>>>>> ).create();
>>>>>>
>>>>>>         Pipeline p = Pipeline.create(opts);
>>>>>>
>>>>>>         PCollection<String> lines = p.apply(TextIO.read().from(
>>>>>> "test_in.csv"));
>>>>>>
>>>>>>         PCollection<String> mergedLines = lines
>>>>>> .apply(Combine.globally(new AddLines()))
>>>>>>
>>>>>>         mergedLines.apply(TextIO.write().to("test_out.csv"));
>>>>>>
>>>>>>         p.run();
>>>>>>
>>>>>>     }
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> AddLines Class:
>>>>>>
>>>>>>
>>>>>> public class AddLines extends CombineFn<String, AddLines.Accum,
>>>>>> String> {
>>>>>>
>>>>>>   /**
>>>>>>
>>>>>>    *
>>>>>>
>>>>>>    */
>>>>>>
>>>>>>   private static final long serialVersionUID = 1L;
>>>>>>
>>>>>>
>>>>>>   public static class Accum {
>>>>>>
>>>>>>     String line;
>>>>>>
>>>>>>   }
>>>>>>
>>>>>>
>>>>>>   @Override
>>>>>>
>>>>>>   public Accum createAccumulator() { return new Accum(); }
>>>>>>
>>>>>>
>>>>>>   @Override
>>>>>>
>>>>>>   public Accum addInput(Accum accum, String line) {
>>>>>>
>>>>>>       accum.line.concat(line);
>>>>>>
>>>>>>       return accum;
>>>>>>
>>>>>>   }
>>>>>>
>>>>>>
>>>>>>
>>>>>>   @Override
>>>>>>
>>>>>>   public Accum mergeAccumulators(Iterable<Accum> accums) {
>>>>>>
>>>>>>     Accum merged = createAccumulator();
>>>>>>
>>>>>>     for (Accum accum : accums) {
>>>>>>
>>>>>>       merged.line.concat("\n").concat(accum.line);
>>>>>>
>>>>>>     }
>>>>>>
>>>>>>     return merged;
>>>>>>
>>>>>>   }
>>>>>>
>>>>>>
>>>>>>
>>>>>>   @Override
>>>>>>
>>>>>>   public String extractOutput(Accum accum) {
>>>>>>
>>>>>>     return accum.line;
>>>>>>
>>>>>>   }
>>>>>>
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> Exception in thread "main" java.lang.IllegalStateException: Unable
>>>>>> to return a default Coder for
>>>>>> Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output
>>>>>> [PCollection]. Correct one of the following root causes:
>>>>>>
>>>>>>   No Coder has been manually specified;  you may do so using
>>>>>> .setCoder().
>>>>>>
>>>>>>   Inferring a Coder from the CoderRegistry failed: Cannot provide
>>>>>> coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>:
>>>>>> Unable to provide a Coder for K.
>>>>>>
>>>>>>   Building a Coder using a registered CoderProvider failed.
>>>>>>
>>>>>>   See suppressed exceptions for detailed failures.
>>>>>>
>>>>>>   Using the default output Coder from the producing PTransform
>>>>>> failed: PTransform.getOutputCoder called.
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(
>>>>>> Preconditions.java:444)
>>>>>>
>>>>>> at org.apache.beam.sdk.values.PCollection.getCoder(
>>>>>> PCollection.java:277)
>>>>>>
>>>>>> at org.apache.beam.sdk.values.PCollection.finishSpecifying(
>>>>>> PCollection.java:114)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(
>>>>>> TransformHierarchy.java:190)
>>>>>>
>>>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
>>>>>>
>>>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>>>>>
>>>>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>>>>>>
>>>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand(
>>>>>> Combine.java:1074)
>>>>>>
>>>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand(
>>>>>> Combine.java:943)
>>>>>>
>>>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>>>>>>
>>>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>>>>>
>>>>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>>>>>>
>>>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:27)
>>>>>>
>>>>>> *--*
>>>>>> *Mahesh Vangala*
>>>>>> *(Ph) 443-326-1957*
>>>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>>>
>>>>>

Reply via email to