[ 
https://issues.apache.org/jira/browse/FLINK-21548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17337814#comment-17337814
 ] 

Kenneth William Krugler commented on FLINK-21548:
-------------------------------------------------

Hi [~izeigerman] - it would be good if you closed this issue as "Not a bug". If 
you're still looking for input from the community on how best to solve this 
issue, I'd suggest asking the question on the Flink user mailing list, thanks.

> keyBy operation produces skewed record distribution for low-cardinality keys
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-21548
>                 URL: https://issues.apache.org/jira/browse/FLINK-21548
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, Runtime / Task
>    Affects Versions: 1.11.0, 1.12.1
>            Reporter: Iaroslav Zeigerman
>            Priority: Minor
>              Labels: auto-deprioritized-major
>         Attachments: Screen Shot 2021-03-01 at 10.52.31 AM.png, Screen Shot 
> 2021-03-01 at 10.54.42 AM.png, Screen Shot 2021-03-01 at 10.57.33 AM.png
>
>
> When the cardinality of keys matches the existing parallelism not all 
> downstream tasks are utilized in the downstream operator. Even those that are 
> utilized are not utilized evenly.
> For example if I have 500 unique keys [0, 500) only 313 downstream tasks (out 
> of 500) will receive any records at all. 
> *NOTE*: for all examples below 1 million record instances were used.
> This behavior can easily be reproduced with the following test case:
> {code:scala}
> import org.apache.flink.runtime.state.KeyGroupRangeAssignment
> object Test {
>   val parallelism = 500
>   val recordsNum  = 1000000
>   def run(): Unit = {
>     val recordIds = (0 to recordsNum).map(_ % parallelism)
>     val tasks     = recordIds.map(selectTask)
>     println(s"Total unique keys: ${recordIds.toSet.size}")
>     println(s"Key distribution: 
> ${recordIds.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
>     println("=======================")
>     println(s"Tasks involved: ${tasks.toSet.size}")
>     println(s"Record distribution by task: 
> ${tasks.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
>   }
>   def selectTask(key: Int): Int =
>     KeyGroupRangeAssignment.assignToKeyGroup(
>       key,
>       parallelism
>     )
> }
> {code}
> Which produces the following results:
> {noformat}
> Total unique keys: 500
> Key distribution: Vector((0,2001), (69,2000), ..., (232,2000), (100,2000))
> =======================
> Tasks involved: 313
> Record distribution by task: Vector((147,10000), (248,10000), ..., 
> (232,2000), (100,2000))
> {noformat}
> Record distribution visualized:
>  !Screen Shot 2021-03-01 at 10.52.31 AM.png!
> I have determined that in order to achieve the utilization of all tasks the 
> number of unique keys should be at least 5 times of the parallelism value. 
> The relation between number of unique keys and a fraction of utilized tasks 
> appears to be exponential:
>  !Screen Shot 2021-03-01 at 10.54.42 AM.png!  
> But with 5x number of keys the skew is still quite significant:
> !Screen Shot 2021-03-01 at 10.57.33 AM.png!
> Given that keys used in my test are integer values for which `hashCode` 
> returns the value itself I tend to believe that the skew is caused by the 
> Flink's murmur hash implementation which is used 
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to