Thanks a bunch!

>For example, the Flink Kafka source operator's parallel instances maintain
as operator state a mapping of partitions to offsets for the partitions
that it is assigned to.

This I think clarifies things. This is literally state for the operator to
do its job, not really row data. The Table API/SQL will use "Keyed State"
for rows entirely separately.

Thanks!

On Mon, Sep 7, 2020 at 11:51 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi!
>
> Operator state is bound to a single parallel operator instance; there is
> no partitioning happening here.
> It is typically used in Flink source and sink operators. For example, the
> Flink Kafka source operator's parallel instances maintain as operator state
> a mapping of partitions to offsets for the partitions that it is assigned
> to. For state like these, it isn't partitionable by any key associated with
> an input DataStream.
>
> Since there is no partitioning scheme, redistribution of the state on
> operator rescale also happens differently compared to keyed state.
> Take for example a ListState; in contrast to a keyed ListState, an
> Operator ListState is a collection of state items that are independent from
> each other and eligible for redistribution across operator instances in the
> event of a rescale (by default, Flink uses simple round-robin for the
> redistribution).
> In other words, the list entries are the finest granularity at which the
> operator state can be redistributed, and should not be correlated with each
> other since each entry of the list may end up in different parallel
> operator instances on rescale.
>
> In general, there should rarely be a need to use operator state for
> typical user applications. It isn't massively scalable and usually is small
> in size.
>
> Cheers,
> Gordon
>
> On Sat, Sep 5, 2020 at 12:26 AM Rex Fenley <r...@remind101.com> wrote:
>
>> This is so helpful, thank you!
>>
>> So just to clarify (3), Operator state has a partitioning scheme, but
>> it's simply not by key, it's something else that's special under-the-hood?
>> In which case, what data is stored in an Operator? I assumed it must be the
>> input data for e.g. a join, so that it can react efficiently to any data
>> changes in the stream and recombine only what has actually changed. Is this
>> correct?
>>
>> On Fri, Sep 4, 2020 at 1:20 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> On Fri, Sep 4, 2020 at 1:37 PM Rex Fenley <r...@remind101.com> wrote:
>>>
>>>> Hello!
>>>>
>>>> I've been digging into State Storage documentation, but it's left me
>>>> scratching my head with a few questions. Any help will be much appreciated.
>>>>
>>>> Qs:
>>>> 1. Is there a way to use RocksDB state backend for Flink on AWS EMR?
>>>> Possibly with S3 backed savepoints for recovery (or maybe hdfs for
>>>> savepoints?)? Only documentation related to AWS I can find makes it look
>>>> like AWS must use the S3 File System state backend and not RocksDB at all.
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html
>>>>
>>>
>>> I think there's some misunderstanding of the role of RocksDB vs
>>> filesystems for fault-tolerance here.
>>> RocksDB is a state backend option that manages user state out-of-core,
>>> and is managed by the Flink runtime. Users do not need to separately manage
>>> RocksDB instances.
>>> For persistence of that state as checkpoints / savepoints for
>>> fault-tolerance, you may choose the commonly used filesystems like S3 /
>>> HDFS.
>>>
>>> See [1] for how to configure your job to use RocksDBStateBackend as the
>>> runtime state backend and configuring a filesystem path for persistence.
>>>
>>>
>>>>
>>>> 2. Does the FS state backend not compact? I thought everything in Flink
>>>> was stored as key/value. In which case, why would the last n values for a
>>>> key need to stick around, or how would they?
>>>> > An incremental checkpoint builds upon (typically multiple) previous
>>>> checkpoints. Flink leverages RocksDB’s internal compaction mechanism in a
>>>> way that is self-consolidating over time. As a result, the incremental
>>>> checkpoint history in Flink does not grow indefinitely, and old checkpoints
>>>> are eventually subsumed and pruned automatically.
>>>>
>>>>
>>> The sentence that you quote simply states how Flink leverages RocksDB's
>>> background compaction of SSTables to ensure that incremental checkpoints
>>> don't grow indefinitely in size.
>>> This has nothing to do with the FsStateBackend, as incremental
>>> checkpointing isn't supported there.
>>>
>>> Just as a clarification as there might be some other misunderstanding
>>> here:
>>> The difference between FsStateBackend v.s. RocksDBStateBackend is the
>>> state backend being used to maintain local state at runtime.
>>> RocksDBStateBackend obviously uses RocksDB, while the FsStateBackend
>>> uses in-memory hash maps. For persistence, both are checkpointed to a
>>> filesystem for fault-tolerance.
>>> The naming may be a bit confusing, so just wanted to clarify that here
>>> in case that may have caused any confusion with the questions above.
>>>
>>>
>>>> 3. In the docs, Operators are referred to as non-keyed state, yet,
>>>> Operators have IDs that they are keyed by, so why are they referred to as
>>>> non-keyed state?
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
>>>>
>>>>
>>> Operator state is referred to as non-keyed state because it is not
>>> co-partitioned with the stream by key and not values are not bound to
>>> single key (i.e. when you access keyed state, the access is bound to a
>>> single key), and have different schemes for repartitioning when operators
>>> are scaled up or down.
>>> The operator IDs you referred to are simply a unique ID to identify the
>>> same operators across different executions of the same job. I'm not sure
>>> what you mean by "operators have IDs that are keyed by"; those IDs are not
>>> used in any partitioning operation.
>>>
>>>
>>>
>>>> 4. For the Table API / SQL are primary keys and join keys automatically
>>>> used as the keys for state under the hood?
>>>>
>>>
>>> Yes.
>>>
>>>
>>>>
>>>> Lastly
>>>> 5. Is there a way to estimate roughly how much disk space state storage
>>>> will take per operation?
>>>>
>>>>
>>> Thanks again!
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>> Cheers,
>>> Gordon
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#configuring-a-state-backend
>>>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to