When your memory is not sufficient to keep the cached data for your jobs in two 
different stages, it might be read twice because Spark might have to clear the 
previous cache for other jobs. In those cases, a spill may triggered when Spark 
write your data from memory to disk.

One way to to check is to read Spark UI. When Spark cache the data, you will 
see a little green dot connected to the blue rectangle in the Spark UI. If you 
see this green dot twice on your two stages, likely Spark spill the data after 
your first job and read it again in the second run. You can also confirm it in 
other metrics from Spark UI.

That is my personal understanding based on what I have read and seen on my job 
runs. If there is any mistake, be free to correct me.

Thank You & Best Regards
Winston Lai
________________________________
From: Nitin Siwach <nitinsiw...@gmail.com>
Sent: Sunday, May 7, 2023 12:22:32 PM
To: Vikas Kumar <vku...@etsy.com>
Cc: User <user@spark.apache.org>
Subject: Re: Does spark read the same file twice, if two stages are using the 
same DataFrame?

Thank you tons, Vikas :). That makes so much sense now

I'm in learning phase and was just browsing through various concepts of spark 
with self made small examples.

It didn't make sense to me that the two physical plans should be different. 
But, now I understand what you're saying.

Again, thank you for helping me out

On Sun, 7 May, 2023, 07:48 Vikas Kumar, 
<vku...@etsy.com<mailto:vku...@etsy.com>> wrote:

Spark came up with a plan but that may or may not be optimal plan given the 
system settings.
If you do df1.cache() , i am guessing spark will not read df1 twice.

Btw, Why do you have adaptive enabled to be false?

On Sat, May 6, 2023, 1:46 PM Nitin Siwach 
<nitinsiw...@gmail.com<mailto:nitinsiw...@gmail.com>> wrote:
I hope this email finds you well :)

The following code reads the same csv twice even though only one action is 
called

End to end runnable example:
```
import pandas as pd
import numpy as np

df1=  pd.DataFrame(np.arange(1_000).reshape(-1,1))
df1.index = np.random.choice(range(10),size=1000)
df1.to_csv("./df1.csv",index_label = "index")
############################################################################

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, StructField

spark = 
SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\
config("spark.sql.adaptive.enabled","false").getOrCreate()

schema = StructType([StructField('index', StringType(), True),
                     StructField('0', StringType(), True)])

df1 = spark.read.csv("./df1.csv", header=True, schema = schema)

df2 = df1.groupby("index").agg(F.mean("0"))
df3 = df1.join(df2,on='index')

df3.explain()
df3.count()
```

The sql tab in the web UI shows the following:

[screencapture-localhost-4040-SQL-execution-2023-05-06-19_48_41.png]

As you can see, the df1 file is read twice. Is this the expected behaviour? Why 
is that happening? I have just one action so the same part of the pipeline 
should not run multiple times.

I have read the answer 
[here][1]<https://stackoverflow.com/questions/37894099/does-spark-read-the-same-file-twice-if-two-stages-are-using-the-same-rdd>.
 The question is almost the same it is just that in that question the RDDs are 
used and I am using dataframe in pyspark API. In that question, it is suggested 
that if multiple file scans are to be avoided then DataFrames API would help 
and this is the reason why DataFrama API exists in the first place


However, as it turns out, I am facing the exact same issue with the DataFrames 
as well. It seems rather weird of spark, which is celebrated for its 
efficiency, to be this inefficient (Mostly I am just missing something and that 
is not a valid criticism :))

Reply via email to