[jira] [Commented] (SPARK-31794) Incorrect distribution with repartitionByRange and repartition column expression
[ https://issues.apache.org/jira/browse/SPARK-31794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810476#comment-17810476 ] Sean R. Owen commented on SPARK-31794: -- Not that it helps, but I observe the same behavior, and it must be a bug. The result is exactly as expected, except that the first partition is 2x the size of the others, and the last partition is empty. I tried tweaks to identify the problem (i.e. what if I increased the desired partition count by 1? added 1 to my indices?) but didn't help. I don't have any fix or further insight, just adding that yeah this seems to be a problem > Incorrect distribution with repartitionByRange and repartition column > expression > > > Key: SPARK-31794 > URL: https://issues.apache.org/jira/browse/SPARK-31794 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.2, 2.4.5, 3.0.1 > Environment: Sample code for obtaining the above test results. > import java.io.File > import java.io.PrintWriter > val logfile="/tmp/sparkdftest.log" > val writer = new PrintWriter(logfile) > writer.println("Spark Version " + sc.version) > val df= Range(1, 1002).toDF("val") > writer.println("Default Partition Length:" + df.rdd.partitions.length) > writer.println("Default Partition getNumPartitions:" + > df.rdd.getNumPartitions) > writer.println("Default Partition groupBy spark_partition_id:" + > df.groupBy(spark_partition_id).count().rdd.partitions.length) > val dfcount=df.mapPartitions\{part => Iterator(part.size)} > writer.println("Default Partition:" + dfcount.collect().toList) > val numparts=24 > val dfparts_range=df.withColumn("partid", $"val" % > numparts).repartitionByRange(numparts, $"partid") > writer.println("repartitionByRange Length:" + > dfparts_range.rdd.partitions.length) > writer.println("repartitionByRange getNumPartitions:" + > dfparts_range.rdd.getNumPartitions) > writer.println("repartitionByRange groupBy spark_partition_id:" + > dfparts_range.groupBy(spark_partition_id).count().rdd.partitions.length) > val dfpartscount=dfparts_range.mapPartitions\{part => Iterator(part.size)} > writer.println("repartitionByRange: " + dfpartscount.collect().toList) > val dfparts_expr=df.withColumn("partid", $"val" % > numparts).repartition(numparts, $"partid") > writer.println("repartition by column expr Length:" + > dfparts_expr.rdd.partitions.length) > writer.println("repartition by column expr getNumPartitions:" + > dfparts_expr.rdd.getNumPartitions) > writer.println("repartition by column expr groupBy spark_partitoin_id:" + > dfparts_expr.groupBy(spark_partition_id).count().rdd.partitions.length) > val dfpartscount=dfparts_expr.mapPartitions\{part => Iterator(part.size)} > writer.println("repartition by column expr:" + dfpartscount.collect().toList) > writer.close() >Reporter: Ramesha Bhatta >Priority: Major > Labels: performance > > Both repartitionByRange and repartition(, ) resulting in wrong > distribution within the resulting partition. > > In the Range partition one of the partition has 2x volume and last one with > zero. In repartition this is more problematic with some partition with 4x, > 2x the avg and many partitions with zero volume. > > This distribution imbalance can cause performance problem in a concurrent > environment. > Details from testing in 3 different versions. > |Verion 2.3.2|Version 2.4.5|Versoin 3.0 Preview2| > |Spark Version 2.3.2.3.1.4.0-315|Spark Version 2.4.5|Spark Version > 3.0.0-preview2| > |Default Partition Length:2|Default Partition Length:2|Default Partition > Length:80| > |Default Partition getNumPartitions:2|Default Partition > getNumPartitions:2|Default Partition getNumPartitions:80| > |Default Partition groupBy spark_partition_id:200|Default Partition groupBy > spark_partition_id:200|Default Partition groupBy spark_partition_id:200| > |repartitionByRange Length:24|repartitionByRange Length:24|repartitionByRange > Length:24| > |repartitionByRange getNumPartitions:24|repartitionByRange > getNumPartitions:24|repartitionByRange getNumPartitions:24| > |repartitionByRange groupBy spark_partition_id:200|repartitionByRange groupBy > spark_partition_id:200|repartitionByRange groupBy spark_partition_id:200| > |repartitionByRange: List(83, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, > 42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 0)|repartitionByRange: List(83, 42, > 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 41, 41, 41, 41, > 41, 41, 0)|repartitionByRange: List(83, 42, 42, 42, 42, 42, 42, 42, 42, 42, > 42, 42, 42, 42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 0)| > |repartition by column expr Length:24|repartition by column expr > Length:24|repartition by column expr Length:24| > |repartition by column
[jira] [Commented] (SPARK-31794) Incorrect distribution with repartitionByRange and repartition column expression
[ https://issues.apache.org/jira/browse/SPARK-31794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17116470#comment-17116470 ] Jungtaek Lim commented on SPARK-31794: -- http://spark.apache.org/docs/3.0.0-preview2/api/scala/org/apache/spark/sql/Dataset.html (The detailed explanation seem to be only added for 3.0.0 - I haven't indicated it's not addressed to Spark 2.4.x. My bad. That's just a doc issue and still be valid for all Spark 2.x though.) Please check the description of "repartition*" methods - please click on method name to expand the description. Given Spark describes the limitation of the repartitions it would be never a sort of bugs. Anyone is welcome to propose better solutions, but the new solutions should also take existing considerations into account. If you're fully understand about your data distribution then you'll want to get your hand dirty by custom partitioner - though it seems to be only available for RDD. > Incorrect distribution with repartitionByRange and repartition column > expression > > > Key: SPARK-31794 > URL: https://issues.apache.org/jira/browse/SPARK-31794 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.2, 2.4.5, 3.0.1 > Environment: Sample code for obtaining the above test results. > import java.io.File > import java.io.PrintWriter > val logfile="/tmp/sparkdftest.log" > val writer = new PrintWriter(logfile) > writer.println("Spark Version " + sc.version) > val df= Range(1, 1002).toDF("val") > writer.println("Default Partition Length:" + df.rdd.partitions.length) > writer.println("Default Partition getNumPartitions:" + > df.rdd.getNumPartitions) > writer.println("Default Partition groupBy spark_partition_id:" + > df.groupBy(spark_partition_id).count().rdd.partitions.length) > val dfcount=df.mapPartitions\{part => Iterator(part.size)} > writer.println("Default Partition:" + dfcount.collect().toList) > val numparts=24 > val dfparts_range=df.withColumn("partid", $"val" % > numparts).repartitionByRange(numparts, $"partid") > writer.println("repartitionByRange Length:" + > dfparts_range.rdd.partitions.length) > writer.println("repartitionByRange getNumPartitions:" + > dfparts_range.rdd.getNumPartitions) > writer.println("repartitionByRange groupBy spark_partition_id:" + > dfparts_range.groupBy(spark_partition_id).count().rdd.partitions.length) > val dfpartscount=dfparts_range.mapPartitions\{part => Iterator(part.size)} > writer.println("repartitionByRange: " + dfpartscount.collect().toList) > val dfparts_expr=df.withColumn("partid", $"val" % > numparts).repartition(numparts, $"partid") > writer.println("repartition by column expr Length:" + > dfparts_expr.rdd.partitions.length) > writer.println("repartition by column expr getNumPartitions:" + > dfparts_expr.rdd.getNumPartitions) > writer.println("repartition by column expr groupBy spark_partitoin_id:" + > dfparts_expr.groupBy(spark_partition_id).count().rdd.partitions.length) > val dfpartscount=dfparts_expr.mapPartitions\{part => Iterator(part.size)} > writer.println("repartition by column expr:" + dfpartscount.collect().toList) > writer.close() >Reporter: Ramesha Bhatta >Priority: Major > Labels: performance > > Both repartitionByRange and repartition(, ) resulting in wrong > distribution within the resulting partition. > > In the Range partition one of the partition has 2x volume and last one with > zero. In repartition this is more problematic with some partition with 4x, > 2x the avg and many partitions with zero volume. > > This distribution imbalance can cause performance problem in a concurrent > environment. > Details from testing in 3 different versions. > |Verion 2.3.2|Version 2.4.5|Versoin 3.0 Preview2| > |Spark Version 2.3.2.3.1.4.0-315|Spark Version 2.4.5|Spark Version > 3.0.0-preview2| > |Default Partition Length:2|Default Partition Length:2|Default Partition > Length:80| > |Default Partition getNumPartitions:2|Default Partition > getNumPartitions:2|Default Partition getNumPartitions:80| > |Default Partition groupBy spark_partition_id:200|Default Partition groupBy > spark_partition_id:200|Default Partition groupBy spark_partition_id:200| > |repartitionByRange Length:24|repartitionByRange Length:24|repartitionByRange > Length:24| > |repartitionByRange getNumPartitions:24|repartitionByRange > getNumPartitions:24|repartitionByRange getNumPartitions:24| > |repartitionByRange groupBy spark_partition_id:200|repartitionByRange groupBy > spark_partition_id:200|repartitionByRange groupBy spark_partition_id:200| > |repartitionByRange: List(83, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, > 42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 0)|repartitionByRange: List(83, 42, > 42, 42, 42,
[jira] [Commented] (SPARK-31794) Incorrect distribution with repartitionByRange and repartition column expression
[ https://issues.apache.org/jira/browse/SPARK-31794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17115638#comment-17115638 ] Ramesha Bhatta commented on SPARK-31794: Understand partitionByRange is doing sampling to estimate distinct values in the range, however in this small dataset, I have tried with increasing sample size to 2000 that is ~199% of the data and still the behavior is same indicating Bug. Note the first partition includes all data for data matching remainder 0 and 1 and last one with nodata. This is not the question of even distribution between partitions. Assume, if we are to partition/classify data by country & and apply specific logic, the above result is kind of mixing 2 different country data in one partition. For performance, the executor processing this partition has to do double work while the rest are done and thus performance depends on this partition with 2x volume. Hash Partition is not relevant for this kind of usage. Also noted, if the data is random order, then output is correct and problem with sequential/ordered data. This below is the only documentation, I can find. Is there any more detailed documentation available? h4. defrepartitionByRange(numPartitions: Int, partitionExprs: [Column|https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Column.html]*): [Dataset|https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html][T] [!https://spark.apache.org/docs/latest/api/scala/lib/permalink.png!|https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@repartitionByRange(numPartitions:Int,partitionExprs:org.apache.spark.sql.Column*):org.apache.spark.sql.Dataset[T]] Returns a new Dataset partitioned by the given partitioning expressions into {{numPartitions}}. The resulting Dataset is range partitioned. At least one partition-by expression must be specified. When no explicit sort order is specified, "ascending nulls first" is assumed. Note, the rows are not sorted in each partition of the resulting Dataset. Annotations @ varargs() Since 2.3.0 > Incorrect distribution with repartitionByRange and repartition column > expression > > > Key: SPARK-31794 > URL: https://issues.apache.org/jira/browse/SPARK-31794 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.2, 2.4.5, 3.0.1 > Environment: Sample code for obtaining the above test results. > import java.io.File > import java.io.PrintWriter > val logfile="/tmp/sparkdftest.log" > val writer = new PrintWriter(logfile) > writer.println("Spark Version " + sc.version) > val df= Range(1, 1002).toDF("val") > writer.println("Default Partition Length:" + df.rdd.partitions.length) > writer.println("Default Partition getNumPartitions:" + > df.rdd.getNumPartitions) > writer.println("Default Partition groupBy spark_partition_id:" + > df.groupBy(spark_partition_id).count().rdd.partitions.length) > val dfcount=df.mapPartitions\{part => Iterator(part.size)} > writer.println("Default Partition:" + dfcount.collect().toList) > val numparts=24 > val dfparts_range=df.withColumn("partid", $"val" % > numparts).repartitionByRange(numparts, $"partid") > writer.println("repartitionByRange Length:" + > dfparts_range.rdd.partitions.length) > writer.println("repartitionByRange getNumPartitions:" + > dfparts_range.rdd.getNumPartitions) > writer.println("repartitionByRange groupBy spark_partition_id:" + > dfparts_range.groupBy(spark_partition_id).count().rdd.partitions.length) > val dfpartscount=dfparts_range.mapPartitions\{part => Iterator(part.size)} > writer.println("repartitionByRange: " + dfpartscount.collect().toList) > val dfparts_expr=df.withColumn("partid", $"val" % > numparts).repartition(numparts, $"partid") > writer.println("repartition by column expr Length:" + > dfparts_expr.rdd.partitions.length) > writer.println("repartition by column expr getNumPartitions:" + > dfparts_expr.rdd.getNumPartitions) > writer.println("repartition by column expr groupBy spark_partitoin_id:" + > dfparts_expr.groupBy(spark_partition_id).count().rdd.partitions.length) > val dfpartscount=dfparts_expr.mapPartitions\{part => Iterator(part.size)} > writer.println("repartition by column expr:" + dfpartscount.collect().toList) > writer.close() >Reporter: Ramesha Bhatta >Priority: Major > Labels: performance > > Both repartitionByRange and repartition(, ) resulting in wrong > distribution within the resulting partition. > > In the Range partition one of the partition has 2x volume and last one with > zero. In repartition this is more problematic with some partition with 4x, > 2x the avg and many partitions with zero volume. > > This distribution
[jira] [Commented] (SPARK-31794) Incorrect distribution with repartitionByRange and repartition column expression
[ https://issues.apache.org/jira/browse/SPARK-31794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17115080#comment-17115080 ] Jungtaek Lim commented on SPARK-31794: -- Please read through the doc of these methods, which explain why the distribution may not be evenly. Only `Dataset.repartition(numPartition)` guarantees the rows are evenly distributed across partitions. It's neither a bug nor the thing which can be improved. (Unless we expose the interface to implement custom hash function.) > Incorrect distribution with repartitionByRange and repartition column > expression > > > Key: SPARK-31794 > URL: https://issues.apache.org/jira/browse/SPARK-31794 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.2, 2.4.5, 3.0.1 > Environment: Sample code for obtaining the above test results. > import java.io.File > import java.io.PrintWriter > val logfile="/tmp/sparkdftest.log" > val writer = new PrintWriter(logfile) > writer.println("Spark Version " + sc.version) > val df= Range(1, 1002).toDF("val") > writer.println("Default Partition Length:" + df.rdd.partitions.length) > writer.println("Default Partition getNumPartitions:" + > df.rdd.getNumPartitions) > writer.println("Default Partition groupBy spark_partition_id:" + > df.groupBy(spark_partition_id).count().rdd.partitions.length) > val dfcount=df.mapPartitions\{part => Iterator(part.size)} > writer.println("Default Partition:" + dfcount.collect().toList) > val numparts=24 > val dfparts_range=df.withColumn("partid", $"val" % > numparts).repartitionByRange(numparts, $"partid") > writer.println("repartitionByRange Length:" + > dfparts_range.rdd.partitions.length) > writer.println("repartitionByRange getNumPartitions:" + > dfparts_range.rdd.getNumPartitions) > writer.println("repartitionByRange groupBy spark_partition_id:" + > dfparts_range.groupBy(spark_partition_id).count().rdd.partitions.length) > val dfpartscount=dfparts_range.mapPartitions\{part => Iterator(part.size)} > writer.println("repartitionByRange: " + dfpartscount.collect().toList) > val dfparts_expr=df.withColumn("partid", $"val" % > numparts).repartition(numparts, $"partid") > writer.println("repartition by column expr Length:" + > dfparts_expr.rdd.partitions.length) > writer.println("repartition by column expr getNumPartitions:" + > dfparts_expr.rdd.getNumPartitions) > writer.println("repartition by column expr groupBy spark_partitoin_id:" + > dfparts_expr.groupBy(spark_partition_id).count().rdd.partitions.length) > val dfpartscount=dfparts_expr.mapPartitions\{part => Iterator(part.size)} > writer.println("repartition by column expr:" + dfpartscount.collect().toList) > writer.close() >Reporter: Ramesha Bhatta >Priority: Major > Labels: performance > > Both repartitionByRange and repartition(, ) resulting in wrong > distribution within the resulting partition. > > In the Range partition one of the partition has 2x volume and last one with > zero. In repartition this is more problematic with some partition with 4x, > 2x the avg and many partitions with zero volume. > > This distribution imbalance can cause performance problem in a concurrent > environment. > Details from testing in 3 different versions. > |Verion 2.3.2|Version 2.4.5|Versoin 3.0 Preview2| > |Spark Version 2.3.2.3.1.4.0-315|Spark Version 2.4.5|Spark Version > 3.0.0-preview2| > |Default Partition Length:2|Default Partition Length:2|Default Partition > Length:80| > |Default Partition getNumPartitions:2|Default Partition > getNumPartitions:2|Default Partition getNumPartitions:80| > |Default Partition groupBy spark_partition_id:200|Default Partition groupBy > spark_partition_id:200|Default Partition groupBy spark_partition_id:200| > |repartitionByRange Length:24|repartitionByRange Length:24|repartitionByRange > Length:24| > |repartitionByRange getNumPartitions:24|repartitionByRange > getNumPartitions:24|repartitionByRange getNumPartitions:24| > |repartitionByRange groupBy spark_partition_id:200|repartitionByRange groupBy > spark_partition_id:200|repartitionByRange groupBy spark_partition_id:200| > |repartitionByRange: List(83, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, > 42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 0)|repartitionByRange: List(83, 42, > 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 41, 41, 41, 41, > 41, 41, 0)|repartitionByRange: List(83, 42, 42, 42, 42, 42, 42, 42, 42, 42, > 42, 42, 42, 42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 0)| > |repartition by column expr Length:24|repartition by column expr > Length:24|repartition by column expr Length:24| > |repartition by column expr getNumPartitions:24|repartition by column expr > getNumPartitions:24|repartition by column expr