Hi AndreaKinn,

The AscendingTimestampExtractor do not work as you think. It should be
applied for streams where timestamps are
monotonously ascending, naturally.

Flink uses watermark to deal with unordered data. When a watermark *t* is
received, it means there should be no more
records whose timestamps are less than or equal to *t*. However, you must
implement your own watermark generation
policy. There are two basic watermark
assigners: AssignerWithPeriodicWatermarks for generating watermarks
periodically
and  AssignerWithPunctuatedWatermarks for generating watermarks when
encountered certain records.

For more information, please refer to [1] and [2].

Best,
Xingcan

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_time.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html

On Fri, Sep 8, 2017 at 4:24 AM, AndreaKinn <kinn6...@hotmail.it> wrote:

> Hi,
> I'm getting sensor data from a kafka source and I absolutely need they are
> ordered on time data generation basis. I've implemented a custom
> deserialiser and employed an AscendingTimestampExtractor to handle event
> time.
> Obviously I set EventTime as streamTimeCharacteristics.
> Unfortunately when I print the stream I see there are many records
> unordered. Am I doing something wrong?
> I've attached a prove of that:
>
> *env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>                 env.enableCheckpointing(CHECKPOINT_TIME);
>                 env.setParallelism(1);
>         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
>
>                 Properties properties = new Properties();
>                 properties.setProperty("bootstrap.servers",
> KAFKA_ADDRESS);
>                 properties.setProperty("group.id", GROUP_ID);
>
>                 DataStream<Tuple6&lt;String, String, Date, String, String,
> Double>> stream
> = env
>                                 .addSource(new
> FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
> properties))
>                                 .assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<Tuple6&lt;String, String, Date, String,
> String,
> Double>>() {
>
>                                 @Override
>                                 public long 
> extractAscendingTimestamp(Tuple6<String,
> String,
> Date, String, String, Double> element) {
>                                     return element.f2.getTime();
>                                 }
>                                 })
>                                 .keyBy(0);
>
> stream.print()*
>
> <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/t985/Screen_Shot_2017-09-07_at_21.png>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Reply via email to