Re: Does spark read the same file twice, if two stages are using the same DataFrame?

2023-05-07 Thread Nitin Siwach
Thank you for the help Mich :)

I have not started with a pandas DF. I have used pandas to create a dummy
.csv which I dump on the disk that I intend to use to showcase my pain
point. Providing pandas code was to ensure an end-to-end runnable example
is provided and the effort on anyone trying to help me out is minimized

I don't think Spark validating the file existence qualifies as an action
according to Spark parlance. Sure there would be an analysis exception in
case the file is not found as per the location provided, however, if you
provided a schema and a valid path then no job would show up on the spark
UI validating (IMO) that no action has been taken. (1 Action necessarily
equals at least one job). If you don't provide the schema then a job is
triggered (an action) to infer the schema for subsequent logical planning.

Since I am just demonstrating my lack of understanding I have chosen local
mode. Otherwise, I do use google buckets to host all the data

This being said I think my question is something entirely different. It is
that calling one action  (df3.count()) is reading the same csv twice. I do
not understand that. So far, I always thought that data should be persisted
only in case a DAG subset is to be reused by several actions.


On Sun, May 7, 2023 at 9:47 PM Mich Talebzadeh 
wrote:

> You have started with panda DF which won't scale outside of the driver
> itself.
>
> Let us put that aside.
> df1.to_csv("./df1.csv",index_label = "index")  ## write the dataframe to
> the underlying file system
>
> starting with spark
>
> df1 = spark.read.csv("./df1.csv", header=True, schema = schema) ## read
> the dataframe from the underlying file system
>
> That is your first action because spark needs to validate that file
> (exiss) and the schema. What will happen if that file does not exist
>
> csvlocation="/tmp/df1.csv"
> csvlocation2="/tmp/df5.csv"
> df1=  pd.DataFrame(np.arange(1_000).reshape(-1,1))
> df1.index = np.random.choice(range(10),size=1000)
> df1.to_csv(csvlocation,index_label = "index")
> Schema = StructType([StructField('index', StringType(), True),
>  StructField('0', StringType(), True)])
>
> df1 = spark.read.csv(*csvlocation2*, header=True, schema =
> Schema).cache()  ## incorrect location
>
> df2 = df1.groupby("index").agg(F.mean("0"))
> df3 = df1.join(df2,on='index')
> df3.show()
> #df3.explain()
> df3.count()
>
>
> error
>
> pyspark.errors.exceptions.captured.AnalysisException: [PATH_NOT_FOUND]
> Path does not exist: hdfs://rhes75:9000/tmp/df5.csv.
>
> In a distributed env, that csv file has to be available to all spark
> workers. Either you copy that file to all worker nodes or you put in HDFS
> or S3 or  gs:// locations to be available to all.
>
> It does not even get to df3.count()
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 7 May 2023 at 15:53, Nitin Siwach  wrote:
>
>> Thank you for your response, Sir.
>>
>> My understanding is that the final ```df3.count()``` is the only action
>> in the code I have attached. In fact, I tried running the rest of the code
>> (commenting out just the final df3.count()) and, as I expected, no
>> computations were triggered
>>
>> On Sun, 7 May, 2023, 20:16 Mich Talebzadeh, 
>> wrote:
>>
>>>
>>> ...However, In my case here I am calling just one action. ..
>>>
>>> ok, which line  in your code is called one action?
>>>
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sun, 7 May 2023 at 14:13, Nitin Siwach  wrote:
>>>
 @Vikas Kumar 
 I am sorry but I thought that you had answered the other question that
 I had raised to the same email address yesterday. It was around the SQL tab
 in web UI and the output of .explain showing different plans.

 I get how using .cache I can ensure that the data from a 

Re: Does spark read the same file twice, if two stages are using the same DataFrame?

2023-05-07 Thread Mich Talebzadeh
You have started with panda DF which won't scale outside of the driver
itself.

Let us put that aside.
df1.to_csv("./df1.csv",index_label = "index")  ## write the dataframe to
the underlying file system

