[ 
https://issues.apache.org/jira/browse/FLINK-1978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14530197#comment-14530197
 ] 

Flavio Pompermaier edited comment on FLINK-1978 at 5/6/15 9:10 AM:
-------------------------------------------------------------------

The error can be reproduced with the following code:

public class PojoDateError {

                @SuppressWarnings("unused")
                private static final Logger LOG = 
LoggerFactory.getLogger(PojoDateError.class);
                

                // 
*************************************************************************
                // PROGRAM
                // 
*************************************************************************

                public static void main(String[] args) throws Exception {
                        
                        // set up the execution environment
                        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                        Collection<Tuple2<String,Person>> peopleColl = new 
ArrayList<>();
                        peopleColl.add(new Tuple2<String,Person> ("1",new 
Person("1","test-person1",new Date())));
                        peopleColl.add(new Tuple2<String,Person> ("2",new 
Person("2","test-person2",null)));
                        peopleColl.add(new Tuple2<String,Person> ("3",new 
Person("3","test-person3",new Date())));
                        DataSet<Tuple2<String,Person>> people =  
env.fromCollection(peopleColl);
                        DataSet<Tuple2<String, Vehicle>> vehicles =  
env.fromCollection(Arrays.asList(new Tuple2<String,Vehicle> ("AA",new 
Vehicle("1","test-vehicle",new Date()))));
                        
                        DataSet<Tuple2<String,String>> vehiclesOwnersRel =  
env.fromCollection(Arrays.asList(
                                        new Tuple2<String,String>("AA","1"), 
                                        new Tuple2<String,String>("AA","2"),
                                        new Tuple2<String,String>("AA","3")
                                        ));
                        
                        DataSet<Tuple2<String,Person>> vehiclesOwners = 
vehiclesOwnersRel
                                        
.join(people).where(1).equalTo(0).with(new 
JoinFunction<Tuple2<String,String>,Tuple2<String, 
Person>,Tuple2<String,Person>>(){
                                                
                                                private Tuple2<String, Person> 
reuse = new Tuple2<>();
                                                @Override
                                                public Tuple2<String, Person> 
join(
                                                                Tuple2<String, 
String> rel, Tuple2<String, Person> p)
                                                                throws 
Exception {
                                                        reuse.f0 = rel.f0;
                                                        reuse.f1 = p.f1;
                                                        return reuse;
                                                }
                                                
                                        });
                        
vehicles.coGroup(vehiclesOwners).where(0).equalTo(0).with(new 
CoGroupFunction<Tuple2<String, 
Vehicle>,Tuple2<String,Person>,Tuple2<String,Vehicle>>(){
                                private Tuple2<String,Vehicle> reuse = new 
Tuple2<>();
                                @Override
                                public void coGroup(Iterable<Tuple2<String, 
Vehicle>> vIt,
                                                Iterable<Tuple2<String, 
Person>> pIt,
                                                
Collector<Tuple2<String,Vehicle>> out) throws Exception {
                                        for (Tuple2<String, Vehicle> off : vIt) 
{
                                                List<Person> owners = 
off.f1.owners;
                                                for (Tuple2<String, Person> e : 
pIt) {
                                                        if(owners == null){
                                                                owners = new 
ArrayList<>();
                                                                off.f1.owners = 
owners;
                                                        }
                                                        owners.add(e.f1);
                                                }
                                                reuse.f0 = off.f0;
                                                reuse.f1 = off.f1;
                                                out.collect(reuse);
                                                break;
                                        }
                                }

                                
                                
                        }).print();
                        
                        env.execute();
                }
                
      static class Person {
        
        public String id;
        public String name;
        public Date birthDate;
        
        public Person(){}
        public Person(String id, String name, Date birthDate) {
                this.id = id;
                this.name = name;
                this.birthDate = birthDate;
        }
        public Person(String id) {
                this.id = id;
        }
        @Override
        public String toString() {
                return this.id;
        }
        
}

static class Vehicle {

            public String id;
            public String plate;
                public List<Person> owners;
            public Date registrationDate;

            public Vehicle(){}
            public Vehicle(String id, String plate, Date registrationDate) {
                        this.id = id;
                        this.plate = plate;
                        this.registrationDate = registrationDate;
                }
            @Override
                public String toString() {
                        return this.plate;
                }
}

}






