Hi AndreaKinn, Reordering in a stream environment is quite costly. AFAIK, Flink doesn't provide such functions internally.
Watermark is just one of the approaches to deal with the out-of-order problem. IMO, it just like a coarse-grained reordering. The late records should be dropped *manually*. Maybe you can try changing your function to be applied on streams with such "coarse-grained" ordering. However, if the fully ordered stream is necessary in your application, I'm afraid you must cache and re-emit them in a user-defined processFunction. Best, Xingcan On Tue, Sep 12, 2017 at 1:48 AM, Eron Wright <eronwri...@gmail.com> wrote: > As mentioned earlier, the watermark is the basis for reasoning about the > overall progression of time. Many operators use the watermark to > correctly organize records, e.g. into the correct time-based window. > Within that window the records may still be unordered. That said, some > operators do take pains to reorder the records, notably the Flink CEP > operator to correctly detect temporal patterns. Basically, the operator > buffers records until a watermark arrives; all buffered records older than > the watermark may then be sorted and processed. > > It is tempting to write a standalone operator that simply reorders records > as described, but subsequent repartitioning to downstream operators would > reintroduce disorder. Therefore one must ensure that subsequent processing > is done with a 'forward' partitioning strategy. > > Hope this helps! > Eron > > On Fri, Sep 8, 2017 at 3:50 AM, AndreaKinn <kinn6...@hotmail.it> wrote: > >> Thank you, effectively I developed also a simple custom solution for >> watermark looking at flink doc but anyway I see unordered printed streams. >> I have a doubt about flink behaviour: if I understand, flink doesn't >> perform >> automatically reordering of records in a stream, so if for instance a >> record >> arrives in late what is the behaviour of flink? In the doc it's described >> that elements arrive after in late are dropped (allowed lateness default >> value is 0) but also using this watermark emitter: >> >> *public class CustomTimestampExtractor implements >> AssignerWithPeriodicWatermarks<Tuple6<String, String, Date, String, >> String, Double>>{ >> >> private static final long serialVersionUID = 5448621759931440489L; >> private final long maxOutOfOrderness = 0; >> private long currentMaxTimestamp; >> >> @Override >> public long extractTimestamp(Tuple6<String, String, Date, >> String, String, >> Double> element, long previousElementTimestamp) { >> long timestamp = element.f2.getTime(); >> currentMaxTimestamp = Math.max(timestamp, >> currentMaxTimestamp); >> return timestamp; >> } >> >> @Override >> public Watermark getCurrentWatermark() { >> return new Watermark(currentMaxTimestamp - >> maxOutOfOrderness); >> } >> }* >> >> with maxOutOfOrderness = 0 I see unordered record in the stream. >> >> What I want to obtain is a fully ordered stream, is there a way to >> implement >> it? >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/ >> > >