I've realized this is not such a big issue because it's also upper bounded
by the number of watermarks received, and it won't be one per event.

Miguel Araújo <miguelaraujo...@gmail.com> escreveu no dia segunda,
10/05/2021 à(s) 09:39:

> Thanks Dawid, having a look at CepOperator was useful. I implemented
> something with one difference I feel might be important:
>
> I noticed that in the CepOperator the timer is being registered for
> currentWatermark+1, instead of using the event's timestamp. Is there a
> reason for this? I think this implies a quadratic number of triggers, on
> the number of keys with events that arrived after the current watermark.
> For instance, if you have 1000 events per second on different keys (and
> different timestamps), a watermark that is delayed 1 second will fire ~1
> million times. Is this a requirement to the NFA implementation? Would this
> not be a problem?
>
> Thanks, once again.
>
> Dawid Wysakowicz <dwysakow...@apache.org> escreveu no dia segunda,
> 10/05/2021 à(s) 09:13:
>
>> Hey Miguel,
>>
>> I think you could take a look at the CepOperator which does pretty much
>> what you are describing.
>>
>> As for more direct answers for your questions. If you use
>> KeyedProcessFunction it is always scoped to a single Key. There is no way
>> to process events from other keys. If you want to have more control over
>> state and e.g. use PriorityQueue which would be snapshotted on checkpoint
>> you could look into using Operator API. Bare in mind it is a semi-public
>> API. It is very low level and subject to change rather frequently. Another
>> thing to consider is that if you use PriorityQueue instead of e.g. MapState
>> for buffering and ordering events you are constrained by the available
>> memory. We used PriorityQueue in the past in the CepOperator but migrated
>> it to MapState.
>>
>> It is possible that events in downstream operators can become late. It
>> all depends on the timestamp of the events you emit from the "sorting"
>> operator. If you emit records with timestamps larger than the Watermark
>> that "triggered" its generation it can become late.
>>
>> Hope those tips could help you a bit.
>>
>> Best,
>>
>> Dawid
>> On 04/05/2021 14:49, Miguel Araújo wrote:
>>
>> Hi Timo,
>>
>> Thanks for your answer. I think I wasn't clear enough in my initial
>> message, so let me give more details.
>>
>> The stream is not keyed by timestamp, it's keyed by a custom field (e.g.,
>> user-id) and then fed into a KeyedProcessFunction. I want to process all
>> events for a given user in order, before sending them downstream for
>> further processing in other operators. I don't want to hold events longer
>> than needed, hence using the watermark to signal which events can be
>> processed.
>> I don't think your suggestion of using a ListState would work, because we
>> would effectively have one list per user. That would imply (among other
>> things) that an event could only be processed when a new event for the same
>> user arrives, which would not only imply a (potentially) huge latency, but
>> also data leakage. Not to mention that the events being sent could easily
>> be considered late-events to the downstream operators.
>> The idea of keying by timestamp was an "evolution" of the ListState
>> suggestion, where events-to-be-later-processed would be kept sorted in the
>> map (which is what would be keyed by timestamp). We could iterate the map
>> to process the events, instead of fetching the full list and sorting it to
>> process the events in order. I don't think this solves any of the problems
>> mentioned above, so I think that mentioning it only raised confusion.
>>
>> Regarding global event-time order, that's not really what I'm after. I
>> only need event-time order per key, but I want to process the event as soon
>> as possible, constrained by knowing that it is "safe" to do so because no
>> event with a smaller timestamp for this key is yet to come.
>>
>> So, rephrasing my question as I'm not sure that part was clear in the
>> initial message, here is the idea:
>> - keeping one priority queue (ordered by timestamp) in each
>> KeyedProcessFunction instance. Therefore, each priority queue would store
>> events for multiple keys.
>> - when an event arrives, we push it to the queue and then process events
>> (updating state and sending them downstream) while their timestamp is lower
>> than the current watermark.
>>
>> The question is:
>> - is this fault tolerant? The priority queue is not state that is managed
>> by flink, but it should be recoverable on replay.
>> - is it possible that the events I'm sending downstream become
>> late-events for a different operator, for some reason? Will they always be
>> sent before the watermark of the event that originated the processElement()
>> call?
>> - I would effectively be processing multiple elements (from multiple
>> keys) in the same call to processElement(). Is there a way to access the
>> state of different keys?
>>
>> This doesn't feel like the right approach. Is there an operator more
>> suitable than a KeyedProcessFunction which would allow me to handle the
>> state for multiple keys in this task manager? Should I register a timer to
>> trigger on the event timestamp instead? I believe timers trigger on
>> watermarks, so that could theoretically work, although it feels a little
>> weird. After all, what I want is just to buffer events so that they are
>> only processed when the watermark has caught up to them.
>>
>> Thanks
>>
>> Timo Walther <twal...@apache.org> escreveu no dia sexta, 30/04/2021 à(s)
>> 12:05:
>>
>>> Hi Miguel,
>>>
>>> your initial idea sounds not too bad but why do you want to key by
>>> timestamp? Usually, you can simply key your stream by a custom key and
>>> store the events in a ListState until a watermark comes in.
>>>
>>> But if you really want to have some kind of global event-time order, you
>>> have two choices:
>>>
>>> - either a single operator with parallelism 1 that performs the ordering
>>> - or you send the every event to every operator using the broadcast
>>> state pattern [1]
>>>
>>> It is guaranteed that watermark will reach the downstream operator or
>>> sink after all events. Watermarks are synchronized across all parallel
>>> operator instances. You can store a Map uncheckpointed by this means
>>> that you have to ensure to initialize the map again during recovery.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> [1]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html
>>>
>>> On 30.04.21 11:37, Miguel Araújo wrote:
>>> > Hi everyone,
>>> >
>>> > I have a KeyedProcessFunction whose events I would like to process in
>>> > event-time order.
>>> > My initial idea was to use a Map keyed by timestamp and, when a new
>>> > event arrives, iterate over the Map to process events older than the
>>> > current watermark.
>>> >
>>> > The issue is that I obviously can't use a MapState, as my stream is
>>> > keyed, so the map would be scoped to the current key.
>>> > Is using a "regular" (i.e., not checkpointed) Map an option, given
>>> that
>>> > its content will be recreated by the replay of the events on a
>>> restart?
>>> > Is it guaranteed that the watermark that triggered the processing of
>>> > multiple events (and their subsequent push downstream) is not sent
>>> > downstream before these events themselves?
>>> >
>>> > Thanks,
>>> > Miguel
>>>
>>>

Reply via email to