Hi,
I am using a JSON file as the source for the streaming (in the ascending
order of the field Umlaufsekunde)which has events as follows:
{"event":[{"*Umlaufsekunde*":115}]}
{"event":[{"*Umlaufsekunde*":135}]}
{"event":[{"*Umlaufsekunde*":135}]}
{"event":[{"*Umlaufsekunde*":145}]}
{"event":[{"*Umlaufsekunde*":155}]}
{"event":[{"*Umlaufsekunde*":155}]}
{"event":[{"*Umlaufsekunde*":185}]}
{"event":[{"*Umlaufsekunde*":195}]}
{"event":[{"*Umlaufsekunde*":195}]}
{"event":[{"*Umlaufsekunde*":205}]}
{"event":[{"*Umlaufsekunde*":245}]}
However, when I try to print the stream, it is unordered as given below:
1> (*115*,null,1483517983252,1190) -- The first value indicating
Umlaufsekunde
2> (135,null,1483517984877,1190)
2> (155,null,1483517986861,1190)
4> (145,null,1483517985752,1190)
3> (135,null,1483517985424,1190)
4> (195,null,1483517990736,1190)
4> (255,null,1483517997424,1190)
2> (205,null,1483517991518,1190)
2> (275,null,1483517999330,1190)
2> (385,null,1483518865371,1190)
2> (395,null,1483518866840,1190)
1> (155,null,1483517986533,1190)
4> (285,null,1483518000189,1190)
4> (395,null,1483518866231,1190)
I have also tried using the Timestamps and Watermarks but no luck as
follows:
public class TimestampExtractor implements
AssignerWithPeriodicWatermarks<Tuple5<String, Long, List<Lane>, Long,
Long>>{
private long currentMaxTimestamp;
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp);
}
@Override
public long extractTimestamp(Tuple5<String, Long> element, long
previousElementTimestamp) {
long timestamp = element.getField(1);
currentMaxTimestamp = timestamp;
return currentMaxTimestamp;
}
}
Could anyone suggest how do I handle this problem for the arrival of events
in order ?
Thanks!