starting with spark

df1 = spark.read.csv("./df1.csv", header=True, schema = schema) ## read the
dataframe from the underlying file system

That is your first action because spark needs to validate that file (exiss)
and the schema. What will happen if that file does not exist

csvlocation="/tmp/df1.csv"
csvlocation2="/tmp/df5.csv"
df1=  pd.DataFrame(np.arange(1_000).reshape(-1,1))
df1.index = np.random.choice(range(10),size=1000)
df1.to_csv(csvlocation,index_label = "index")
Schema = StructType([StructField('index', StringType(), True),
 StructField('0', StringType(), True)])

df1 = spark.read.csv(*csvlocation2*, header=True, schema = Schema).cache()
## incorrect location

df2 = df1.groupby("index").agg(F.mean("0"))
df3 = df1.join(df2,on='index')
df3.show()
#df3.explain()
df3.count()


error

pyspark.errors.exceptions.captured.AnalysisException: [PATH_NOT_FOUND] Path
does not exist: hdfs://rhes75:9000/tmp/df5.csv.

In a distributed env, that csv file has to be available to all spark
workers. Either you copy that file to all worker nodes or you put in HDFS
or S3 or  gs:// locations to be available to all.

It does not even get to df3.count()

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 7 May 2023 at 15:53, Nitin Siwach  wrote:

> Thank you for your response, Sir.
>
> My understanding is that the final ```df3.count()``` is the only action in
> the code I have attached. In fact, I tried running the rest of the code
> (commenting out just the final df3.count()) and, as I expected, no
> computations were triggered
>
> On Sun, 7 May, 2023, 20:16 Mich Talebzadeh, 
> wrote:
>
>>
>> ...However, In my case here I am calling just one action. ..
>>
>> ok, which line  in your code is called one action?
>>
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 7 May 2023 at 14:13, Nitin Siwach  wrote:
>>
>>> @Vikas Kumar 
>>> I am sorry but I thought that you had answered the other question that I
>>> had raised to the same email address yesterday. It was around the SQL tab
>>> in web UI and the output of .explain showing different plans.
>>>
>>> I get how using .cache I can ensure that the data from a particular
>>> checkpoint is reused and the computations do not happen again.
>>>
>>> However, In my case here I am calling just one action. Within the
>>> purview of one action Spark should not rerun the overlapping parts of the
>>> DAG. I do not understand why the file scan is happening several times. I
>>> can easily mitigate the issue by using window functions and creating all
>>> the columns in one go without having to use several joins later on. That
>>> being said this particular behavior is what I am trying ot understand. The
>>> golden rule "The DAG overlaps wont run several times for one action" seems
>>> not to be apocryphal. If you can shed some light on this matter I would
>>> appreciate it
>>>
>>> @weiruanl...@gmail.com  My datasets are very
>>> small as you can see in the sample examples that I am creating as the first
>>> part of the code
>>>
>>> Really appreciate you guys helping me out with this :)
>>>
>>> On Sun, May 7, 2023 at 12:23 PM Winston Lai 
>>> wrote:
>>>
 When your memory is not sufficient to keep the cached data for your
 jobs in two different stages, it might be read twice because Spark might
 have to clear the previous cache for other jobs. In those cases, a spill
 may triggered when Spark write your data from memory to disk.

 One way to to check is to read Spark UI. When Spark cache the data, you
 will see a little green dot connected to the blue rectangle in the Spark
 UI. If you see this green dot 

Re: Does spark read the same file twice, if two stages are using the same DataFrame?

2023-05-07 Thread Nitin Siwach
Thank you for your response, Sir.

My understanding is that the final ```df3.count()``` is the only action in
the code I have attached. In fact, I tried running the rest of the code
(commenting out just the final df3.count()) and, as I expected, no
computations were triggered

On Sun, 7 May, 2023, 20:16 Mich Talebzadeh, 
wrote:

