Re: Spark shuffle and inevitability of writing to Disk

2023-05-17 Thread Mich Talebzadeh
Ok, I did a bit of a test that shows that the shuffle does spill to memory
then to disk if my assertion is valid.

The sample code I wrote is as follows:

import sys
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField,
StringType,IntegerType, FloatType, TimestampType
import time
def main():
appName = "skew"
spark = SparkSession.builder.appName(appName).getOrCreate()
spark_context = SparkContext.getOrCreate()
spark_context.setLogLevel("ERROR")
df_uniform = spark.createDataFrame([i for i in range(1000)],
IntegerType())
df_uniform = df_uniform.withColumn("partitionId", spark_partition_id())
print("Number of Partitions: "+str(df_uniform.rdd.getNumPartitions()))

df_uniform.groupby([df_uniform.partitionId]).count().sort(df_uniform.partitionId).show()
df_uniform.alias("left").join(df_uniform.alias("right"),"value",
"inner").count()

print(f"""Spark.sql.shuffle.partitions is
{spark.conf.get("spark.sql.shuffle.partitions")}""")
df0 = spark.createDataFrame([0] * 998, IntegerType()).repartition(1)
df1 = spark.createDataFrame([1], IntegerType()).repartition(1)
df2 = spark.createDataFrame([2], IntegerType()).repartition(1)
df_skew = df0.union(df1).union(df2)
df_skew = df_skew.withColumn("partitionId", spark_partition_id())
## If we apply the same function call again, we get what we want to see
for the one partition with much more data than the other two.

df_skew.groupby([df_skew.partitionId]).count().sort(df_skew.partitionId).show()
## simulate reading to first round robin distribute the key
#df_skew = df_skew.repartition(3)
df_skew.join(df_uniform.select("value"),"value", "inner").count()
# salt range is from 1 to spark.conf.get("spark.sql.shuffle.partitions")
df_left = df_skew.withColumn("salt", (rand() *
spark.conf.get("spark.sql.shuffle.partitions")).cast("int")).show()
df_right = df_uniform.withColumn("salt_temp", array([lit(i) for i in
range(int(spark.conf.get("spark.sql.shuffle.partitions")))])).show()
time.sleep(60) # Pause

if __name__ == "__main__":
  main()

The PySpark code file is attached

There is a 60 sec wait at the end to allow one to examine Spark UI

Run it with meager memory and cores

bin/spark-submit --master local[1] --driver-memory 550M skew.py

This is the run results

Number of Partitions: 1
+---++
|partitionId|   count|
+---++
|  0|1000|
+---++

Spark.sql.shuffle.partitions is 200




*+---+---+|partitionId|  count|+---+---+|
 0|998||  1|  1|*

*|2| 1|+---+---+*

+-+---++
|value|partitionId|salt|
+-+---++
|0|  0|  89|
|0|  0|  56|
|0|  0| 169|
|0|  0| 130|
|0|  0|  94|
+-+---++
only showing top 5 rows

+-+---++
|value|partitionId|   salt_temp|
+-+---++
|0|  0|[0, 1, 2, 3, 4, 5...|
|1|  0|[0, 1, 2, 3, 4, 5...|
|2|  0|[0, 1, 2, 3, 4, 5...|
|3|  0|[0, 1, 2, 3, 4, 5...|
|4|  0|[0, 1, 2, 3, 4, 5...|
+-+---++
only showing top 5 rows

I have attached the screenshot from spark UI


So we can see the skew as below
+---+---+
|partitionId|  count|
+---+---+
|  0|998|
|  1|  1|
|  2|  1|
+---+---+

Now I have attached the UI plot as well. The section on "Aggregated Metric
by Executor" shows columns Spill(Memory) and Spill(Disk) highlighted in
yellow circle.

My deduction is that Spark will try to use memory for shuffle if it can but
will revert to Disk if it has to. So it is not true that Spark shuffle will
end up using disk as a result of shuffle?

I stand corrected 🤔

Thanks

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 16 May 2023 at 18:07, Mich Talebzadeh 
wrote:

> Hi,
>
> On the issue of Spark shuffle it is accepted that shuffle *often involves*
> the following if not all below:
>
>- Disk I/O
>- Data serialization and deserialization
>- Network I/O
>
> Excluding external shuffle service and without relying on the
>

Re: [spark-core] Can executors recover/reuse shuffle files upon failure?

2023-05-17 Thread vaquar khan
Following link you will get all required details

https://aws.amazon.com/blogs/containers/best-practices-for-running-spark-on-amazon-eks/

Let me know if you required further informations.


Regards,
Vaquar khan




On Mon, May 15, 2023, 10:14 PM Mich Talebzadeh 
wrote:

> Couple of points
>
> Why use spot or pre-empt intantes when your application as you stated
> shuffles heavily.
> Have you looked at why you are having these shuffles? What is the cause of
> these large transformations ending up in shuffle
>
> Also on your point:
> "..then ideally we should expect that when an executor is killed/OOM'd
> and a new executor is spawned on the same host, the new executor registers
> the shuffle files to itself. Is that so?"
>
> What guarantee is that the new executor with inherited shuffle files will
> succeed?
>
> Also OOM is often associated with some form of skewed data
>
> HTH
> .
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 15 May 2023 at 13:11, Faiz Halde 
> wrote:
>
>> Hello,
>>
>> We've been in touch with a few spark specialists who suggested us a
>> potential solution to improve the reliability of our jobs that are shuffle
>> heavy
>>
>> Here is what our setup looks like
>>
>>- Spark version: 3.3.1
>>- Java version: 1.8
>>- We do not use external shuffle service
>>- We use spot instances
>>
>> We run spark jobs on clusters that use Amazon EBS volumes. The
>> spark.local.dir is mounted on this EBS volume. One of the offerings from
>> the service we use is EBS migration which basically means if a host is
>> about to get evicted, a new host is created and the EBS volume is attached
>> to it
>>
>> When Spark assigns a new executor to the newly created instance, it
>> basically can recover all the shuffle files that are already persisted in
>> the migrated EBS volume
>>
>> Is this how it works? Do executors recover / re-register the shuffle
>> files that they found?
>>
>> So far I have not come across any recovery mechanism. I can only see
>>
>> KubernetesLocalDiskShuffleDataIO
>>
>>  that has a pre-init step where it tries to register the available
>> shuffle files to itself
>>
>> A natural follow-up on this,
>>
>> If what they claim is true, then ideally we should expect that when an
>> executor is killed/OOM'd and a new executor is spawned on the same host,
>> the new executor registers the shuffle files to itself. Is that so?
>>
>> Thanks
>>
>> --
>> Confidentiality note: This e-mail may contain confidential information
>> from Nu Holdings Ltd and/or its affiliates. If you have received it by
>> mistake, please let us know by e-mail reply and delete it from your system;
>> you may not copy this message or disclose its contents to anyone; for
>> details about what personal information we collect and why, please refer to
>> our privacy policy
>> 
>> .
>>
>