Inheritance and FlatMap with custom POJO
Hi guys, I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types. It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. Here the derived class, using RainfallPOJO: public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction { public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){ super(num, den, time_data_name, start_time, end_time, interval, time_unit); } public void flatMap(RainfallPOJO obj, Collector coll) throws Exception { DateFormat formatter = new SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS"); try { Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); if(time.after(this.startTime) && time.before(this.endTime)){ coll.collect(obj); } } catch(Exception e){ e.printStackTrace(); } } } My Base class is: public class CullTimeBase { protected int numerator; protected int denominator; protected String timeDataName; protected Date startTime; protected Date endTime; protected int interval; protected String timeUnit; public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){ numerator = num; denominator = den; timeDataName = time_data_name; interval = interv; timeUnit = time_unit; DateFormat formatter = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS"); try { startTime = formatter.parse(start_time); endTime = formatter.parse(end_time); } catch (ParseException e) { e.printStackTrace(); } } It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type. Thanks a lot, Giacomo
Re: Inheritance and FlatMap with custom POJO
Hi Giacomo, I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example). public [abstract] class CullTimeBase implements FlatMapFunction { // ... } public class CullTimeRainFall extends CullTimeBaseRainFallPOJO> { // ... } This should work. Best, Martin On 16.09.2015 10:41, Giacomo Licari wrote: Hi guys, I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types. It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. Here the derived class, using RainfallPOJO: public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction { public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){ super(num, den, time_data_name, start_time, end_time, interval, time_unit); } public void flatMap(RainfallPOJO obj, Collector coll) throws Exception { DateFormat formatter = new SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS"); try { Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); if(time.after(this.startTime) && time.before(this.endTime)){ coll.collect(obj); } } catch(Exception e){ e.printStackTrace(); } } } My Base class is: public class CullTimeBase { protected int numerator; protected int denominator; protected String timeDataName; protected Date startTime; protected Date endTime; protected int interval; protected String timeUnit; public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){ numerator = num; denominator = den; timeDataName = time_data_name; interval = interv; timeUnit = time_unit; DateFormat formatter = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS"); try { startTime = formatter.parse(start_time); endTime = formatter.parse(end_time); } catch (ParseException e) { e.printStackTrace(); } } It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type. Thanks a lot, Giacomo
Re: Inheritance and FlatMap with custom POJO
Hi! Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue. Can you try the following to see if any of those resolves the problem? 1) On the environment, disable the closure cleaner (in the execution config). 2) Let the CullTimeBase class implement java.io.Serializable. Please let us know how it turns out! Greetings, Stephan On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns wrote: > Hi Giacomo, > > I ran into the same issue. Seems to be coupled to the serialization > mechanism of UDFs. I solved it by letting the base class implement the UDF > interface (e.g. FlatMapFunction) and in addition make it generic (which > should be necessary in your example). > > public [abstract] class CullTimeBase implements > FlatMapFunction { > // ... > } > > public class CullTimeRainFall extends CullTimeBase RainFallPOJO> { > // ... > } > > This should work. > > Best, > Martin > > > On 16.09.2015 10:41, Giacomo Licari wrote: > > Hi guys, > I'm trying to create a base class which is inherited from classes > implementing FlatMap method on specific POJO types. > > It seems inheritance doesn't work, I can access this.PropertyName or > super.PropertyName from flatMap method but values are always null. > > Here the derived class, using RainfallPOJO: > > public class CullTimeRainfall extends CullTimeBase implements > FlatMapFunction { > > public CullTimeRainfall(int num, int den, String time_data_name, String > start_time, String end_time, int interval, String time_unit){ > super(num, den, time_data_name, start_time, end_time, interval, time_unit); > } > > public void flatMap(RainfallPOJO obj, Collector coll) throws > Exception { > DateFormat formatter = new SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS"); > try { >Date time = formatter.parse( obj.getTime().replaceAll( > "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); >if(time.after(this.startTime) && time.before(this.endTime)){ > coll.collect(obj); > } > } catch(Exception e){ > e.printStackTrace(); > } > } > } > > My Base class is: > > public class CullTimeBase { > > protected int numerator; > protected int denominator; > protected String timeDataName; > protected Date startTime; > protected Date endTime; > protected int interval; > protected String timeUnit; > public CullTimeBase(int num, int den, String time_data_name, String > start_time, String end_time, int interv, String time_unit){ > numerator = num; > denominator = den; > timeDataName = time_data_name; > interval = interv; > timeUnit = time_unit; > DateFormat formatter = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS"); > try { > startTime = formatter.parse(start_time); > endTime = formatter.parse(end_time); > } catch (ParseException e) { > e.printStackTrace(); > } > } > > It works only if I declare all variables and methods in only one class, > but so I should repeat the same properties in more classes. I would only > specialize each derived class with a custom flatMap method. which uses a > custom POJO type. > > Thanks a lot, > Giacomo > > >
Re: Inheritance and FlatMap with custom POJO
Thank you Martin and Stephan for your help. I tried directly to implement java.io.Serializable in Base class and it worked perfectly! Now I can develop more flexible and maintainable code. Thank you a lot guys. Greetings, Giacomo On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen wrote: > Hi! > > Interesting case. We use plain Java Serialization to distribute UDFs, and > perform additional "cleaning" of scopes, which may be causing the issue. > > Can you try the following to see if any of those resolves the problem? > > 1) On the environment, disable the closure cleaner (in the execution > config). > > 2) Let the CullTimeBase class implement java.io.Serializable. > > Please let us know how it turns out! > > Greetings, > Stephan > > > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns > wrote: > >> Hi Giacomo, >> >> I ran into the same issue. Seems to be coupled to the serialization >> mechanism of UDFs. I solved it by letting the base class implement the UDF >> interface (e.g. FlatMapFunction) and in addition make it generic (which >> should be necessary in your example). >> >> public [abstract] class CullTimeBase implements >> FlatMapFunction { >> // ... >> } >> >> public class CullTimeRainFall extends CullTimeBase> RainFallPOJO> { >> // ... >> } >> >> This should work. >> >> Best, >> Martin >> >> >> On 16.09.2015 10:41, Giacomo Licari wrote: >> >> Hi guys, >> I'm trying to create a base class which is inherited from classes >> implementing FlatMap method on specific POJO types. >> >> It seems inheritance doesn't work, I can access this.PropertyName or >> super.PropertyName from flatMap method but values are always null. >> >> Here the derived class, using RainfallPOJO: >> >> public class CullTimeRainfall extends CullTimeBase implements >> FlatMapFunction { >> >> public CullTimeRainfall(int num, int den, String time_data_name, String >> start_time, String end_time, int interval, String time_unit){ >> super(num, den, time_data_name, start_time, end_time, interval, >> time_unit); >> } >> >> public void flatMap(RainfallPOJO obj, Collector coll) >> throws Exception { >> DateFormat formatter = new SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS"); >> try { >>Date time = formatter.parse( obj.getTime().replaceAll( >> "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); >>if(time.after(this.startTime) && time.before(this.endTime)){ >> coll.collect(obj); >> } >> } catch(Exception e){ >> e.printStackTrace(); >> } >> } >> } >> >> My Base class is: >> >> public class CullTimeBase { >> >> protected int numerator; >> protected int denominator; >> protected String timeDataName; >> protected Date startTime; >> protected Date endTime; >> protected int interval; >> protected String timeUnit; >> public CullTimeBase(int num, int den, String time_data_name, String >> start_time, String end_time, int interv, String time_unit){ >> numerator = num; >> denominator = den; >> timeDataName = time_data_name; >> interval = interv; >> timeUnit = time_unit; >> DateFormat formatter = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS"); >> try { >> startTime = formatter.parse(start_time); >> endTime = formatter.parse(end_time); >> } catch (ParseException e) { >> e.printStackTrace(); >> } >> } >> >> It works only if I declare all variables and methods in only one class, >> but so I should repeat the same properties in more classes. I would only >> specialize each derived class with a custom flatMap method. which uses a >> custom POJO type. >> >> Thanks a lot, >> Giacomo >> >> >> >
Re: Inheritance and FlatMap with custom POJO
Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it. Greetings, Stephan On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari wrote: > Thank you Martin and Stephan for your help. > I tried directly to implement java.io.Serializable in Base class and it > worked perfectly! > > Now I can develop more flexible and maintainable code. Thank you a lot > guys. > > Greetings, > Giacomo > > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen wrote: > >> Hi! >> >> Interesting case. We use plain Java Serialization to distribute UDFs, and >> perform additional "cleaning" of scopes, which may be causing the issue. >> >> Can you try the following to see if any of those resolves the problem? >> >> 1) On the environment, disable the closure cleaner (in the execution >> config). >> >> 2) Let the CullTimeBase class implement java.io.Serializable. >> >> Please let us know how it turns out! >> >> Greetings, >> Stephan >> >> >> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns < >> m.jungha...@mailbox.org> wrote: >> >>> Hi Giacomo, >>> >>> I ran into the same issue. Seems to be coupled to the serialization >>> mechanism of UDFs. I solved it by letting the base class implement the UDF >>> interface (e.g. FlatMapFunction) and in addition make it generic (which >>> should be necessary in your example). >>> >>> public [abstract] class CullTimeBase implements >>> FlatMapFunction { >>> // ... >>> } >>> >>> public class CullTimeRainFall extends CullTimeBase>> RainFallPOJO> { >>> // ... >>> } >>> >>> This should work. >>> >>> Best, >>> Martin >>> >>> >>> On 16.09.2015 10:41, Giacomo Licari wrote: >>> >>> Hi guys, >>> I'm trying to create a base class which is inherited from classes >>> implementing FlatMap method on specific POJO types. >>> >>> It seems inheritance doesn't work, I can access this.PropertyName or >>> super.PropertyName from flatMap method but values are always null. >>> >>> Here the derived class, using RainfallPOJO: >>> >>> public class CullTimeRainfall extends CullTimeBase implements >>> FlatMapFunction { >>> >>> public CullTimeRainfall(int num, int den, String time_data_name, String >>> start_time, String end_time, int interval, String time_unit){ >>> super(num, den, time_data_name, start_time, end_time, interval, >>> time_unit); >>> } >>> >>> public void flatMap(RainfallPOJO obj, Collector coll) >>> throws Exception { >>> DateFormat formatter = new SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS"); >>> try { >>>Date time = formatter.parse( obj.getTime().replaceAll( >>> "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); >>>if(time.after(this.startTime) && time.before(this.endTime)){ >>> coll.collect(obj); >>> } >>> } catch(Exception e){ >>> e.printStackTrace(); >>> } >>> } >>> } >>> >>> My Base class is: >>> >>> public class CullTimeBase { >>> >>> protected int numerator; >>> protected int denominator; >>> protected String timeDataName; >>> protected Date startTime; >>> protected Date endTime; >>> protected int interval; >>> protected String timeUnit; >>> public CullTimeBase(int num, int den, String time_data_name, String >>> start_time, String end_time, int interv, String time_unit){ >>> numerator = num; >>> denominator = den; >>> timeDataName = time_data_name; >>> interval = interv; >>> timeUnit = time_unit; >>> DateFormat formatter = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS"); >>> try { >>> startTime = formatter.parse(start_time); >>> endTime = formatter.parse(end_time); >>> } catch (ParseException e) { >>> e.printStackTrace(); >>> } >>> } >>> >>> It works only if I declare all variables and methods in only one class, >>> but so I should repeat the same properties in more classes. I would only >>> specialize each derived class with a custom flatMap method. which uses a >>> custom POJO type. >>> >>> Thanks a lot, >>> Giacomo >>> >>> >>> >> >
Re: Inheritance and FlatMap with custom POJO
I run it only implementing java.io.Serializable without disabling the closure cleaner. Another question I have is about POJO classes. I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type). In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet". DataSet ds_twitter = env.readCsvFile("file://"+path_twitter) .pojoType(TwitterPOJO.class, "table", "time", "tweet"); I obtain this error: [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo Greetings, G.L. On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen wrote: > Could you also try the other variant (disabeling the closure cleaner)? I > would be curious if this behavior is expected Java Serialization behavior, > or whether our pre-processing code is causing it. > > Greetings, > Stephan > > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari > wrote: > >> Thank you Martin and Stephan for your help. >> I tried directly to implement java.io.Serializable in Base class and it >> worked perfectly! >> >> Now I can develop more flexible and maintainable code. Thank you a lot >> guys. >> >> Greetings, >> Giacomo >> >> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen wrote: >> >>> Hi! >>> >>> Interesting case. We use plain Java Serialization to distribute UDFs, >>> and perform additional "cleaning" of scopes, which may be causing the issue. >>> >>> Can you try the following to see if any of those resolves the problem? >>> >>> 1) On the environment, disable the closure cleaner (in the execution >>> config). >>> >>> 2) Let the CullTimeBase class implement java.io.Serializable. >>> >>> Please let us know how it turns out! >>> >>> Greetings, >>> Stephan >>> >>> >>> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns < >>> m.jungha...@mailbox.org> wrote: >>> Hi Giacomo, I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example). public [abstract] class CullTimeBase implements FlatMapFunction { // ... } public class CullTimeRainFall extends CullTimeBase>>> RainFallPOJO> { // ... } This should work. Best, Martin On 16.09.2015 10:41, Giacomo Licari wrote: Hi guys, I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types. It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. Here the derived class, using RainfallPOJO: public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction { public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){ super(num, den, time_data_name, start_time, end_time, interval, time_unit); } public void flatMap(RainfallPOJO obj, Collector coll) throws Exception { DateFormat formatter = new SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS"); try { Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); if(time.after(this.startTime) && time.before(this.endTime)){ coll.collect(obj); } } catch(Exception e){ e.printStackTrace(); } } } My Base class is: public class CullTimeBase { protected int numerator; protected int denominator; protected String timeDataName; protected Date startTime; protected Date endTime; protected int interval; protected String timeUnit; public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){ numerator = num; denominator = den; timeDataName = time_data_name; interval = interv; timeUnit = time_unit; DateFormat formatter = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS"); try { startTime = formatter.parse(start_time); endTime = formatter.parse(end_time); } catch (ParseException e) { e.printStackTrace(); } } It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each der
Re: Inheritance and FlatMap with custom POJO
Hi Giacomo, You should set your field as public. If you are set your field as private or protected, the class must provide getter and setter to be treated as POJO. Maybe the documentation in homepage [1] would be helpful. Regards, Chiwan Park [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos > On Sep 16, 2015, at 11:25 PM, Giacomo Licari wrote: > > I run it only implementing java.io.Serializable without disabling the closure > cleaner. > > Another question I have is about POJO classes. > I would also create a base POJO class with some common proprerties, and then > extend it in new classes. These classes are used to convert a CSV into a > dataset of POJO objects (of derived class type). > > In the following example, I create a DataSet of TwitterPOJO, which extends a > Base class, adding the new property "tweet". > > DataSet ds_twitter = env.readCsvFile("file://"+path_twitter) >.pojoType(TwitterPOJO.class, "table", "time", "tweet"); > > I obtain this error: > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class > com.Flink.POJO.TwitterPOJO is not a valid POJO type > Exception in thread "main" java.lang.ClassCastException: > org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to > org.apache.flink.api.java.typeutils.PojoTypeInfo > > Greetings, > G.L. > > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen wrote: > Could you also try the other variant (disabeling the closure cleaner)? I > would be curious if this behavior is expected Java Serialization behavior, or > whether our pre-processing code is causing it. > > Greetings, > Stephan > > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari > wrote: > Thank you Martin and Stephan for your help. > I tried directly to implement java.io.Serializable in Base class and it > worked perfectly! > > Now I can develop more flexible and maintainable code. Thank you a lot guys. > > Greetings, > Giacomo > > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen wrote: > Hi! > > Interesting case. We use plain Java Serialization to distribute UDFs, and > perform additional "cleaning" of scopes, which may be causing the issue. > > Can you try the following to see if any of those resolves the problem? > > 1) On the environment, disable the closure cleaner (in the execution config). > > 2) Let the CullTimeBase class implement java.io.Serializable. > > Please let us know how it turns out! > > Greetings, > Stephan > > > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns > wrote: > Hi Giacomo, > > I ran into the same issue. Seems to be coupled to the serialization mechanism > of UDFs. I solved it by letting the base class implement the UDF interface > (e.g. FlatMapFunction) and in addition make it generic (which should be > necessary in your example). > > public [abstract] class CullTimeBase implements FlatMapFunction OUT> { > // ... > } > > public class CullTimeRainFall extends CullTimeBase RainFallPOJO> { > // ... > } > > This should work. > > Best, > Martin > > > On 16.09.2015 10:41, Giacomo Licari wrote: >> Hi guys, >> I'm trying to create a base class which is inherited from classes >> implementing FlatMap method on specific POJO types. >> >> It seems inheritance doesn't work, I can access this.PropertyName or >> super.PropertyName from flatMap method but values are always null. >> >> Here the derived class, using RainfallPOJO: >> >> public class CullTimeRainfall extends CullTimeBase implements >> FlatMapFunction { >> >> public CullTimeRainfall(int num, int den, String time_data_name, String >> start_time, String end_time, int interval, String time_unit){ >> super(num, den, time_data_name, start_time, end_time, interval, >> time_unit); >> } >> >> public void flatMap(RainfallPOJO obj, Collector coll) >> throws Exception { >> DateFormat formatter = new >> SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS"); >> try { >> Date time = formatter.parse( obj.getTime().replaceAll( >> "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); >> if(time.after(this.startTime) && time.before(this.endTime)){ >> >> coll.collect(obj); >> } >> } catch(Exception e){ >> e.printStackTrace(); >> } >> } >> >> } >> >> My Base class is: >> >> public class CullTimeBase { >> >> protected int numerator; >> protected int denominator; >> protected String timeDataName; >> protected Date startTime; >> protected Date endTime; >> protected int interval; >> protected String timeUnit; >> >> public CullTimeBase(int num, int den, String time_data_name, String >> start_time, String end_time, int interv, String time_unit){ >> numerato
Re: Inheritance and FlatMap with custom POJO
Hi Chiwan, I followed instructions in documentation. I have a simple base class with some properties (all public). Then I extend that class with a new public property (tweet in my case), I provide also getter and setter for that property. Now when I execute: DataSet ds_twitter = env.readCsvFile("file://"+path_twitter) .pojoType(TwitterPOJO.class, "table", "time", "tweet"); I receive: There is no field called "table" in com.Flink.POJO.TwitterPOJO table is a field of the Base class, declared as public with also getter and setter. Thank you for your help. Giacomo On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park wrote: > Hi Giacomo, > > You should set your field as public. If you are set your field as private > or protected, the class must provide getter and setter to be treated as > POJO. > > Maybe the documentation in homepage [1] would be helpful. > > Regards, > Chiwan Park > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos > > > On Sep 16, 2015, at 11:25 PM, Giacomo Licari > wrote: > > > > I run it only implementing java.io.Serializable without disabling the > closure cleaner. > > > > Another question I have is about POJO classes. > > I would also create a base POJO class with some common proprerties, and > then extend it in new classes. These classes are used to convert a CSV into > a dataset of POJO objects (of derived class type). > > > > In the following example, I create a DataSet of TwitterPOJO, which > extends a Base class, adding the new property "tweet". > > > > DataSet ds_twitter = env.readCsvFile("file://"+path_twitter) > >.pojoType(TwitterPOJO.class, "table", "time", > "tweet"); > > > > I obtain this error: > > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class > com.Flink.POJO.TwitterPOJO is not a valid POJO type > > Exception in thread "main" java.lang.ClassCastException: > org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to > org.apache.flink.api.java.typeutils.PojoTypeInfo > > > > Greetings, > > G.L. > > > > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen wrote: > > Could you also try the other variant (disabeling the closure cleaner)? I > would be curious if this behavior is expected Java Serialization behavior, > or whether our pre-processing code is causing it. > > > > Greetings, > > Stephan > > > > > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari < > giacomo.lic...@gmail.com> wrote: > > Thank you Martin and Stephan for your help. > > I tried directly to implement java.io.Serializable in Base class and it > worked perfectly! > > > > Now I can develop more flexible and maintainable code. Thank you a lot > guys. > > > > Greetings, > > Giacomo > > > > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen wrote: > > Hi! > > > > Interesting case. We use plain Java Serialization to distribute UDFs, > and perform additional "cleaning" of scopes, which may be causing the issue. > > > > Can you try the following to see if any of those resolves the problem? > > > > 1) On the environment, disable the closure cleaner (in the execution > config). > > > > 2) Let the CullTimeBase class implement java.io.Serializable. > > > > Please let us know how it turns out! > > > > Greetings, > > Stephan > > > > > > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns < > m.jungha...@mailbox.org> wrote: > > Hi Giacomo, > > > > I ran into the same issue. Seems to be coupled to the serialization > mechanism of UDFs. I solved it by letting the base class implement the UDF > interface (e.g. FlatMapFunction) and in addition make it generic (which > should be necessary in your example). > > > > public [abstract] class CullTimeBase implements > FlatMapFunction { > > // ... > > } > > > > public class CullTimeRainFall extends CullTimeBase RainFallPOJO> { > > // ... > > } > > > > This should work. > > > > Best, > > Martin > > > > > > On 16.09.2015 10:41, Giacomo Licari wrote: > >> Hi guys, > >> I'm trying to create a base class which is inherited from classes > implementing FlatMap method on specific POJO types. > >> > >> It seems inheritance doesn't work, I can access this.PropertyName or > super.PropertyName from flatMap method but values are always null. > >> > >> Here the derived class, using RainfallPOJO: > >> > >> public class CullTimeRainfall extends CullTimeBase implements > FlatMapFunction { > >> > >> public CullTimeRainfall(int num, int den, String time_data_name, > String start_time, String end_time, int interval, String time_unit){ > >> super(num, den, time_data_name, start_time, end_time, > interval, time_unit); > >> } > >> > >> public void flatMap(RainfallPOJO obj, Collector > coll) throws Exception { > >> DateFormat formatter = new > SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS"); > >> try { > >> Date time = formatter.parse( obj.getTime().replaceAll( > "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); > >>
Re: Inheritance and FlatMap with custom POJO
Hi Giacomo, Did you create constructors without arguments in both base class and derived class? If you do, it seems like a bug. Regards, Chiwan Park > On Sep 17, 2015, at 12:04 AM, Giacomo Licari wrote: > > Hi Chiwan, > I followed instructions in documentation. > I have a simple base class with some properties (all public). > Then I extend that class with a new public property (tweet in my case), I > provide also getter and setter for that property. > > Now when I execute: > DataSet ds_twitter = env.readCsvFile("file://"+path_twitter) >.pojoType(TwitterPOJO.class, "table", "time", "tweet"); > > I receive: > There is no field called "table" in com.Flink.POJO.TwitterPOJO > > table is a field of the Base class, declared as public with also getter and > setter. > > Thank you for your help. > > Giacomo > > On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park wrote: > Hi Giacomo, > > You should set your field as public. If you are set your field as private or > protected, the class must provide getter and setter to be treated as POJO. > > Maybe the documentation in homepage [1] would be helpful. > > Regards, > Chiwan Park > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos > > > On Sep 16, 2015, at 11:25 PM, Giacomo Licari > > wrote: > > > > I run it only implementing java.io.Serializable without disabling the > > closure cleaner. > > > > Another question I have is about POJO classes. > > I would also create a base POJO class with some common proprerties, and > > then extend it in new classes. These classes are used to convert a CSV into > > a dataset of POJO objects (of derived class type). > > > > In the following example, I create a DataSet of TwitterPOJO, which extends > > a Base class, adding the new property "tweet". > > > > DataSet ds_twitter = env.readCsvFile("file://"+path_twitter) > >.pojoType(TwitterPOJO.class, "table", "time", "tweet"); > > > > I obtain this error: > > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class > > com.Flink.POJO.TwitterPOJO is not a valid POJO type > > Exception in thread "main" java.lang.ClassCastException: > > org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to > > org.apache.flink.api.java.typeutils.PojoTypeInfo > > > > Greetings, > > G.L. > > > > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen wrote: > > Could you also try the other variant (disabeling the closure cleaner)? I > > would be curious if this behavior is expected Java Serialization behavior, > > or whether our pre-processing code is causing it. > > > > Greetings, > > Stephan > > > > > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari > > wrote: > > Thank you Martin and Stephan for your help. > > I tried directly to implement java.io.Serializable in Base class and it > > worked perfectly! > > > > Now I can develop more flexible and maintainable code. Thank you a lot guys. > > > > Greetings, > > Giacomo > > > > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen wrote: > > Hi! > > > > Interesting case. We use plain Java Serialization to distribute UDFs, and > > perform additional "cleaning" of scopes, which may be causing the issue. > > > > Can you try the following to see if any of those resolves the problem? > > > > 1) On the environment, disable the closure cleaner (in the execution > > config). > > > > 2) Let the CullTimeBase class implement java.io.Serializable. > > > > Please let us know how it turns out! > > > > Greetings, > > Stephan > > > > > > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns > > wrote: > > Hi Giacomo, > > > > I ran into the same issue. Seems to be coupled to the serialization > > mechanism of UDFs. I solved it by letting the base class implement the UDF > > interface (e.g. FlatMapFunction) and in addition make it generic (which > > should be necessary in your example). > > > > public [abstract] class CullTimeBase implements > > FlatMapFunction { > > // ... > > } > > > > public class CullTimeRainFall extends CullTimeBase > RainFallPOJO> { > > // ... > > } > > > > This should work. > > > > Best, > > Martin > > > > > > On 16.09.2015 10:41, Giacomo Licari wrote: > >> Hi guys, > >> I'm trying to create a base class which is inherited from classes > >> implementing FlatMap method on specific POJO types. > >> > >> It seems inheritance doesn't work, I can access this.PropertyName or > >> super.PropertyName from flatMap method but values are always null. > >> > >> Here the derived class, using RainfallPOJO: > >> > >> public class CullTimeRainfall extends CullTimeBase implements > >> FlatMapFunction { > >> > >> public CullTimeRainfall(int num, int den, String time_data_name, > >> String start_time, String end_time, int interval, String time_unit){ > >> super(num, den, time_data_name, start_time, end_time, > >> interval, time_unit); > >> } > >> > >> public void flatMap(RainfallPOJO obj, Collector coll) > >> th
Re: Inheritance and FlatMap with custom POJO
Yes I did. if anyone has a bypass solution, let us know. Regards, Giacomo Licari On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park wrote: > Hi Giacomo, > > Did you create constructors without arguments in both base class and > derived class? > If you do, it seems like a bug. > > Regards, > Chiwan Park > > > On Sep 17, 2015, at 12:04 AM, Giacomo Licari > wrote: > > > > Hi Chiwan, > > I followed instructions in documentation. > > I have a simple base class with some properties (all public). > > Then I extend that class with a new public property (tweet in my case), > I provide also getter and setter for that property. > > > > Now when I execute: > > DataSet ds_twitter = env.readCsvFile("file://"+path_twitter) > >.pojoType(TwitterPOJO.class, "table", "time", > "tweet"); > > > > I receive: > > There is no field called "table" in com.Flink.POJO.TwitterPOJO > > > > table is a field of the Base class, declared as public with also getter > and setter. > > > > Thank you for your help. > > > > Giacomo > > > > On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park > wrote: > > Hi Giacomo, > > > > You should set your field as public. If you are set your field as > private or protected, the class must provide getter and setter to be > treated as POJO. > > > > Maybe the documentation in homepage [1] would be helpful. > > > > Regards, > > Chiwan Park > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos > > > > > On Sep 16, 2015, at 11:25 PM, Giacomo Licari > wrote: > > > > > > I run it only implementing java.io.Serializable without disabling the > closure cleaner. > > > > > > Another question I have is about POJO classes. > > > I would also create a base POJO class with some common proprerties, > and then extend it in new classes. These classes are used to convert a CSV > into a dataset of POJO objects (of derived class type). > > > > > > In the following example, I create a DataSet of TwitterPOJO, which > extends a Base class, adding the new property "tweet". > > > > > > DataSet ds_twitter = > env.readCsvFile("file://"+path_twitter) > > >.pojoType(TwitterPOJO.class, "table", "time", > "tweet"); > > > > > > I obtain this error: > > > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class > com.Flink.POJO.TwitterPOJO is not a valid POJO type > > > Exception in thread "main" java.lang.ClassCastException: > org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to > org.apache.flink.api.java.typeutils.PojoTypeInfo > > > > > > Greetings, > > > G.L. > > > > > > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen > wrote: > > > Could you also try the other variant (disabeling the closure cleaner)? > I would be curious if this behavior is expected Java Serialization > behavior, or whether our pre-processing code is causing it. > > > > > > Greetings, > > > Stephan > > > > > > > > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari < > giacomo.lic...@gmail.com> wrote: > > > Thank you Martin and Stephan for your help. > > > I tried directly to implement java.io.Serializable in Base class and > it worked perfectly! > > > > > > Now I can develop more flexible and maintainable code. Thank you a lot > guys. > > > > > > Greetings, > > > Giacomo > > > > > > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen > wrote: > > > Hi! > > > > > > Interesting case. We use plain Java Serialization to distribute UDFs, > and perform additional "cleaning" of scopes, which may be causing the issue. > > > > > > Can you try the following to see if any of those resolves the problem? > > > > > > 1) On the environment, disable the closure cleaner (in the execution > config). > > > > > > 2) Let the CullTimeBase class implement java.io.Serializable. > > > > > > Please let us know how it turns out! > > > > > > Greetings, > > > Stephan > > > > > > > > > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns < > m.jungha...@mailbox.org> wrote: > > > Hi Giacomo, > > > > > > I ran into the same issue. Seems to be coupled to the serialization > mechanism of UDFs. I solved it by letting the base class implement the UDF > interface (e.g. FlatMapFunction) and in addition make it generic (which > should be necessary in your example). > > > > > > public [abstract] class CullTimeBase implements > FlatMapFunction { > > > // ... > > > } > > > > > > public class CullTimeRainFall extends CullTimeBase RainFallPOJO> { > > > // ... > > > } > > > > > > This should work. > > > > > > Best, > > > Martin > > > > > > > > > On 16.09.2015 10:41, Giacomo Licari wrote: > > >> Hi guys, > > >> I'm trying to create a base class which is inherited from classes > implementing FlatMap method on specific POJO types. > > >> > > >> It seems inheritance doesn't work, I can access this.PropertyName or > super.PropertyName from flatMap method but values are always null. > > >> > > >> Here the derived class, using RainfallPOJO: > > >> > > >> public class CullTimeRainfall extends CullTimeBase implements > Fl
Re: Inheritance and FlatMap with custom POJO
It seems like a bug of CsvInputFormat. I succeed in reproducing in my local machine. I will create a JIRA issue for this and submit a patch to fix it. Which version of Flink are used? Regards, Chiwan Park > On Sep 17, 2015, at 12:20 AM, Giacomo Licari wrote: > > Yes I did. > > if anyone has a bypass solution, let us know. > > Regards, > Giacomo Licari > > On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park wrote: > Hi Giacomo, > > Did you create constructors without arguments in both base class and derived > class? > If you do, it seems like a bug. > > Regards, > Chiwan Park > > > On Sep 17, 2015, at 12:04 AM, Giacomo Licari > > wrote: > > > > Hi Chiwan, > > I followed instructions in documentation. > > I have a simple base class with some properties (all public). > > Then I extend that class with a new public property (tweet in my case), I > > provide also getter and setter for that property. > > > > Now when I execute: > > DataSet ds_twitter = env.readCsvFile("file://"+path_twitter) > >.pojoType(TwitterPOJO.class, "table", "time", "tweet"); > > > > I receive: > > There is no field called "table" in com.Flink.POJO.TwitterPOJO > > > > table is a field of the Base class, declared as public with also getter and > > setter. > > > > Thank you for your help. > > > > Giacomo > > > > On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park wrote: > > Hi Giacomo, > > > > You should set your field as public. If you are set your field as private > > or protected, the class must provide getter and setter to be treated as > > POJO. > > > > Maybe the documentation in homepage [1] would be helpful. > > > > Regards, > > Chiwan Park > > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos > > > > > On Sep 16, 2015, at 11:25 PM, Giacomo Licari > > > wrote: > > > > > > I run it only implementing java.io.Serializable without disabling the > > > closure cleaner. > > > > > > Another question I have is about POJO classes. > > > I would also create a base POJO class with some common proprerties, and > > > then extend it in new classes. These classes are used to convert a CSV > > > into a dataset of POJO objects (of derived class type). > > > > > > In the following example, I create a DataSet of TwitterPOJO, which > > > extends a Base class, adding the new property "tweet". > > > > > > DataSet ds_twitter = env.readCsvFile("file://"+path_twitter) > > >.pojoType(TwitterPOJO.class, "table", "time", "tweet"); > > > > > > I obtain this error: > > > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class > > > com.Flink.POJO.TwitterPOJO is not a valid POJO type > > > Exception in thread "main" java.lang.ClassCastException: > > > org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to > > > org.apache.flink.api.java.typeutils.PojoTypeInfo > > > > > > Greetings, > > > G.L. > > > > > > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen wrote: > > > Could you also try the other variant (disabeling the closure cleaner)? I > > > would be curious if this behavior is expected Java Serialization > > > behavior, or whether our pre-processing code is causing it. > > > > > > Greetings, > > > Stephan > > > > > > > > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari > > > wrote: > > > Thank you Martin and Stephan for your help. > > > I tried directly to implement java.io.Serializable in Base class and it > > > worked perfectly! > > > > > > Now I can develop more flexible and maintainable code. Thank you a lot > > > guys. > > > > > > Greetings, > > > Giacomo > > > > > > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen wrote: > > > Hi! > > > > > > Interesting case. We use plain Java Serialization to distribute UDFs, and > > > perform additional "cleaning" of scopes, which may be causing the issue. > > > > > > Can you try the following to see if any of those resolves the problem? > > > > > > 1) On the environment, disable the closure cleaner (in the execution > > > config). > > > > > > 2) Let the CullTimeBase class implement java.io.Serializable. > > > > > > Please let us know how it turns out! > > > > > > Greetings, > > > Stephan > > > > > > > > > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns > > > wrote: > > > Hi Giacomo, > > > > > > I ran into the same issue. Seems to be coupled to the serialization > > > mechanism of UDFs. I solved it by letting the base class implement the > > > UDF interface (e.g. FlatMapFunction) and in addition make it generic > > > (which should be necessary in your example). > > > > > > public [abstract] class CullTimeBase implements > > > FlatMapFunction { > > > // ... > > > } > > > > > > public class CullTimeRainFall extends CullTimeBase > > RainFallPOJO> { > > > // ... > > > } > > > > > > This should work. > > > > > > Best, > > > Martin > > > > > > > > > On 16.09.2015 10:41, Giacomo Licari wrote: > > >> Hi guys, > > >> I'm trying to create a base class which is inherited from classes
Re: Inheritance and FlatMap with custom POJO
I created a JIRA issue [1]. After FLINK-2637 [2][3] is resolved, I’ll submit a patch to solve this. Currently, there is no way to use derived class with CSV input. Thank you for reporting. [1] https://issues.apache.org/jira/browse/FLINK-2690 [2] https://issues.apache.org/jira/browse/FLINK-2637 [3] https://github.com/apache/flink/pull/1134 Regards, Chiwan Park > On Sep 17, 2015, at 1:33 AM, Chiwan Park wrote: > > It seems like a bug of CsvInputFormat. I succeed in reproducing in my local > machine. > I will create a JIRA issue for this and submit a patch to fix it. > > Which version of Flink are used? > > Regards, > Chiwan Park > >> On Sep 17, 2015, at 12:20 AM, Giacomo Licari >> wrote: >> >> Yes I did. >> >> if anyone has a bypass solution, let us know. >> >> Regards, >> Giacomo Licari >> >> On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park wrote: >> Hi Giacomo, >> >> Did you create constructors without arguments in both base class and derived >> class? >> If you do, it seems like a bug. >> >> Regards, >> Chiwan Park >> >>> On Sep 17, 2015, at 12:04 AM, Giacomo Licari >>> wrote: >>> >>> Hi Chiwan, >>> I followed instructions in documentation. >>> I have a simple base class with some properties (all public). >>> Then I extend that class with a new public property (tweet in my case), I >>> provide also getter and setter for that property. >>> >>> Now when I execute: >>> DataSet ds_twitter = env.readCsvFile("file://"+path_twitter) >>> .pojoType(TwitterPOJO.class, "table", "time", "tweet"); >>> >>> I receive: >>> There is no field called "table" in com.Flink.POJO.TwitterPOJO >>> >>> table is a field of the Base class, declared as public with also getter and >>> setter. >>> >>> Thank you for your help. >>> >>> Giacomo >>> >>> On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park wrote: >>> Hi Giacomo, >>> >>> You should set your field as public. If you are set your field as private >>> or protected, the class must provide getter and setter to be treated as >>> POJO. >>> >>> Maybe the documentation in homepage [1] would be helpful. >>> >>> Regards, >>> Chiwan Park >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos >>> On Sep 16, 2015, at 11:25 PM, Giacomo Licari wrote: I run it only implementing java.io.Serializable without disabling the closure cleaner. Another question I have is about POJO classes. I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type). In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet". DataSet ds_twitter = env.readCsvFile("file://"+path_twitter) .pojoType(TwitterPOJO.class, "table", "time", "tweet"); I obtain this error: [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo Greetings, G.L. On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen wrote: Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it. Greetings, Stephan On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari wrote: Thank you Martin and Stephan for your help. I tried directly to implement java.io.Serializable in Base class and it worked perfectly! Now I can develop more flexible and maintainable code. Thank you a lot guys. Greetings, Giacomo On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen wrote: Hi! Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue. Can you try the following to see if any of those resolves the problem? 1) On the environment, disable the closure cleaner (in the execution config). 2) Let the CullTimeBase class implement java.io.Serializable. Please let us know how it turns out! Greetings, Stephan On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns wrote: Hi Giacomo, I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example). >>>
Re: Inheritance and FlatMap with custom POJO
Hi Chiwan, I'm using Flink 0.9. 1 Cheers, Giacomo I created a JIRA issue [1]. After FLINK-2637 [2][3] is resolved, I’ll submit a patch to solve this. Currently, there is no way to use derived class with CSV input. Thank you for reporting. [1] https://issues.apache.org/jira/browse/FLINK-2690 [2] https://issues.apache.org/jira/browse/FLINK-2637 [3] https://github.com/apache/flink/pull/1134 Regards, Chiwan Park > On Sep 17, 2015, at 1:33 AM, Chiwan Park wrote: > > It seems like a bug of CsvInputFormat. I succeed in reproducing in my local machine. > I will create a JIRA issue for this and submit a patch to fix it. > > Which version of Flink are used? > > Regards, > Chiwan Park > >> On Sep 17, 2015, at 12:20 AM, Giacomo Licari wrote: >> >> Yes I did. >> >> if anyone has a bypass solution, let us know. >> >> Regards, >> Giacomo Licari >> >> On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park wrote: >> Hi Giacomo, >> >> Did you create constructors without arguments in both base class and derived class? >> If you do, it seems like a bug. >> >> Regards, >> Chiwan Park >> >>> On Sep 17, 2015, at 12:04 AM, Giacomo Licari wrote: >>> >>> Hi Chiwan, >>> I followed instructions in documentation. >>> I have a simple base class with some properties (all public). >>> Then I extend that class with a new public property (tweet in my case), I provide also getter and setter for that property. >>> >>> Now when I execute: >>> DataSet ds_twitter = env.readCsvFile("file://"+path_twitter) >>> .pojoType(TwitterPOJO.class, "table", "time", "tweet"); >>> >>> I receive: >>> There is no field called "table" in com.Flink.POJO.TwitterPOJO >>> >>> table is a field of the Base class, declared as public with also getter and setter. >>> >>> Thank you for your help. >>> >>> Giacomo >>> >>> On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park wrote: >>> Hi Giacomo, >>> >>> You should set your field as public. If you are set your field as private or protected, the class must provide getter and setter to be treated as POJO. >>> >>> Maybe the documentation in homepage [1] would be helpful. >>> >>> Regards, >>> Chiwan Park >>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos >>> On Sep 16, 2015, at 11:25 PM, Giacomo Licari wrote: I run it only implementing java.io.Serializable without disabling the closure cleaner. Another question I have is about POJO classes. I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type). In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet". DataSet ds_twitter = env.readCsvFile("file://"+path_twitter) .pojoType(TwitterPOJO.class, "table", "time", "tweet"); I obtain this error: [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo Greetings, G.L. On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen wrote: Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it. Greetings, Stephan On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari < giacomo.lic...@gmail.com> wrote: Thank you Martin and Stephan for your help. I tried directly to implement java.io.Serializable in Base class and it worked perfectly! Now I can develop more flexible and maintainable code. Thank you a lot guys. Greetings, Giacomo On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen wrote: Hi! Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue. Can you try the following to see if any of those resolves the problem? 1) On the environment, disable the closure cleaner (in the execution config). 2) Let the CullTimeBase class implement java.io.Serializable. Please let us know how it turns out! Greetings, Stephan On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns < m.jungha...@mailbox.org> wrote: Hi Giacomo, I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example). public [abstract] class CullTimeBase implements FlatMapFunction { // ... }