This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e6bebb66651 [SPARK-40660][CORE][SQL] Switch to XORShiftRandom to 
distribute elements
e6bebb66651 is described below

commit e6bebb66651a1ff06f821bd4ee2b7b52bd532c01
Author: Yuming Wang <yumw...@ebay.com>
AuthorDate: Wed Oct 5 14:00:55 2022 +0800

    [SPARK-40660][CORE][SQL] Switch to XORShiftRandom to distribute elements
    
    ### What changes were proposed in this pull request?
    
    This PR replaces `Random(hashing.byteswap32(index))` with 
`XORShiftRandom(index)` to distribute elements evenly across output partitions.
    
    ### Why are the changes needed?
    
    It seems that the distribution using `XORShiftRandom` is better. For 
example:
    
    1. The number of output files has changed since SPARK-40407. [Some 
downstream 
projects](https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java#L578-L579)
 use repartition to determine the number of output files in the test.
       ```
       bin/spark-shell --master "local[2]"
       
spark.range(10).repartition(10).write.mode("overwrite").parquet("/tmp/spark/repartition")
       ```
       Before this PR and after SPARK-40407, the number of output files is 8. 
After this PR or before  SPARK-40407, the number of output files is 10.
    
    2. The distribution using `XORShiftRandom` seem better.
       ```scala
       import java.util.Random
       import org.apache.spark.util.random.XORShiftRandom
       import scala.util.hashing
    
       def distribution(count: Int, partition: Int) = {
         println((1 to count).map(partitionId => new 
Random(partitionId).nextInt(partition))
           .groupBy(f => f)
           .map(_._2.size).mkString(". "))
    
         println((1 to count).map(partitionId => new 
Random(hashing.byteswap32(partitionId)).nextInt(partition))
           .groupBy(f => f)
           .map(_._2.size).mkString(". "))
    
         println((1 to count).map(partitionId => new 
XORShiftRandom(partitionId).nextInt(partition))
           .groupBy(f => f)
           .map(_._2.size).mkString(". "))
       }
    
       distribution(200, 4)
       ```
       The output:
       ```
       200
       50. 60. 46. 44
       55. 48. 43. 54
       ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #38106 from wangyum/SPARK-40660.
    
    Authored-by: Yuming Wang <yumw...@ebay.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 core/src/main/scala/org/apache/spark/rdd/RDD.scala         |  5 ++---
 .../spark/sql/execution/exchange/ShuffleExchangeExec.scala |  5 ++---
 .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 14 +++++++++++++-
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala    |  4 ++--
 4 files changed, 19 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index d12804fc12b..18f3f87f30f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -25,7 +25,6 @@ import scala.io.Codec
 import scala.language.implicitConversions
 import scala.ref.WeakReference
 import scala.reflect.{classTag, ClassTag}
-import scala.util.hashing
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
 import org.apache.hadoop.io.{BytesWritable, NullWritable, Text}
@@ -50,7 +49,7 @@ import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap,
   Utils => collectionUtils}
 import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, 
PoissonSampler,
-  SamplingUtils}
+  SamplingUtils, XORShiftRandom}
 
 /**
  * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 
Represents an immutable,
@@ -505,7 +504,7 @@ abstract class RDD[T: ClassTag](
     if (shuffle) {
       /** Distributes elements evenly across output partitions, starting from 
a random partition. */
       val distributePartition = (index: Int, items: Iterator[T]) => {
-        var position = new 
Random(hashing.byteswap32(index)).nextInt(numPartitions)
+        var position = new XORShiftRandom(index).nextInt(numPartitions)
         items.map { t =>
           // Note that the hash code of the key will just be the key itself. 
The HashPartitioner
           // will mod it with the number of total partitions.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 6f287028f74..806a048b244 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -17,11 +17,9 @@
 
 package org.apache.spark.sql.execution.exchange
 
-import java.util.Random
 import java.util.function.Supplier
 
 import scala.concurrent.Future
-import scala.util.hashing
 
 import org.apache.spark._
 import org.apache.spark.internal.config
@@ -40,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.MutablePair
 import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, 
RecordComparator}
+import org.apache.spark.util.random.XORShiftRandom
 
 /**
  * Common trait for all shuffle exchange implementations to facilitate pattern 
matching.
@@ -307,7 +306,7 @@ object ShuffleExchangeExec {
         // end up being almost the same regardless of the index. substantially 
scrambling the
         // seed by hashing will help. Refer to SPARK-21782 for more details.
         val partitionId = TaskContext.get().partitionId()
-        var position = new 
Random(hashing.byteswap32(partitionId)).nextInt(numPartitions)
+        var position = new XORShiftRandom(partitionId).nextInt(numPartitions)
         (row: InternalRow) => {
           // The HashPartitioner will handle the `mod` by the number of 
partitions
           position += 1
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 7dc44b76e03..48b434fbb3a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import java.io.{Externalizable, ObjectInput, ObjectOutput}
 import java.sql.{Date, Timestamp}
 
+import org.apache.hadoop.fs.{Path, PathFilter}
 import org.scalatest.Assertions._
 import org.scalatest.exceptions.TestFailedException
 import org.scalatest.prop.TableDrivenPropertyChecks._
@@ -2169,7 +2170,18 @@ class DatasetSuite extends QueryTest
   test("SPARK-40407: repartition should not result in severe data skew") {
     val df = spark.range(0, 100, 1, 50).repartition(4)
     val result = df.mapPartitions(iter => 
Iterator.single(iter.length)).collect()
-    assert(result.sorted.toSeq === Seq(19, 25, 25, 31))
+    assert(result.sorted.toSeq === Seq(23, 25, 25, 27))
+  }
+
+  test("SPARK-40660: Switch to XORShiftRandom to distribute elements") {
+    withTempDir { dir =>
+      
spark.range(10).repartition(10).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+      val fs = new 
Path(dir.getAbsolutePath).getFileSystem(spark.sessionState.newHadoopConf())
+      val parquetFiles = fs.listStatus(new Path(dir.getAbsolutePath), new 
PathFilter {
+        override def accept(path: Path): Boolean = 
path.getName.endsWith("parquet")
+      })
+      assert(parquetFiles.size === 10)
+    }
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index fbaa6fea60b..81bce35a584 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -2127,8 +2127,8 @@ class AdaptiveQueryExecSuite
         withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "150") {
           // partition size [0,258,72,72,72]
           checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 2, 4)
-          // partition size [144,72,144,216,144]
-          checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 2, 6)
+          // partition size [144,72,144,72,72,144,72]
+          checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 6, 7)
         }
 
         // no skewed partition should be optimized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to