Yes, the pipeline is quite small: pipeline.apply("source", Read.from(new CustomSource())).setCoder(CustomSource.coder) .apply("GlobalCombine", Combine.globally(new CustomCombineFn())).setCoder(CustomTuple.coder);
The InputT is not the same as OutputT, so the input coder can't be used. On 2017-04-07 08:58 (-0500), Aviem Zur <aviem...@gmail.com> wrote: > Have you set the coder for your input PCollection? The one on which you > perform the Combine? > > On Fri, Apr 7, 2017 at 4:24 PM Paul Gerver <pfger...@gmail.com> wrote: > > > Hello All, > > > > I'm trying to test out a Combine.Globally transform which takes in a small > > custom class (CustomA) and outputs a secondary custom class (CustomB). I > > have set the coder for the resulting PCollection<CustomB>, but Beam is > > arguing that a coder for a KV type is missing (see output at bottom). > > > > Since this a global combine, the input nor the output is of KV type, so I > > decided to take a look at the Combine code. Since Combine.Globally.expand() > > performs a perKeys and groupedValues underneath the covers, but requires > > making an intermediate PCollection KV<Void, OutputT> which--according to > > the docs--is inferred from the CombineFn. > > > > I believe I could workaround this by registering a KvCoder with the > > CoderRegistry, but that's not intuitive. Is there a better way to address > > this currently, or should something be added to the CombineFn area for > > setting an output coder similar to PCollection. > > > > > > Output: > > Exception in thread "main" java.lang.IllegalStateException: Unable to > > return a default Coder for > > > > GlobalCombine/Combine.perKey(CustomTuple)/Combine.GroupedValues/ParDo(Anonymous).out > > [Class]. Correct one of the following root causes: > > No Coder has been manually specified; you may do so using .setCoder(). > > Inferring a Coder from the CoderRegistry failed: Unable to provide a > > default Coder for org.apache.beam.sdk.values.KV<K, OutputT>. Correct one of > > the following root causes: > > Building a Coder using a registered CoderFactory failed: Cannot provide > > coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: > > Unable to provide a default Coder for java.lang.Object. Correct one of the > > following root causes: > > > > > > Stack: > > at > > > > org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:174) > > at > > org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51) > > at > > org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:130) > > at > > > > org.apache.beam.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:90) > > at > > > > org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:95) > > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:386) > > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:302) > > at > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154) > > at > > org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1460) > > at > > org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1337) > > at > > org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76) > > at > > org.apache.beam.runners.direct.DirectRunner.apply(DirectRunner.java:296) > > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:388) > > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:318) > > at > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167) > > at > > org.iastate.edu.CombineTestPipeline.main(CombineTestPipeline.java:110) > > > > > > Let me know. Thanks! > > -Paul G > > > > -- > > *Paul Gerver* > > pfger...@gmail.com > > >