Hi all,
The following code works under Flink 1.0.3, but under 1.1.1 it just
switches to FINISHED and doesn't output any result.
stream.map(new RichMapFunction<String, Request>() {
private ObjectMapper objectMapper;
@Override
public void open(Configuration parameters) {
objectMapper = new ObjectMapper();
}
@Override
public Request map(String value) throws Exception {
return objectMapper.readValue(value, Request.class);
}
})
.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<Request>() {
@Override
public long extractAscendingTimestamp(Request req) {
return req.ts;
}
})
.map((Request req) -> new Tuple3<String, String, Integer>(req.userId,
req.location, 1))
.keyBy(0)
.timeWindow(Time.minutes(10))
.apply(
(Tuple3<String, String, Integer> x, Tuple3<String, String,
Integer> y) -> y,
(Tuple key, TimeWindow w, Iterable<Tuple3<String, String,
Integer>> itrbl, Collector<Tuple2<String, Integer>> clctr) -> {
Tuple3<String, String, Integer> res =
itrbl.iterator().next();
clctr.collect(new Tuple2<>(res.f1, res.f2));
})
.print();
The problem is with the window operator because I could print results
before it.
Best,
Yassine