Re: Accumulator with Map field in CombineFn not serializing correctly

2020-08-07 Thread Brian Hulette
Interesting, thanks for following up with the fix. Were you able to find a
way to reproduce this locally, or did it only occur on Dataflow?

Did you have to make a similar change for the HashMap in Accum, or just the
ExpiringLinkHashMap?

Brian

On Fri, Aug 7, 2020 at 9:58 AM Josh  wrote:

> I have resolved this issue now, in case anyone else runs into this problem
> in future, the resolution was simply to use the concrete type for the field
> in the accumulator, rather than Map:
>
> ExpiringLinkedHashMap recentEvents = new 
> ExpiringLinkedHashMap<>()
>
>
> On Thu, Aug 6, 2020 at 3:16 PM Josh  wrote:
>
>> Hi all,
>>
>> In my Beam job I have defined my own CombineFn with an accumulator.
>> Running locally is no problem, but when I run the job on Dataflow I hit an
>> Avro serialization exception:
>> java.lang.NoSuchMethodException: java.util.Map.()
>> java.lang.Class.getConstructor0(Class.java:3082)
>> java.lang.Class.getDeclaredConstructor(Class.java:2178)
>> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
>>
>> I am using the `@DefaultCoder(AvroCoder.class)` annotation for my
>> accumulator class. Is there anything special I need to do because one of
>> the fields in my accumulator class is a Map? I have pasted an outline of my
>> CombineFn below.
>>
>> Thanks for any help with this!
>>
>> Josh
>>
>> private static class MyCombineFn extends CombineFn> MyCombineFn.Accum, Out> {
>>
>> private static class ExpiringLinkedHashMap extends
>> LinkedHashMap {
>> @Override
>> protected boolean removeEldestEntry(Map.Entry eldest) {
>> return this.size() > 10;
>> }
>> }
>>
>> @DefaultCoder(AvroCoder.class)
>> private static class PartialEventUpdate implements Serializable {
>> Long incrementCountBy = 0L;
>> Map recentEvents = new
>> ExpiringLinkedHashMap<>();
>> Long lastSeenMillis = 0L;
>>
>> PartialEventUpdate() {}
>> }
>>
>> @DefaultCoder(AvroCoder.class)
>> private static class Accum implements Serializable {
>> Map eventIdToUpdate = new
>> HashMap<>();
>>
>> Accum() {}
>> }
>>
>> @Override
>> public MyCombineFn.Accum createAccumulator() {
>> return new MyCombineFn.Accum();
>> }
>>
>> ...
>>
>> }
>>
>


Re: Accumulator with Map field in CombineFn not serializing correctly

2020-08-07 Thread Josh
I have resolved this issue now, in case anyone else runs into this problem
in future, the resolution was simply to use the concrete type for the field
in the accumulator, rather than Map:

ExpiringLinkedHashMap recentEvents = new
ExpiringLinkedHashMap<>()


On Thu, Aug 6, 2020 at 3:16 PM Josh  wrote:

> Hi all,
>
> In my Beam job I have defined my own CombineFn with an accumulator.
> Running locally is no problem, but when I run the job on Dataflow I hit an
> Avro serialization exception:
> java.lang.NoSuchMethodException: java.util.Map.()
> java.lang.Class.getConstructor0(Class.java:3082)
> java.lang.Class.getDeclaredConstructor(Class.java:2178)
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
>
> I am using the `@DefaultCoder(AvroCoder.class)` annotation for my
> accumulator class. Is there anything special I need to do because one of
> the fields in my accumulator class is a Map? I have pasted an outline of my
> CombineFn below.
>
> Thanks for any help with this!
>
> Josh
>
> private static class MyCombineFn extends CombineFn MyCombineFn.Accum, Out> {
>
> private static class ExpiringLinkedHashMap extends
> LinkedHashMap {
> @Override
> protected boolean removeEldestEntry(Map.Entry eldest) {
> return this.size() > 10;
> }
> }
>
> @DefaultCoder(AvroCoder.class)
> private static class PartialEventUpdate implements Serializable {
> Long incrementCountBy = 0L;
> Map recentEvents = new
> ExpiringLinkedHashMap<>();
> Long lastSeenMillis = 0L;
>
> PartialEventUpdate() {}
> }
>
> @DefaultCoder(AvroCoder.class)
> private static class Accum implements Serializable {
> Map eventIdToUpdate = new
> HashMap<>();
>
> Accum() {}
> }
>
> @Override
> public MyCombineFn.Accum createAccumulator() {
> return new MyCombineFn.Accum();
> }
>
> ...
>
> }
>


Accumulator with Map field in CombineFn not serializing correctly

2020-08-06 Thread Josh
Hi all,

In my Beam job I have defined my own CombineFn with an accumulator. Running
locally is no problem, but when I run the job on Dataflow I hit an Avro
serialization exception:
java.lang.NoSuchMethodException: java.util.Map.()
java.lang.Class.getConstructor0(Class.java:3082)
java.lang.Class.getDeclaredConstructor(Class.java:2178)
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)

I am using the `@DefaultCoder(AvroCoder.class)` annotation for my
accumulator class. Is there anything special I need to do because one of
the fields in my accumulator class is a Map? I have pasted an outline of my
CombineFn below.

Thanks for any help with this!

Josh

private static class MyCombineFn extends CombineFn {

private static class ExpiringLinkedHashMap extends
LinkedHashMap {
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return this.size() > 10;
}
}

@DefaultCoder(AvroCoder.class)
private static class PartialEventUpdate implements Serializable {
Long incrementCountBy = 0L;
Map recentEvents = new ExpiringLinkedHashMap<>();
Long lastSeenMillis = 0L;

PartialEventUpdate() {}
}

@DefaultCoder(AvroCoder.class)
private static class Accum implements Serializable {
Map eventIdToUpdate = new HashMap<>();

Accum() {}
}

@Override
public MyCombineFn.Accum createAccumulator() {
return new MyCombineFn.Accum();
}

...

}