Hi all,
I am using Flink 1.3.1 and I have found a strange behavior on running the following logic:

1. Read data from file and store into DataSet<POJO>
2. Split dataset in two, by checking if "field1" of POJOs is empty or
   not, so that the first dataset contains only elements with non empty
   "field1", and the second dataset will contain the other elements.
3. Each dataset is then grouped by, one by "field1" and other by
   another field, and subsequently reduced.
4. The 2 datasets are merged together by union.
5. The final dataset is written as json.

What I was expected, from output, was to find only one element with a specific value of "field1" because:

1. Reducing the first dataset grouped by "field1" should generate only
   one element with a specific value of "field1".
2. The second dataset should contain only elements with empty "field1".
3. Making an union of them should not duplicate any record.

This does not happen. When i read the generated jsons i see some duplicate (non empty) values of "field1". Strangely this does not happen when the union between the two datasets is not computed. In this case the first dataset produces elements only with distinct values of "field1", while second dataset produces only records with empty field "value1".

Debugging the code, it seems that the map function used to convent the last merged dataset into json strings starts before the reduction functions terminates. This seems to produce duplicates.
Here is my pseudocode:

/DataSet<POJO> subjects = read from csv...//
//
//DataSet<POJO> subjectsWithCondition = subjects.filter(new FilterFunction<POJO>()//{//
//    @Override//
//    public boolean filter(POJO subject) throws Exception {//
//        return subject.getField("field1") != "";//
//    }//
//}).groupBy("field1").reduce(new ReduceFunction<P>() {//
//    @Override//
//    public Soggetto reduce(POJO subject1, POJO subject2) {//
//        POJO ret = subject1;//
//        return ret;//
//    }//
//});//
//
//DataSet<POJO> subjectsWithoutCondition = subjects.filter(new FilterFunction<POJO>(){//
//    @Override//
//    public boolean filter(POJO subject) throws Exception {//
//        return subject.getField("field1") == "";//
//    }//
//}).groupBy("field2").reduce(new ReduceFunction<P>() {//
//    @Override//
//    public Soggetto reduce(POJO subject1, POJO subject2) {//
//        POJO ret = subject1;//
//        return ret;//
//    }//
//});//
//
//DataSet<POJO> allSubjects = subjectsWithCondition.union(subjectsWithoutCondition);//
//
//DataSet<String> jsonSubjects = allSubjects.map(new RichMapFunction<POJO, String>() {//
//    private static final long serialVersionUID = 1L;//
//    ObjectMapper mapper = new ObjectMapper();//
//
//    @Override//
//    public String map(POJO subject) throws Exception {//
//        return mapper.writeValueAsString(subject);//
//    }//
//});//
//
//jsonSubjects.writeAsText("/tmp/subjects/", WriteMode.OVERWRITE);//
//env.execute("JSON generation");/

What is the problem? Did I made some mistake on filtering,grouping or reducing logic?
Thanks in advance,
Simone.

Reply via email to