Hi Ido,

at the first glance, I could not find any problem in your code. So it might be a bug. The "environment.registerType()" is not needed in your case, because you have no generic types.

I will have a closer look at it tomorrow.

Regards,
Timo

Am 14.08.17 um 16:35 schrieb Ido Bar Av:

Hi,

We’re using flink 1.3.1, and we’re trying to pass through the pipeline a POJO object that has a generic field )see details in the complete example below):

We have the class Foo<SomeKey extends BarKey>, and when sending a subclass with a specific SomeKey, we get the following exception:

java.lang.RuntimeException: Cannot instantiate class.

at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:201)

at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:395)

at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206)

at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)

at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)

at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)

at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)

at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)

              at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.IllegalArgumentException: Can not set ….BarKey field …Foo.someKey to java.lang.Object

              at java.lang.reflect.Field.set(Field.java:764)

at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)

at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:197)

              ... 10 more

If I understand correctly, for some reason, the deserializer used for SomeKey returns Object (before filling it), ignoring the fact that SomeKey extends a BarKey, and then fails when trying to assign it to the parent class.

What is the correct approach for this situation?

Thanks,

Ido

Complete code example:

*public class BarKey implements Serializable {*

*    public List<Long> valueList;*

**

*    public BarKey() {*

*    }*

**

*    public BarKey(long value) {*

*        super();*

*        valueList = new ArrayList<>();*

*        valueList.add(value);*

*    }*

**

*    @Override*

*    public boolean equals(Object o) {*

*        if (this == o) {*

*            return true;*

*        }*

*        if (o == null || getClass() != o.getClass()) {*

*            return false;*

*        }*

*        BarKey barKey = (BarKey) o;*

*        return Objects.equals(valueList, barKey.valueList);*

*    }*

**

*    @Override*

*    public int hashCode() {*

*        return Objects.hash(valueList);*

*    }*

*}*

**

**

*public class SomeKey extends BarKey implements Serializable {*

*public Integer banana=1;*

**

*public SomeKey() {*

*    }*

**

*public SomeKey(long value) {*

*super(value);*

*    }*

*}*

*public class Foo<SomeKey extends BarKey> implements Serializable {*

**

*    public Foo() {}*

*    public SomeKey someKey;*

**

*    public Foo(SomeKey someKey) {*

*        this.someKey = someKey;*

*    }*

**

*}*

**

*public class FooFoo<SomeKey extends BarKey> extends Foo<SomeKey> implements Serializable {*

*    public FooFoo() {*

*    }*

**

*    public Integer grill = 12;*

*    public FooFoo(SomeKey someKey) {*

*        super(someKey);*

*    }*

**

*}*

class MakeFoo extends ProcessFunction<Integer, Foo<BarKey>> implements Serializable {

    @Override

public void processElement(Integer value, Context ctx, Collector<Foo<BarKey>> out) throws Exception {

        out.collect(new FooFoo<>(new SomeKey((long) value)));

    }

}

class FooProcessor extends ProcessFunction<Foo<BarKey>, Foo<BarKey>> implements Serializable {

    @Override

public void processElement(Foo<BarKey> value, Context ctx, Collector<Foo<BarKey>> out) throws Exception {

        value.someKey.valueList.add(1L);

        out.collect(value);

    }

}

class FooBarSelector<SomeKey extends BarKey> implements KeySelector<Foo<SomeKey>, BarKey>, Serializable {

    @Override

    public BarKey getKey(Foo<SomeKey> value) throws Exception {

        return value.someKey;

    }

}

class FooBarSink implements Serializable, SinkFunction<Foo<BarKey>> {

private static final Logger logger = LoggerFactory.getLogger(FooBarSink.class);

    public long dosomething = 0;

    @Override

    public void invoke(Foo<BarKey> value) throws Exception {

        dosomething += value.someKey.valueList.size();

        logger.warn("Sink {}", dosomething);

    }

}

Test code:

environment.registerType(FooFoo.class); // Not certain if this is needed

        List<Integer> intlist = new ArrayList<>();

        intlist.add(3);

        intlist.add(5);

DataStreamSource<Integer> streamSource = environment.fromCollection(intlist);

        streamSource.process(new MakeFoo())

             .keyBy(new FooBarSelector<>())

             .process(new FooProcessor())

             .addSink(new FooBarSink());

        environment.execute(“Jobname-UT");


Reply via email to