Hi AndreaKinn, The AscendingTimestampExtractor do not work as you think. It should be applied for streams where timestamps are monotonously ascending, naturally.
Flink uses watermark to deal with unordered data. When a watermark *t* is received, it means there should be no more records whose timestamps are less than or equal to *t*. However, you must implement your own watermark generation policy. There are two basic watermark assigners: AssignerWithPeriodicWatermarks for generating watermarks periodically and AssignerWithPunctuatedWatermarks for generating watermarks when encountered certain records. For more information, please refer to [1] and [2]. Best, Xingcan [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_time.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html On Fri, Sep 8, 2017 at 4:24 AM, AndreaKinn <kinn6...@hotmail.it> wrote: > Hi, > I'm getting sensor data from a kafka source and I absolutely need they are > ordered on time data generation basis. I've implemented a custom > deserialiser and employed an AscendingTimestampExtractor to handle event > time. > Obviously I set EventTime as streamTimeCharacteristics. > Unfortunately when I print the stream I see there are many records > unordered. Am I doing something wrong? > I've attached a prove of that: > > *env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.enableCheckpointing(CHECKPOINT_TIME); > env.setParallelism(1); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); > > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", > KAFKA_ADDRESS); > properties.setProperty("group.id", GROUP_ID); > > DataStream<Tuple6<String, String, Date, String, String, > Double>> stream > = env > .addSource(new > FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(), > properties)) > .assignTimestampsAndWatermarks(new > AscendingTimestampExtractor<Tuple6<String, String, Date, String, > String, > Double>>() { > > @Override > public long > extractAscendingTimestamp(Tuple6<String, > String, > Date, String, String, Double> element) { > return element.f2.getTime(); > } > }) > .keyBy(0); > > stream.print()* > > <http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/file/t985/Screen_Shot_2017-09-07_at_21.png> > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >