Hi all,

In my Beam job I have defined my own CombineFn with an accumulator. Running
locally is no problem, but when I run the job on Dataflow I hit an Avro
serialization exception:
java.lang.NoSuchMethodException: java.util.Map.<init>()
java.lang.Class.getConstructor0(Class.java:3082)
java.lang.Class.getDeclaredConstructor(Class.java:2178)
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)

I am using the `@DefaultCoder(AvroCoder.class)` annotation for my
accumulator class. Is there anything special I need to do because one of
the fields in my accumulator class is a Map? I have pasted an outline of my
CombineFn below.

Thanks for any help with this!

Josh

private static class MyCombineFn extends CombineFn<Event,
MyCombineFn.Accum, Out> {

        private static class ExpiringLinkedHashMap<K, V> extends
LinkedHashMap<K, V> {
            @Override
            protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
                return this.size() > 10;
            }
        }

        @DefaultCoder(AvroCoder.class)
        private static class PartialEventUpdate implements Serializable {
            Long incrementCountBy = 0L;
            Map<String, Event> recentEvents = new ExpiringLinkedHashMap<>();
            Long lastSeenMillis = 0L;

            PartialEventUpdate() {}
        }

        @DefaultCoder(AvroCoder.class)
        private static class Accum implements Serializable {
            Map<UUID, PartialEventUpdate> eventIdToUpdate = new HashMap<>();

            Accum() {}
        }

        @Override
        public MyCombineFn.Accum createAccumulator() {
            return new MyCombineFn.Accum();
        }

        ...

}

Reply via email to