>
> ...However, In my case here I am calling just one action. ..
>
> ok, which line  in your code is called one action?
>
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 7 May 2023 at 14:13, Nitin Siwach  wrote:
>
>> @Vikas Kumar 
>> I am sorry but I thought that you had answered the other question that I
>> had raised to the same email address yesterday. It was around the SQL tab
>> in web UI and the output of .explain showing different plans.
>>
>> I get how using .cache I can ensure that the data from a particular
>> checkpoint is reused and the computations do not happen again.
>>
>> However, In my case here I am calling just one action. Within the purview
>> of one action Spark should not rerun the overlapping parts of the DAG. I do
>> not understand why the file scan is happening several times. I can easily
>> mitigate the issue by using window functions and creating all the columns
>> in one go without having to use several joins later on. That being said
>> this particular behavior is what I am trying ot understand. The golden rule
>> "The DAG overlaps wont run several times for one action" seems not to be
>> apocryphal. If you can shed some light on this matter I would appreciate it
>>
>> @weiruanl...@gmail.com  My datasets are very
>> small as you can see in the sample examples that I am creating as the first
>> part of the code
>>
>> Really appreciate you guys helping me out with this :)
>>
>> On Sun, May 7, 2023 at 12:23 PM Winston Lai 
>> wrote:
>>
>>> When your memory is not sufficient to keep the cached data for your jobs
>>> in two different stages, it might be read twice because Spark might have to
>>> clear the previous cache for other jobs. In those cases, a spill may
>>> triggered when Spark write your data from memory to disk.
>>>
>>> One way to to check is to read Spark UI. When Spark cache the data, you
>>> will see a little green dot connected to the blue rectangle in the Spark
>>> UI. If you see this green dot twice on your two stages, likely Spark spill
>>> the data after your first job and read it again in the second run. You can
>>> also confirm it in other metrics from Spark UI.
>>>
>>> That is my personal understanding based on what I have read and seen on
>>> my job runs. If there is any mistake, be free to correct me.
>>>
>>> Thank You & Best Regards
>>> Winston Lai
>>> --
>>> *From:* Nitin Siwach 
>>> *Sent:* Sunday, May 7, 2023 12:22:32 PM
>>> *To:* Vikas Kumar 
>>> *Cc:* User 
>>> *Subject:* Re: Does spark read the same file twice, if two stages are
>>> using the same DataFrame?
>>>
>>> Thank you tons, Vikas :). That makes so much sense now
>>>
>>> I'm in learning phase and was just browsing through various concepts of
>>> spark with self made small examples.
>>>
>>> It didn't make sense to me that the two physical plans should be
>>> different. But, now I understand what you're saying.
>>>
>>> Again, thank you for helping me out
>>>
>>> On Sun, 7 May, 2023, 07:48 Vikas Kumar,  wrote:
>>>
>>>
>>> Spark came up with a plan but that may or may not be optimal plan given
>>> the system settings.
>>> If you do df1.cache() , i am guessing spark will not read df1 twice.
>>>
>>> Btw, Why do you have adaptive enabled to be false?
>>>
>>> On Sat, May 6, 2023, 1:46 PM Nitin Siwach  wrote:
>>>
>>> I hope this email finds you well :)
>>>
>>> The following code reads the same csv twice even though only one action
>>> is called
>>>
>>> End to end runnable example:
>>> ```
>>> import pandas as pd
>>> import numpy as np
>>>
>>> df1=  pd.DataFrame(np.arange(1_000).reshape(-1,1))
>>> df1.index = np.random.choice(range(10),size=1000)
>>> df1.to_csv("./df1.csv",index_label = "index")
>>>
>>> 
>>>
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import StructType, StringType, StructField
>>>
>>> spark =
>>> SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\
>>> 

Re: Does spark read the same file twice, if two stages are using the same DataFrame?

2023-05-07 Thread Mich Talebzadeh
...However, In my case here I am calling just one action. ..

ok, which line  in your code is called one action?


Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 7 May 2023 at 14:13, Nitin Siwach  wrote:

> @Vikas Kumar 
> I am sorry but I thought that you had answered the other question that I
> had raised to the same email address yesterday. It was around the SQL tab
> in web UI and the output of .explain showing different plans.
>
> I get how using .cache I can ensure that the data from a particular
> checkpoint is reused and the computations do not happen again.
>
> However, In my case here I am calling just one action. Within the purview
> of one action Spark should not rerun the overlapping parts of the DAG. I do
> not understand why the file scan is happening several times. I can easily
> mitigate the issue by using window functions and creating all the columns
> in one go without having to use several joins later on. That being said
> this particular behavior is what I am trying ot understand. The golden rule
> "The DAG overlaps wont run several times for one action" seems not to be
> apocryphal. If you can shed some light on this matter I would appreciate it
>
> @weiruanl...@gmail.com  My datasets are very small
> as you can see in the sample examples that I am creating as the first part
> of the code
>
> Really appreciate you guys helping me out with this :)
>
> On Sun, May 7, 2023 at 12:23 PM Winston Lai  wrote:
>
>> When your memory is not sufficient to keep the cached data for your jobs
>> in two different stages, it might be read twice because Spark might have to
>> clear the previous cache for other jobs. In those cases, a spill may
>> triggered when Spark write your data from memory to disk.
>>
>> One way to to check is to read Spark UI. When Spark cache the data, you
>> will see a little green dot connected to the blue rectangle in the Spark
>> UI. If you see this green dot twice on your two stages, likely Spark spill
>> the data after your first job and read it again in the second run. You can
>> also confirm it in other metrics from Spark UI.
>>
>> That is my personal understanding based on what I have read and seen on
>> my job runs. If there is any mistake, be free to correct me.
>>
>> Thank You & Best Regards
>> Winston Lai
>> --
>> *From:* Nitin Siwach 
>> *Sent:* Sunday, May 7, 2023 12:22:32 PM
>> *To:* Vikas Kumar 
>> *Cc:* User 
>> *Subject:* Re: Does spark read the same file twice, if two stages are
>> using the same DataFrame?
>>
>> Thank you tons, Vikas :). That makes so much sense now
>>
>> I'm in learning phase and was just browsing through various concepts of
>> spark with self made small examples.
>>
>> It didn't make sense to me that the two physical plans should be
>> different. But, now I understand what you're saying.
>>
>> Again, thank you for helping me out
>>
>> On Sun, 7 May, 2023, 07:48 Vikas Kumar,  wrote:
>>
>>
>> Spark came up with a plan but that may or may not be optimal plan given
>> the system settings.
>> If you do df1.cache() , i am guessing spark will not read df1 twice.
>>
>> Btw, Why do you have adaptive enabled to be false?
>>
>> On Sat, May 6, 2023, 1:46 PM Nitin Siwach  wrote:
>>
>> I hope this email finds you well :)
>>
>> The following code reads the same csv twice even though only one action
>> is called
>>
>> End to end runnable example:
>> ```
>> import pandas as pd
>> import numpy as np
>>
>> df1=  pd.DataFrame(np.arange(1_000).reshape(-1,1))
>> df1.index = np.random.choice(range(10),size=1000)
>> df1.to_csv("./df1.csv",index_label = "index")
>>
>> 
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql import functions as F
>> from pyspark.sql.types import StructType, StringType, StructField
>>
>> spark =
>> SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\
>> config("spark.sql.adaptive.enabled","false").getOrCreate()
>>
>> schema = StructType([StructField('index', StringType(), True),
>>  StructField('0', StringType(), True)])
>>
>> df1 = spark.read.csv("./df1.csv", header=True, schema = schema)
>>
>> df2 = df1.groupby("index").agg(F.mean("0"))
>> df3 = df1.join(df2,on='index')
>>
>> df3.explain()
>> df3.count()
>> ```
>>
>> The sql tab in the web UI shows the following:
>>
>> [image:
>> 

Re: Does spark read the same file twice, if two stages are using the same DataFrame?

2023-05-07 Thread Nitin Siwach
@Vikas Kumar 
I am sorry but I thought that you had answered the other question that I
had raised to the same email address yesterday. It was around the SQL tab
in web UI and the output of .explain showing different plans.

I get how using .cache I can ensure that the data from a particular
checkpoint is reused and the computations do not happen again.

However, In my case here I am calling just one action. Within the purview
of one action Spark should not rerun the overlapping parts of the DAG. I do
not understand why the file scan is happening several times. I can easily
mitigate the issue by using window functions and creating all the columns
in one go without having to use several joins later on. That being said
this particular behavior is what I am trying ot understand. The golden rule
"The DAG overlaps wont run several times for one action" seems not to be
apocryphal. If you can shed some light on this matter I would appreciate it

@weiruanl...@gmail.com  My datasets are very small
as you can see in the sample examples that I am creating as the first part
of the code

Really appreciate you guys helping me out with this :)

On Sun, May 7, 2023 at 12:23 PM Winston Lai  wrote:

> When your memory is not sufficient to keep the cached data for your jobs
> in two different stages, it might be read twice because Spark might have to
> clear the previous cache for other jobs. In those cases, a spill may
> triggered when Spark write your data from memory to disk.
>
> One way to to check is to read Spark UI. When Spark cache the data, you
> will see a little green dot connected to the blue rectangle in the Spark
> UI. If you see this green dot twice on your two stages, likely Spark spill
> the data after your first job and read it again in the second run. You can
> also confirm it in other metrics from Spark UI.
>
> That is my personal understanding based on what I have read and seen on my
> job runs. If there is any mistake, be free to correct me.
>
> Thank You & Best Regards
> Winston Lai
> --
> *From:* Nitin Siwach 
> *Sent:* Sunday, May 7, 2023 12:22:32 PM
> *To:* Vikas Kumar 
> *Cc:* User 
> *Subject:* Re: Does spark read the same file twice, if two stages are
> using the same DataFrame?
>
> Thank you tons, Vikas :). That makes so much sense now
>
> I'm in learning phase and was just browsing through various concepts of
> spark with self made small examples.
>
> It didn't make sense to me that the two physical plans should be
> different. But, now I understand what you're saying.
>
> Again, thank you for helping me out
>
> On Sun, 7 May, 2023, 07:48 Vikas Kumar,  wrote:
>
>
> Spark came up with a plan but that may or may not be optimal plan given
> the system settings.
> If you do df1.cache() , i am guessing spark will not read df1 twice.
>
> Btw, Why do you have adaptive enabled to be false?
>
> On Sat, May 6, 2023, 1:46 PM Nitin Siwach  wrote:
>
> I hope this email finds you well :)
>
> The following code reads the same csv twice even though only one action is
> called
>
> End to end runnable example:
> ```
> import pandas as pd
> import numpy as np
>
> df1=  pd.DataFrame(np.arange(1_000).reshape(-1,1))
> df1.index = np.random.choice(range(10),size=1000)
> df1.to_csv("./df1.csv",index_label = "index")
>
> 
>
> from pyspark.sql import SparkSession
> from pyspark.sql import functions as F
> from pyspark.sql.types import StructType, StringType, StructField
>
> spark =
> SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\
> config("spark.sql.adaptive.enabled","false").getOrCreate()
>
> schema = StructType([StructField('index', StringType(), True),
>  StructField('0', StringType(), True)])
>
> df1 = spark.read.csv("./df1.csv", header=True, schema = schema)
>
> df2 = df1.groupby("index").agg(F.mean("0"))
> df3 = df1.join(df2,on='index')
>
> df3.explain()
> df3.count()
> ```
>
> The sql tab in the web UI shows the following:
>
> [image: screencapture-localhost-4040-SQL-execution-2023-05-06-19_48_41.png]
>
> As you can see, the df1 file is read twice. Is this the expected
> behaviour? Why is that happening? I have just one action so the same part
> of the pipeline should not run multiple times.
>
> I have read the answer [here][1]
> .
> The question is almost the same it is just that in that question the RDDs
> are used and I am using dataframe in pyspark API. In that question, it is
> suggested that if multiple file scans are to be avoided then DataFrames API
> would help and this is the reason why DataFrama API exists in the first
> place
>
>
> However, as it turns out, I am facing the exact same issue with the
> DataFrames as well. It seems rather weird of spark, which is celebrated for
> its efficiency, to be this inefficient (Mostly I am just 

unsubscribe

2023-05-07 Thread Utkarsh Jain



Re: Does spark read the same file twice, if two stages are using the same DataFrame?

2023-05-07 Thread Winston Lai
When your memory is not sufficient to keep the cached data for your jobs in two 
different stages, it might be read twice because Spark might have to clear the 
previous cache for other jobs. In those cases, a spill may triggered when Spark 
write your data from memory to disk.

One way to to check is to read Spark UI. When Spark cache the data, you will 
see a little green dot connected to the blue rectangle in the Spark UI. If you 
see this green dot twice on your two stages, likely Spark spill the data after 
your first job and read it again in the second run. You can also confirm it in 
other metrics from Spark UI.

That is my personal understanding based on what I have read and seen on my job 
runs. If there is any mistake, be free to correct me.

Thank You & Best Regards
Winston Lai

From: Nitin Siwach 
Sent: Sunday, May 7, 2023 12:22:32 PM
To: Vikas Kumar 
Cc: User 
Subject: Re: Does spark read the same file twice, if two stages are using the 
same DataFrame?

Thank you tons, Vikas :). That makes so much sense now

I'm in learning phase and was just browsing through various concepts of spark 
with self made small examples.

It didn't make sense to me that the two physical plans should be different. 
But, now I understand what you're saying.

Again, thank you for helping me out

On Sun, 7 May, 2023, 07:48 Vikas Kumar, 
mailto:vku...@etsy.com>> wrote:

Spark came up with a plan but that may or may not be optimal plan given the 
system settings.
If you do df1.cache() , i am guessing spark will not read df1 twice.

Btw, Why do you have adaptive enabled to be false?

On Sat, May 6, 2023, 1:46 PM Nitin Siwach 
mailto:nitinsiw...@gmail.com>> wrote:
I hope this email finds you well :)

The following code reads the same csv twice even though only one action is 
called

End to end runnable example:
```
import pandas as pd
import numpy as np

