This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 5fe895a65a4 [SPARK-40660][SQL][3.3] Switch to XORShiftRandom to distribute elements 5fe895a65a4 is described below commit 5fe895a65a4a9d65f81d43af473b5e3a855ed8c8 Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Thu Oct 6 13:02:22 2022 +0800 [SPARK-40660][SQL][3.3] Switch to XORShiftRandom to distribute elements ### What changes were proposed in this pull request? Cherry-picked from #38106 and reverted changes in RDD.scala: https://github.com/apache/spark/blob/d2952b671a3579759ad9ce326ed8389f5270fd9f/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L507 ### Why are the changes needed? The number of output files has changed since SPARK-40407. [Some downstream projects](https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java#L578-L579) use repartition to determine the number of output files in the test. ``` bin/spark-shell --master "local[2]" spark.range(10).repartition(10).write.mode("overwrite").parquet("/tmp/spark/repartition") ``` Before this PR and after SPARK-40407, the number of output files is 8. After this PR or before SPARK-40407, the number of output files is 10. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #38110 from wangyum/branch-3.3-SPARK-40660. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Yuming Wang <yumw...@ebay.com> --- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 5 ++--- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 14 +++++++++++++- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 4 ++-- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 9800a781402..964f1d6d518 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -17,11 +17,9 @@ package org.apache.spark.sql.execution.exchange -import java.util.Random import java.util.function.Supplier import scala.concurrent.Future -import scala.util.hashing import org.apache.spark._ import org.apache.spark.internal.config @@ -40,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.MutablePair import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator} +import org.apache.spark.util.random.XORShiftRandom /** * Common trait for all shuffle exchange implementations to facilitate pattern matching. @@ -314,7 +313,7 @@ object ShuffleExchangeExec { // end up being almost the same regardless of the index. substantially scrambling the // seed by hashing will help. Refer to SPARK-21782 for more details. val partitionId = TaskContext.get().partitionId() - var position = new Random(hashing.byteswap32(partitionId)).nextInt(numPartitions) + var position = new XORShiftRandom(partitionId).nextInt(numPartitions) (row: InternalRow) => { // The HashPartitioner will handle the `mod` by the number of partitions position += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index c65ae966ef6..f5e736621eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} +import org.apache.hadoop.fs.{Path, PathFilter} import org.scalatest.Assertions._ import org.scalatest.exceptions.TestFailedException import org.scalatest.prop.TableDrivenPropertyChecks._ @@ -2138,7 +2139,18 @@ class DatasetSuite extends QueryTest test("SPARK-40407: repartition should not result in severe data skew") { val df = spark.range(0, 100, 1, 50).repartition(4) val result = df.mapPartitions(iter => Iterator.single(iter.length)).collect() - assert(result.sorted.toSeq === Seq(19, 25, 25, 31)) + assert(result.sorted.toSeq === Seq(23, 25, 25, 27)) + } + + test("SPARK-40660: Switch to XORShiftRandom to distribute elements") { + withTempDir { dir => + spark.range(10).repartition(10).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + val fs = new Path(dir.getAbsolutePath).getFileSystem(spark.sessionState.newHadoopConf()) + val parquetFiles = fs.listStatus(new Path(dir.getAbsolutePath), new PathFilter { + override def accept(path: Path): Boolean = path.getName.endsWith("parquet") + }) + assert(parquetFiles.size === 10) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 8aaafd05217..0055b94fa06 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -2090,8 +2090,8 @@ class AdaptiveQueryExecSuite withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "150") { // partition size [0,258,72,72,72] checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 2, 4) - // partition size [144,72,144,216,144] - checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 2, 6) + // partition size [144,72,144,72,72,144,72] + checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 6, 7) } // no skewed partition should be optimized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org