Yep every operator usually cleans state of records past a received watermark
On Mon, Feb 14, 2022 at 4:03 PM HG <[email protected]> wrote: > Will keys that are out dated disappear? > > It is infact a kind of sessions window that can start at any time. > Constantly new keys will appear. > > > > > > > On Mon, Feb 14, 2022, 15:57 Francesco Guardiani <[email protected]> > wrote: > >> Hi, >> >> - bounded out of orderness: This means that you have essentially a stream >> where events can come late of a certain amount of time, compared to the >> "newest" event received. For example, with a bounded out of orderness of 5 >> minutes, you essentially say to Flink that your stream can receive an event >> of time 1PM today, and then immediately after that you can still receive >> another one of time 1PM - 5 minutes, and Flink should consider it. But if >> you rather receive one with time 1PM - 6mins, then Flink will consider this >> one as "late" and drop it. This is essentially the way Flink is able to not >> retain indefinitely your events. >> - with idleness: Because the stream generator needs new records to come >> in before advancing the stream, if your stream is stale, then no watermark >> is produced, that means that records after that watermark will not be >> processed. >> >> Reading your requirement, my understanding is that your input stream, >> that is InputTable, requires a bounded out of orderness of 5 minutes. >> For idleness, it really depends on whether your load can become stale at >> some point in time or not: if your stream can be stale for long period of >> times (say for a couple of days nothing is produced), then you should set >> an idleness which after that, a watermark is produced. >> >> Idleness is >> >> On Fri, Feb 11, 2022 at 2:53 PM HG <[email protected]> wrote: >> >>> Hi, >>> >>> I am getting a headache when thinking about watermarks and timestamps. >>> My application reads events from Kafka (they are in json format) as a >>> Datastream >>> Events can be keyed by a transactionId and have a event timestamp >>> (handlingTime) >>> >>> All events belonging to a single transactionId will arrive in a window >>> of a couple of minutes (say max 5 minutes). >>> As soon as this 5 minutes has passed it should calculate the differences >>> in timestamp between the ordered events, add that elapsed time to every >>> event and emit them to the Sink. >>> >>> I basically want to use the table api to do >>> "SELECT transactionId, handlingTime, handlingTime - lag(handlingTime) >>> over (partition by transactionId order by handlingTime) as elapsedTime, >>> originalEvent FROM InputTable" >>> >>> After the result of this query has been pushed to the Sink all data with >>> respect to this transactionId can be discarded. >>> >>> What kind of watermark do I need to use? >>> - bounded out of orderness? >>> - with idleness? >>> - ... >>> >>> Late events can be ignored. They will rarely happen. >>> >>> Regards Hans-Peter >>> >>> >>> >>>
