Hi everyone, I have the following issue with Flink (0.10) and Kafka.
I am using a very simple TimestampExtractor like [1], which just extracts a millis timestamp from a POJO. In my streaming job, I read in these POJOs from Kafka using the FlinkKafkaConsumer082 like this: stream = env.addSource(new FlinkKafkaConsumer082< (parameterTool.getRequired("topic"), new AvroPojoDeserializationSchema(), parameterTool.getProperties())) I have timestampEnabled() and the TimeCharacteristics are EventTime, AutoWatermarkIntervall is 500. The problem is, when I do something like: stream.assignTimestamps(new PojoTimestampExtractor(6000)) .timeWindowAll(Time.of(1, TimeUnit.SECONDS) .sum(..) .print() env.execute(); the windows never get triggered. If I use ProcessingTime it works. If I use env.fromCollection(...) instead of the KafkaSource it works with EventTime, too. Any ideas what I could be doing wrong are highly appreciated. Cheers, Konstantin [1]: public class PojoTimestampExtractor implements TimestampExtractor<Pojo> { final private long maxDelay; public PojoTimestampExtractor(long maxDelay) { this.maxDelay = maxDelay; } @Override public long extractTimestamp(Pojo fightEvent, long l) { return pojo.getTime(); } @Override public long extractWatermark(Pojo pojo, long l) { return pojo.getTime() - maxDelay; } @Override public long getCurrentWatermark() { return Long.MIN_VALUE; }