Hello Robin -

Thank you so much for your help.
I added Serializable to Accum, and I got the following error. (Sorry, for
being a pain. I hope once I get past the initial hump ...)

Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource
getEstimatedSizeBytes

INFO: Filepattern test_in.csv matched 1 files with total size 36

Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split

INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms
and produced 1 files and 9 bundles

Aug 16, 2018 3:00:09 PM
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
verifyUnmodifiedThrowingCheckedExceptions

WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
#structuralValue method which does not return true when the encoding of the
elements is equal. Element KV{null,
pipelines.variant_caller.AddLines$Accum@52449030}

Aug 16, 2018 3:00:09 PM
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
verifyUnmodifiedThrowingCheckedExceptions

WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
#structuralValue method which does not return true when the encoding of the
elements is equal. Element KV{null,
pipelines.variant_caller.AddLines$Accum@59bb25e2}

Aug 16, 2018 3:00:09 PM
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
verifyUnmodifiedThrowingCheckedExceptions

WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
#structuralValue method which does not return true when the encoding of the
elements is equal. Element KV{null,
pipelines.variant_caller.AddLines$Accum@7076d18e}

Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.NullPointerException

at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
DirectRunner.java:332)

at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
DirectRunner.java:302)

at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197)

at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)

at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)

at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)

at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)

Caused by: java.lang.NullPointerException

at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:35)

at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:1)

*--*
*Mahesh Vangala*
*(Ph) 443-326-1957*
*(web) mvangala.com <http://mvangala.com>*


On Thu, Aug 16, 2018 at 2:41 PM Robin Qiu <robi...@google.com> wrote:

> Hello Mahesh,
>
> You can add "implements Serializable" to the Accum class, then it should
> work.
>
> By the way, in Java String is immutable, so in order to change, for
> example, accum.line, you need to write accum.line = accum.line.concat(line).
>
> Best,
> Robin
>
> On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala <vangalamahe...@gmail.com>
> wrote:
>
>> Hello all -
>>
>> I am trying to run a barebone beam pipeline to understand the "combine"
>> logic. I am from python world trying to learn java beam sdk due to my use
>> case of ETL with spark cluster. So, pardon me for my grotesque java code :)
>>
>> I appreciate if you could nudge me in the right path with this error:
>> (please see below)
>>
>> Here's my code: (read lines from input file and output the same lines to
>> outfile)
>>
>> public class VariantCaller
>>
>> {
>>
>>
>>
>>     public static void main( String[] args )
>>
>>     {
>>
>>         PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>> ).create();
>>
>>         Pipeline p = Pipeline.create(opts);
>>
>>         PCollection<String> lines = p.apply(TextIO.read().from(
>> "test_in.csv"));
>>
>>         PCollection<String> mergedLines = lines.apply(Combine.globally(
>> new AddLines()))
>>
>>         mergedLines.apply(TextIO.write().to("test_out.csv"));
>>
>>         p.run();
>>
>>     }
>>
>> }
>>
>>
>> AddLines Class:
>>
>>
>> public class AddLines extends CombineFn<String, AddLines.Accum, String> {
>>
>>   /**
>>
>>    *
>>
>>    */
>>
>>   private static final long serialVersionUID = 1L;
>>
>>
>>   public static class Accum {
>>
>>     String line;
>>
>>   }
>>
>>
>>   @Override
>>
>>   public Accum createAccumulator() { return new Accum(); }
>>
>>
>>   @Override
>>
>>   public Accum addInput(Accum accum, String line) {
>>
>>       accum.line.concat(line);
>>
>>       return accum;
>>
>>   }
>>
>>
>>
>>   @Override
>>
>>   public Accum mergeAccumulators(Iterable<Accum> accums) {
>>
>>     Accum merged = createAccumulator();
>>
>>     for (Accum accum : accums) {
>>
>>       merged.line.concat("\n").concat(accum.line);
>>
>>     }
>>
>>     return merged;
>>
>>   }
>>
>>
>>
>>   @Override
>>
>>   public String extractOutput(Accum accum) {
>>
>>     return accum.line;
>>
>>   }
>>
>>
>> }
>>
>>
>> Exception in thread "main" java.lang.IllegalStateException: Unable to
>> return a default Coder for
>> Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output
>> [PCollection]. Correct one of the following root causes:
>>
>>   No Coder has been manually specified;  you may do so using .setCoder().
>>
>>   Inferring a Coder from the CoderRegistry failed: Cannot provide coder
>> for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: Unable to
>> provide a Coder for K.
>>
>>   Building a Coder using a registered CoderProvider failed.
>>
>>   See suppressed exceptions for detailed failures.
>>
>>   Using the default output Coder from the producing PTransform failed:
>> PTransform.getOutputCoder called.
>>
>> at
>> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(
>> Preconditions.java:444)
>>
>> at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:277)
>>
>> at org.apache.beam.sdk.values.PCollection.finishSpecifying(
>> PCollection.java:114)
>>
>> at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(
>> TransformHierarchy.java:190)
>>
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
>>
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>
>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>>
>> at org.apache.beam.sdk.transforms.Combine$Globally.expand(
>> Combine.java:1074)
>>
>> at org.apache.beam.sdk.transforms.Combine$Globally.expand(
>> Combine.java:943)
>>
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>>
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>
>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>>
>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:27)
>>
>> *--*
>> *Mahesh Vangala*
>> *(Ph) 443-326-1957*
>> *(web) mvangala.com <http://mvangala.com>*
>>
>

Reply via email to