> On Jul 22, 2016, at 2:54 AM, Josh <jof...@gmail.com> wrote:
> 
> Hi all,
> 
> >(1)  Only write to the DB upon a checkpoint, at which point it is known that 
> >no replay of that data will occur any more. Values from partially successful 
> >writes will be overwritten >with correct value. I assume that is what you 
> >thought of when referring to the State Backend, because in some sense, that 
> >is what that state backend would do.

I feel the problem is about how to commit all snapshots as a transaction. 
Partial writes pose cleanup challenges when job restore. 
A easy hack would be treat Rocksdb as cache and keep states updates there. Aka 
aemanifest. do cleanup check before actual restore.


> 
> >I think it is simpler to realize that in a custom sink, than developing a 
> >new state backend.  Another Flink committer (Chesnay) has developed some 
> >nice tooling for that, to >be merged into Flink soon. 
> 
> I am planning to implement something like this:
> 
> Say I have a topology which looks like this: [source => operator => sink], I 
> would like it to work like this:
> 1. Upon receiving an element, the operator retrieves some state from an 
> external key-value store (would like to put an in-memory cache on top of this 
> with a TTL)
> 2. The operator emits a new state (and updates its in-memory cache with the 
> new state)
> 3. The sink batches up all the new states and upon checkpoint flushes them to 
> the external store
> 
> Could anyone point me at the work that's already been done on this? Has it 
> already been merged into Flink?
> 
> Thanks,
> Josh
> 
>> On Thu, Apr 7, 2016 at 12:53 PM, Aljoscha Krettek <aljos...@apache.org> 
>> wrote:
>> Hi,
>> regarding windows and incremental aggregation. This is already happening in 
>> Flink as of now. When you give a ReduceFunction on a window, which "sum" 
>> internally does, the result for a window is incrementally updated whenever a 
>> new element comes in. This incremental aggregation only happens when you 
>> specify a ReduceFunction or a FoldFunction, not for the general case of a 
>> WindowFunction, where all elements in the window are required.
>> 
>> You are right about incremental snapshots. We mainly want to introduce them 
>> to reduce latency incurred by snapshotting. Right now, processing stalls 
>> when a checkpoint happens.
>> 
>> Cheers,
>> Aljoscha
>> 
>>> On Thu, 7 Apr 2016 at 13:12 Shannon Carey <sca...@expedia.com> wrote:
>>> Thanks very kindly for your response, Stephan!
>>> 
>>> We will definitely use a custom sink for persistence of idempotent 
>>> mutations whenever possible. Exposing state as read-only to external 
>>> systems is a complication we will try to avoid. Also, we will definitely 
>>> only write to the DB upon checkpoint, and the write will be synchronous and 
>>> transactional (no possibility of partial success/failure).
>>> 
>>> However, we do want Flink state to be durable, we want it to be in memory 
>>> when possible, and we want to avoid running out of memory due to the size 
>>> of the state. For example, if you have a wide window that hasn't gotten an 
>>> event for a long time, we want to evict that window state from memory. 
>>> We're now thinking of using Redis (via AWS Elasticache) which also 
>>> conveniently has TTL, instead of DynamoDB.
>>> 
>>> I just wanted to check whether eviction of (inactive/quiet) state from 
>>> memory is something that I should consider implementing, or whether Flink 
>>> already had some built-in way of doing it.
>>> 
>>> Along the same lines, I am also wondering whether Flink already has means 
>>> of compacting the state of a window by applying an aggregation function to 
>>> the elements so-far (eg. every time window is triggered)? For example, if 
>>> you are only executing a sum on the contents of the window, the window 
>>> state doesn't need to store all the individual items in the window, it only 
>>> needs to store the sum. Aggregations other than "sum" might have that 
>>> characteristic too. I don't know if Flink is already that intelligent or 
>>> whether I should figure out how to aggregate window contents myself when 
>>> possible with something like a window fold? Another poster (Aljoscha) was 
>>> talking about adding incremental snapshots, but it sounds like that would 
>>> only improve the write throughput not the memory usage.
>>> 
>>> Thanks again!
>>> Shannon Carey
>>> 
>>> 
>>> From: Stephan Ewen <se...@apache.org>
>>> Date: Wednesday, April 6, 2016 at 10:37 PM
>>> To: <user@flink.apache.org>
>>> Subject: Re: State in external db (dynamodb)
>>> 
>>> Hi Shannon!
>>> 
>>> Welcome to the Flink community!
>>> 
>>> You are right, sinks need in general to be idempotent if you want 
>>> "exactly-once" semantics, because there can be a replay of elements that 
>>> were already written.
>>> 
>>> However, what you describe later, overwriting of a key with a new value (or 
>>> the same value again) is pretty much sufficient. That means that when a 
>>> duplicate write happens during replay, the value for the key is simply 
>>> overwritten with the same value again.
>>> As long as all computation is purely in Flink and you only write to the 
>>> key/value store (rather than read from k/v, modify in Flink, write to k/v), 
>>> you get the consistency that for example counts/aggregates never have 
>>> duplicates.
>>> 
>>> If Flink needs to look up state from the database (because it is no longer 
>>> in Flink), it is a bit more tricky. I assume that is where you are going 
>>> with "Subsequently, when an event is processed, we must be able to quickly 
>>> load up any evicted state".  In that case, there are two things you can do:
>>> 
>>> (1)  Only write to the DB upon a checkpoint, at which point it is known 
>>> that no replay of that data will occur any more. Values from partially 
>>> successful writes will be overwritten with correct value. I assume that is 
>>> what you thought of when referring to the State Backend, because in some 
>>> sense, that is what that state backend would do.
>>> 
>>> I think it is simpler to realize that in a custom sink, than developing a 
>>> new state backend.  Another Flink committer (Chesnay) has developed some 
>>> nice tooling for that, to be merged into Flink soon. 
>>> 
>>> (2) You could attach version numbers to every write, and increment the 
>>> versions upon each checkpoint. That allows you to always refer to a 
>>> consistent previous value, if some writes were made, but a failure occurred 
>>> before the checkpoint completed.
>>> 
>>> I hope these answers apply to your case. Let us know if some things are 
>>> still unclear, or if I misunderstood your question!
>>> 
>>> 
>>> Greetings,
>>> Stephan
>>> 
>>> 
>>> 
>>>> On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever 
>>>> <sanne.de.roe...@gmail.com> wrote:
>>>> FYI Cassandra has a TTL on data: 
>>>> https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html
>>>> 
>>>>> On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey <sca...@expedia.com> wrote:
>>>>> Hi, new Flink user here!
>>>>> 
>>>>> I found a discussion on user@flink.apache.org about using DynamoDB as a 
>>>>> sink. However, as noted, sinks have an at-least-once guarantee so your 
>>>>> operations must idempotent.
>>>>> 
>>>>> However, another way to go about this (and correct me if I'm wrong) is to 
>>>>> write the state to the external store via a custom State Backend. Since 
>>>>> the state participates in checkpointing, you don't have to worry about 
>>>>> idempotency: every time state is checkpointed, overwrite the value of 
>>>>> that key.
>>>>> 
>>>>> We are starting a project with Flink, and we are interested in evicting 
>>>>> the state from memory once a TTL is reached during which no events have 
>>>>> come in for that state. Subsequently, when an event is processed, we must 
>>>>> be able to quickly load up any evicted state. Does this sound reasonable? 
>>>>> We are considering using DynamoDB for our state backend because it seems 
>>>>> like all we will need is a key-value store. The only weakness of this is 
>>>>> that if state gets older than, say, 2 years we would like to get rid of 
>>>>> it which might not be easy in DynamoDB. I don't suppose Flink has any 
>>>>> behind-the-scenes features that deal with getting rid of old state 
>>>>> (either evicting from memory or TTL/aging out entirely)?
>>>>> 
>>>>> Thanks for your time!
>>>>> Shannon Carey
>>>> 
>>> 
> 

Reply via email to