Hi Kostas,

Yes, that's the case. I will revert back to 1.0.3 until the bug is fixed.
Thank you.

Best,
Yassine

On Fri, Aug 12, 2016 at 10:34 AM, Kostas Kloudas <
k.klou...@data-artisans.com> wrote:

> Hi Yassine,
>
> Are you reading from a file and use ingestion time?
> If yes, then the problem can be related to this:
>
> https://issues.apache.org/jira/browse/FLINK-4329
>
> Is this the case?
>
> Best,
> Kostas
>
> On Aug 12, 2016, at 10:30 AM, Yassine Marzougui <yassmar...@gmail.com>
> wrote:
>
> Hi all,
>
> The following code works under Flink 1.0.3, but under 1.1.1 it just
> switches to FINISHED and doesn't output any result.
>
> stream.map(new RichMapFunction<String, Request>() {
>
>         private ObjectMapper objectMapper;
>
>         @Override
>         public void open(Configuration parameters) {
>             objectMapper = new ObjectMapper();
>         }
>
>         @Override
>         public Request map(String value) throws Exception {
>             return objectMapper.readValue(value, Request.class);
>         }
>
>     })
>     .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>()
> {
>         @Override
>         public long extractAscendingTimestamp(Request req) {
>             return req.ts;
>         }
>     })
>     .map((Request req) -> new Tuple3<String, String, Integer>(req.userId,
> req.location, 1))
>     .keyBy(0)
>     .timeWindow(Time.minutes(10))
>     .apply(
>             (Tuple3<String, String, Integer> x, Tuple3<String, String,
> Integer> y) -> y,
>             (Tuple key, TimeWindow w, Iterable<Tuple3<String, String,
> Integer>> itrbl, Collector<Tuple2<String, Integer>> clctr) -> {
>                 Tuple3<String, String, Integer> res =
> itrbl.iterator().next();
>                 clctr.collect(new Tuple2<>(res.f1, res.f2));
>             })
>     .print();
>
> The problem is with the window operator because I could print results
> before it.
>
> Best,
> Yassine
>
>
>

Reply via email to