I had a discussion with Andrey and now think that also the case 
event-time-timestamp/watermark-cleanup is a valid case. If you don’t need this 
for regulatory compliance but just for cleaning up old state, in case where you 
have re-processing of old data.

I think the discussion about whether to have this in the backends is also good 
to have: I’d say it’s good to have it in the backends because this
 (1) decreases state size, for user timers a timer entry is basically a <key, 
timestamp> whereas if we use backend TTL it’s only the timestamp
 (2) can piggyback on log compaction in RocksDB. A user-time manually has to go 
to state and delete it, which can be costly, while TTL in the backend would 
happen as-we-go

Aljoscha

> On 8. Apr 2019, at 12:03, Kostas Kloudas <kklou...@gmail.com> wrote:
> 
> Hi all,
> 
> For GDPR: I am not sure about the regulatory requirements of GDPR but I would 
> assume that the time for deletion starts counting from the time an 
> organisation received the data (i.e. the wall-clock ingestion time of the 
> data), and not the "event time" of the data. In other case, an organisaton 
> may be violating GDPR by just receiving e.g. 1 year old data of a user whole 
> deletion policy is "you are allowed to keep them for 6 months".
> 
> Now for the discussion in this thread, I think that the scenario:
> 
> * Timestamp stored: Event timestamp
> * Timestamp to check expiration: Processing Time
> 
> has the underlying assumption that there is a relationship between event-time 
> and processing time, which is not necessarily the case. Event-time, although 
> we call it "time", is just another user-defined column or attribute of the 
> data and can be anything. It is not an "objective" and independently evolving 
> attribute like wall-clock time. I am not sure what could be the solution, as 
> out-of-orderness can always lead to arbitrary, non-reproducible and difficult 
> to debug behaviour (e.g. a super-early element that arrives out-of-order and, 
> as the succeeding elements set the timestamp to lower values, it gets deleted 
> by the state backend, although the user-level windowing logic would expect it 
> to be there).
> 
> Given that last point made above, and apart from the semantics of the 
> proposed feature, I think that we should also discuss if it is a good idea to 
> have event time TTL implemented in state backend level in the first place. 
> Personally, I am not so convinced that this is a good idea, as we introduce 
> another (potentially competing) mechanism for handling event time, apart from 
> the user program. An example can be the one that I described above. And this 
> also defeats one of the main advantages of event time, in my opinion, which 
> is reproducability of the results.
> 
> I may be wrong, but I would appreciate any opinions on this.
> 
> Cheers,
> Kostas
> 
> On Mon, Apr 8, 2019 at 11:12 AM Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Oh boy, this is an interesting pickle.
> 
> For *last-access-timestamp*, I think only *event-time-of-current-record* 
> makes sense. I’m looking at this from a GDPR/regulatory compliance 
> perspective. If you update a state, by say storing the event you just 
> received in state, you want to use the exact timestamp of that event to to 
> expiration. Both *max-timestamp-of-data-seen-so-far* and *last-watermark* 
> suffer from problems in edge cases: if the timestamp of an event you receive 
> is quite a bit earlier than other timestamps that we have seen so far (i.e. 
> the event is late) we would artificially lengthen the TTL of that event 
> (which is stored in state) and would therefore break regulatory requirements. 
> Always using the timestamp of an event doesn’t suffer from that problem.
> 
> For *expiration-check-time*, both *last-watermark* and 
> *current-processing-time* could make sense but I’m leaning towards 
> *processing-time*. The reason is again the GDPR/compliance view: if we have 
> an old savepoint with data that should have been expired by now but we 
> re-process it with *last-watermark* expiration, this means that we will get 
> to “see” that state even though we shouldn’t allowed to be. If we use 
> *current-processing-time* for expiration, we wouldn’t have that problem 
> because that old data (according to their event-time timestamp) would be 
> properly cleaned up and access would be prevented.
> 
> To sum up:
> last-access-timestamp: event-time of event
> expiration-check-time: processing-time
> 
> What do you think?
> 
> Aljoscha
> 
> > On 6. Apr 2019, at 01:30, Konstantin Knauf <konstan...@ververica.com 
> > <mailto:konstan...@ververica.com>> wrote:
> > 
> > Hi Andrey,
> > 
> > I agree with Elias. This would be the most natural behavior. I wouldn't add
> > additional slightly different notions of time to Flink.
> > 
> > As I can also see a use case for the combination
> > 
> > * Timestamp stored: Event timestamp
> > * Timestamp to check expiration: Processing Time
> > 
> > we could (maybe in a second step) add the possibility to mix and match time
> > characteristics for both aspects.
> > 
> > Cheers,
> > 
> > Konstantin
> > 
> > On Thu, Apr 4, 2019 at 7:59 PM Elias Levy <fearsome.lucid...@gmail.com 
> > <mailto:fearsome.lucid...@gmail.com>>
> > wrote:
> > 
> >> My 2c:
> >> 
> >> Timestamp stored with the state value: Event timestamp
> >> Timestamp used to check expiration: Last emitted watermark
> >> 
> >> That follows the event time processing model used elsewhere is Flink.
> >> E.g. events are segregated into windows based on their event time, but the
> >> windows do not fire until the watermark advances past the end of the 
> >> window.
> >> 
> >> 
> >> On Thu, Apr 4, 2019 at 7:55 AM Andrey Zagrebin <and...@ververica.com 
> >> <mailto:and...@ververica.com>>
> >> wrote:
> >> 
> >>> Hi All,
> >>> 
> >>> As you might have already seen there is an effort tracked in FLINK-12005
> >>> [1] to support event time scale for state with time-to-live (TTL) [2].
> >>> While thinking about design, we realised that there can be multiple
> >>> options
> >>> for semantics of this feature, depending on use case. There is also
> >>> sometimes confusion because of event time out-of-order nature in Flink. I
> >>> am starting this thread to discuss potential use cases of this feature and
> >>> their requirements for interested users and developers. There was already
> >>> discussion thread asking about event time for TTL and it already contains
> >>> some thoughts [3].
> >>> 
> >>> There are two semantical cases where we use time for TTL feature at the
> >>> moment. Firstly, we store timestamp of state last access/update. Secondly,
> >>> we use this timestamp and current timestamp to check expiration and
> >>> garbage
> >>> collect state at some point later.
> >>> 
> >>> At the moment, Flink supports *only processing time* for both timestamps:
> >>> state *last access and current timestamp*. It is basically current local
> >>> system unix epoch time.
> >>> 
> >>> When it comes to event time scale, we also need to define what Flink
> >>> should
> >>> use for these two timestamps. Here I will list some options and their
> >>> possible pros&cons for discussion. There might be more depending on use
> >>> case.
> >>> 
> >>> *Last access timestamp (stored in backend with the actual state value):*
> >>> 
> >>>   - *Event timestamp of currently being processed record.* This seems to
> >>>   be the simplest option and it allows user-defined timestamps in state
> >>>   backend. The problem here might be instability of event time which can
> >>> not
> >>>   only increase but also decrease if records come out of order. This can
> >>> lead
> >>>   to rewriting the state timestamp to smaller value which is unnatural
> >>> for
> >>>   the notion of time.
> >>>   - *Max event timestamp of records seen so far for this record key.*
> >>> This
> >>>   option is similar to the previous one but it tries to fix the notion of
> >>>   time to make it always increasing. Maintaining this timestamp has also
> >>>   performance implications because the previous timestamp needs to be
> >>> read
> >>>   out to decide whether to rewrite it.
> >>>   - *Last emitted watermark*. This is what we usually use for other
> >>>   operations to trigger some actions in Flink, like timers and windows
> >>> but it
> >>>   can be unrelated to the record which actually triggers the state
> >>> update.
> >>> 
> >>> *Current timestamp to check expiration:*
> >>> 
> >>>   - *Event timestamp of last processed record.* Again quite simple but
> >>>   unpredictable option for out-of-order events. It can potentially lead
> >>> to
> >>>   undesirable expiration of late buffered data in state without control.
> >>>   - *Max event timestamp of records seen so far for operator backend.*
> >>> Again
> >>>   similar to previous one, more stable but still user does not have too
> >>> much
> >>>   control when to expire state.
> >>>   - *Last emitted watermark*. Again, this is what we usually use for
> >>> other
> >>>   operations to trigger some actions in Flink, like timers and windows.
> >>> It
> >>>   also gives user some control to decide when state is expired (up to
> >>> which
> >>>   point in event time) by emitting certain watermark. It is more
> >>> flexible but
> >>>   complicated. If some watermark emitting strategy is already used for
> >>> other
> >>>   operations, it might be not optimal for TTL and delay state cleanup.
> >>>   - *Current processing time.* This option is quite simple, It would mean
> >>>   that user just decides which timestamp to store but it will expire in
> >>> real
> >>>   time. For data privacy use case, it might be better because we want
> >>> state
> >>>   to be unavailable in particular real moment of time since the
> >>> associated
> >>>   piece of data was created in event time. For long term approximate
> >>> garbage
> >>>   collection, it might be not a problem as well. For quick expiration,
> >>> the
> >>>   time skew between event and processing time can lead again to premature
> >>>   deletion of late data and user cannot delay it.
> >>> 
> >>> We could also make this behaviour configurable. Another option is to make
> >>> time provider pluggable for users. The interface can give users context
> >>> (currently processed record, watermark etc) and ask them which timestamp
> >>> to
> >>> use. This is more complicated though.
> >>> 
> >>> Looking forward for your feedback.
> >>> 
> >>> Best,
> >>> Andrey
> >>> 
> >>> [1] https://issues.apache.org/jira/browse/FLINK-12005 
> >>> <https://issues.apache.org/jira/browse/FLINK-12005>
> >>> [2]
> >>> 
> >>> https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM
> >>>  
> >>> <https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM>
> >>> [3]
> >>> 
> >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-TTL-in-Flink-1-6-0-td22509.html
> >>>  
> >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-TTL-in-Flink-1-6-0-td22509.html>
> >>> 
> >> 
> > 
> > -- 
> > 
> > Konstantin Knauf | Solutions Architect
> > 
> > +49 160 91394525
> > 
> > <https://www.ververica.com/ <https://www.ververica.com/>>
> > 
> > Follow us @VervericaData
> > 
> > --
> > 
> > Join Flink Forward <https://flink-forward.org/ 
> > <https://flink-forward.org/>> - The Apache Flink
> > Conference
> > 
> > Stream Processing | Event Driven | Real Time
> > 
> > --
> > 
> > Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > 
> > --
> > Data Artisans GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
> 

Reply via email to