There are various strategies that you can use depending on what you know
with the worst case being that you have to ask the person using the
PTransform to give you a K and V coder or a concrete type descriptor for K
and V which would allow you to get the coder from the coder registry.

The Apache Beam SDK has used the following strategies to solve this problem
internally (on a case by case basis):
* Get the coder from the PCollection that is supplied as input since it can
encode CustomType<SimpleImmutableEntry<K,V>> and hence some part of it
knows how to encode the K and V types
* If you know that K and V will always be from a class of types (Java
Serializable, Proto message subclass, ...), then you can use a generic
coder for that class of types (SerializableCoder, ProtoCoder, ...)
* Ask the user to explicitly provide a coder (see getDefaultOutputCoder,
getAccumulatorCoder).


On Tue, Dec 4, 2018 at 2:07 AM Eran Twili <eran.tw...@niceactimize.com>
wrote:

> Thanks Lukasz,
>
>
>
> Please tell me, how can I set a coder on the *PCollection* created after
> the "MapToKV" apply?
>
> I mean, all I know is that it will be a *PCollection<KV<K,V>>*, and I
> don't know what will be the actual runtime types of K and V.
>
> So, what coder should I set? Can you please give a code example of how to
> do that?
>
>
>
> Really appriciate your help,
>
> Eran
>
>
>
> *From:* Lukasz Cwik [mailto:lc...@google.com]
> *Sent:* Monday, December 03, 2018 7:10 PM
> *To:* user@beam.apache.org
> *Subject:* Re: Generic Type PTransform
>
>
>
> Apache Beam attempts to propagate coders through by looking at any typing
> information available but because Java has a lot of type erasure and there
> are many scenarios where these coders can't be propagated forward from a
> previous transform.
>
>
>
> Take the following two examples (note that there are many subtle
> variations that can give different results):
>
> List<String> erasedType = new List<String>();  // type is lost
>
> List<String> keptType = new List<String>() {};  // type is kept because of
> anonymous inner class being declared
>
> In the first the type is erased and in the second the type information is
> available. I would suggest
>
>
>
> In your case we can't infer what K and what V are because after the code
> compiles the types have been erased hence the error message. To immediately
> fix the problem, you'll want to set the coder on the PCollection created
> after you apply the "MapToKV" transform (you might need to do it on the
> "MapToSimpleImmutableEntry" transform as well).
>
>
>
> If you want to get into the details, take a look at they CoderRegistry[1]
> as it contains the type inference / propagation code.
>
>
>
> Finally, this not an uncommon problem that users face and it seems as
> though the error message that is given doesn't make sense so feel free to
> propose changes in the error messages to help others such as yourself.
>
>
>
> 1:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
>
>
>
> On Sun, Dec 2, 2018 at 10:54 PM Matt Casters <mattcast...@gmail.com>
> wrote:
>
> There are probably smarter people than me on this list but since I
> recently been through a similar thought exercise...
>
>
>
> For the generic use in Kettle I have a PCollection<KettleRow> going
> through the pipeline.
> KettleRow is just an Object[] wrapper for which I can implement a Coder.
>
> The "group by" that I implemented does the following:Split
> PCollection<KettleRow> into PCollection<KV<KettleRow, KettleRow>>
> Then it  applies the standard GroupByKey.create() giving us
> PCollection<KV<KettleRow, Iterable<KettleRow>>>
> This means that we can simple aggregate all the elements in
> Iterable<KettleRow> to aggregate a group.
>
> Well, at least that works for me. The code is open so look at it over here:
>
> https://github.com/mattcasters/kettle-beam-core/blob/master/src/main/java/org/kettle/beam/core/transform/GroupByTransform.java
>
> Like you I had trouble with the Coder for my KettleRows so I hacked up
> this to make it work:
>
>
> https://github.com/mattcasters/kettle-beam-core/blob/master/src/main/java/org/kettle/beam/core/coder/KettleRowCoder.java
>
>
>
> It's set on the pipeline:
> pipeline.getCoderRegistry().registerCoderForClass( KettleRow.class, new
> KettleRowCoder());
>
>
> Good luck!
>
> Matt
>
>
>
> Op zo 2 dec. 2018 om 20:57 schreef Eran Twili <eran.tw...@niceactimize.com
> >:
>
> Hi,
>
>
>
> We are considering using Beam in our software.
>
> We wish to create a service for a user which will operate Beam for him,
> and obviously the user code doesn't have Beam API visibility.
>
> For that we need to generify some Beam API.
>
> So the user supply functions and we embed them in a generic *PTransform*
> and run them in a Beam pipeline.
>
> We have some difficulties to understand how can we provide the user with
> option to perform *GroupByKey* operation.
>
> The problem is that *GroupByKey* takes *KV* and our *PCollections* holds
> only user datatypes which should not be Beam datatypes.
>
> So we thought about having this *PTransform*:
>
> public class PlatformGroupByKey<K,V> extends
>         PTransform<PCollection<CustomType<SimpleImmutableEntry<K,V>>>,
> PCollection<CustomType<SimpleImmutableEntry<K,Iterable<V>>>>> {
>     @Override
>     public PCollection<CustomType<SimpleImmutableEntry<K,Iterable<V>>>>
> expand(PCollection<CustomType<SimpleImmutableEntry<K,V>>> input) {
>
>         return input
>                 .apply("MapToKV",
>                         MapElements.*via*(
>                             new
> SimpleFunction<CustomType<SimpleImmutableEntry<K,V>>, KV<K, V>>() {
>                                 @Override
>                                 public KV<K, V> apply
> (CustomType<SimpleImmutableEntry<K,V>> kv) {
>                                     return KV.*of*(kv.field.getKey(), kv.
> field.getValue()); }}))
>                 .apply("GroupByKey",
>                         GroupByKey.*create*())
>                 .apply("MapToSimpleImmutableEntry",
>                         MapElements.*via*(
>                             new SimpleFunction<KV<K, Iterable<V>>,
> CustomType<SimpleImmutableEntry<K,Iterable<V>>>>() {
>                                 @Override
>                                 public CustomType<SimpleImmutableEntry<K,
> Iterable<V>>> apply(KV<K, Iterable<V>> kv) {
>                                     return new CustomType<>(new
> SimpleImmutableEntry<>(kv.getKey(), kv.getValue())); }}));
>     }
> }
>
> In which we will get *PCollection* from our key-value type (java's
> *SimpleImmutableEntry*),
>
> Convert it to *KV*,
>
> Preform the *GroupByKey*,
>
> And re-convert it again to *SimpleImmutableEntry*.
>
>
>
> But we get this error in runtime:
>
>
>
> java.lang.IllegalStateException: Unable to return a default Coder for
> GroupByKey/MapToKV/Map/ParMultiDo(Anonymous).output [PCollection]. Correct
> one of the following root causes:
>
>   No Coder has been manually specified;  you may do so using .setCoder().
>
>   Inferring a Coder from the CoderRegistry failed: Cannot provide coder
> for parameterized type org.apache.beam.sdk.values.KV<K, V>: Unable to
> provide a Coder for K.
>
>   Building a Coder using a registered CoderProvider failed.
>
>   See suppressed exceptions for detailed failures.
>
>   Using the default output Coder from the producing PTransform failed:
> PTransform.getOutputCoder called.
>
>                 at
> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
>
>                 at
> org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278)
>
>                 at
> org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:115)
>
>                 at
> org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:190)
>
>                 at
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
>
>                 at
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
>
>                 at
> org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>
>                 at
> org.apache.beam.examples.platform.PlatformGroupByKey.expand(PlatformGroupByKey.java:27)
>
>
>
> We don't understand why is *K* generic type gets into runtime.
>
> In runtime it will been known by the *PCollection* concrete input
> parameter that is being send to the *expand* method.
>
> What are we doing wrong? Is there a way to achieve what we want using Beam?
>
> Appreciate any help.
>
>
>
> Regards,
>
> Eran
>
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>

Reply via email to