Hi Juho,

As Aljoscha mentioned the current TTL implementation was mostly targeted to 
data privacy applications
where only processing time matters.

I think the event time can be also useful for TTL and should address your 
concerns. 
The event time extension is on the road map for the future Flink releases.

Cheers,
Andrey

> On 22 Aug 2018, at 11:57, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi Juho,
> 
> The main motivation for the initial implementation of TTL was compliance with 
> new GDPR rules. I.e. data cannot be accessible and must be dropped according 
> to time in the real world, i.e. processing time. The behaviour you describe, 
> with data being dropped if you keep a savepoint for too long, is actually 
> what is required for this use case.
> 
> I do see that also having this for event time can also be useful and it might 
> get implemented in the future. Maybe Stefan can chime in here.
> 
> Best,
> Aljoscha
> 
>> On 22. Aug 2018, at 11:01, Chesnay Schepler <ches...@apache.org> wrote:
>> 
>> Just a quick note for the docs: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl
>> 
>> On 22.08.2018 10:53, Juho Autio wrote:
>>> First, I couldn't find anything about State TTL in Flink docs, is there 
>>> anything like that? I can manage based on Javadocs & source code, but just 
>>> wondering.
>>> 
>>> Then to main main question, why doesn't the TTL support event time, and is 
>>> there any sensible use case for the TTL if the streaming charateristic of 
>>> my job is event time?
>>> 
>>> I have a job that is cleaning up old entries from a keyed MapState by 
>>> calling registerEventTimeTimer & implementing the onTimer method. This way 
>>> I can keep the state for a certain time in _event time_.
>>> 
>>> That's more complicated code than it would have to be, so I wanted to 
>>> convert by function to use Flink's own state TTL. I started writing this:
>>> 
>>>       MapStateDescriptor<String, String> stateDesc = new 
>>> MapStateDescriptor<>(
>>>               "deviceState", String.class, String.class);
>>>       StateTtlConfig ttlConfig = StateTtlConfig
>>> .newBuilder(Time.milliseconds(stateRetentionMillis))
>>>               // TODO EventTime is not supported?
>>> .setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime)
>>>               .build();
>>>       stateDesc.enableTimeToLive(ttlConfig);
>>> 
>>> So, I realized that ProcessingTime is the only existing TimeCharacteristic 
>>> in StateTtlConfig.
>>> 
>>> Based on some comments in Flink tickets it seems that it was a conscious 
>>> choice, because supporting EventTime TTL would be much heavier:
>>> 
>>> https://issues.apache.org/jira/browse/FLINK-3089?focusedCommentId=16318013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16318013
>>> 
>>> So I can't exactly match the current behaviour that guarantees to keep the 
>>> state available for 24 hours (or whatever is passed as 
>>> --stateRetentionMillis).
>>> 
>>> However, if we accept the restriction and switch to processing time in 
>>> state cleanup, what does it mean?
>>> 
>>> - As long as stream keeps up with the input rate (from kafka), there's no 
>>> big difference, because 24 hours in processing time ~= 24 hours in even 
>>> time.
>>> - If the stream is lagging behind a lot, then it would be possible that the 
>>> state is cleaned "too early". However we aim at not having a lot of lag, so 
>>> this is not a real issue – job would be scaled up to catch up before it 
>>> starts lagging too much to get misses because of cleared state. Still, if 
>>> we fail to scale up quickly enough, the state might be cleared too early 
>>> and cause real trouble.
>>> - One problem is that if the stream is quickly processing a long backlog 
>>> (say, start streaming 7 days back in event time), then the state size can 
>>> temporarily grow bigger than usual – maybe this wouldn't be a big problem, 
>>> but it could at least require extraneous upscaling of resources.
>>> - After restoring from a savepoint, the processing time on the state is as 
>>> much older than what was the time of downtime due to job restart. Even this 
>>> is not a huge issue as long as the deployment downtime is short compared to 
>>> the 24 hour TTL.
>>> 
>>> Any way, all these issues combined, I'm a bit confused on the whole TTL 
>>> feature. Can it be used in event time based streaming in any sensible way? 
>>> It seems like it would be more like a cache then, and can't be relied on 
>>> well enough.
>>> 
>>> Thanks.
>>> 
>>> Juho
>> 
>> 
> 

Reply via email to