Hi AndreaKinn,

Reordering in a stream environment is quite costly. AFAIK, Flink doesn't
provide such functions internally.

Watermark is just one of the approaches to deal with the out-of-order
problem. IMO, it just like a coarse-grained
reordering. The late records should be dropped *manually*. Maybe you can
try changing your function to be applied
on streams with such "coarse-grained" ordering. However, if the fully
ordered stream is necessary in your
application, I'm afraid you must cache and re-emit them in a user-defined
processFunction.

Best,
Xingcan


On Tue, Sep 12, 2017 at 1:48 AM, Eron Wright <eronwri...@gmail.com> wrote:

> As mentioned earlier, the watermark is the basis for reasoning about the
> overall progression of time.   Many operators use the watermark to
> correctly organize records, e.g. into the correct time-based window.
> Within that window the records may still be unordered.   That said, some
> operators do take pains to reorder the records, notably the Flink CEP
> operator to correctly detect temporal patterns.  Basically, the operator
> buffers records until a watermark arrives; all buffered records older than
> the watermark may then be sorted and processed.
>
> It is tempting to write a standalone operator that simply reorders records
> as described, but subsequent repartitioning to downstream operators would
> reintroduce disorder.  Therefore one must ensure that subsequent processing
> is done with a 'forward' partitioning strategy.
>
> Hope this helps!
> Eron
>
> On Fri, Sep 8, 2017 at 3:50 AM, AndreaKinn <kinn6...@hotmail.it> wrote:
>
>> Thank you, effectively I developed also a simple custom solution for
>> watermark looking at flink doc but anyway I see unordered printed streams.
>> I have a doubt about flink behaviour: if I understand, flink doesn't
>> perform
>> automatically reordering of records in a stream, so if for instance a
>> record
>> arrives in late what is the behaviour of flink? In the doc it's described
>> that elements arrive after in late are dropped (allowed lateness default
>> value is 0) but also using this watermark emitter:
>>
>> *public class CustomTimestampExtractor implements
>> AssignerWithPeriodicWatermarks<Tuple6&lt;String, String, Date, String,
>> String, Double>>{
>>
>>         private static final long serialVersionUID = 5448621759931440489L;
>>         private final long maxOutOfOrderness = 0;
>>     private long currentMaxTimestamp;
>>
>>         @Override
>>         public long extractTimestamp(Tuple6<String, String, Date,
>> String, String,
>> Double> element, long previousElementTimestamp) {
>>                 long timestamp = element.f2.getTime();
>>                 currentMaxTimestamp = Math.max(timestamp,
>> currentMaxTimestamp);
>>                 return timestamp;
>>         }
>>
>>         @Override
>>         public Watermark getCurrentWatermark() {
>>                 return new Watermark(currentMaxTimestamp -
>> maxOutOfOrderness);
>>         }
>> }*
>>
>> with maxOutOfOrderness = 0 I see unordered record in the stream.
>>
>> What I want to obtain is a fully ordered stream, is there a way to
>> implement
>> it?
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>

Reply via email to