[ 
https://issues.apache.org/jira/browse/SPARK-31794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17115080#comment-17115080
 ] 

Jungtaek Lim commented on SPARK-31794:
--------------------------------------

Please read through the doc of these methods, which explain why the 
distribution may not be evenly. Only `Dataset.repartition(numPartition)` 
guarantees the rows are evenly distributed across partitions. It's neither a 
bug nor the thing which can be improved. (Unless we expose the interface to 
implement custom hash function.)

> Incorrect distribution with repartitionByRange and repartition column 
> expression
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-31794
>                 URL: https://issues.apache.org/jira/browse/SPARK-31794
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.2, 2.4.5, 3.0.1
>         Environment: Sample code for obtaining the above test results.
> import java.io.File 
> import java.io.PrintWriter 
> val logfile="/tmp/sparkdftest.log"
> val writer = new PrintWriter(logfile) 
> writer.println("Spark Version " + sc.version)
> val df= Range(1, 1002).toDF("val")
> writer.println("Default Partition Length:" + df.rdd.partitions.length)
> writer.println("Default Partition getNumPartitions:" + 
> df.rdd.getNumPartitions)
> writer.println("Default Partition groupBy spark_partition_id:" + 
> df.groupBy(spark_partition_id).count().rdd.partitions.length)
> val dfcount=df.mapPartitions\{part => Iterator(part.size)}
> writer.println("Default Partition:" + dfcount.collect().toList)
> val numparts=24
> val dfparts_range=df.withColumn("partid", $"val" % 
> numparts).repartitionByRange(numparts, $"partid")
> writer.println("repartitionByRange Length:" + 
> dfparts_range.rdd.partitions.length)
> writer.println("repartitionByRange getNumPartitions:" + 
> dfparts_range.rdd.getNumPartitions)
> writer.println("repartitionByRange groupBy spark_partition_id:" + 
> dfparts_range.groupBy(spark_partition_id).count().rdd.partitions.length)
> val dfpartscount=dfparts_range.mapPartitions\{part => Iterator(part.size)}
> writer.println("repartitionByRange: " + dfpartscount.collect().toList)
> val dfparts_expr=df.withColumn("partid", $"val" % 
> numparts).repartition(numparts, $"partid")
> writer.println("repartition by column expr Length:" + 
> dfparts_expr.rdd.partitions.length)
> writer.println("repartition by column expr getNumPartitions:" + 
> dfparts_expr.rdd.getNumPartitions)
> writer.println("repartition by column expr groupBy spark_partitoin_id:" + 
> dfparts_expr.groupBy(spark_partition_id).count().rdd.partitions.length)
> val dfpartscount=dfparts_expr.mapPartitions\{part => Iterator(part.size)}
> writer.println("repartition by column expr:" + dfpartscount.collect().toList)
> writer.close()
>            Reporter: Ramesha Bhatta
>            Priority: Major
>              Labels: performance
>
> Both repartitionByRange and  repartition(<num>, <column>)  resulting in wrong 
> distribution within the resulting partition.  
>  
> In the Range partition one of the partition has 2x volume and last one with 
> zero.  In repartition this is more problematic with some partition with 4x, 
> 2x the avg and many partitions with zero volume.  
>  
> This distribution imbalance can cause performance problem in a concurrent 
> environment.
> Details from testing in 3 different versions.
> |Verion 2.3.2|Version 2.4.5|Versoin 3.0 Preview2|
> |Spark Version 2.3.2.3.1.4.0-315|Spark Version 2.4.5|Spark Version 
> 3.0.0-preview2|
> |Default Partition Length:2|Default Partition Length:2|Default Partition 
> Length:80|
> |Default Partition getNumPartitions:2|Default Partition 
> getNumPartitions:2|Default Partition getNumPartitions:80|
> |Default Partition groupBy spark_partition_id:200|Default Partition groupBy 
> spark_partition_id:200|Default Partition groupBy spark_partition_id:200|
> |repartitionByRange Length:24|repartitionByRange Length:24|repartitionByRange 
> Length:24|
> |repartitionByRange getNumPartitions:24|repartitionByRange 
> getNumPartitions:24|repartitionByRange getNumPartitions:24|
> |repartitionByRange groupBy spark_partition_id:200|repartitionByRange groupBy 
> spark_partition_id:200|repartitionByRange groupBy spark_partition_id:200|
> |repartitionByRange: List(83, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 
> 42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 0)|repartitionByRange: List(83, 42, 
> 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 41, 41, 41, 41, 
> 41, 41, 0)|repartitionByRange: List(83, 42, 42, 42, 42, 42, 42, 42, 42, 42, 
> 42, 42, 42, 42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 0)|
> |repartition by column expr Length:24|repartition by column expr 
> Length:24|repartition by column expr Length:24|
> |repartition by column expr getNumPartitions:24|repartition by column expr 
> getNumPartitions:24|repartition by column expr getNumPartitions:24|
> |repartition by column expr groupBy spark_partitoin_id:200|repartition by 
> column expr groupBy spark_partitoin_id:200|repartition by column expr groupBy 
> spark_partitoin_id:200|
> |repartition by column expr:List(83, 42, 0, 84, 0, 42, 125, 0, 42, 84, 0, 42, 
> 0, 82, 0, 124, 42, 83, 84, 42, 0, 0, 0, 0)|repartition by column 
> expr:List(83, 42, 0, 84, 0, 42, 125, 0, 42, 84, 0, 42, 0, 82, 0, 124, 42, 83, 
> 84, 42, 0, 0, 0, 0)|repartition by column expr:List(83, 42, 0, 84, 0, 42, 
> 125, 0, 42, 84, 0, 42, 0, 82, 0, 124, 42, 83, 84, 42, 0, 0, 0, 0)|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to