Ok, I did a bit of a test that shows that the shuffle does spill to memory
then to disk if my assertion is valid.

The sample code I wrote is as follows:

import sys
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField,
StringType,IntegerType, FloatType, TimestampType
import time
def main():
    appName = "skew"
    spark = SparkSession.builder.appName(appName).getOrCreate()
    spark_context = SparkContext.getOrCreate()
    spark_context.setLogLevel("ERROR")
    df_uniform = spark.createDataFrame([i for i in range(10000000)],
IntegerType())
    df_uniform = df_uniform.withColumn("partitionId", spark_partition_id())
    print("Number of Partitions: "+str(df_uniform.rdd.getNumPartitions()))

df_uniform.groupby([df_uniform.partitionId]).count().sort(df_uniform.partitionId).show()
    df_uniform.alias("left").join(df_uniform.alias("right"),"value",
"inner").count()

    print(f"""Spark.sql.shuffle.partitions is
{spark.conf.get("spark.sql.shuffle.partitions")}""")
    df0 = spark.createDataFrame([0] * 9999998, IntegerType()).repartition(1)
    df1 = spark.createDataFrame([1], IntegerType()).repartition(1)
    df2 = spark.createDataFrame([2], IntegerType()).repartition(1)
    df_skew = df0.union(df1).union(df2)
    df_skew = df_skew.withColumn("partitionId", spark_partition_id())
    ## If we apply the same function call again, we get what we want to see
for the one partition with much more data than the other two.

df_skew.groupby([df_skew.partitionId]).count().sort(df_skew.partitionId).show()
    ## simulate reading to first round robin distribute the key
    #df_skew = df_skew.repartition(3)
    df_skew.join(df_uniform.select("value"),"value", "inner").count()
    # salt range is from 1 to spark.conf.get("spark.sql.shuffle.partitions")
    df_left = df_skew.withColumn("salt", (rand() *
spark.conf.get("spark.sql.shuffle.partitions")).cast("int")).show()
    df_right = df_uniform.withColumn("salt_temp", array([lit(i) for i in
range(int(spark.conf.get("spark.sql.shuffle.partitions")))])).show()
    time.sleep(60) # Pause

if __name__ == "__main__":
  main()

The PySpark code file is attached

There is a 60 sec wait at the end to allow one to examine Spark UI

Run it with meager memory and cores

bin/spark-submit --master local[1] --driver-memory 550M skew.py

This is the run results

Number of Partitions: 1
+-----------+--------+
|partitionId|   count|
+-----------+--------+
|          0|10000000|
+-----------+--------+

Spark.sql.shuffle.partitions is 200




*+-----------+-------+|partitionId|  count|+-----------+-------+|
 0|9999998||          1|      1|*

*|        2|     1|+-----------+-------+*

+-----+-----------+----+
|value|partitionId|salt|
+-----+-----------+----+
|    0|          0|  89|
|    0|          0|  56|
|    0|          0| 169|
|    0|          0| 130|
|    0|          0|  94|
+-----+-----------+----+
only showing top 5 rows

+-----+-----------+--------------------+
|value|partitionId|           salt_temp|
+-----+-----------+--------------------+
|    0|          0|[0, 1, 2, 3, 4, 5...|
|    1|          0|[0, 1, 2, 3, 4, 5...|
|    2|          0|[0, 1, 2, 3, 4, 5...|
|    3|          0|[0, 1, 2, 3, 4, 5...|
|    4|          0|[0, 1, 2, 3, 4, 5...|
+-----+-----------+--------------------+
only showing top 5 rows

I have attached the screenshot from spark UI


So we can see the skew as below
+-----------+-------+
|partitionId|  count|
+-----------+-------+
|          0|9999998|
|          1|      1|
|          2|      1|
+-----------+-------+

Now I have attached the UI plot as well. The section on "Aggregated Metric
by Executor" shows columns Spill(Memory) and Spill(Disk) highlighted in
yellow circle.

My deduction is that Spark will try to use memory for shuffle if it can but
will revert to Disk if it has to. So it is not true that Spark shuffle will
end up using disk as a result of shuffle?

I stand corrected 🤔

Thanks

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 16 May 2023 at 18:07, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
> On the issue of Spark shuffle it is accepted that shuffle *often involves*
> the following if not all below:
>
>    - Disk I/O
>    - Data serialization and deserialization
>    - Network I/O
>
> Excluding external shuffle service and without relying on the
> configuration options provided by spark for shuffle does the operation
> always involve disk usage (any HCFS compatible file system) or will it use
> the existing persistent memory if it can.?
>
> Thanks
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
import sys
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType,IntegerType, FloatType, TimestampType
import time
def main():
    appName = "skew"
    spark = SparkSession.builder.appName(appName).getOrCreate()
    spark_context = SparkContext.getOrCreate()
    spark_context.setLogLevel("ERROR")
    df_uniform = spark.createDataFrame([i for i in range(10000000)], IntegerType())
    df_uniform = df_uniform.withColumn("partitionId", spark_partition_id())
    print("Number of Partitions: "+str(df_uniform.rdd.getNumPartitions()))
    df_uniform.groupby([df_uniform.partitionId]).count().sort(df_uniform.partitionId).show()
    df_uniform.alias("left").join(df_uniform.alias("right"),"value", "inner").count()

    print(f"""Spark.sql.shuffle.partitions is {spark.conf.get("spark.sql.shuffle.partitions")}""")
    df0 = spark.createDataFrame([0] * 9999998, IntegerType()).repartition(1)
    df1 = spark.createDataFrame([1], IntegerType()).repartition(1)
    df2 = spark.createDataFrame([2], IntegerType()).repartition(1)
    df_skew = df0.union(df1).union(df2)
    df_skew = df_skew.withColumn("partitionId", spark_partition_id())
    ## If we apply the same function call again, we get what we want to see for the one partition with much more data than the other two.
    df_skew.groupby([df_skew.partitionId]).count().sort(df_skew.partitionId).show()
    ## simulate reading to first round robin distribute the key
    #df_skew = df_skew.repartition(3)
    df_skew.join(df_uniform.select("value"),"value", "inner").count()
    # salt range is from 1 to spark.conf.get("spark.sql.shuffle.partitions")
    df_left = df_skew.withColumn("salt", (rand() * spark.conf.get("spark.sql.shuffle.partitions")).cast("int")).show()
    df_right = df_uniform.withColumn("salt_temp", array([lit(i) for i in range(int(spark.conf.get("spark.sql.shuffle.partitions")))])).show()
    time.sleep(60) # Pause 

if __name__ == "__main__":
  main()
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to