Hello all -

I am trying to run a barebone beam pipeline to understand the "combine"
logic. I am from python world trying to learn java beam sdk due to my use
case of ETL with spark cluster. So, pardon me for my grotesque java code :)

I appreciate if you could nudge me in the right path with this error:
(please see below)

Here's my code: (read lines from input file and output the same lines to
outfile)

public class VariantCaller

{



    public static void main( String[] args )

    {

        PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
).create();

        Pipeline p = Pipeline.create(opts);

        PCollection<String> lines = p.apply(TextIO.read().from("test_in.csv"
));

        PCollection<String> mergedLines = lines.apply(Combine.globally(new
AddLines()))

        mergedLines.apply(TextIO.write().to("test_out.csv"));

        p.run();

    }

}


AddLines Class:


public class AddLines extends CombineFn<String, AddLines.Accum, String> {

  /**

   *

   */

  private static final long serialVersionUID = 1L;


  public static class Accum {

    String line;

  }


  @Override

  public Accum createAccumulator() { return new Accum(); }


  @Override

  public Accum addInput(Accum accum, String line) {

      accum.line.concat(line);

      return accum;

  }



  @Override

  public Accum mergeAccumulators(Iterable<Accum> accums) {

    Accum merged = createAccumulator();

    for (Accum accum : accums) {

      merged.line.concat("\n").concat(accum.line);

    }

    return merged;

  }



  @Override

  public String extractOutput(Accum accum) {

    return accum.line;

  }


}


Exception in thread "main" java.lang.IllegalStateException: Unable to
return a default Coder for
Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output
[PCollection]. Correct one of the following root causes:

  No Coder has been manually specified;  you may do so using .setCoder().

  Inferring a Coder from the CoderRegistry failed: Cannot provide coder for
parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: Unable to
provide a Coder for K.

  Building a Coder using a registered CoderProvider failed.

  See suppressed exceptions for detailed failures.

  Using the default output Coder from the producing PTransform failed:
PTransform.getOutputCoder called.

at
org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(
Preconditions.java:444)

at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:277)

at org.apache.beam.sdk.values.PCollection.finishSpecifying(
PCollection.java:114)

at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(
TransformHierarchy.java:190)

at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)

at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)

at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)

at org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1074)

at org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:943)

at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)

at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)

at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)

at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:27)

*--*
*Mahesh Vangala*
*(Ph) 443-326-1957*
*(web) mvangala.com <http://mvangala.com>*

Reply via email to