Hi Mahesh,

I think I had the same NPE when I explored self defined combineFn. I think
your combineFn might still need to define a coder to help Beam run it in
distributed environment. Beam tries to invoke coder somewhere and then
throw a NPE as there is no one defined.

Here is a PR I wrote that defined a AccumulatingCombineFn and implemented a
coder for that for you reference.

-Rui

On Thu, Aug 16, 2018 at 12:03 PM Mahesh Vangala <vangalamahe...@gmail.com>
wrote:

> Hello Robin -
>
> Thank you so much for your help.
> I added Serializable to Accum, and I got the following error. (Sorry, for
> being a pain. I hope once I get past the initial hump ...)
>
> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource
> getEstimatedSizeBytes
>
> INFO: Filepattern test_in.csv matched 1 files with total size 36
>
> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split
>
> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms
> and produced 1 files and 9 bundles
>
> Aug 16, 2018 3:00:09 PM
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
> verifyUnmodifiedThrowingCheckedExceptions
>
> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
> #structuralValue method which does not return true when the encoding of the
> elements is equal. Element KV{null,
> pipelines.variant_caller.AddLines$Accum@52449030}
>
> Aug 16, 2018 3:00:09 PM
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
> verifyUnmodifiedThrowingCheckedExceptions
>
> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
> #structuralValue method which does not return true when the encoding of the
> elements is equal. Element KV{null,
> pipelines.variant_caller.AddLines$Accum@59bb25e2}
>
> Aug 16, 2018 3:00:09 PM
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
> verifyUnmodifiedThrowingCheckedExceptions
>
> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
> #structuralValue method which does not return true when the encoding of the
> elements is equal. Element KV{null,
> pipelines.variant_caller.AddLines$Accum@7076d18e}
>
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.NullPointerException
>
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
> DirectRunner.java:332)
>
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
> DirectRunner.java:302)
>
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197)
>
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
>
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>
> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>
> Caused by: java.lang.NullPointerException
>
> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:35)
>
> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:1)
>
> *--*
> *Mahesh Vangala*
> *(Ph) 443-326-1957*
> *(web) mvangala.com <http://mvangala.com>*
>
>
> On Thu, Aug 16, 2018 at 2:41 PM Robin Qiu <robi...@google.com> wrote:
>
>> Hello Mahesh,
>>
>> You can add "implements Serializable" to the Accum class, then it should
>> work.
>>
>> By the way, in Java String is immutable, so in order to change, for
>> example, accum.line, you need to write accum.line = accum.line.concat(line).
>>
>> Best,
>> Robin
>>
>> On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala <vangalamahe...@gmail.com>
>> wrote:
>>
>>> Hello all -
>>>
>>> I am trying to run a barebone beam pipeline to understand the "combine"
>>> logic. I am from python world trying to learn java beam sdk due to my use
>>> case of ETL with spark cluster. So, pardon me for my grotesque java code :)
>>>
>>> I appreciate if you could nudge me in the right path with this error:
>>> (please see below)
>>>
>>> Here's my code: (read lines from input file and output the same lines to
>>> outfile)
>>>
>>> public class VariantCaller
>>>
>>> {
>>>
>>>
>>>
>>>     public static void main( String[] args )
>>>
>>>     {
>>>
>>>         PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>>> ).create();
>>>
>>>         Pipeline p = Pipeline.create(opts);
>>>
>>>         PCollection<String> lines = p.apply(TextIO.read().from(
>>> "test_in.csv"));
>>>
>>>         PCollection<String> mergedLines = lines.apply(Combine.globally(
>>> new AddLines()))
>>>
>>>         mergedLines.apply(TextIO.write().to("test_out.csv"));
>>>
>>>         p.run();
>>>
>>>     }
>>>
>>> }
>>>
>>>
>>> AddLines Class:
>>>
>>>
>>> public class AddLines extends CombineFn<String, AddLines.Accum, String>
>>> {
>>>
>>>   /**
>>>
>>>    *
>>>
>>>    */
>>>
>>>   private static final long serialVersionUID = 1L;
>>>
>>>
>>>   public static class Accum {
>>>
>>>     String line;
>>>
>>>   }
>>>
>>>
>>>   @Override
>>>
>>>   public Accum createAccumulator() { return new Accum(); }
>>>
>>>
>>>   @Override
>>>
>>>   public Accum addInput(Accum accum, String line) {
>>>
>>>       accum.line.concat(line);
>>>
>>>       return accum;
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public Accum mergeAccumulators(Iterable<Accum> accums) {
>>>
>>>     Accum merged = createAccumulator();
>>>
>>>     for (Accum accum : accums) {
>>>
>>>       merged.line.concat("\n").concat(accum.line);
>>>
>>>     }
>>>
>>>     return merged;
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public String extractOutput(Accum accum) {
>>>
>>>     return accum.line;
>>>
>>>   }
>>>
>>>
>>> }
>>>
>>>
>>> Exception in thread "main" java.lang.IllegalStateException: Unable to
>>> return a default Coder for
>>> Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output
>>> [PCollection]. Correct one of the following root causes:
>>>
>>>   No Coder has been manually specified;  you may do so using
>>> .setCoder().
>>>
>>>   Inferring a Coder from the CoderRegistry failed: Cannot provide coder
>>> for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: Unable to
>>> provide a Coder for K.
>>>
>>>   Building a Coder using a registered CoderProvider failed.
>>>
>>>   See suppressed exceptions for detailed failures.
>>>
>>>   Using the default output Coder from the producing PTransform failed:
>>> PTransform.getOutputCoder called.
>>>
>>> at
>>> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(
>>> Preconditions.java:444)
>>>
>>> at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:277)
>>>
>>> at org.apache.beam.sdk.values.PCollection.finishSpecifying(
>>> PCollection.java:114)
>>>
>>> at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(
>>> TransformHierarchy.java:190)
>>>
>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
>>>
>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>>
>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>>>
>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand(
>>> Combine.java:1074)
>>>
>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand(
>>> Combine.java:943)
>>>
>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>>>
>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>>
>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>>>
>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:27)
>>>
>>> *--*
>>> *Mahesh Vangala*
>>> *(Ph) 443-326-1957*
>>> *(web) mvangala.com <http://mvangala.com>*
>>>
>>

Reply via email to