[ 
https://issues.apache.org/jira/browse/FLINK-24639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17435061#comment-17435061
 ] 

Danny Cranmer commented on FLINK-24639:
---------------------------------------

Thanks [~jkarp], your analysis is great! I have been wondering how to better 
distribute shards across subtasks and did not consider this. I would be 
inclined to add this implementation to the connector code base and allow users 
to optionally pick this one instead of the default, what are your thoughts? 
Would you be willing to open a PR for that, I am happy to perform a code review 
and merge for you. Thanks

> Improve assignment of Kinesis shards to subtasks
> ------------------------------------------------
>
>                 Key: FLINK-24639
>                 URL: https://issues.apache.org/jira/browse/FLINK-24639
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kinesis
>            Reporter: John Karp
>            Priority: Major
>         Attachments: Screen Shot 2021-10-25 at 5.11.29 PM.png
>
>
> The default assigner of Kinesis shards to Flink subtasks simply takes the 
> hashCode() of the StreamShardHandle (an integer), which is then interpreted 
> modulo the number of subtasks. This basically does random-ish but 
> deterministic assignment of shards to subtasks.
> However, this can lead to some subtasks getting several times the number of 
> shards as others. To prevent those unlucky subtasks from being overloaded, 
> the overall Flink cluster must be over-provisioned, so that each subtask has 
> more headroom to handle any over-assignment of shards.
> We can do better here, at least if Kinesis is being used in a common way. 
> Each record sent to a Kinesis stream has a particular hash key in the range 
> [0, 2^128), which is used to determine which shard gets used; each shard has 
> an assigned range of hash keys. By default Kinesis assigns each shard equal 
> fractions of the hash-key space. And when you scale up or down using 
> UpdateShardCount, it tries to maintain equal fractions to the extent 
> possible. Also, a shard's hash key range is fixed at creation; it can only be 
> replaced by new shards, which split it, or merge it.
> Given the above, one way to assign shards to subtasks is to do a linear 
> mapping from hash-keys in range [0, 2^128) to subtask indices in [0, 
> nSubtasks). For the 'coordinate' of each shard we pick the middle of the 
> shard's range, to ensure neither subtask 0 nor subtask (n-1) is assigned too 
> many.
> However this will probably not be helpful for Kinesis users that don't 
> randomly assign partition or hash keys to Kinesis records. The existing 
> assigner is probably better for them.
> I ran a simulation of the default shard assigner versus some alternatives, 
> using shards taken from one of our Kinesis streams; results attached. The 
> measure I used I call 'overload' and it measures how many times more shards 
> the most heavily-loaded subtask has than is necessary. (DEFAULT is the 
> default assigner, Sha256 is similar to the default but with a stronger 
> hashing function, ShardId extracts the shard number from the shardId and uses 
> that, and HashKey is the one I describe above.)
> Patch is at: 
> https://github.com/apache/flink/compare/master...john-karp:uniform-shard-assigner?expand=1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to