Thanks a bunch! >For example, the Flink Kafka source operator's parallel instances maintain as operator state a mapping of partitions to offsets for the partitions that it is assigned to.
This I think clarifies things. This is literally state for the operator to do its job, not really row data. The Table API/SQL will use "Keyed State" for rows entirely separately. Thanks! On Mon, Sep 7, 2020 at 11:51 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi! > > Operator state is bound to a single parallel operator instance; there is > no partitioning happening here. > It is typically used in Flink source and sink operators. For example, the > Flink Kafka source operator's parallel instances maintain as operator state > a mapping of partitions to offsets for the partitions that it is assigned > to. For state like these, it isn't partitionable by any key associated with > an input DataStream. > > Since there is no partitioning scheme, redistribution of the state on > operator rescale also happens differently compared to keyed state. > Take for example a ListState; in contrast to a keyed ListState, an > Operator ListState is a collection of state items that are independent from > each other and eligible for redistribution across operator instances in the > event of a rescale (by default, Flink uses simple round-robin for the > redistribution). > In other words, the list entries are the finest granularity at which the > operator state can be redistributed, and should not be correlated with each > other since each entry of the list may end up in different parallel > operator instances on rescale. > > In general, there should rarely be a need to use operator state for > typical user applications. It isn't massively scalable and usually is small > in size. > > Cheers, > Gordon > > On Sat, Sep 5, 2020 at 12:26 AM Rex Fenley <r...@remind101.com> wrote: > >> This is so helpful, thank you! >> >> So just to clarify (3), Operator state has a partitioning scheme, but >> it's simply not by key, it's something else that's special under-the-hood? >> In which case, what data is stored in an Operator? I assumed it must be the >> input data for e.g. a join, so that it can react efficiently to any data >> changes in the stream and recombine only what has actually changed. Is this >> correct? >> >> On Fri, Sep 4, 2020 at 1:20 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org> >> wrote: >> >>> Hi, >>> >>> On Fri, Sep 4, 2020 at 1:37 PM Rex Fenley <r...@remind101.com> wrote: >>> >>>> Hello! >>>> >>>> I've been digging into State Storage documentation, but it's left me >>>> scratching my head with a few questions. Any help will be much appreciated. >>>> >>>> Qs: >>>> 1. Is there a way to use RocksDB state backend for Flink on AWS EMR? >>>> Possibly with S3 backed savepoints for recovery (or maybe hdfs for >>>> savepoints?)? Only documentation related to AWS I can find makes it look >>>> like AWS must use the S3 File System state backend and not RocksDB at all. >>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html >>>> >>> >>> I think there's some misunderstanding of the role of RocksDB vs >>> filesystems for fault-tolerance here. >>> RocksDB is a state backend option that manages user state out-of-core, >>> and is managed by the Flink runtime. Users do not need to separately manage >>> RocksDB instances. >>> For persistence of that state as checkpoints / savepoints for >>> fault-tolerance, you may choose the commonly used filesystems like S3 / >>> HDFS. >>> >>> See [1] for how to configure your job to use RocksDBStateBackend as the >>> runtime state backend and configuring a filesystem path for persistence. >>> >>> >>>> >>>> 2. Does the FS state backend not compact? I thought everything in Flink >>>> was stored as key/value. In which case, why would the last n values for a >>>> key need to stick around, or how would they? >>>> > An incremental checkpoint builds upon (typically multiple) previous >>>> checkpoints. Flink leverages RocksDB’s internal compaction mechanism in a >>>> way that is self-consolidating over time. As a result, the incremental >>>> checkpoint history in Flink does not grow indefinitely, and old checkpoints >>>> are eventually subsumed and pruned automatically. >>>> >>>> >>> The sentence that you quote simply states how Flink leverages RocksDB's >>> background compaction of SSTables to ensure that incremental checkpoints >>> don't grow indefinitely in size. >>> This has nothing to do with the FsStateBackend, as incremental >>> checkpointing isn't supported there. >>> >>> Just as a clarification as there might be some other misunderstanding >>> here: >>> The difference between FsStateBackend v.s. RocksDBStateBackend is the >>> state backend being used to maintain local state at runtime. >>> RocksDBStateBackend obviously uses RocksDB, while the FsStateBackend >>> uses in-memory hash maps. For persistence, both are checkpointed to a >>> filesystem for fault-tolerance. >>> The naming may be a bit confusing, so just wanted to clarify that here >>> in case that may have caused any confusion with the questions above. >>> >>> >>>> 3. In the docs, Operators are referred to as non-keyed state, yet, >>>> Operators have IDs that they are keyed by, so why are they referred to as >>>> non-keyed state? >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids >>>> >>>> >>> Operator state is referred to as non-keyed state because it is not >>> co-partitioned with the stream by key and not values are not bound to >>> single key (i.e. when you access keyed state, the access is bound to a >>> single key), and have different schemes for repartitioning when operators >>> are scaled up or down. >>> The operator IDs you referred to are simply a unique ID to identify the >>> same operators across different executions of the same job. I'm not sure >>> what you mean by "operators have IDs that are keyed by"; those IDs are not >>> used in any partitioning operation. >>> >>> >>> >>>> 4. For the Table API / SQL are primary keys and join keys automatically >>>> used as the keys for state under the hood? >>>> >>> >>> Yes. >>> >>> >>>> >>>> Lastly >>>> 5. Is there a way to estimate roughly how much disk space state storage >>>> will take per operation? >>>> >>>> >>> Thanks again! >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >>> Cheers, >>> Gordon >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#configuring-a-state-backend >>> >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>