I've realized this is not such a big issue because it's also upper bounded by the number of watermarks received, and it won't be one per event.
Miguel Araújo <miguelaraujo...@gmail.com> escreveu no dia segunda, 10/05/2021 à(s) 09:39: > Thanks Dawid, having a look at CepOperator was useful. I implemented > something with one difference I feel might be important: > > I noticed that in the CepOperator the timer is being registered for > currentWatermark+1, instead of using the event's timestamp. Is there a > reason for this? I think this implies a quadratic number of triggers, on > the number of keys with events that arrived after the current watermark. > For instance, if you have 1000 events per second on different keys (and > different timestamps), a watermark that is delayed 1 second will fire ~1 > million times. Is this a requirement to the NFA implementation? Would this > not be a problem? > > Thanks, once again. > > Dawid Wysakowicz <dwysakow...@apache.org> escreveu no dia segunda, > 10/05/2021 à(s) 09:13: > >> Hey Miguel, >> >> I think you could take a look at the CepOperator which does pretty much >> what you are describing. >> >> As for more direct answers for your questions. If you use >> KeyedProcessFunction it is always scoped to a single Key. There is no way >> to process events from other keys. If you want to have more control over >> state and e.g. use PriorityQueue which would be snapshotted on checkpoint >> you could look into using Operator API. Bare in mind it is a semi-public >> API. It is very low level and subject to change rather frequently. Another >> thing to consider is that if you use PriorityQueue instead of e.g. MapState >> for buffering and ordering events you are constrained by the available >> memory. We used PriorityQueue in the past in the CepOperator but migrated >> it to MapState. >> >> It is possible that events in downstream operators can become late. It >> all depends on the timestamp of the events you emit from the "sorting" >> operator. If you emit records with timestamps larger than the Watermark >> that "triggered" its generation it can become late. >> >> Hope those tips could help you a bit. >> >> Best, >> >> Dawid >> On 04/05/2021 14:49, Miguel Araújo wrote: >> >> Hi Timo, >> >> Thanks for your answer. I think I wasn't clear enough in my initial >> message, so let me give more details. >> >> The stream is not keyed by timestamp, it's keyed by a custom field (e.g., >> user-id) and then fed into a KeyedProcessFunction. I want to process all >> events for a given user in order, before sending them downstream for >> further processing in other operators. I don't want to hold events longer >> than needed, hence using the watermark to signal which events can be >> processed. >> I don't think your suggestion of using a ListState would work, because we >> would effectively have one list per user. That would imply (among other >> things) that an event could only be processed when a new event for the same >> user arrives, which would not only imply a (potentially) huge latency, but >> also data leakage. Not to mention that the events being sent could easily >> be considered late-events to the downstream operators. >> The idea of keying by timestamp was an "evolution" of the ListState >> suggestion, where events-to-be-later-processed would be kept sorted in the >> map (which is what would be keyed by timestamp). We could iterate the map >> to process the events, instead of fetching the full list and sorting it to >> process the events in order. I don't think this solves any of the problems >> mentioned above, so I think that mentioning it only raised confusion. >> >> Regarding global event-time order, that's not really what I'm after. I >> only need event-time order per key, but I want to process the event as soon >> as possible, constrained by knowing that it is "safe" to do so because no >> event with a smaller timestamp for this key is yet to come. >> >> So, rephrasing my question as I'm not sure that part was clear in the >> initial message, here is the idea: >> - keeping one priority queue (ordered by timestamp) in each >> KeyedProcessFunction instance. Therefore, each priority queue would store >> events for multiple keys. >> - when an event arrives, we push it to the queue and then process events >> (updating state and sending them downstream) while their timestamp is lower >> than the current watermark. >> >> The question is: >> - is this fault tolerant? The priority queue is not state that is managed >> by flink, but it should be recoverable on replay. >> - is it possible that the events I'm sending downstream become >> late-events for a different operator, for some reason? Will they always be >> sent before the watermark of the event that originated the processElement() >> call? >> - I would effectively be processing multiple elements (from multiple >> keys) in the same call to processElement(). Is there a way to access the >> state of different keys? >> >> This doesn't feel like the right approach. Is there an operator more >> suitable than a KeyedProcessFunction which would allow me to handle the >> state for multiple keys in this task manager? Should I register a timer to >> trigger on the event timestamp instead? I believe timers trigger on >> watermarks, so that could theoretically work, although it feels a little >> weird. After all, what I want is just to buffer events so that they are >> only processed when the watermark has caught up to them. >> >> Thanks >> >> Timo Walther <twal...@apache.org> escreveu no dia sexta, 30/04/2021 à(s) >> 12:05: >> >>> Hi Miguel, >>> >>> your initial idea sounds not too bad but why do you want to key by >>> timestamp? Usually, you can simply key your stream by a custom key and >>> store the events in a ListState until a watermark comes in. >>> >>> But if you really want to have some kind of global event-time order, you >>> have two choices: >>> >>> - either a single operator with parallelism 1 that performs the ordering >>> - or you send the every event to every operator using the broadcast >>> state pattern [1] >>> >>> It is guaranteed that watermark will reach the downstream operator or >>> sink after all events. Watermarks are synchronized across all parallel >>> operator instances. You can store a Map uncheckpointed by this means >>> that you have to ensure to initialize the map again during recovery. >>> >>> Regards, >>> Timo >>> >>> >>> [1] >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html >>> >>> On 30.04.21 11:37, Miguel Araújo wrote: >>> > Hi everyone, >>> > >>> > I have a KeyedProcessFunction whose events I would like to process in >>> > event-time order. >>> > My initial idea was to use a Map keyed by timestamp and, when a new >>> > event arrives, iterate over the Map to process events older than the >>> > current watermark. >>> > >>> > The issue is that I obviously can't use a MapState, as my stream is >>> > keyed, so the map would be scoped to the current key. >>> > Is using a "regular" (i.e., not checkpointed) Map an option, given >>> that >>> > its content will be recreated by the replay of the events on a >>> restart? >>> > Is it guaranteed that the watermark that triggered the processing of >>> > multiple events (and their subsequent push downstream) is not sent >>> > downstream before these events themselves? >>> > >>> > Thanks, >>> > Miguel >>> >>>