Ok, I did a bit of a test that shows that the shuffle does spill to memory then to disk if my assertion is valid.
The sample code I wrote is as follows: import sys from pyspark.sql import SparkSession from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql import functions as F from pyspark.sql.functions import * from pyspark.sql.types import StructType, StructField, StringType,IntegerType, FloatType, TimestampType import time def main(): appName = "skew" spark = SparkSession.builder.appName(appName).getOrCreate() spark_context = SparkContext.getOrCreate() spark_context.setLogLevel("ERROR") df_uniform = spark.createDataFrame([i for i in range(10000000)], IntegerType()) df_uniform = df_uniform.withColumn("partitionId", spark_partition_id()) print("Number of Partitions: "+str(df_uniform.rdd.getNumPartitions())) df_uniform.groupby([df_uniform.partitionId]).count().sort(df_uniform.partitionId).show() df_uniform.alias("left").join(df_uniform.alias("right"),"value", "inner").count() print(f"""Spark.sql.shuffle.partitions is {spark.conf.get("spark.sql.shuffle.partitions")}""") df0 = spark.createDataFrame([0] * 9999998, IntegerType()).repartition(1) df1 = spark.createDataFrame([1], IntegerType()).repartition(1) df2 = spark.createDataFrame([2], IntegerType()).repartition(1) df_skew = df0.union(df1).union(df2) df_skew = df_skew.withColumn("partitionId", spark_partition_id()) ## If we apply the same function call again, we get what we want to see for the one partition with much more data than the other two. df_skew.groupby([df_skew.partitionId]).count().sort(df_skew.partitionId).show() ## simulate reading to first round robin distribute the key #df_skew = df_skew.repartition(3) df_skew.join(df_uniform.select("value"),"value", "inner").count() # salt range is from 1 to spark.conf.get("spark.sql.shuffle.partitions") df_left = df_skew.withColumn("salt", (rand() * spark.conf.get("spark.sql.shuffle.partitions")).cast("int")).show() df_right = df_uniform.withColumn("salt_temp", array([lit(i) for i in range(int(spark.conf.get("spark.sql.shuffle.partitions")))])).show() time.sleep(60) # Pause if __name__ == "__main__": main() The PySpark code file is attached There is a 60 sec wait at the end to allow one to examine Spark UI Run it with meager memory and cores bin/spark-submit --master local[1] --driver-memory 550M skew.py This is the run results Number of Partitions: 1 +-----------+--------+ |partitionId| count| +-----------+--------+ | 0|10000000| +-----------+--------+ Spark.sql.shuffle.partitions is 200 *+-----------+-------+|partitionId| count|+-----------+-------+| 0|9999998|| 1| 1|* *| 2| 1|+-----------+-------+* +-----+-----------+----+ |value|partitionId|salt| +-----+-----------+----+ | 0| 0| 89| | 0| 0| 56| | 0| 0| 169| | 0| 0| 130| | 0| 0| 94| +-----+-----------+----+ only showing top 5 rows +-----+-----------+--------------------+ |value|partitionId| salt_temp| +-----+-----------+--------------------+ | 0| 0|[0, 1, 2, 3, 4, 5...| | 1| 0|[0, 1, 2, 3, 4, 5...| | 2| 0|[0, 1, 2, 3, 4, 5...| | 3| 0|[0, 1, 2, 3, 4, 5...| | 4| 0|[0, 1, 2, 3, 4, 5...| +-----+-----------+--------------------+ only showing top 5 rows I have attached the screenshot from spark UI So we can see the skew as below +-----------+-------+ |partitionId| count| +-----------+-------+ | 0|9999998| | 1| 1| | 2| 1| +-----------+-------+ Now I have attached the UI plot as well. The section on "Aggregated Metric by Executor" shows columns Spill(Memory) and Spill(Disk) highlighted in yellow circle. My deduction is that Spark will try to use memory for shuffle if it can but will revert to Disk if it has to. So it is not true that Spark shuffle will end up using disk as a result of shuffle? I stand corrected 🤔 Thanks Mich Talebzadeh, Lead Solutions Architect/Engineering Lead Palantir Technologies Limited London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Tue, 16 May 2023 at 18:07, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi, > > On the issue of Spark shuffle it is accepted that shuffle *often involves* > the following if not all below: > > - Disk I/O > - Data serialization and deserialization > - Network I/O > > Excluding external shuffle service and without relying on the > configuration options provided by spark for shuffle does the operation > always involve disk usage (any HCFS compatible file system) or will it use > the existing persistent memory if it can.? > > Thanks > > Mich Talebzadeh, > Lead Solutions Architect/Engineering Lead > Palantir Technologies Limited > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > >
import sys from pyspark.sql import SparkSession from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql import functions as F from pyspark.sql.functions import * from pyspark.sql.types import StructType, StructField, StringType,IntegerType, FloatType, TimestampType import time def main(): appName = "skew" spark = SparkSession.builder.appName(appName).getOrCreate() spark_context = SparkContext.getOrCreate() spark_context.setLogLevel("ERROR") df_uniform = spark.createDataFrame([i for i in range(10000000)], IntegerType()) df_uniform = df_uniform.withColumn("partitionId", spark_partition_id()) print("Number of Partitions: "+str(df_uniform.rdd.getNumPartitions())) df_uniform.groupby([df_uniform.partitionId]).count().sort(df_uniform.partitionId).show() df_uniform.alias("left").join(df_uniform.alias("right"),"value", "inner").count() print(f"""Spark.sql.shuffle.partitions is {spark.conf.get("spark.sql.shuffle.partitions")}""") df0 = spark.createDataFrame([0] * 9999998, IntegerType()).repartition(1) df1 = spark.createDataFrame([1], IntegerType()).repartition(1) df2 = spark.createDataFrame([2], IntegerType()).repartition(1) df_skew = df0.union(df1).union(df2) df_skew = df_skew.withColumn("partitionId", spark_partition_id()) ## If we apply the same function call again, we get what we want to see for the one partition with much more data than the other two. df_skew.groupby([df_skew.partitionId]).count().sort(df_skew.partitionId).show() ## simulate reading to first round robin distribute the key #df_skew = df_skew.repartition(3) df_skew.join(df_uniform.select("value"),"value", "inner").count() # salt range is from 1 to spark.conf.get("spark.sql.shuffle.partitions") df_left = df_skew.withColumn("salt", (rand() * spark.conf.get("spark.sql.shuffle.partitions")).cast("int")).show() df_right = df_uniform.withColumn("salt_temp", array([lit(i) for i in range(int(spark.conf.get("spark.sql.shuffle.partitions")))])).show() time.sleep(60) # Pause if __name__ == "__main__": main()
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org