I wasn't able to reproduce the issue you're experiencing.
I've created a gist with an example that works and is similar to what you
have described.
Please help us make tweaks to the gist reproduce your problem:
https://gist.github.com/aviemzur/ba213d98b4484492099b3cf709ddded0

On Fri, Apr 7, 2017 at 7:25 PM Paul Gerver <[email protected]> wrote:

> Yes, the pipeline is quite small:
>
>         pipeline.apply("source",
>                 Read.from(new CustomSource())).setCoder(CustomSource.coder)
>         .apply("GlobalCombine", Combine.globally(new
> CustomCombineFn())).setCoder(CustomTuple.coder);
>
>
> The InputT is not the same as OutputT, so the input coder can't be used.
>
> On 2017-04-07 08:58 (-0500), Aviem Zur <[email protected]> wrote:
> > Have you set the coder for your input PCollection? The one on which you
> > perform the Combine?
> >
> > On Fri, Apr 7, 2017 at 4:24 PM Paul Gerver <[email protected]> wrote:
> >
> > > Hello All,
> > >
> > > I'm trying to test out a Combine.Globally transform which takes in a
> small
> > > custom class (CustomA) and outputs a secondary custom class (CustomB).
> I
> > > have set the coder for the resulting PCollection<CustomB>, but Beam is
> > > arguing that a coder for a KV type is missing (see output at bottom).
> > >
> > > Since this a global combine, the input nor the output is of KV type,
> so I
> > > decided to take a look at the Combine code. Since
> Combine.Globally.expand()
> > > performs a perKeys and groupedValues underneath the covers, but
> requires
> > > making an intermediate PCollection KV<Void, OutputT> which--according
> to
> > > the docs--is inferred from the CombineFn.
> > >
> > > I believe I could workaround this by registering a KvCoder with the
> > > CoderRegistry, but that's not intuitive. Is there a better way to
> address
> > > this currently, or should something be added to the CombineFn area for
> > > setting an output coder similar to PCollection.
> > >
> > >
> > > Output:
> > > Exception in thread "main" java.lang.IllegalStateException: Unable to
> > > return a default Coder for
> > >
> > >
> GlobalCombine/Combine.perKey(CustomTuple)/Combine.GroupedValues/ParDo(Anonymous).out
> > > [Class]. Correct one of the following root causes:
> > >   No Coder has been manually specified;  you may do so using
> .setCoder().
> > >   Inferring a Coder from the CoderRegistry failed: Unable to provide a
> > > default Coder for org.apache.beam.sdk.values.KV<K, OutputT>. Correct
> one of
> > > the following root causes:
> > >   Building a Coder using a registered CoderFactory failed: Cannot
> provide
> > > coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>:
> > > Unable to provide a default Coder for java.lang.Object. Correct one of
> the
> > > following root causes:
> > >
> > >
> > > Stack:
> > >         at
> > >
> > >
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:174)
> > >         at
> > > org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51)
> > >         at
> > > org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:130)
> > >         at
> > >
> > >
> org.apache.beam.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:90)
> > >         at
> > >
> > >
> org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:95)
> > >         at
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:386)
> > >         at
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:302)
> > >         at
> > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154)
> > >         at
> > >
> org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1460)
> > >         at
> > >
> org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1337)
> > >         at
> > >
> org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76)
> > >         at
> > >
> org.apache.beam.runners.direct.DirectRunner.apply(DirectRunner.java:296)
> > >         at
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:388)
> > >         at
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:318)
> > >         at
> > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167)
> > >         at
> > > org.iastate.edu.CombineTestPipeline.main(CombineTestPipeline.java:110)
> > >
> > >
> > > Let me know. Thanks!
> > > -Paul G
> > >
> > > --
> > > *Paul Gerver*
> > > [email protected]
> > >
> >
>

Reply via email to