Hello all -

I am trying to run a barebone beam pipeline to understand the "combine"
logic. I am from python world trying to learn java beam sdk due to my use
case of ETL with spark cluster. So, pardon me for my grotesque java code :)

I appreciate if you could nudge me in the right path with this error:
(please see below)

Here's my code: (read lines from input file and output the same lines to

public class VariantCaller


    public static void main( String[] args )


        PipelineOptions opts = PipelineOptionsFactory.fromArgs(args

        Pipeline p = Pipeline.create(opts);

        PCollection<String> lines = p.apply(TextIO.read().from("test_in.csv"

        PCollection<String> mergedLines = lines.apply(Combine.globally(new





AddLines Class:

public class AddLines extends CombineFn<String, AddLines.Accum, String> {




  private static final long serialVersionUID = 1L;

  public static class Accum {

    String line;



  public Accum createAccumulator() { return new Accum(); }


  public Accum addInput(Accum accum, String line) {


      return accum;



  public Accum mergeAccumulators(Iterable<Accum> accums) {

    Accum merged = createAccumulator();

    for (Accum accum : accums) {



    return merged;



  public String extractOutput(Accum accum) {

    return accum.line;



Exception in thread "main" java.lang.IllegalStateException: Unable to
return a default Coder for
[PCollection]. Correct one of the following root causes:

  No Coder has been manually specified;  you may do so using .setCoder().

  Inferring a Coder from the CoderRegistry failed: Cannot provide coder for
parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: Unable to
provide a Coder for K.

  Building a Coder using a registered CoderProvider failed.

  See suppressed exceptions for detailed failures.

  Using the default output Coder from the producing PTransform failed:
PTransform.getOutputCoder called.


at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:277)

at org.apache.beam.sdk.values.PCollection.finishSpecifying(

at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(

at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)

at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)

at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)

at org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1074)

at org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:943)

at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)

at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)

at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)

at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:27)

*Mahesh Vangala*
*(Ph) 443-326-1957*
*(web) mvangala.com <http://mvangala.com>*

Reply via email to