[
https://issues.apache.org/jira/browse/FLINK-24639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Cranmer updated FLINK-24639:
----------------------------------
Fix Version/s: 1.15.0
> Improve assignment of Kinesis shards to subtasks
> ------------------------------------------------
>
> Key: FLINK-24639
> URL: https://issues.apache.org/jira/browse/FLINK-24639
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kinesis
> Reporter: John Karp
> Assignee: John Karp
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.15.0
>
> Attachments: Screen Shot 2021-10-25 at 5.11.29 PM.png
>
>
> The default assigner of Kinesis shards to Flink subtasks simply takes the
> hashCode() of the StreamShardHandle (an integer), which is then interpreted
> modulo the number of subtasks. This basically does random-ish but
> deterministic assignment of shards to subtasks.
> However, this can lead to some subtasks getting several times the number of
> shards as others. To prevent those unlucky subtasks from being overloaded,
> the overall Flink cluster must be over-provisioned, so that each subtask has
> more headroom to handle any over-assignment of shards.
> We can do better here, at least if Kinesis is being used in a common way.
> Each record sent to a Kinesis stream has a particular hash key in the range
> [0, 2^128), which is used to determine which shard gets used; each shard has
> an assigned range of hash keys. By default Kinesis assigns each shard equal
> fractions of the hash-key space. And when you scale up or down using
> UpdateShardCount, it tries to maintain equal fractions to the extent
> possible. Also, a shard's hash key range is fixed at creation; it can only be
> replaced by new shards, which split it, or merge it.
> Given the above, one way to assign shards to subtasks is to do a linear
> mapping from hash-keys in range [0, 2^128) to subtask indices in [0,
> nSubtasks). For the 'coordinate' of each shard we pick the middle of the
> shard's range, to ensure neither subtask 0 nor subtask (n-1) is assigned too
> many.
> However this will probably not be helpful for Kinesis users that don't
> randomly assign partition or hash keys to Kinesis records. The existing
> assigner is probably better for them.
> I ran a simulation of the default shard assigner versus some alternatives,
> using shards taken from one of our Kinesis streams; results attached. The
> measure I used I call 'overload' and it measures how many times more shards
> the most heavily-loaded subtask has than is necessary. (DEFAULT is the
> default assigner, Sha256 is similar to the default but with a stronger
> hashing function, ShardId extracts the shard number from the shardId and uses
> that, and HashKey is the one I describe above.)
> Patch is at:
> https://github.com/apache/flink/compare/master...john-karp:uniform-shard-assigner?expand=1
--
This message was sent by Atlassian Jira
(v8.20.1#820001)