Hello Robin - Thank you so much for your help. I added Serializable to Accum, and I got the following error. (Sorry, for being a pain. I hope once I get past the initial hump ...)
Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes INFO: Filepattern test_in.csv matched 1 files with total size 36 Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms and produced 1 files and 9 bundles Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector verifyUnmodifiedThrowingCheckedExceptions WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. Element KV{null, pipelines.variant_caller.AddLines$Accum@52449030} Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector verifyUnmodifiedThrowingCheckedExceptions WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. Element KV{null, pipelines.variant_caller.AddLines$Accum@59bb25e2} Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector verifyUnmodifiedThrowingCheckedExceptions WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. Element KV{null, pipelines.variant_caller.AddLines$Accum@7076d18e} Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( DirectRunner.java:332) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( DirectRunner.java:302) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29) Caused by: java.lang.NullPointerException at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:35) at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:1) *--* *Mahesh Vangala* *(Ph) 443-326-1957* *(web) mvangala.com <http://mvangala.com>* On Thu, Aug 16, 2018 at 2:41 PM Robin Qiu <robi...@google.com> wrote: > Hello Mahesh, > > You can add "implements Serializable" to the Accum class, then it should > work. > > By the way, in Java String is immutable, so in order to change, for > example, accum.line, you need to write accum.line = accum.line.concat(line). > > Best, > Robin > > On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala <vangalamahe...@gmail.com> > wrote: > >> Hello all - >> >> I am trying to run a barebone beam pipeline to understand the "combine" >> logic. I am from python world trying to learn java beam sdk due to my use >> case of ETL with spark cluster. So, pardon me for my grotesque java code :) >> >> I appreciate if you could nudge me in the right path with this error: >> (please see below) >> >> Here's my code: (read lines from input file and output the same lines to >> outfile) >> >> public class VariantCaller >> >> { >> >> >> >> public static void main( String[] args ) >> >> { >> >> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args >> ).create(); >> >> Pipeline p = Pipeline.create(opts); >> >> PCollection<String> lines = p.apply(TextIO.read().from( >> "test_in.csv")); >> >> PCollection<String> mergedLines = lines.apply(Combine.globally( >> new AddLines())) >> >> mergedLines.apply(TextIO.write().to("test_out.csv")); >> >> p.run(); >> >> } >> >> } >> >> >> AddLines Class: >> >> >> public class AddLines extends CombineFn<String, AddLines.Accum, String> { >> >> /** >> >> * >> >> */ >> >> private static final long serialVersionUID = 1L; >> >> >> public static class Accum { >> >> String line; >> >> } >> >> >> @Override >> >> public Accum createAccumulator() { return new Accum(); } >> >> >> @Override >> >> public Accum addInput(Accum accum, String line) { >> >> accum.line.concat(line); >> >> return accum; >> >> } >> >> >> >> @Override >> >> public Accum mergeAccumulators(Iterable<Accum> accums) { >> >> Accum merged = createAccumulator(); >> >> for (Accum accum : accums) { >> >> merged.line.concat("\n").concat(accum.line); >> >> } >> >> return merged; >> >> } >> >> >> >> @Override >> >> public String extractOutput(Accum accum) { >> >> return accum.line; >> >> } >> >> >> } >> >> >> Exception in thread "main" java.lang.IllegalStateException: Unable to >> return a default Coder for >> Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output >> [PCollection]. Correct one of the following root causes: >> >> No Coder has been manually specified; you may do so using .setCoder(). >> >> Inferring a Coder from the CoderRegistry failed: Cannot provide coder >> for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: Unable to >> provide a Coder for K. >> >> Building a Coder using a registered CoderProvider failed. >> >> See suppressed exceptions for detailed failures. >> >> Using the default output Coder from the producing PTransform failed: >> PTransform.getOutputCoder called. >> >> at >> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState( >> Preconditions.java:444) >> >> at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:277) >> >> at org.apache.beam.sdk.values.PCollection.finishSpecifying( >> PCollection.java:114) >> >> at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput( >> TransformHierarchy.java:190) >> >> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536) >> >> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) >> >> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325) >> >> at org.apache.beam.sdk.transforms.Combine$Globally.expand( >> Combine.java:1074) >> >> at org.apache.beam.sdk.transforms.Combine$Globally.expand( >> Combine.java:943) >> >> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) >> >> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) >> >> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325) >> >> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:27) >> >> *--* >> *Mahesh Vangala* >> *(Ph) 443-326-1957* >> *(web) mvangala.com <http://mvangala.com>* >> >