Hi Theo,

The general idea is interesting. I'd probably start with some initial out
of boundness, and after collecting X elements, switch to the histogram. It
sounds very valid to snapshot it. I'd probably use a union state to also
support rescaling in a meaningful way.

However, tbh for a production use case, I'd probably go with a bit more
deterministic (=simpler) approach, and tweak it accordingly manually. If
late events are something to worry about, I'd have a metric on it anyways
with alerts and tweak the job accordingly.

Another dimension to look at, if you can actually recalculate results based
on the late events, so you actually get the best of two worlds (low initial
latency, but precise end results). I'd recommend also having a look at the
retract streams of Table API.

On Sat, May 30, 2020 at 10:04 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi Congxian,
>
> Thank's for your feedback. You raised a point I also already thought
> about. As "assignTimestampsAndWatermarks" creates an operator extending the
> standard AbstractUdfStreamOperator, I can also implement a RichFunction
> watermark assigner with full state access. In my case, I was also wondering
> whether it's a good idea to have a stateful watermark assigner or whether
> its more practical to have no state on start and build my histogram over
> time, new with each job restart... That's also why I asked on the mailing
> list so I can feedback of other people customizing the watermark assigners.
>
> Best regards
> Theo
>
> ------------------------------
> *Von: *"Congxian Qiu" <qcx978132...@gmail.com>
> *An: *"Theo Diefenthal" <theo.diefent...@scoop-software.de>
> *CC: *"user" <user@flink.apache.org>
> *Gesendet: *Samstag, 30. Mai 2020 05:06:12
> *Betreff: *Re: Auto adjusting watermarks?
>
> Hi
> Could it be store a histogram data in custom 
> `BoundedOutOfOrdernessTimestampExtractor`
> and adjust the `maxOutOfOrderness` according to the histogram data ok for
> you case? (be careful, such histogram data would not snapshot out when
> checkpointing)
>
> Best,
> Congxian
>
>
> Theo Diefenthal <theo.diefent...@scoop-software.de> 于2020年5月30日周六
> 上午4:35写道:
>
>> Hi there,
>>
>> Currently I have a job pipeline reading data from > 10 different kind of
>> sources with each having different out-of-orderness characteristics. I am
>> currently working on adjusting the watermarks for each source "properly". I
>> work with BoundedOutOfOrdernessTimestampExtractor and, as usual, I want the
>> maxOutOfOrderness as low as possible while still keeping as much elements
>> as possible in time as late arrivals trigger rather expensive computations.
>>
>> Now I thought, what I probably want is something like "I want to have
>> about 99.9% of my elements within the allowed lateness". Of course, I don't
>> know the future events out-of-orderness, but I can predict it from the
>> past, e.g. via a histogram with a 99.9% percentile, and adjust the
>> maxOutOfOrdernesss dynamically.
>>
>> As Flink provides rather simplified Timestamp Assigner only but allows me
>> to create my own ones with arbitrary complexity, I was wondering if
>> somebody of you already did something like that, if that's a viable
>> approach and I'm on a good track here?
>>
>> Best regards
>> Theo
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to