Inheritance and FlatMap with custom POJO

2015-09-16 Thread Giacomo Licari
Hi guys,
I'm trying to create a base class which is inherited from classes
implementing FlatMap method on specific POJO types.

It seems inheritance doesn't work, I can access this.PropertyName or
super.PropertyName from flatMap method but values are always null.

Here the derived class, using RainfallPOJO:

public class CullTimeRainfall extends CullTimeBase implements
FlatMapFunction {

public CullTimeRainfall(int num, int den, String time_data_name, String
start_time, String end_time, int interval, String time_unit){
super(num, den, time_data_name, start_time, end_time, interval, time_unit);
}

public void flatMap(RainfallPOJO obj, Collector coll) throws
Exception {
DateFormat formatter = new SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS");
try {
   Date time = formatter.parse( obj.getTime().replaceAll(
"([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
   if(time.after(this.startTime) && time.before(this.endTime)){
coll.collect(obj);
}
} catch(Exception e){
e.printStackTrace();
}
}
}

My Base class is:

public class CullTimeBase {

  protected int numerator;
protected int denominator;
protected String timeDataName;
protected Date startTime;
protected Date endTime;
protected int interval;
protected String timeUnit;
public CullTimeBase(int num, int den, String time_data_name, String
start_time, String end_time, int interv, String time_unit){
numerator = num;
denominator = den;
timeDataName = time_data_name;
interval = interv;
timeUnit = time_unit;
DateFormat formatter = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS");
try {
startTime = formatter.parse(start_time);
endTime = formatter.parse(end_time);
} catch (ParseException e) {
e.printStackTrace();
}
}

It works only if I declare all variables and methods in only one class, but
so I should repeat the same properties in more classes. I would only
specialize each derived class with a custom flatMap method. which uses a
custom POJO type.

Thanks a lot,
Giacomo


Re: Inheritance and FlatMap with custom POJO

2015-09-16 Thread Martin Junghanns

Hi Giacomo,

I ran into the same issue. Seems to be coupled to the serialization 
mechanism of UDFs. I solved it by letting the base class implement the 
UDF interface (e.g. FlatMapFunction) and in addition make it generic 
(which should be necessary in your example).


public [abstract] class CullTimeBase implements 
FlatMapFunction {

// ...
}

public class CullTimeRainFall extends CullTimeBaseRainFallPOJO> {

// ...
}

This should work.

Best,
Martin

On 16.09.2015 10:41, Giacomo Licari wrote:

Hi guys,
I'm trying to create a base class which is inherited from classes 
implementing FlatMap method on specific POJO types.


It seems inheritance doesn't work, I can access this.PropertyName or 
super.PropertyName from flatMap method but values are always null.


Here the derived class, using RainfallPOJO:

public class CullTimeRainfall extends CullTimeBase implements 
FlatMapFunction {


public CullTimeRainfall(int num, int den, String time_data_name, 
String start_time, String end_time, int interval, String time_unit){
super(num, den, time_data_name, start_time, end_time, interval, 
time_unit);

}

public void flatMap(RainfallPOJO obj, Collector coll) 
throws Exception {

DateFormat formatter = new SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS");
try {
   Date time = formatter.parse( obj.getTime().replaceAll( 
"([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );

 if(time.after(this.startTime) && time.before(this.endTime)){
coll.collect(obj);
}
} catch(Exception e){
e.printStackTrace();
}
}
}

My Base class is:

public class CullTimeBase {

protected int numerator;
protected int denominator;
protected String timeDataName;
protected Date startTime;
protected Date endTime;
protected int interval;
protected String timeUnit;
public CullTimeBase(int num, int den, String time_data_name, String 
start_time, String end_time, int interv, String time_unit){

numerator = num;
denominator = den;
timeDataName = time_data_name;
interval = interv;
timeUnit = time_unit;
DateFormat formatter = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS");
try {
startTime = formatter.parse(start_time);
endTime = formatter.parse(end_time);
} catch (ParseException e) {
e.printStackTrace();
}
}

It works only if I declare all variables and methods in only one 
class, but so I should repeat the same properties in more classes. I 
would only specialize each derived class with a custom flatMap method. 
which uses a custom POJO type.


Thanks a lot,
Giacomo




Re: Inheritance and FlatMap with custom POJO

2015-09-16 Thread Stephan Ewen
Hi!

Interesting case. We use plain Java Serialization to distribute UDFs, and
perform additional "cleaning" of scopes, which may be causing the issue.

Can you try the following to see if any of those resolves the problem?

1) On the environment, disable the closure cleaner (in the execution
config).

2) Let the CullTimeBase class implement java.io.Serializable.

Please let us know how it turns out!

Greetings,
Stephan


On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns 
wrote:

> Hi Giacomo,
>
> I ran into the same issue. Seems to be coupled to the serialization
> mechanism of UDFs. I solved it by letting the base class implement the UDF
> interface (e.g. FlatMapFunction) and in addition make it generic (which
> should be necessary in your example).
>
> public [abstract] class CullTimeBase implements
> FlatMapFunction {
> // ...
> }
>
> public class CullTimeRainFall extends CullTimeBase RainFallPOJO> {
> // ...
> }
>
> This should work.
>
> Best,
> Martin
>
>
> On 16.09.2015 10:41, Giacomo Licari wrote:
>
> Hi guys,
> I'm trying to create a base class which is inherited from classes
> implementing FlatMap method on specific POJO types.
>
> It seems inheritance doesn't work, I can access this.PropertyName or
> super.PropertyName from flatMap method but values are always null.
>
> Here the derived class, using RainfallPOJO:
>
> public class CullTimeRainfall extends CullTimeBase implements
> FlatMapFunction {
>
> public CullTimeRainfall(int num, int den, String time_data_name, String
> start_time, String end_time, int interval, String time_unit){
> super(num, den, time_data_name, start_time, end_time, interval, time_unit);
> }
>
> public void flatMap(RainfallPOJO obj, Collector coll) throws
> Exception {
> DateFormat formatter = new SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS");
> try {
>Date time = formatter.parse( obj.getTime().replaceAll(
> "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
>if(time.after(this.startTime) && time.before(this.endTime)){
> coll.collect(obj);
> }
> } catch(Exception e){
> e.printStackTrace();
> }
> }
> }
>
> My Base class is:
>
> public class CullTimeBase {
>
>   protected int numerator;
> protected int denominator;
> protected String timeDataName;
> protected Date startTime;
> protected Date endTime;
> protected int interval;
> protected String timeUnit;
> public CullTimeBase(int num, int den, String time_data_name, String
> start_time, String end_time, int interv, String time_unit){
> numerator = num;
> denominator = den;
> timeDataName = time_data_name;
> interval = interv;
> timeUnit = time_unit;
> DateFormat formatter = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS");
> try {
> startTime = formatter.parse(start_time);
> endTime = formatter.parse(end_time);
> } catch (ParseException e) {
> e.printStackTrace();
> }
> }
>
> It works only if I declare all variables and methods in only one class,
> but so I should repeat the same properties in more classes. I would only
> specialize each derived class with a custom flatMap method. which uses a
> custom POJO type.
>
> Thanks a lot,
> Giacomo
>
>
>


Re: Inheritance and FlatMap with custom POJO

2015-09-16 Thread Giacomo Licari
Thank you Martin and Stephan for your help.
I tried directly to implement java.io.Serializable in Base class and it
worked perfectly!

Now I can develop more flexible and maintainable code. Thank you a lot guys.

Greetings,
Giacomo

On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen  wrote:

> Hi!
>
> Interesting case. We use plain Java Serialization to distribute UDFs, and
> perform additional "cleaning" of scopes, which may be causing the issue.
>
> Can you try the following to see if any of those resolves the problem?
>
> 1) On the environment, disable the closure cleaner (in the execution
> config).
>
> 2) Let the CullTimeBase class implement java.io.Serializable.
>
> Please let us know how it turns out!
>
> Greetings,
> Stephan
>
>
> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns  > wrote:
>
>> Hi Giacomo,
>>
>> I ran into the same issue. Seems to be coupled to the serialization
>> mechanism of UDFs. I solved it by letting the base class implement the UDF
>> interface (e.g. FlatMapFunction) and in addition make it generic (which
>> should be necessary in your example).
>>
>> public [abstract] class CullTimeBase implements
>> FlatMapFunction {
>> // ...
>> }
>>
>> public class CullTimeRainFall extends CullTimeBase> RainFallPOJO> {
>> // ...
>> }
>>
>> This should work.
>>
>> Best,
>> Martin
>>
>>
>> On 16.09.2015 10:41, Giacomo Licari wrote:
>>
>> Hi guys,
>> I'm trying to create a base class which is inherited from classes
>> implementing FlatMap method on specific POJO types.
>>
>> It seems inheritance doesn't work, I can access this.PropertyName or
>> super.PropertyName from flatMap method but values are always null.
>>
>> Here the derived class, using RainfallPOJO:
>>
>> public class CullTimeRainfall extends CullTimeBase implements
>> FlatMapFunction {
>>
>> public CullTimeRainfall(int num, int den, String time_data_name, String
>> start_time, String end_time, int interval, String time_unit){
>> super(num, den, time_data_name, start_time, end_time, interval,
>> time_unit);
>> }
>>
>> public void flatMap(RainfallPOJO obj, Collector coll)
>> throws Exception {
>> DateFormat formatter = new SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS");
>> try {
>>Date time = formatter.parse( obj.getTime().replaceAll(
>> "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
>>if(time.after(this.startTime) && time.before(this.endTime)){
>> coll.collect(obj);
>> }
>> } catch(Exception e){
>> e.printStackTrace();
>> }
>> }
>> }
>>
>> My Base class is:
>>
>> public class CullTimeBase {
>>
>>   protected int numerator;
>> protected int denominator;
>> protected String timeDataName;
>> protected Date startTime;
>> protected Date endTime;
>> protected int interval;
>> protected String timeUnit;
>> public CullTimeBase(int num, int den, String time_data_name, String
>> start_time, String end_time, int interv, String time_unit){
>> numerator = num;
>> denominator = den;
>> timeDataName = time_data_name;
>> interval = interv;
>> timeUnit = time_unit;
>> DateFormat formatter = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS");
>> try {
>> startTime = formatter.parse(start_time);
>> endTime = formatter.parse(end_time);
>> } catch (ParseException e) {
>> e.printStackTrace();
>> }
>> }
>>
>> It works only if I declare all variables and methods in only one class,
>> but so I should repeat the same properties in more classes. I would only
>> specialize each derived class with a custom flatMap method. which uses a
>> custom POJO type.
>>
>> Thanks a lot,
>> Giacomo
>>
>>
>>
>


Re: Inheritance and FlatMap with custom POJO

2015-09-16 Thread Stephan Ewen
Could you also try the other variant (disabeling the closure cleaner)? I
would be curious if this behavior is expected Java Serialization behavior,
or whether our pre-processing code is causing it.

Greetings,
Stephan


On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari 
wrote:

> Thank you Martin and Stephan for your help.
> I tried directly to implement java.io.Serializable in Base class and it
> worked perfectly!
>
> Now I can develop more flexible and maintainable code. Thank you a lot
> guys.
>
> Greetings,
> Giacomo
>
> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen  wrote:
>
>> Hi!
>>
>> Interesting case. We use plain Java Serialization to distribute UDFs, and
>> perform additional "cleaning" of scopes, which may be causing the issue.
>>
>> Can you try the following to see if any of those resolves the problem?
>>
>> 1) On the environment, disable the closure cleaner (in the execution
>> config).
>>
>> 2) Let the CullTimeBase class implement java.io.Serializable.
>>
>> Please let us know how it turns out!
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <
>> m.jungha...@mailbox.org> wrote:
>>
>>> Hi Giacomo,
>>>
>>> I ran into the same issue. Seems to be coupled to the serialization
>>> mechanism of UDFs. I solved it by letting the base class implement the UDF
>>> interface (e.g. FlatMapFunction) and in addition make it generic (which
>>> should be necessary in your example).
>>>
>>> public [abstract] class CullTimeBase implements
>>> FlatMapFunction {
>>> // ...
>>> }
>>>
>>> public class CullTimeRainFall extends CullTimeBase>> RainFallPOJO> {
>>> // ...
>>> }
>>>
>>> This should work.
>>>
>>> Best,
>>> Martin
>>>
>>>
>>> On 16.09.2015 10:41, Giacomo Licari wrote:
>>>
>>> Hi guys,
>>> I'm trying to create a base class which is inherited from classes
>>> implementing FlatMap method on specific POJO types.
>>>
>>> It seems inheritance doesn't work, I can access this.PropertyName or
>>> super.PropertyName from flatMap method but values are always null.
>>>
>>> Here the derived class, using RainfallPOJO:
>>>
>>> public class CullTimeRainfall extends CullTimeBase implements
>>> FlatMapFunction {
>>>
>>> public CullTimeRainfall(int num, int den, String time_data_name, String
>>> start_time, String end_time, int interval, String time_unit){
>>> super(num, den, time_data_name, start_time, end_time, interval,
>>> time_unit);
>>> }
>>>
>>> public void flatMap(RainfallPOJO obj, Collector coll)
>>> throws Exception {
>>> DateFormat formatter = new SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS");
>>> try {
>>>Date time = formatter.parse( obj.getTime().replaceAll(
>>> "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
>>>if(time.after(this.startTime) && time.before(this.endTime)){
>>> coll.collect(obj);
>>> }
>>> } catch(Exception e){
>>> e.printStackTrace();
>>> }
>>> }
>>> }
>>>
>>> My Base class is:
>>>
>>> public class CullTimeBase {
>>>
>>>   protected int numerator;
>>> protected int denominator;
>>> protected String timeDataName;
>>> protected Date startTime;
>>> protected Date endTime;
>>> protected int interval;
>>> protected String timeUnit;
>>> public CullTimeBase(int num, int den, String time_data_name, String
>>> start_time, String end_time, int interv, String time_unit){
>>> numerator = num;
>>> denominator = den;
>>> timeDataName = time_data_name;
>>> interval = interv;
>>> timeUnit = time_unit;
>>> DateFormat formatter = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS");
>>> try {
>>> startTime = formatter.parse(start_time);
>>> endTime = formatter.parse(end_time);
>>> } catch (ParseException e) {
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> It works only if I declare all variables and methods in only one class,
>>> but so I should repeat the same properties in more classes. I would only
>>> specialize each derived class with a custom flatMap method. which uses a
>>> custom POJO type.
>>>
>>> Thanks a lot,
>>> Giacomo
>>>
>>>
>>>
>>
>


Re: Inheritance and FlatMap with custom POJO

2015-09-16 Thread Giacomo Licari
I run it only implementing java.io.Serializable without disabling the
closure cleaner.

Another question I have is about POJO classes.
I would also create a base POJO class with some common proprerties, and
then extend it in new classes. These classes are used to convert a CSV into
a dataset of POJO objects (of derived class type).

In the following example, I create a DataSet of TwitterPOJO, which extends
a Base class, adding the new property "tweet".

DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
   .pojoType(TwitterPOJO.class, "table", "time", "tweet");

I obtain this error:
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
com.Flink.POJO.TwitterPOJO is not a valid POJO type
Exception in thread "main" java.lang.ClassCastException:
org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to
org.apache.flink.api.java.typeutils.PojoTypeInfo

Greetings,
G.L.

On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen  wrote:

> Could you also try the other variant (disabeling the closure cleaner)? I
> would be curious if this behavior is expected Java Serialization behavior,
> or whether our pre-processing code is causing it.
>
> Greetings,
> Stephan
>
>
> On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari 
> wrote:
>
>> Thank you Martin and Stephan for your help.
>> I tried directly to implement java.io.Serializable in Base class and it
>> worked perfectly!
>>
>> Now I can develop more flexible and maintainable code. Thank you a lot
>> guys.
>>
>> Greetings,
>> Giacomo
>>
>> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen  wrote:
>>
>>> Hi!
>>>
>>> Interesting case. We use plain Java Serialization to distribute UDFs,
>>> and perform additional "cleaning" of scopes, which may be causing the issue.
>>>
>>> Can you try the following to see if any of those resolves the problem?
>>>
>>> 1) On the environment, disable the closure cleaner (in the execution
>>> config).
>>>
>>> 2) Let the CullTimeBase class implement java.io.Serializable.
>>>
>>> Please let us know how it turns out!
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <
>>> m.jungha...@mailbox.org> wrote:
>>>
 Hi Giacomo,

 I ran into the same issue. Seems to be coupled to the serialization
 mechanism of UDFs. I solved it by letting the base class implement the UDF
 interface (e.g. FlatMapFunction) and in addition make it generic (which
 should be necessary in your example).

 public [abstract] class CullTimeBase implements
 FlatMapFunction {
 // ...
 }

 public class CullTimeRainFall extends CullTimeBase>>> RainFallPOJO> {
 // ...
 }

 This should work.

 Best,
 Martin


 On 16.09.2015 10:41, Giacomo Licari wrote:

 Hi guys,
 I'm trying to create a base class which is inherited from classes
 implementing FlatMap method on specific POJO types.

 It seems inheritance doesn't work, I can access this.PropertyName or
 super.PropertyName from flatMap method but values are always null.

 Here the derived class, using RainfallPOJO:

 public class CullTimeRainfall extends CullTimeBase implements
 FlatMapFunction {

 public CullTimeRainfall(int num, int den, String time_data_name, String
 start_time, String end_time, int interval, String time_unit){
 super(num, den, time_data_name, start_time, end_time, interval,
 time_unit);
 }

 public void flatMap(RainfallPOJO obj, Collector coll)
 throws Exception {
 DateFormat formatter = new
 SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS");
 try {
Date time = formatter.parse( obj.getTime().replaceAll(
 "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
if(time.after(this.startTime) && time.before(this.endTime)){
 coll.collect(obj);
 }
 } catch(Exception e){
 e.printStackTrace();
 }
 }
 }

 My Base class is:

 public class CullTimeBase {

   protected int numerator;
 protected int denominator;
 protected String timeDataName;
 protected Date startTime;
 protected Date endTime;
 protected int interval;
 protected String timeUnit;
 public CullTimeBase(int num, int den, String time_data_name, String
 start_time, String end_time, int interv, String time_unit){
 numerator = num;
 denominator = den;
 timeDataName = time_data_name;
 interval = interv;
 timeUnit = time_unit;
 DateFormat formatter = new
 SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS");
 try {
 startTime = formatter.parse(start_time);
 endTime = formatter.parse(end_time);
 } catch (ParseException e) {
 e.printStackTrace();
 }
 }

 It works only if I declare all variables and methods in only one class,
 but so I should repeat the same properties in more classes. I would only
 specialize each der

Re: Inheritance and FlatMap with custom POJO

2015-09-16 Thread Chiwan Park
Hi Giacomo,

You should set your field as public. If you are set your field as private or 
protected, the class must provide getter and setter to be treated as POJO.

Maybe the documentation in homepage [1] would be helpful.

Regards,
Chiwan Park

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos

> On Sep 16, 2015, at 11:25 PM, Giacomo Licari  wrote:
> 
> I run it only implementing java.io.Serializable without disabling the closure 
> cleaner.
> 
> Another question I have is about POJO classes. 
> I would also create a base POJO class with some common proprerties, and then 
> extend it in new classes. These classes are used to convert a CSV into a 
> dataset of POJO objects (of derived class type). 
> 
> In the following example, I create a DataSet of TwitterPOJO, which extends a 
> Base class, adding the new property "tweet".
> 
> DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
>.pojoType(TwitterPOJO.class, "table", "time", "tweet"); 
> 
> I obtain this error:
> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class 
> com.Flink.POJO.TwitterPOJO is not a valid POJO type
> Exception in thread "main" java.lang.ClassCastException: 
> org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
> org.apache.flink.api.java.typeutils.PojoTypeInfo
> 
> Greetings,
> G.L.
> 
> On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen  wrote:
> Could you also try the other variant (disabeling the closure cleaner)? I 
> would be curious if this behavior is expected Java Serialization behavior, or 
> whether our pre-processing code is causing it.
> 
> Greetings,
> Stephan
> 
> 
> On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari  
> wrote:
> Thank you Martin and Stephan for your help.
> I tried directly to implement java.io.Serializable in Base class and it 
> worked perfectly!
> 
> Now I can develop more flexible and maintainable code. Thank you a lot guys.
> 
> Greetings,
> Giacomo
> 
> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen  wrote:
> Hi!
> 
> Interesting case. We use plain Java Serialization to distribute UDFs, and 
> perform additional "cleaning" of scopes, which may be causing the issue.
> 
> Can you try the following to see if any of those resolves the problem?
> 
> 1) On the environment, disable the closure cleaner (in the execution config).
> 
> 2) Let the CullTimeBase class implement java.io.Serializable.
> 
> Please let us know how it turns out!
> 
> Greetings,
> Stephan
> 
> 
> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns  
> wrote:
> Hi Giacomo,
> 
> I ran into the same issue. Seems to be coupled to the serialization mechanism 
> of UDFs. I solved it by letting the base class implement the UDF interface 
> (e.g. FlatMapFunction) and in addition make it generic (which should be 
> necessary in your example).
> 
> public [abstract] class CullTimeBase implements FlatMapFunction OUT> {
> // ...
> }
> 
> public class CullTimeRainFall extends CullTimeBase RainFallPOJO> {
> // ...
> }
> 
> This should work.
> 
> Best,
> Martin
> 
> 
> On 16.09.2015 10:41, Giacomo Licari wrote:
>> Hi guys,
>> I'm trying to create a base class which is inherited from classes 
>> implementing FlatMap method on specific POJO types.
>> 
>> It seems inheritance doesn't work, I can access this.PropertyName or 
>> super.PropertyName from flatMap method but values are always null. 
>> 
>> Here the derived class, using RainfallPOJO:
>> 
>> public class CullTimeRainfall extends CullTimeBase implements 
>> FlatMapFunction {
>> 
>>  public CullTimeRainfall(int num, int den, String time_data_name, String 
>> start_time, String end_time, int interval, String time_unit){
>>  super(num, den, time_data_name, start_time, end_time, interval, 
>> time_unit); 
>>  }   
>> 
>>  public void flatMap(RainfallPOJO obj, Collector coll) 
>> throws Exception {  
>>  DateFormat formatter = new 
>> SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS");
>>  try {
>>  Date time = formatter.parse( obj.getTime().replaceAll( 
>> "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
>>  if(time.after(this.startTime) && time.before(this.endTime)){
>> 
>>  coll.collect(obj);
>>  }   
>>  } catch(Exception e){
>>  e.printStackTrace();
>>  }   
>>  }
>>  
>> }
>> 
>> My Base class is:
>> 
>> public class CullTimeBase {
>> 
>> protected int numerator;
>> protected int denominator;
>> protected String timeDataName;   
>> protected Date startTime;
>> protected Date endTime;
>> protected int interval;
>> protected String timeUnit;
>>  
>>  public CullTimeBase(int num, int den, String time_data_name, String 
>> start_time, String end_time, int interv, String time_unit){
>>  numerato

Re: Inheritance and FlatMap with custom POJO

2015-09-16 Thread Giacomo Licari
Hi Chiwan,
I followed instructions in documentation.
I have a simple base class with some properties (all public).
Then I extend that class with a new public property (tweet in my case), I
provide also getter and setter for that property.

Now when I execute:
DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
   .pojoType(TwitterPOJO.class, "table", "time", "tweet");

I receive:
There is no field called "table" in com.Flink.POJO.TwitterPOJO

table is a field of the Base class, declared as public with also getter and
setter.

Thank you for your help.

Giacomo

On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park  wrote:

> Hi Giacomo,
>
> You should set your field as public. If you are set your field as private
> or protected, the class must provide getter and setter to be treated as
> POJO.
>
> Maybe the documentation in homepage [1] would be helpful.
>
> Regards,
> Chiwan Park
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos
>
> > On Sep 16, 2015, at 11:25 PM, Giacomo Licari 
> wrote:
> >
> > I run it only implementing java.io.Serializable without disabling the
> closure cleaner.
> >
> > Another question I have is about POJO classes.
> > I would also create a base POJO class with some common proprerties, and
> then extend it in new classes. These classes are used to convert a CSV into
> a dataset of POJO objects (of derived class type).
> >
> > In the following example, I create a DataSet of TwitterPOJO, which
> extends a Base class, adding the new property "tweet".
> >
> > DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
> >.pojoType(TwitterPOJO.class, "table", "time",
> "tweet");
> >
> > I obtain this error:
> > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
> com.Flink.POJO.TwitterPOJO is not a valid POJO type
> > Exception in thread "main" java.lang.ClassCastException:
> org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to
> org.apache.flink.api.java.typeutils.PojoTypeInfo
> >
> > Greetings,
> > G.L.
> >
> > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen  wrote:
> > Could you also try the other variant (disabeling the closure cleaner)? I
> would be curious if this behavior is expected Java Serialization behavior,
> or whether our pre-processing code is causing it.
> >
> > Greetings,
> > Stephan
> >
> >
> > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <
> giacomo.lic...@gmail.com> wrote:
> > Thank you Martin and Stephan for your help.
> > I tried directly to implement java.io.Serializable in Base class and it
> worked perfectly!
> >
> > Now I can develop more flexible and maintainable code. Thank you a lot
> guys.
> >
> > Greetings,
> > Giacomo
> >
> > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen  wrote:
> > Hi!
> >
> > Interesting case. We use plain Java Serialization to distribute UDFs,
> and perform additional "cleaning" of scopes, which may be causing the issue.
> >
> > Can you try the following to see if any of those resolves the problem?
> >
> > 1) On the environment, disable the closure cleaner (in the execution
> config).
> >
> > 2) Let the CullTimeBase class implement java.io.Serializable.
> >
> > Please let us know how it turns out!
> >
> > Greetings,
> > Stephan
> >
> >
> > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <
> m.jungha...@mailbox.org> wrote:
> > Hi Giacomo,
> >
> > I ran into the same issue. Seems to be coupled to the serialization
> mechanism of UDFs. I solved it by letting the base class implement the UDF
> interface (e.g. FlatMapFunction) and in addition make it generic (which
> should be necessary in your example).
> >
> > public [abstract] class CullTimeBase implements
> FlatMapFunction {
> > // ...
> > }
> >
> > public class CullTimeRainFall extends CullTimeBase RainFallPOJO> {
> > // ...
> > }
> >
> > This should work.
> >
> > Best,
> > Martin
> >
> >
> > On 16.09.2015 10:41, Giacomo Licari wrote:
> >> Hi guys,
> >> I'm trying to create a base class which is inherited from classes
> implementing FlatMap method on specific POJO types.
> >>
> >> It seems inheritance doesn't work, I can access this.PropertyName or
> super.PropertyName from flatMap method but values are always null.
> >>
> >> Here the derived class, using RainfallPOJO:
> >>
> >> public class CullTimeRainfall extends CullTimeBase implements
> FlatMapFunction {
> >>
> >>  public CullTimeRainfall(int num, int den, String time_data_name,
> String start_time, String end_time, int interval, String time_unit){
> >>  super(num, den, time_data_name, start_time, end_time,
> interval, time_unit);
> >>  }
> >>
> >>  public void flatMap(RainfallPOJO obj, Collector
> coll) throws Exception {
> >>  DateFormat formatter = new
> SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS");
> >>  try {
> >>  Date time = formatter.parse( obj.getTime().replaceAll(
> "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
> >>  

Re: Inheritance and FlatMap with custom POJO

2015-09-16 Thread Chiwan Park
Hi Giacomo,

Did you create constructors without arguments in both base class and derived 
class?
If you do, it seems like a bug.

Regards,
Chiwan Park

> On Sep 17, 2015, at 12:04 AM, Giacomo Licari  wrote:
> 
> Hi Chiwan,
> I followed instructions in documentation.
> I have a simple base class with some properties (all public).
> Then I extend that class with a new public property (tweet in my case), I 
> provide also getter and setter for that property.
> 
> Now when I execute:
> DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
>.pojoType(TwitterPOJO.class, "table", "time", "tweet");
> 
> I receive:
> There is no field called "table" in com.Flink.POJO.TwitterPOJO
> 
> table is a field of the Base class, declared as public with also getter and 
> setter.
> 
> Thank you for your help.
> 
> Giacomo
> 
> On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park  wrote:
> Hi Giacomo,
> 
> You should set your field as public. If you are set your field as private or 
> protected, the class must provide getter and setter to be treated as POJO.
> 
> Maybe the documentation in homepage [1] would be helpful.
> 
> Regards,
> Chiwan Park
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos
> 
> > On Sep 16, 2015, at 11:25 PM, Giacomo Licari  
> > wrote:
> >
> > I run it only implementing java.io.Serializable without disabling the 
> > closure cleaner.
> >
> > Another question I have is about POJO classes.
> > I would also create a base POJO class with some common proprerties, and 
> > then extend it in new classes. These classes are used to convert a CSV into 
> > a dataset of POJO objects (of derived class type).
> >
> > In the following example, I create a DataSet of TwitterPOJO, which extends 
> > a Base class, adding the new property "tweet".
> >
> > DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
> >.pojoType(TwitterPOJO.class, "table", "time", "tweet");
> >
> > I obtain this error:
> > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class 
> > com.Flink.POJO.TwitterPOJO is not a valid POJO type
> > Exception in thread "main" java.lang.ClassCastException: 
> > org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
> > org.apache.flink.api.java.typeutils.PojoTypeInfo
> >
> > Greetings,
> > G.L.
> >
> > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen  wrote:
> > Could you also try the other variant (disabeling the closure cleaner)? I 
> > would be curious if this behavior is expected Java Serialization behavior, 
> > or whether our pre-processing code is causing it.
> >
> > Greetings,
> > Stephan
> >
> >
> > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari  
> > wrote:
> > Thank you Martin and Stephan for your help.
> > I tried directly to implement java.io.Serializable in Base class and it 
> > worked perfectly!
> >
> > Now I can develop more flexible and maintainable code. Thank you a lot guys.
> >
> > Greetings,
> > Giacomo
> >
> > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen  wrote:
> > Hi!
> >
> > Interesting case. We use plain Java Serialization to distribute UDFs, and 
> > perform additional "cleaning" of scopes, which may be causing the issue.
> >
> > Can you try the following to see if any of those resolves the problem?
> >
> > 1) On the environment, disable the closure cleaner (in the execution 
> > config).
> >
> > 2) Let the CullTimeBase class implement java.io.Serializable.
> >
> > Please let us know how it turns out!
> >
> > Greetings,
> > Stephan
> >
> >
> > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns  
> > wrote:
> > Hi Giacomo,
> >
> > I ran into the same issue. Seems to be coupled to the serialization 
> > mechanism of UDFs. I solved it by letting the base class implement the UDF 
> > interface (e.g. FlatMapFunction) and in addition make it generic (which 
> > should be necessary in your example).
> >
> > public [abstract] class CullTimeBase implements 
> > FlatMapFunction {
> > // ...
> > }
> >
> > public class CullTimeRainFall extends CullTimeBase > RainFallPOJO> {
> > // ...
> > }
> >
> > This should work.
> >
> > Best,
> > Martin
> >
> >
> > On 16.09.2015 10:41, Giacomo Licari wrote:
> >> Hi guys,
> >> I'm trying to create a base class which is inherited from classes 
> >> implementing FlatMap method on specific POJO types.
> >>
> >> It seems inheritance doesn't work, I can access this.PropertyName or 
> >> super.PropertyName from flatMap method but values are always null.
> >>
> >> Here the derived class, using RainfallPOJO:
> >>
> >> public class CullTimeRainfall extends CullTimeBase implements 
> >> FlatMapFunction {
> >>
> >>  public CullTimeRainfall(int num, int den, String time_data_name, 
> >> String start_time, String end_time, int interval, String time_unit){
> >>  super(num, den, time_data_name, start_time, end_time, 
> >> interval, time_unit);
> >>  }
> >>
> >>  public void flatMap(RainfallPOJO obj, Collector coll) 
> >> th

Re: Inheritance and FlatMap with custom POJO

2015-09-16 Thread Giacomo Licari
Yes I did.

if anyone has a bypass solution, let us know.

Regards,
Giacomo Licari

On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park  wrote:

> Hi Giacomo,
>
> Did you create constructors without arguments in both base class and
> derived class?
> If you do, it seems like a bug.
>
> Regards,
> Chiwan Park
>
> > On Sep 17, 2015, at 12:04 AM, Giacomo Licari 
> wrote:
> >
> > Hi Chiwan,
> > I followed instructions in documentation.
> > I have a simple base class with some properties (all public).
> > Then I extend that class with a new public property (tweet in my case),
> I provide also getter and setter for that property.
> >
> > Now when I execute:
> > DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
> >.pojoType(TwitterPOJO.class, "table", "time",
> "tweet");
> >
> > I receive:
> > There is no field called "table" in com.Flink.POJO.TwitterPOJO
> >
> > table is a field of the Base class, declared as public with also getter
> and setter.
> >
> > Thank you for your help.
> >
> > Giacomo
> >
> > On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park 
> wrote:
> > Hi Giacomo,
> >
> > You should set your field as public. If you are set your field as
> private or protected, the class must provide getter and setter to be
> treated as POJO.
> >
> > Maybe the documentation in homepage [1] would be helpful.
> >
> > Regards,
> > Chiwan Park
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos
> >
> > > On Sep 16, 2015, at 11:25 PM, Giacomo Licari 
> wrote:
> > >
> > > I run it only implementing java.io.Serializable without disabling the
> closure cleaner.
> > >
> > > Another question I have is about POJO classes.
> > > I would also create a base POJO class with some common proprerties,
> and then extend it in new classes. These classes are used to convert a CSV
> into a dataset of POJO objects (of derived class type).
> > >
> > > In the following example, I create a DataSet of TwitterPOJO, which
> extends a Base class, adding the new property "tweet".
> > >
> > > DataSet ds_twitter =
> env.readCsvFile("file://"+path_twitter)
> > >.pojoType(TwitterPOJO.class, "table", "time",
> "tweet");
> > >
> > > I obtain this error:
> > > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
> com.Flink.POJO.TwitterPOJO is not a valid POJO type
> > > Exception in thread "main" java.lang.ClassCastException:
> org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to
> org.apache.flink.api.java.typeutils.PojoTypeInfo
> > >
> > > Greetings,
> > > G.L.
> > >
> > > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen 
> wrote:
> > > Could you also try the other variant (disabeling the closure cleaner)?
> I would be curious if this behavior is expected Java Serialization
> behavior, or whether our pre-processing code is causing it.
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <
> giacomo.lic...@gmail.com> wrote:
> > > Thank you Martin and Stephan for your help.
> > > I tried directly to implement java.io.Serializable in Base class and
> it worked perfectly!
> > >
> > > Now I can develop more flexible and maintainable code. Thank you a lot
> guys.
> > >
> > > Greetings,
> > > Giacomo
> > >
> > > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen 
> wrote:
> > > Hi!
> > >
> > > Interesting case. We use plain Java Serialization to distribute UDFs,
> and perform additional "cleaning" of scopes, which may be causing the issue.
> > >
> > > Can you try the following to see if any of those resolves the problem?
> > >
> > > 1) On the environment, disable the closure cleaner (in the execution
> config).
> > >
> > > 2) Let the CullTimeBase class implement java.io.Serializable.
> > >
> > > Please let us know how it turns out!
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <
> m.jungha...@mailbox.org> wrote:
> > > Hi Giacomo,
> > >
> > > I ran into the same issue. Seems to be coupled to the serialization
> mechanism of UDFs. I solved it by letting the base class implement the UDF
> interface (e.g. FlatMapFunction) and in addition make it generic (which
> should be necessary in your example).
> > >
> > > public [abstract] class CullTimeBase implements
> FlatMapFunction {
> > > // ...
> > > }
> > >
> > > public class CullTimeRainFall extends CullTimeBase RainFallPOJO> {
> > > // ...
> > > }
> > >
> > > This should work.
> > >
> > > Best,
> > > Martin
> > >
> > >
> > > On 16.09.2015 10:41, Giacomo Licari wrote:
> > >> Hi guys,
> > >> I'm trying to create a base class which is inherited from classes
> implementing FlatMap method on specific POJO types.
> > >>
> > >> It seems inheritance doesn't work, I can access this.PropertyName or
> super.PropertyName from flatMap method but values are always null.
> > >>
> > >> Here the derived class, using RainfallPOJO:
> > >>
> > >> public class CullTimeRainfall extends CullTimeBase implements
> Fl

Re: Inheritance and FlatMap with custom POJO

2015-09-16 Thread Chiwan Park
It seems like a bug of CsvInputFormat. I succeed in reproducing in my local 
machine.
I will create a JIRA issue for this and submit a patch to fix it.

Which version of Flink are used?

Regards,
Chiwan Park

> On Sep 17, 2015, at 12:20 AM, Giacomo Licari  wrote:
> 
> Yes I did.
> 
> if anyone has a bypass solution, let us know.
> 
> Regards,
> Giacomo Licari
> 
> On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park  wrote:
> Hi Giacomo,
> 
> Did you create constructors without arguments in both base class and derived 
> class?
> If you do, it seems like a bug.
> 
> Regards,
> Chiwan Park
> 
> > On Sep 17, 2015, at 12:04 AM, Giacomo Licari  
> > wrote:
> >
> > Hi Chiwan,
> > I followed instructions in documentation.
> > I have a simple base class with some properties (all public).
> > Then I extend that class with a new public property (tweet in my case), I 
> > provide also getter and setter for that property.
> >
> > Now when I execute:
> > DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
> >.pojoType(TwitterPOJO.class, "table", "time", "tweet");
> >
> > I receive:
> > There is no field called "table" in com.Flink.POJO.TwitterPOJO
> >
> > table is a field of the Base class, declared as public with also getter and 
> > setter.
> >
> > Thank you for your help.
> >
> > Giacomo
> >
> > On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park  wrote:
> > Hi Giacomo,
> >
> > You should set your field as public. If you are set your field as private 
> > or protected, the class must provide getter and setter to be treated as 
> > POJO.
> >
> > Maybe the documentation in homepage [1] would be helpful.
> >
> > Regards,
> > Chiwan Park
> >
> > [1] 
> > https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos
> >
> > > On Sep 16, 2015, at 11:25 PM, Giacomo Licari  
> > > wrote:
> > >
> > > I run it only implementing java.io.Serializable without disabling the 
> > > closure cleaner.
> > >
> > > Another question I have is about POJO classes.
> > > I would also create a base POJO class with some common proprerties, and 
> > > then extend it in new classes. These classes are used to convert a CSV 
> > > into a dataset of POJO objects (of derived class type).
> > >
> > > In the following example, I create a DataSet of TwitterPOJO, which 
> > > extends a Base class, adding the new property "tweet".
> > >
> > > DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
> > >.pojoType(TwitterPOJO.class, "table", "time", "tweet");
> > >
> > > I obtain this error:
> > > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class 
> > > com.Flink.POJO.TwitterPOJO is not a valid POJO type
> > > Exception in thread "main" java.lang.ClassCastException: 
> > > org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
> > > org.apache.flink.api.java.typeutils.PojoTypeInfo
> > >
> > > Greetings,
> > > G.L.
> > >
> > > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen  wrote:
> > > Could you also try the other variant (disabeling the closure cleaner)? I 
> > > would be curious if this behavior is expected Java Serialization 
> > > behavior, or whether our pre-processing code is causing it.
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari 
> > >  wrote:
> > > Thank you Martin and Stephan for your help.
> > > I tried directly to implement java.io.Serializable in Base class and it 
> > > worked perfectly!
> > >
> > > Now I can develop more flexible and maintainable code. Thank you a lot 
> > > guys.
> > >
> > > Greetings,
> > > Giacomo
> > >
> > > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen  wrote:
> > > Hi!
> > >
> > > Interesting case. We use plain Java Serialization to distribute UDFs, and 
> > > perform additional "cleaning" of scopes, which may be causing the issue.
> > >
> > > Can you try the following to see if any of those resolves the problem?
> > >
> > > 1) On the environment, disable the closure cleaner (in the execution 
> > > config).
> > >
> > > 2) Let the CullTimeBase class implement java.io.Serializable.
> > >
> > > Please let us know how it turns out!
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns 
> > >  wrote:
> > > Hi Giacomo,
> > >
> > > I ran into the same issue. Seems to be coupled to the serialization 
> > > mechanism of UDFs. I solved it by letting the base class implement the 
> > > UDF interface (e.g. FlatMapFunction) and in addition make it generic 
> > > (which should be necessary in your example).
> > >
> > > public [abstract] class CullTimeBase implements 
> > > FlatMapFunction {
> > > // ...
> > > }
> > >
> > > public class CullTimeRainFall extends CullTimeBase > > RainFallPOJO> {
> > > // ...
> > > }
> > >
> > > This should work.
> > >
> > > Best,
> > > Martin
> > >
> > >
> > > On 16.09.2015 10:41, Giacomo Licari wrote:
> > >> Hi guys,
> > >> I'm trying to create a base class which is inherited from classes 

Re: Inheritance and FlatMap with custom POJO

2015-09-16 Thread Chiwan Park
I created a JIRA issue [1]. After FLINK-2637 [2][3] is resolved, I’ll submit a 
patch to solve this.
Currently, there is no way to use derived class with CSV input.

Thank you for reporting.

[1] https://issues.apache.org/jira/browse/FLINK-2690
[2] https://issues.apache.org/jira/browse/FLINK-2637
[3] https://github.com/apache/flink/pull/1134

Regards,
Chiwan Park

> On Sep 17, 2015, at 1:33 AM, Chiwan Park  wrote:
> 
> It seems like a bug of CsvInputFormat. I succeed in reproducing in my local 
> machine.
> I will create a JIRA issue for this and submit a patch to fix it.
> 
> Which version of Flink are used?
> 
> Regards,
> Chiwan Park
> 
>> On Sep 17, 2015, at 12:20 AM, Giacomo Licari  
>> wrote:
>> 
>> Yes I did.
>> 
>> if anyone has a bypass solution, let us know.
>> 
>> Regards,
>> Giacomo Licari
>> 
>> On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park  wrote:
>> Hi Giacomo,
>> 
>> Did you create constructors without arguments in both base class and derived 
>> class?
>> If you do, it seems like a bug.
>> 
>> Regards,
>> Chiwan Park
>> 
>>> On Sep 17, 2015, at 12:04 AM, Giacomo Licari  
>>> wrote:
>>> 
>>> Hi Chiwan,
>>> I followed instructions in documentation.
>>> I have a simple base class with some properties (all public).
>>> Then I extend that class with a new public property (tweet in my case), I 
>>> provide also getter and setter for that property.
>>> 
>>> Now when I execute:
>>> DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
>>>   .pojoType(TwitterPOJO.class, "table", "time", "tweet");
>>> 
>>> I receive:
>>> There is no field called "table" in com.Flink.POJO.TwitterPOJO
>>> 
>>> table is a field of the Base class, declared as public with also getter and 
>>> setter.
>>> 
>>> Thank you for your help.
>>> 
>>> Giacomo
>>> 
>>> On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park  wrote:
>>> Hi Giacomo,
>>> 
>>> You should set your field as public. If you are set your field as private 
>>> or protected, the class must provide getter and setter to be treated as 
>>> POJO.
>>> 
>>> Maybe the documentation in homepage [1] would be helpful.
>>> 
>>> Regards,
>>> Chiwan Park
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos
>>> 
 On Sep 16, 2015, at 11:25 PM, Giacomo Licari  
 wrote:
 
 I run it only implementing java.io.Serializable without disabling the 
 closure cleaner.
 
 Another question I have is about POJO classes.
 I would also create a base POJO class with some common proprerties, and 
 then extend it in new classes. These classes are used to convert a CSV 
 into a dataset of POJO objects (of derived class type).
 
 In the following example, I create a DataSet of TwitterPOJO, which extends 
 a Base class, adding the new property "tweet".
 
 DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
   .pojoType(TwitterPOJO.class, "table", "time", "tweet");
 
 I obtain this error:
 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class 
 com.Flink.POJO.TwitterPOJO is not a valid POJO type
 Exception in thread "main" java.lang.ClassCastException: 
 org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
 org.apache.flink.api.java.typeutils.PojoTypeInfo
 
 Greetings,
 G.L.
 
 On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen  wrote:
 Could you also try the other variant (disabeling the closure cleaner)? I 
 would be curious if this behavior is expected Java Serialization behavior, 
 or whether our pre-processing code is causing it.
 
 Greetings,
 Stephan
 
 
 On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari  
 wrote:
 Thank you Martin and Stephan for your help.
 I tried directly to implement java.io.Serializable in Base class and it 
 worked perfectly!
 
 Now I can develop more flexible and maintainable code. Thank you a lot 
 guys.
 
 Greetings,
 Giacomo
 
 On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen  wrote:
 Hi!
 
 Interesting case. We use plain Java Serialization to distribute UDFs, and 
 perform additional "cleaning" of scopes, which may be causing the issue.
 
 Can you try the following to see if any of those resolves the problem?
 
 1) On the environment, disable the closure cleaner (in the execution 
 config).
 
 2) Let the CullTimeBase class implement java.io.Serializable.
 
 Please let us know how it turns out!
 
 Greetings,
 Stephan
 
 
 On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns 
  wrote:
 Hi Giacomo,
 
 I ran into the same issue. Seems to be coupled to the serialization 
 mechanism of UDFs. I solved it by letting the base class implement the UDF 
 interface (e.g. FlatMapFunction) and in addition make it generic (which 
 should be necessary in your example).
>>>

Re: Inheritance and FlatMap with custom POJO

2015-09-16 Thread Giacomo Licari
Hi Chiwan,
I'm using Flink 0.9. 1

Cheers,
Giacomo
I created a JIRA issue [1]. After FLINK-2637 [2][3] is resolved, I’ll
submit a patch to solve this.
Currently, there is no way to use derived class with CSV input.

Thank you for reporting.

[1] https://issues.apache.org/jira/browse/FLINK-2690
[2] https://issues.apache.org/jira/browse/FLINK-2637
[3] https://github.com/apache/flink/pull/1134

Regards,
Chiwan Park

> On Sep 17, 2015, at 1:33 AM, Chiwan Park  wrote:
>
> It seems like a bug of CsvInputFormat. I succeed in reproducing in my
local machine.
> I will create a JIRA issue for this and submit a patch to fix it.
>
> Which version of Flink are used?
>
> Regards,
> Chiwan Park
>
>> On Sep 17, 2015, at 12:20 AM, Giacomo Licari 
wrote:
>>
>> Yes I did.
>>
>> if anyone has a bypass solution, let us know.
>>
>> Regards,
>> Giacomo Licari
>>
>> On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park 
wrote:
>> Hi Giacomo,
>>
>> Did you create constructors without arguments in both base class and
derived class?
>> If you do, it seems like a bug.
>>
>> Regards,
>> Chiwan Park
>>
>>> On Sep 17, 2015, at 12:04 AM, Giacomo Licari 
wrote:
>>>
>>> Hi Chiwan,
>>> I followed instructions in documentation.
>>> I have a simple base class with some properties (all public).
>>> Then I extend that class with a new public property (tweet in my case),
I provide also getter and setter for that property.
>>>
>>> Now when I execute:
>>> DataSet ds_twitter =
env.readCsvFile("file://"+path_twitter)
>>>   .pojoType(TwitterPOJO.class, "table", "time",
"tweet");
>>>
>>> I receive:
>>> There is no field called "table" in com.Flink.POJO.TwitterPOJO
>>>
>>> table is a field of the Base class, declared as public with also getter
and setter.
>>>
>>> Thank you for your help.
>>>
>>> Giacomo
>>>
>>> On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park 
wrote:
>>> Hi Giacomo,
>>>
>>> You should set your field as public. If you are set your field as
private or protected, the class must provide getter and setter to be
treated as POJO.
>>>
>>> Maybe the documentation in homepage [1] would be helpful.
>>>
>>> Regards,
>>> Chiwan Park
>>>
>>> [1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos
>>>
 On Sep 16, 2015, at 11:25 PM, Giacomo Licari 
wrote:

 I run it only implementing java.io.Serializable without disabling the
closure cleaner.

 Another question I have is about POJO classes.
 I would also create a base POJO class with some common proprerties,
and then extend it in new classes. These classes are used to convert a CSV
into a dataset of POJO objects (of derived class type).

 In the following example, I create a DataSet of TwitterPOJO, which
extends a Base class, adding the new property "tweet".

 DataSet ds_twitter =
env.readCsvFile("file://"+path_twitter)
   .pojoType(TwitterPOJO.class, "table", "time",
"tweet");

 I obtain this error:
 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
com.Flink.POJO.TwitterPOJO is not a valid POJO type
 Exception in thread "main" java.lang.ClassCastException:
org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to
org.apache.flink.api.java.typeutils.PojoTypeInfo

 Greetings,
 G.L.

 On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen  wrote:
 Could you also try the other variant (disabeling the closure cleaner)?
I would be curious if this behavior is expected Java Serialization
behavior, or whether our pre-processing code is causing it.

 Greetings,
 Stephan


 On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <
giacomo.lic...@gmail.com> wrote:
 Thank you Martin and Stephan for your help.
 I tried directly to implement java.io.Serializable in Base class and
it worked perfectly!

 Now I can develop more flexible and maintainable code. Thank you a lot
guys.

 Greetings,
 Giacomo

 On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen  wrote:
 Hi!

 Interesting case. We use plain Java Serialization to distribute UDFs,
and perform additional "cleaning" of scopes, which may be causing the issue.

 Can you try the following to see if any of those resolves the problem?

 1) On the environment, disable the closure cleaner (in the execution
config).

 2) Let the CullTimeBase class implement java.io.Serializable.

 Please let us know how it turns out!

 Greetings,
 Stephan


 On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <
m.jungha...@mailbox.org> wrote:
 Hi Giacomo,

 I ran into the same issue. Seems to be coupled to the serialization
mechanism of UDFs. I solved it by letting the base class implement the UDF
interface (e.g. FlatMapFunction) and in addition make it generic (which
should be necessary in your example).

 public [abstract] class CullTimeBase implements
FlatMapFunction {
 // ...
 }