Yep every operator usually cleans state of records past a received watermark

On Mon, Feb 14, 2022 at 4:03 PM HG <hanspeter.sl...@gmail.com> wrote:

> Will keys that are out dated disappear?
>
> It is infact a kind of sessions window that can start at any time.
> Constantly new keys will appear.
>
>
>
>
>
>
> On Mon, Feb 14, 2022, 15:57 Francesco Guardiani <france...@ververica.com>
> wrote:
>
>> Hi,
>>
>> - bounded out of orderness: This means that you have essentially a stream
>> where events can come late of a certain amount of time, compared to the
>> "newest" event received. For example, with a bounded out of orderness of 5
>> minutes, you essentially say to Flink that your stream can receive an event
>> of time 1PM today, and then immediately after that you can still receive
>> another one of time 1PM - 5 minutes, and Flink should consider it. But if
>> you rather receive one with time 1PM - 6mins, then Flink will consider this
>> one as "late" and drop it. This is essentially the way Flink is able to not
>> retain indefinitely your events.
>> - with idleness: Because the stream generator needs new records to come
>> in before advancing the stream, if your stream is stale, then no watermark
>> is produced, that means that records after that watermark will not be
>> processed.
>>
>> Reading your requirement, my understanding is that your input stream,
>> that is InputTable, requires a bounded out of orderness of 5 minutes.
>> For idleness, it really depends on whether your load can become stale at
>> some point in time or not: if your stream can be stale for long period of
>> times (say for a couple of days nothing is produced), then you should set
>> an idleness which after that, a watermark is produced.
>>
>> Idleness is
>>
>> On Fri, Feb 11, 2022 at 2:53 PM HG <hanspeter.sl...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am getting a headache when thinking about watermarks and timestamps.
>>> My application reads events from Kafka  (they are in json format) as a
>>> Datastream
>>> Events can be keyed by a transactionId and have a event timestamp
>>> (handlingTime)
>>>
>>> All events belonging to a single transactionId will arrive in a window
>>> of a couple of minutes (say max 5 minutes).
>>> As soon as this 5 minutes has passed it should calculate the differences
>>> in timestamp between the ordered events, add that elapsed time to every
>>> event and emit them to the Sink.
>>>
>>> I basically want to use the table api to do
>>> "SELECT transactionId, handlingTime, handlingTime - lag(handlingTime)
>>> over (partition by transactionId order by handlingTime) as elapsedTime,
>>> originalEvent FROM InputTable"
>>>
>>> After the result of this query has been pushed to the Sink all data with
>>> respect to this transactionId can be discarded.
>>>
>>> What kind of watermark do I need to use?
>>> - bounded out of orderness?
>>> - with idleness?
>>> - ...
>>>
>>> Late events can be ignored. They will rarely happen.
>>>
>>> Regards Hans-Peter
>>>
>>>
>>>
>>>

Reply via email to