Re: Accumulator with Map field in CombineFn not serializing correctly
Interesting, thanks for following up with the fix. Were you able to find a way to reproduce this locally, or did it only occur on Dataflow? Did you have to make a similar change for the HashMap in Accum, or just the ExpiringLinkHashMap? Brian On Fri, Aug 7, 2020 at 9:58 AM Josh wrote: > I have resolved this issue now, in case anyone else runs into this problem > in future, the resolution was simply to use the concrete type for the field > in the accumulator, rather than Map: > > ExpiringLinkedHashMap recentEvents = new > ExpiringLinkedHashMap<>() > > > On Thu, Aug 6, 2020 at 3:16 PM Josh wrote: > >> Hi all, >> >> In my Beam job I have defined my own CombineFn with an accumulator. >> Running locally is no problem, but when I run the job on Dataflow I hit an >> Avro serialization exception: >> java.lang.NoSuchMethodException: java.util.Map.() >> java.lang.Class.getConstructor0(Class.java:3082) >> java.lang.Class.getDeclaredConstructor(Class.java:2178) >> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) >> >> I am using the `@DefaultCoder(AvroCoder.class)` annotation for my >> accumulator class. Is there anything special I need to do because one of >> the fields in my accumulator class is a Map? I have pasted an outline of my >> CombineFn below. >> >> Thanks for any help with this! >> >> Josh >> >> private static class MyCombineFn extends CombineFn> MyCombineFn.Accum, Out> { >> >> private static class ExpiringLinkedHashMap extends >> LinkedHashMap { >> @Override >> protected boolean removeEldestEntry(Map.Entry eldest) { >> return this.size() > 10; >> } >> } >> >> @DefaultCoder(AvroCoder.class) >> private static class PartialEventUpdate implements Serializable { >> Long incrementCountBy = 0L; >> Map recentEvents = new >> ExpiringLinkedHashMap<>(); >> Long lastSeenMillis = 0L; >> >> PartialEventUpdate() {} >> } >> >> @DefaultCoder(AvroCoder.class) >> private static class Accum implements Serializable { >> Map eventIdToUpdate = new >> HashMap<>(); >> >> Accum() {} >> } >> >> @Override >> public MyCombineFn.Accum createAccumulator() { >> return new MyCombineFn.Accum(); >> } >> >> ... >> >> } >> >
Re: Accumulator with Map field in CombineFn not serializing correctly
I have resolved this issue now, in case anyone else runs into this problem in future, the resolution was simply to use the concrete type for the field in the accumulator, rather than Map: ExpiringLinkedHashMap recentEvents = new ExpiringLinkedHashMap<>() On Thu, Aug 6, 2020 at 3:16 PM Josh wrote: > Hi all, > > In my Beam job I have defined my own CombineFn with an accumulator. > Running locally is no problem, but when I run the job on Dataflow I hit an > Avro serialization exception: > java.lang.NoSuchMethodException: java.util.Map.() > java.lang.Class.getConstructor0(Class.java:3082) > java.lang.Class.getDeclaredConstructor(Class.java:2178) > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > > I am using the `@DefaultCoder(AvroCoder.class)` annotation for my > accumulator class. Is there anything special I need to do because one of > the fields in my accumulator class is a Map? I have pasted an outline of my > CombineFn below. > > Thanks for any help with this! > > Josh > > private static class MyCombineFn extends CombineFn MyCombineFn.Accum, Out> { > > private static class ExpiringLinkedHashMap extends > LinkedHashMap { > @Override > protected boolean removeEldestEntry(Map.Entry eldest) { > return this.size() > 10; > } > } > > @DefaultCoder(AvroCoder.class) > private static class PartialEventUpdate implements Serializable { > Long incrementCountBy = 0L; > Map recentEvents = new > ExpiringLinkedHashMap<>(); > Long lastSeenMillis = 0L; > > PartialEventUpdate() {} > } > > @DefaultCoder(AvroCoder.class) > private static class Accum implements Serializable { > Map eventIdToUpdate = new > HashMap<>(); > > Accum() {} > } > > @Override > public MyCombineFn.Accum createAccumulator() { > return new MyCombineFn.Accum(); > } > > ... > > } >
Accumulator with Map field in CombineFn not serializing correctly
Hi all, In my Beam job I have defined my own CombineFn with an accumulator. Running locally is no problem, but when I run the job on Dataflow I hit an Avro serialization exception: java.lang.NoSuchMethodException: java.util.Map.() java.lang.Class.getConstructor0(Class.java:3082) java.lang.Class.getDeclaredConstructor(Class.java:2178) org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) I am using the `@DefaultCoder(AvroCoder.class)` annotation for my accumulator class. Is there anything special I need to do because one of the fields in my accumulator class is a Map? I have pasted an outline of my CombineFn below. Thanks for any help with this! Josh private static class MyCombineFn extends CombineFn { private static class ExpiringLinkedHashMap extends LinkedHashMap { @Override protected boolean removeEldestEntry(Map.Entry eldest) { return this.size() > 10; } } @DefaultCoder(AvroCoder.class) private static class PartialEventUpdate implements Serializable { Long incrementCountBy = 0L; Map recentEvents = new ExpiringLinkedHashMap<>(); Long lastSeenMillis = 0L; PartialEventUpdate() {} } @DefaultCoder(AvroCoder.class) private static class Accum implements Serializable { Map eventIdToUpdate = new HashMap<>(); Accum() {} } @Override public MyCombineFn.Accum createAccumulator() { return new MyCombineFn.Accum(); } ... }