Hi Chen,

Can you explain what you mean a bit more? I'm not sure I understand the
problem.

Does anyone know if the tooling discussed here has been merged into Flink
already? Or if there's an example of what this custom sink would look like?
I guess the sink would buffer updates in-memory between checkpoints. Then
it would implement the Checkpointed interface and write to the external
store in snapshotState(...)?

Thanks,
Josh

On Sun, Jul 24, 2016 at 6:00 PM, Chen Qin <qinnc...@gmail.com> wrote:

>
>
> On Jul 22, 2016, at 2:54 AM, Josh <jof...@gmail.com> wrote:
>
> Hi all,
>
> >(1)  Only write to the DB upon a checkpoint, at which point it is known
> that no replay of that data will occur any more. Values from partially
> successful writes will be overwritten >with correct value. I assume that is
> what you thought of when referring to the State Backend, because in some 
> sense,
> that is what that state backend would do.
>
>
> I feel the problem is about how to commit all snapshots as a transaction.
> Partial writes pose cleanup challenges when job restore.
> A easy hack would be treat Rocksdb as cache and keep states updates there.
> Aka aemanifest. do cleanup check before actual restore.
>
>
>
> >I think it is simpler to realize that in a custom sink, than developing
>  a new state backend.  Another Flink committer (Chesnay) has developed
> some nice tooling for that, to >be merged into Flink soon.
>
> I am planning to implement something like this:
>
> Say I have a topology which looks like this: [source => operator => sink],
> I would like it to work like this:
> 1. Upon receiving an element, the operator retrieves some state from an
> external key-value store (would like to put an in-memory cache on top of
> this with a TTL)
> 2. The operator emits a new state (and updates its in-memory cache with
> the new state)
> 3. The sink batches up all the new states and upon checkpoint flushes them
> to the external store
>
> Could anyone point me at the work that's already been done on this? Has it
> already been merged into Flink?
>
> Thanks,
> Josh
>
> On Thu, Apr 7, 2016 at 12:53 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> regarding windows and incremental aggregation. This is already happening
>> in Flink as of now. When you give a ReduceFunction on a window, which "sum"
>> internally does, the result for a window is incrementally updated whenever
>> a new element comes in. This incremental aggregation only happens when you
>> specify a ReduceFunction or a FoldFunction, not for the general case of a
>> WindowFunction, where all elements in the window are required.
>>
>> You are right about incremental snapshots. We mainly want to introduce
>> them to reduce latency incurred by snapshotting. Right now, processing
>> stalls when a checkpoint happens.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 7 Apr 2016 at 13:12 Shannon Carey <sca...@expedia.com> wrote:
>>
>>> Thanks very kindly for your response, Stephan!
>>>
>>> We will definitely use a custom sink for persistence of idempotent
>>> mutations whenever possible. Exposing state as read-only to external
>>> systems is a complication we will try to avoid. Also, we will definitely
>>> only write to the DB upon checkpoint, and the write will be synchronous and
>>> transactional (no possibility of partial success/failure).
>>>
>>> However, we do want Flink state to be durable, we want it to be in
>>> memory when possible, and we want to avoid running out of memory due to the
>>> size of the state. For example, if you have a wide window that hasn't
>>> gotten an event for a long time, we want to evict that window state from
>>> memory. We're now thinking of using Redis (via AWS Elasticache) which also
>>> conveniently has TTL, instead of DynamoDB.
>>>
>>> I just wanted to check whether eviction of (inactive/quiet) state from
>>> memory is something that I should consider implementing, or whether Flink
>>> already had some built-in way of doing it.
>>>
>>> Along the same lines, I am also wondering whether Flink already has
>>> means of compacting the state of a window by applying an aggregation
>>> function to the elements so-far (eg. every time window is triggered)? For
>>> example, if you are only executing a sum on the contents of the window, the
>>> window state doesn't need to store all the individual items in the window,
>>> it only needs to store the sum. Aggregations other than "sum" might have
>>> that characteristic too. I don't know if Flink is already that intelligent
>>> or whether I should figure out how to aggregate window contents myself when
>>> possible with something like a window fold? Another poster (Aljoscha) was
>>> talking about adding incremental snapshots, but it sounds like that would
>>> only improve the write throughput not the memory usage.
>>>
>>> Thanks again!
>>> Shannon Carey
>>>
>>>
>>> From: Stephan Ewen <se...@apache.org>
>>> Date: Wednesday, April 6, 2016 at 10:37 PM
>>> To: <user@flink.apache.org>
>>> Subject: Re: State in external db (dynamodb)
>>>
>>> Hi Shannon!
>>>
>>> Welcome to the Flink community!
>>>
>>> You are right, sinks need in general to be idempotent if you want
>>> "exactly-once" semantics, because there can be a replay of elements that
>>> were already written.
>>>
>>> However, what you describe later, overwriting of a key with a new value
>>> (or the same value again) is pretty much sufficient. That means that when a
>>> duplicate write happens during replay, the value for the key is simply
>>> overwritten with the same value again.
>>> As long as all computation is purely in Flink and you only write to the
>>> key/value store (rather than read from k/v, modify in Flink, write to k/v),
>>> you get the consistency that for example counts/aggregates never have
>>> duplicates.
>>>
>>> If Flink needs to look up state from the database (because it is no
>>> longer in Flink), it is a bit more tricky. I assume that is where you are
>>> going with "Subsequently, when an event is processed, we must be able
>>> to quickly load up any evicted state".  In that case, there are two
>>> things you can do:
>>>
>>> (1)  Only write to the DB upon a checkpoint, at which point it is known
>>> that no replay of that data will occur any more. Values from partially
>>> successful writes will be overwritten with correct value. I assume that is
>>> what you thought of when referring to the State Backend, because in some
>>> sense, that is what that state backend would do.
>>>
>>> I think it is simpler to realize that in a custom sink, than developing
>>> a new state backend.  Another Flink committer (Chesnay) has developed
>>> some nice tooling for that, to be merged into Flink soon.
>>>
>>> (2) You could attach version numbers to every write, and increment the
>>> versions upon each checkpoint. That allows you to always refer to a
>>> consistent previous value, if some writes were made, but a failure occurred
>>> before the checkpoint completed.
>>>
>>> I hope these answers apply to your case. Let us know if some things are
>>> still unclear, or if I misunderstood your question!
>>>
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>>
>>> On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever <
>>> sanne.de.roe...@gmail.com> wrote:
>>>
>>>> FYI Cassandra has a TTL on data:
>>>> https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html
>>>>
>>>> On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey <sca...@expedia.com>
>>>> wrote:
>>>>
>>>>> Hi, new Flink user here!
>>>>>
>>>>> I found a discussion on user@flink.apache.org about using DynamoDB as
>>>>> a sink. However, as noted, sinks have an at-least-once guarantee so your
>>>>> operations must idempotent.
>>>>>
>>>>> However, another way to go about this (and correct me if I'm wrong) is
>>>>> to write the state to the external store via a custom State Backend. Since
>>>>> the state participates in checkpointing, you don't have to worry about
>>>>> idempotency: every time state is checkpointed, overwrite the value of that
>>>>> key.
>>>>>
>>>>> We are starting a project with Flink, and we are interested in
>>>>> evicting the state from memory once a TTL is reached during which no 
>>>>> events
>>>>> have come in for that state. Subsequently, when an event is processed, we
>>>>> must be able to quickly load up any evicted state. Does this sound
>>>>> reasonable? We are considering using DynamoDB for our state backend 
>>>>> because
>>>>> it seems like all we will need is a key-value store. The only weakness of
>>>>> this is that if state gets older than, say, 2 years we would like to get
>>>>> rid of it which might not be easy in DynamoDB. I don't suppose Flink has
>>>>> any behind-the-scenes features that deal with getting rid of old state
>>>>> (either evicting from memory or TTL/aging out entirely)?
>>>>>
>>>>> Thanks for your time!
>>>>> Shannon Carey
>>>>>
>>>>
>>>>
>>>
>

Reply via email to