Hi all,
I am using Flink 1.3.1 and I have found a strange behavior on running
the following logic:
1. Read data from file and store into DataSet<POJO>
2. Split dataset in two, by checking if "field1" of POJOs is empty or
not, so that the first dataset contains only elements with non empty
"field1", and the second dataset will contain the other elements.
3. Each dataset is then grouped by, one by "field1" and other by
another field, and subsequently reduced.
4. The 2 datasets are merged together by union.
5. The final dataset is written as json.
What I was expected, from output, was to find only one element with a
specific value of "field1" because:
1. Reducing the first dataset grouped by "field1" should generate only
one element with a specific value of "field1".
2. The second dataset should contain only elements with empty "field1".
3. Making an union of them should not duplicate any record.
This does not happen. When i read the generated jsons i see some
duplicate (non empty) values of "field1".
Strangely this does not happen when the union between the two datasets
is not computed. In this case the first dataset produces elements only
with distinct values of "field1", while second dataset produces only
records with empty field "value1".
Debugging the code, it seems that the map function used to convent the
last merged dataset into json strings starts before the reduction
functions terminates. This seems to produce duplicates.
Here is my pseudocode:
/DataSet<POJO> subjects = read from csv...//
//
//DataSet<POJO> subjectsWithCondition = subjects.filter(new
FilterFunction<POJO>()//{//
// @Override//
// public boolean filter(POJO subject) throws Exception {//
// return subject.getField("field1") != "";//
// }//
//}).groupBy("field1").reduce(new ReduceFunction<P>() {//
// @Override//
// public Soggetto reduce(POJO subject1, POJO subject2) {//
// POJO ret = subject1;//
// return ret;//
// }//
//});//
//
//DataSet<POJO> subjectsWithoutCondition = subjects.filter(new
FilterFunction<POJO>(){//
// @Override//
// public boolean filter(POJO subject) throws Exception {//
// return subject.getField("field1") == "";//
// }//
//}).groupBy("field2").reduce(new ReduceFunction<P>() {//
// @Override//
// public Soggetto reduce(POJO subject1, POJO subject2) {//
// POJO ret = subject1;//
// return ret;//
// }//
//});//
//
//DataSet<POJO> allSubjects =
subjectsWithCondition.union(subjectsWithoutCondition);//
//
//DataSet<String> jsonSubjects = allSubjects.map(new
RichMapFunction<POJO, String>() {//
// private static final long serialVersionUID = 1L;//
// ObjectMapper mapper = new ObjectMapper();//
//
// @Override//
// public String map(POJO subject) throws Exception {//
// return mapper.writeValueAsString(subject);//
// }//
//});//
//
//jsonSubjects.writeAsText("/tmp/subjects/", WriteMode.OVERWRITE);//
//env.execute("JSON generation");/
What is the problem? Did I made some mistake on filtering,grouping or
reducing logic?
Thanks in advance,
Simone.