[ https://issues.apache.org/jira/browse/SPARK-31794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17115080#comment-17115080 ]
Jungtaek Lim commented on SPARK-31794: -------------------------------------- Please read through the doc of these methods, which explain why the distribution may not be evenly. Only `Dataset.repartition(numPartition)` guarantees the rows are evenly distributed across partitions. It's neither a bug nor the thing which can be improved. (Unless we expose the interface to implement custom hash function.) > Incorrect distribution with repartitionByRange and repartition column > expression > -------------------------------------------------------------------------------- > > Key: SPARK-31794 > URL: https://issues.apache.org/jira/browse/SPARK-31794 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.3.2, 2.4.5, 3.0.1 > Environment: Sample code for obtaining the above test results. > import java.io.File > import java.io.PrintWriter > val logfile="/tmp/sparkdftest.log" > val writer = new PrintWriter(logfile) > writer.println("Spark Version " + sc.version) > val df= Range(1, 1002).toDF("val") > writer.println("Default Partition Length:" + df.rdd.partitions.length) > writer.println("Default Partition getNumPartitions:" + > df.rdd.getNumPartitions) > writer.println("Default Partition groupBy spark_partition_id:" + > df.groupBy(spark_partition_id).count().rdd.partitions.length) > val dfcount=df.mapPartitions\{part => Iterator(part.size)} > writer.println("Default Partition:" + dfcount.collect().toList) > val numparts=24 > val dfparts_range=df.withColumn("partid", $"val" % > numparts).repartitionByRange(numparts, $"partid") > writer.println("repartitionByRange Length:" + > dfparts_range.rdd.partitions.length) > writer.println("repartitionByRange getNumPartitions:" + > dfparts_range.rdd.getNumPartitions) > writer.println("repartitionByRange groupBy spark_partition_id:" + > dfparts_range.groupBy(spark_partition_id).count().rdd.partitions.length) > val dfpartscount=dfparts_range.mapPartitions\{part => Iterator(part.size)} > writer.println("repartitionByRange: " + dfpartscount.collect().toList) > val dfparts_expr=df.withColumn("partid", $"val" % > numparts).repartition(numparts, $"partid") > writer.println("repartition by column expr Length:" + > dfparts_expr.rdd.partitions.length) > writer.println("repartition by column expr getNumPartitions:" + > dfparts_expr.rdd.getNumPartitions) > writer.println("repartition by column expr groupBy spark_partitoin_id:" + > dfparts_expr.groupBy(spark_partition_id).count().rdd.partitions.length) > val dfpartscount=dfparts_expr.mapPartitions\{part => Iterator(part.size)} > writer.println("repartition by column expr:" + dfpartscount.collect().toList) > writer.close() > Reporter: Ramesha Bhatta > Priority: Major > Labels: performance > > Both repartitionByRange and repartition(<num>, <column>) resulting in wrong > distribution within the resulting partition. > > In the Range partition one of the partition has 2x volume and last one with > zero. In repartition this is more problematic with some partition with 4x, > 2x the avg and many partitions with zero volume. > > This distribution imbalance can cause performance problem in a concurrent > environment. > Details from testing in 3 different versions. > |Verion 2.3.2|Version 2.4.5|Versoin 3.0 Preview2| > |Spark Version 2.3.2.3.1.4.0-315|Spark Version 2.4.5|Spark Version > 3.0.0-preview2| > |Default Partition Length:2|Default Partition Length:2|Default Partition > Length:80| > |Default Partition getNumPartitions:2|Default Partition > getNumPartitions:2|Default Partition getNumPartitions:80| > |Default Partition groupBy spark_partition_id:200|Default Partition groupBy > spark_partition_id:200|Default Partition groupBy spark_partition_id:200| > |repartitionByRange Length:24|repartitionByRange Length:24|repartitionByRange > Length:24| > |repartitionByRange getNumPartitions:24|repartitionByRange > getNumPartitions:24|repartitionByRange getNumPartitions:24| > |repartitionByRange groupBy spark_partition_id:200|repartitionByRange groupBy > spark_partition_id:200|repartitionByRange groupBy spark_partition_id:200| > |repartitionByRange: List(83, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, > 42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 0)|repartitionByRange: List(83, 42, > 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 41, 41, 41, 41, > 41, 41, 0)|repartitionByRange: List(83, 42, 42, 42, 42, 42, 42, 42, 42, 42, > 42, 42, 42, 42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 0)| > |repartition by column expr Length:24|repartition by column expr > Length:24|repartition by column expr Length:24| > |repartition by column expr getNumPartitions:24|repartition by column expr > getNumPartitions:24|repartition by column expr getNumPartitions:24| > |repartition by column expr groupBy spark_partitoin_id:200|repartition by > column expr groupBy spark_partitoin_id:200|repartition by column expr groupBy > spark_partitoin_id:200| > |repartition by column expr:List(83, 42, 0, 84, 0, 42, 125, 0, 42, 84, 0, 42, > 0, 82, 0, 124, 42, 83, 84, 42, 0, 0, 0, 0)|repartition by column > expr:List(83, 42, 0, 84, 0, 42, 125, 0, 42, 84, 0, 42, 0, 82, 0, 124, 42, 83, > 84, 42, 0, 0, 0, 0)|repartition by column expr:List(83, 42, 0, 84, 0, 42, > 125, 0, 42, 84, 0, 42, 0, 82, 0, 124, 42, 83, 84, 42, 0, 0, 0, 0)| -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org