df1=  pd.DataFrame(np.arange(1_000).reshape(-1,1))
df1.index = np.random.choice(range(10),size=1000)
df1.to_csv("./df1.csv",index_label = "index")


from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, StructField

spark = 
SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\
config("spark.sql.adaptive.enabled","false").getOrCreate()

schema = StructType([StructField('index', StringType(), True),
 StructField('0', StringType(), True)])

df1 = spark.read.csv("./df1.csv", header=True, schema = schema)

df2 = df1.groupby("index").agg(F.mean("0"))
df3 = df1.join(df2,on='index')

df3.explain()
df3.count()
```

The sql tab in the web UI shows the following:

[screencapture-localhost-4040-SQL-execution-2023-05-06-19_48_41.png]

As you can see, the df1 file is read twice. Is this the expected behaviour? Why 
is that happening? I have just one action so the same part of the pipeline 
should not run multiple times.

I have read the answer 
[here][1].
 The question is almost the same it is just that in that question the RDDs are 
used and I am using dataframe in pyspark API. In that question, it is suggested 
that if multiple file scans are to be avoided then DataFrames API would help 
and this is the reason why DataFrama API exists in the first place


However, as it turns out, I am facing the exact same issue with the DataFrames 
as well. It seems rather weird of spark, which is celebrated for its 
efficiency, to be this inefficient (Mostly I am just missing something and that 
is not a valid criticism :))