[ 
https://issues.apache.org/jira/browse/SPARK-31794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17116470#comment-17116470
 ] 

Jungtaek Lim commented on SPARK-31794:
--------------------------------------

http://spark.apache.org/docs/3.0.0-preview2/api/scala/org/apache/spark/sql/Dataset.html
(The detailed explanation seem to be only added for 3.0.0 - I haven't indicated 
it's not addressed to Spark 2.4.x. My bad. That's just a doc issue and still be 
valid for all Spark 2.x though.)

Please check the description of "repartition*" methods - please click on method 
name to expand the description.

Given Spark describes the limitation of the repartitions it would be never a 
sort of bugs. Anyone is welcome to propose better solutions, but the new 
solutions should also take existing considerations into account.

If you're fully understand about your data distribution then you'll want to get 
your hand dirty by custom partitioner - though it seems to be only available 
for RDD.

> Incorrect distribution with repartitionByRange and repartition column 
> expression
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-31794
>                 URL: https://issues.apache.org/jira/browse/SPARK-31794
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.2, 2.4.5, 3.0.1
>         Environment: Sample code for obtaining the above test results.
> import java.io.File 
> import java.io.PrintWriter 
> val logfile="/tmp/sparkdftest.log"
> val writer = new PrintWriter(logfile) 
> writer.println("Spark Version " + sc.version)
> val df= Range(1, 1002).toDF("val")
> writer.println("Default Partition Length:" + df.rdd.partitions.length)
> writer.println("Default Partition getNumPartitions:" + 
> df.rdd.getNumPartitions)
> writer.println("Default Partition groupBy spark_partition_id:" + 
> df.groupBy(spark_partition_id).count().rdd.partitions.length)
> val dfcount=df.mapPartitions\{part => Iterator(part.size)}
> writer.println("Default Partition:" + dfcount.collect().toList)
> val numparts=24
> val dfparts_range=df.withColumn("partid", $"val" % 
> numparts).repartitionByRange(numparts, $"partid")
> writer.println("repartitionByRange Length:" + 
> dfparts_range.rdd.partitions.length)
> writer.println("repartitionByRange getNumPartitions:" + 
> dfparts_range.rdd.getNumPartitions)
> writer.println("repartitionByRange groupBy spark_partition_id:" + 
> dfparts_range.groupBy(spark_partition_id).count().rdd.partitions.length)
> val dfpartscount=dfparts_range.mapPartitions\{part => Iterator(part.size)}
> writer.println("repartitionByRange: " + dfpartscount.collect().toList)
> val dfparts_expr=df.withColumn("partid", $"val" % 
> numparts).repartition(numparts, $"partid")
> writer.println("repartition by column expr Length:" + 
> dfparts_expr.rdd.partitions.length)
> writer.println("repartition by column expr getNumPartitions:" + 
> dfparts_expr.rdd.getNumPartitions)
> writer.println("repartition by column expr groupBy spark_partitoin_id:" + 
> dfparts_expr.groupBy(spark_partition_id).count().rdd.partitions.length)
> val dfpartscount=dfparts_expr.mapPartitions\{part => Iterator(part.size)}
> writer.println("repartition by column expr:" + dfpartscount.collect().toList)
> writer.close()
>            Reporter: Ramesha Bhatta
>            Priority: Major
>              Labels: performance
>
> Both repartitionByRange and  repartition(<num>, <column>)  resulting in wrong 
> distribution within the resulting partition.  
>  
> In the Range partition one of the partition has 2x volume and last one with 
> zero.  In repartition this is more problematic with some partition with 4x, 
> 2x the avg and many partitions with zero volume.  
>  
> This distribution imbalance can cause performance problem in a concurrent 
> environment.
> Details from testing in 3 different versions.
> |Verion 2.3.2|Version 2.4.5|Versoin 3.0 Preview2|
> |Spark Version 2.3.2.3.1.4.0-315|Spark Version 2.4.5|Spark Version 
> 3.0.0-preview2|
> |Default Partition Length:2|Default Partition Length:2|Default Partition 
> Length:80|
> |Default Partition getNumPartitions:2|Default Partition 
> getNumPartitions:2|Default Partition getNumPartitions:80|
> |Default Partition groupBy spark_partition_id:200|Default Partition groupBy 
> spark_partition_id:200|Default Partition groupBy spark_partition_id:200|
> |repartitionByRange Length:24|repartitionByRange Length:24|repartitionByRange 
> Length:24|
> |repartitionByRange getNumPartitions:24|repartitionByRange 
> getNumPartitions:24|repartitionByRange getNumPartitions:24|
> |repartitionByRange groupBy spark_partition_id:200|repartitionByRange groupBy 
> spark_partition_id:200|repartitionByRange groupBy spark_partition_id:200|
> |repartitionByRange: List(83, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 
> 42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 0)|repartitionByRange: List(83, 42, 
> 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 41, 41, 41, 41, 
> 41, 41, 0)|repartitionByRange: List(83, 42, 42, 42, 42, 42, 42, 42, 42, 42, 
> 42, 42, 42, 42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 0)|
> |repartition by column expr Length:24|repartition by column expr 
> Length:24|repartition by column expr Length:24|
> |repartition by column expr getNumPartitions:24|repartition by column expr 
> getNumPartitions:24|repartition by column expr getNumPartitions:24|
> |repartition by column expr groupBy spark_partitoin_id:200|repartition by 
> column expr groupBy spark_partitoin_id:200|repartition by column expr groupBy 
> spark_partitoin_id:200|
> |repartition by column expr:List(83, 42, 0, 84, 0, 42, 125, 0, 42, 84, 0, 42, 
> 0, 82, 0, 124, 42, 83, 84, 42, 0, 0, 0, 0)|repartition by column 
> expr:List(83, 42, 0, 84, 0, 42, 125, 0, 42, 84, 0, 42, 0, 82, 0, 124, 42, 83, 
> 84, 42, 0, 0, 0, 0)|repartition by column expr:List(83, 42, 0, 84, 0, 42, 
> 125, 0, 42, 84, 0, 42, 0, 82, 0, 124, 42, 83, 84, 42, 0, 0, 0, 0)|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to