Hi Mahesh and Rui,

I believe if you comment out your own implementation of getCoder, your
program should still work. The reason is that coder will be inferred if you
don't provide a coder.

Best,
Robin

On Fri, Aug 17, 2018 at 12:03 PM Rui Wang <ruw...@google.com> wrote:

> Hi Robin,
>
> I have the same question here. If we don't implement coder for class Accum
> but only add implements Serializable to class Accum, how is the accumulator
> coder generated?
>
> -Rui
>
> On Fri, Aug 17, 2018 at 11:50 AM Mahesh Vangala <vangalamahe...@gmail.com>
> wrote:
>
>> Thanks, Robin.
>> Based on Robin's comment above, I looked into CombineFn test script in
>> beam git repo, and implemented getCoder method along the lines in that
>> script. (
>> https://github.com/vangalamaheshh/my-beam/blob/master/variant-caller/src/main/java/pipelines/variant_caller/AddLines.java
>> )
>> Do you think, getCoder implementation is not necessary?
>> Thanks for your help though.
>> Much appreciate!
>>
>> - Mahesh
>>
>>
>> *--*
>> *Mahesh Vangala*
>> *(Ph) 443-326-1957*
>> *(web) mvangala.com <http://mvangala.com>*
>>
>>
>> On Fri, Aug 17, 2018 at 1:01 PM Robin Qiu <robi...@google.com> wrote:
>>
>>> Hi Mahesh,
>>>
>>> I think you have the NullPointerException because your Accumulator is
>>> not initialized properly.
>>>
>>> In your createAccumulator() method, you created a Accum object without
>>> setting its line field. So later when accum.line got accessed, you got the
>>> exception.
>>>
>>> By initializing the Accum class you should be able to fix this problem.
>>> (e.g. String line = "";, instead of only String line;)
>>>
>>> Hope this helps,
>>> Robin
>>>
>>> On Thu, Aug 16, 2018 at 12:14 PM Rui Wang <ruw...@google.com> wrote:
>>>
>>>> Sorry I forgot to attach the PR link:
>>>> https://github.com/apache/beam/pull/6154/files#diff-7358f3f0511940ea565e6584f652ed02R342
>>>>
>>>> -Rui
>>>>
>>>> On Thu, Aug 16, 2018 at 12:13 PM Rui Wang <ruw...@google.com> wrote:
>>>>
>>>>> Hi Mahesh,
>>>>>
>>>>> I think I had the same NPE when I explored self defined combineFn. I
>>>>> think your combineFn might still need to define a coder to help Beam run 
>>>>> it
>>>>> in distributed environment. Beam tries to invoke coder somewhere and then
>>>>> throw a NPE as there is no one defined.
>>>>>
>>>>> Here is a PR I wrote that defined a AccumulatingCombineFn and
>>>>> implemented a coder for that for you reference.
>>>>>
>>>>> -Rui
>>>>>
>>>>> On Thu, Aug 16, 2018 at 12:03 PM Mahesh Vangala <
>>>>> vangalamahe...@gmail.com> wrote:
>>>>>
>>>>>> Hello Robin -
>>>>>>
>>>>>> Thank you so much for your help.
>>>>>> I added Serializable to Accum, and I got the following error. (Sorry,
>>>>>> for being a pain. I hope once I get past the initial hump ...)
>>>>>>
>>>>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource
>>>>>> getEstimatedSizeBytes
>>>>>>
>>>>>> INFO: Filepattern test_in.csv matched 1 files with total size 36
>>>>>>
>>>>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split
>>>>>>
>>>>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1
>>>>>> ms and produced 1 files and 9 bundles
>>>>>>
>>>>>> Aug 16, 2018 3:00:09 PM
>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>>>>>> verifyUnmodifiedThrowingCheckedExceptions
>>>>>>
>>>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>>>>>> #structuralValue method which does not return true when the encoding of 
>>>>>> the
>>>>>> elements is equal. Element KV{null,
>>>>>> pipelines.variant_caller.AddLines$Accum@52449030}
>>>>>>
>>>>>> Aug 16, 2018 3:00:09 PM
>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>>>>>> verifyUnmodifiedThrowingCheckedExceptions
>>>>>>
>>>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>>>>>> #structuralValue method which does not return true when the encoding of 
>>>>>> the
>>>>>> elements is equal. Element KV{null,
>>>>>> pipelines.variant_caller.AddLines$Accum@59bb25e2}
>>>>>>
>>>>>> Aug 16, 2018 3:00:09 PM
>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>>>>>> verifyUnmodifiedThrowingCheckedExceptions
>>>>>>
>>>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>>>>>> #structuralValue method which does not return true when the encoding of 
>>>>>> the
>>>>>> elements is equal. Element KV{null,
>>>>>> pipelines.variant_caller.AddLines$Accum@7076d18e}
>>>>>>
>>>>>> Exception in thread "main"
>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>>> java.lang.NullPointerException
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>>>> DirectRunner.java:332)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>>>> DirectRunner.java:302)
>>>>>>
>>>>>> at org.apache.beam.runners.direct.DirectRunner.run(
>>>>>> DirectRunner.java:197)
>>>>>>
>>>>>> at org.apache.beam.runners.direct.DirectRunner.run(
>>>>>> DirectRunner.java:64)
>>>>>>
>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>>>>>>
>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>>>>>>
>>>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>>>>>>
>>>>>> Caused by: java.lang.NullPointerException
>>>>>>
>>>>>> at pipelines.variant_caller.AddLines.mergeAccumulators(
>>>>>> AddLines.java:35)
>>>>>>
>>>>>> at pipelines.variant_caller.AddLines.mergeAccumulators(
>>>>>> AddLines.java:1)
>>>>>>
>>>>>> *--*
>>>>>> *Mahesh Vangala*
>>>>>> *(Ph) 443-326-1957*
>>>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 16, 2018 at 2:41 PM Robin Qiu <robi...@google.com> wrote:
>>>>>>
>>>>>>> Hello Mahesh,
>>>>>>>
>>>>>>> You can add "implements Serializable" to the Accum class, then it
>>>>>>> should work.
>>>>>>>
>>>>>>> By the way, in Java String is immutable, so in order to change, for
>>>>>>> example, accum.line, you need to write accum.line = 
>>>>>>> accum.line.concat(line).
>>>>>>>
>>>>>>> Best,
>>>>>>> Robin
>>>>>>>
>>>>>>> On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala <
>>>>>>> vangalamahe...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello all -
>>>>>>>>
>>>>>>>> I am trying to run a barebone beam pipeline to understand the
>>>>>>>> "combine" logic. I am from python world trying to learn java beam sdk 
>>>>>>>> due
>>>>>>>> to my use case of ETL with spark cluster. So, pardon me for my 
>>>>>>>> grotesque
>>>>>>>> java code :)
>>>>>>>>
>>>>>>>> I appreciate if you could nudge me in the right path with this
>>>>>>>> error: (please see below)
>>>>>>>>
>>>>>>>> Here's my code: (read lines from input file and output the same
>>>>>>>> lines to outfile)
>>>>>>>>
>>>>>>>> public class VariantCaller
>>>>>>>>
>>>>>>>> {
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>     public static void main( String[] args )
>>>>>>>>
>>>>>>>>     {
>>>>>>>>
>>>>>>>>         PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>>>>>>>> ).create();
>>>>>>>>
>>>>>>>>         Pipeline p = Pipeline.create(opts);
>>>>>>>>
>>>>>>>>         PCollection<String> lines = p.apply(TextIO.read().from(
>>>>>>>> "test_in.csv"));
>>>>>>>>
>>>>>>>>         PCollection<String> mergedLines = lines
>>>>>>>> .apply(Combine.globally(new AddLines()))
>>>>>>>>
>>>>>>>>         mergedLines.apply(TextIO.write().to("test_out.csv"));
>>>>>>>>
>>>>>>>>         p.run();
>>>>>>>>
>>>>>>>>     }
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> AddLines Class:
>>>>>>>>
>>>>>>>>
>>>>>>>> public class AddLines extends CombineFn<String, AddLines.Accum,
>>>>>>>> String> {
>>>>>>>>
>>>>>>>>   /**
>>>>>>>>
>>>>>>>>    *
>>>>>>>>
>>>>>>>>    */
>>>>>>>>
>>>>>>>>   private static final long serialVersionUID = 1L;
>>>>>>>>
>>>>>>>>
>>>>>>>>   public static class Accum {
>>>>>>>>
>>>>>>>>     String line;
>>>>>>>>
>>>>>>>>   }
>>>>>>>>
>>>>>>>>
>>>>>>>>   @Override
>>>>>>>>
>>>>>>>>   public Accum createAccumulator() { return new Accum(); }
>>>>>>>>
>>>>>>>>
>>>>>>>>   @Override
>>>>>>>>
>>>>>>>>   public Accum addInput(Accum accum, String line) {
>>>>>>>>
>>>>>>>>       accum.line.concat(line);
>>>>>>>>
>>>>>>>>       return accum;
>>>>>>>>
>>>>>>>>   }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>   @Override
>>>>>>>>
>>>>>>>>   public Accum mergeAccumulators(Iterable<Accum> accums) {
>>>>>>>>
>>>>>>>>     Accum merged = createAccumulator();
>>>>>>>>
>>>>>>>>     for (Accum accum : accums) {
>>>>>>>>
>>>>>>>>       merged.line.concat("\n").concat(accum.line);
>>>>>>>>
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     return merged;
>>>>>>>>
>>>>>>>>   }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>   @Override
>>>>>>>>
>>>>>>>>   public String extractOutput(Accum accum) {
>>>>>>>>
>>>>>>>>     return accum.line;
>>>>>>>>
>>>>>>>>   }
>>>>>>>>
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> Exception in thread "main" java.lang.IllegalStateException: Unable
>>>>>>>> to return a default Coder for
>>>>>>>> Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output
>>>>>>>> [PCollection]. Correct one of the following root causes:
>>>>>>>>
>>>>>>>>   No Coder has been manually specified;  you may do so using
>>>>>>>> .setCoder().
>>>>>>>>
>>>>>>>>   Inferring a Coder from the CoderRegistry failed: Cannot provide
>>>>>>>> coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>:
>>>>>>>> Unable to provide a Coder for K.
>>>>>>>>
>>>>>>>>   Building a Coder using a registered CoderProvider failed.
>>>>>>>>
>>>>>>>>   See suppressed exceptions for detailed failures.
>>>>>>>>
>>>>>>>>   Using the default output Coder from the producing PTransform
>>>>>>>> failed: PTransform.getOutputCoder called.
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(
>>>>>>>> Preconditions.java:444)
>>>>>>>>
>>>>>>>> at org.apache.beam.sdk.values.PCollection.getCoder(
>>>>>>>> PCollection.java:277)
>>>>>>>>
>>>>>>>> at org.apache.beam.sdk.values.PCollection.finishSpecifying(
>>>>>>>> PCollection.java:114)
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(
>>>>>>>> TransformHierarchy.java:190)
>>>>>>>>
>>>>>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
>>>>>>>>
>>>>>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>>>>>>>
>>>>>>>> at org.apache.beam.sdk.values.PCollection.apply(
>>>>>>>> PCollection.java:325)
>>>>>>>>
>>>>>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand(
>>>>>>>> Combine.java:1074)
>>>>>>>>
>>>>>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand(
>>>>>>>> Combine.java:943)
>>>>>>>>
>>>>>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>>>>>>>>
>>>>>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>>>>>>>
>>>>>>>> at org.apache.beam.sdk.values.PCollection.apply(
>>>>>>>> PCollection.java:325)
>>>>>>>>
>>>>>>>> at pipelines.variant_caller.VariantCaller.main(
>>>>>>>> VariantCaller.java:27)
>>>>>>>>
>>>>>>>> *--*
>>>>>>>> *Mahesh Vangala*
>>>>>>>> *(Ph) 443-326-1957*
>>>>>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>>>>>
>>>>>>>

Reply via email to