Hello Mahesh, You can add "implements Serializable" to the Accum class, then it should work.
By the way, in Java String is immutable, so in order to change, for example, accum.line, you need to write accum.line = accum.line.concat(line). Best, Robin On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala <vangalamahe...@gmail.com> wrote: > Hello all - > > I am trying to run a barebone beam pipeline to understand the "combine" > logic. I am from python world trying to learn java beam sdk due to my use > case of ETL with spark cluster. So, pardon me for my grotesque java code :) > > I appreciate if you could nudge me in the right path with this error: > (please see below) > > Here's my code: (read lines from input file and output the same lines to > outfile) > > public class VariantCaller > > { > > > > public static void main( String[] args ) > > { > > PipelineOptions opts = PipelineOptionsFactory.fromArgs(args > ).create(); > > Pipeline p = Pipeline.create(opts); > > PCollection<String> lines = p.apply(TextIO.read().from( > "test_in.csv")); > > PCollection<String> mergedLines = lines.apply(Combine.globally(new > AddLines())) > > mergedLines.apply(TextIO.write().to("test_out.csv")); > > p.run(); > > } > > } > > > AddLines Class: > > > public class AddLines extends CombineFn<String, AddLines.Accum, String> { > > /** > > * > > */ > > private static final long serialVersionUID = 1L; > > > public static class Accum { > > String line; > > } > > > @Override > > public Accum createAccumulator() { return new Accum(); } > > > @Override > > public Accum addInput(Accum accum, String line) { > > accum.line.concat(line); > > return accum; > > } > > > > @Override > > public Accum mergeAccumulators(Iterable<Accum> accums) { > > Accum merged = createAccumulator(); > > for (Accum accum : accums) { > > merged.line.concat("\n").concat(accum.line); > > } > > return merged; > > } > > > > @Override > > public String extractOutput(Accum accum) { > > return accum.line; > > } > > > } > > > Exception in thread "main" java.lang.IllegalStateException: Unable to > return a default Coder for > Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output > [PCollection]. Correct one of the following root causes: > > No Coder has been manually specified; you may do so using .setCoder(). > > Inferring a Coder from the CoderRegistry failed: Cannot provide coder > for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: Unable to > provide a Coder for K. > > Building a Coder using a registered CoderProvider failed. > > See suppressed exceptions for detailed failures. > > Using the default output Coder from the producing PTransform failed: > PTransform.getOutputCoder called. > > at > org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState( > Preconditions.java:444) > > at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:277) > > at org.apache.beam.sdk.values.PCollection.finishSpecifying( > PCollection.java:114) > > at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput( > TransformHierarchy.java:190) > > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536) > > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) > > at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325) > > at org.apache.beam.sdk.transforms.Combine$Globally.expand( > Combine.java:1074) > > at org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:943 > ) > > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) > > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) > > at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325) > > at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:27) > > *--* > *Mahesh Vangala* > *(Ph) 443-326-1957* > *(web) mvangala.com <http://mvangala.com>* >