Hello all -
I am trying to run a barebone beam pipeline to understand the "combine"
logic. I am from python world trying to learn java beam sdk due to my use
case of ETL with spark cluster. So, pardon me for my grotesque java code :)
I appreciate if you could nudge me in the right path with this error:
(please see below)
Here's my code: (read lines from input file and output the same lines to
outfile)
public class VariantCaller
{
public static void main( String[] args )
{
PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
).create();
Pipeline p = Pipeline.create(opts);
PCollection<String> lines = p.apply(TextIO.read().from("test_in.csv"
));
PCollection<String> mergedLines = lines.apply(Combine.globally(new
AddLines()))
mergedLines.apply(TextIO.write().to("test_out.csv"));
p.run();
}
}
AddLines Class:
public class AddLines extends CombineFn<String, AddLines.Accum, String> {
/**
*
*/
private static final long serialVersionUID = 1L;
public static class Accum {
String line;
}
@Override
public Accum createAccumulator() { return new Accum(); }
@Override
public Accum addInput(Accum accum, String line) {
accum.line.concat(line);
return accum;
}
@Override
public Accum mergeAccumulators(Iterable<Accum> accums) {
Accum merged = createAccumulator();
for (Accum accum : accums) {
merged.line.concat("\n").concat(accum.line);
}
return merged;
}
@Override
public String extractOutput(Accum accum) {
return accum.line;
}
}
Exception in thread "main" java.lang.IllegalStateException: Unable to
return a default Coder for
Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output
[PCollection]. Correct one of the following root causes:
No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Cannot provide coder for
parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: Unable to
provide a Coder for K.
Building a Coder using a registered CoderProvider failed.
See suppressed exceptions for detailed failures.
Using the default output Coder from the producing PTransform failed:
PTransform.getOutputCoder called.
at
org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(
Preconditions.java:444)
at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:277)
at org.apache.beam.sdk.values.PCollection.finishSpecifying(
PCollection.java:114)
at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(
TransformHierarchy.java:190)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1074)
at org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:943)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:27)
*--*
*Mahesh Vangala*
*(Ph) 443-326-1957*
*(web) mvangala.com <http://mvangala.com>*