was (Author: f.pompermaier):
The error can be reproduced with the following code:

public class PojoDateError {

                @SuppressWarnings("unused")
                private static final Logger LOG = 
LoggerFactory.getLogger(AciEsIndexer.class);
                

                // 
*************************************************************************
                // PROGRAM
                // 
*************************************************************************

                public static void main(String[] args) throws Exception {
                        
                        // set up the execution environment
                        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                        Collection<Tuple2<String,Person>> peopleColl = new 
ArrayList<>();
                        peopleColl.add(new Tuple2<String,Person> ("1",new 
Person("1","test-person1",new Date())));
                        peopleColl.add(new Tuple2<String,Person> ("2",new 
Person("2","test-person2",null)));
                        peopleColl.add(new Tuple2<String,Person> ("3",new 
Person("3","test-person3",new Date())));
                        DataSet<Tuple2<String,Person>> people =  
env.fromCollection(peopleColl);
                        DataSet<Tuple2<String, Vehicle>> vehicles =  
env.fromCollection(Arrays.asList(new Tuple2<String,Vehicle> ("AA",new 
Vehicle("1","test-vehicle",new Date()))));
                        
                        DataSet<Tuple2<String,String>> vehiclesOwnersRel =  
env.fromCollection(Arrays.asList(
                                        new Tuple2<String,String>("AA","1"), 
                                        new Tuple2<String,String>("AA","2"),
                                        new Tuple2<String,String>("AA","3")
                                        ));
                        
                        DataSet<Tuple2<String,Person>> vehiclesOwners = 
vehiclesOwnersRel
                                        
.join(people).where(1).equalTo(0).with(new 
JoinFunction<Tuple2<String,String>,Tuple2<String, 
Person>,Tuple2<String,Person>>(){
                                                
                                                private Tuple2<String, Person> 
reuse = new Tuple2<>();
                                                @Override
                                                public Tuple2<String, Person> 
join(
                                                                Tuple2<String, 
String> rel, Tuple2<String, Person> p)
                                                                throws 
Exception {
                                                        reuse.f0 = rel.f0;
                                                        reuse.f1 = p.f1;
                                                        return reuse;
                                                }
                                                
                                        });
                        
vehicles.coGroup(vehiclesOwners).where(0).equalTo(0).with(new 
CoGroupFunction<Tuple2<String, 
Vehicle>,Tuple2<String,Person>,Tuple2<String,Vehicle>>(){
                                private Tuple2<String,Vehicle> reuse = new 
Tuple2<>();
                                @Override
                                public void coGroup(Iterable<Tuple2<String, 
Vehicle>> vIt,
                                                Iterable<Tuple2<String, 
Person>> pIt,
                                                
Collector<Tuple2<String,Vehicle>> out) throws Exception {
                                        for (Tuple2<String, Vehicle> off : vIt) 
{
                                                List<Person> owners = 
off.f1.owners;
                                                for (Tuple2<String, Person> e : 
pIt) {
                                                        if(owners == null){
                                                                owners = new 
ArrayList<>();
                                                                off.f1.owners = 
owners;
                                                        }
                                                        owners.add(e.f1);
                                                }
                                                reuse.f0 = off.f0;
                                                reuse.f1 = off.f1;
                                                out.collect(reuse);
                                                break;
                                        }
                                }

                                
                                
                        }).print();
                        
                        env.execute();
                }
                
      static class Person {
        
        public String id;
        public String name;
        public Date birthDate;
        
        public Person(){}
        public Person(String id, String name, Date birthDate) {
                this.id = id;
                this.name = name;
                this.birthDate = birthDate;
        }
        public Person(String id) {
                this.id = id;
        }
        @Override
        public String toString() {
                return this.id;
        }
        
}

static class Vehicle {

            public String id;
            public String plate;
                public List<Person> owners;
            public Date registrationDate;

            public Vehicle(){}
            public Vehicle(String id, String plate, Date registrationDate) {
                        this.id = id;
                        this.plate = plate;
                        this.registrationDate = registrationDate;
                }
            @Override
                public String toString() {
                        return this.plate;
                }
}

}





> POJO serialization NPE
> ----------------------
>
>                 Key: FLINK-1978
>                 URL: https://issues.apache.org/jira/browse/FLINK-1978
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 0.9
>            Reporter: Flavio Pompermaier
>
> NullPointer on serialization of a Date field



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to