@Vikas Kumar <vku...@etsy.com>
I am sorry but I thought that you had answered the other question that I
had raised to the same email address yesterday. It was around the SQL tab
in web UI and the output of .explain showing different plans.

I get how using .cache I can ensure that the data from a particular
checkpoint is reused and the computations do not happen again.

However, In my case here I am calling just one action. Within the purview
of one action Spark should not rerun the overlapping parts of the DAG. I do
not understand why the file scan is happening several times. I can easily
mitigate the issue by using window functions and creating all the columns
in one go without having to use several joins later on. That being said
this particular behavior is what I am trying ot understand. The golden rule
"The DAG overlaps wont run several times for one action" seems not to be
apocryphal. If you can shed some light on this matter I would appreciate it

@weiruanl...@gmail.com <weiruanl...@gmail.com> My datasets are very small
as you can see in the sample examples that I am creating as the first part
of the code

Really appreciate you guys helping me out with this :)

On Sun, May 7, 2023 at 12:23 PM Winston Lai <weiruanl...@gmail.com> wrote:

> When your memory is not sufficient to keep the cached data for your jobs
> in two different stages, it might be read twice because Spark might have to
> clear the previous cache for other jobs. In those cases, a spill may
> triggered when Spark write your data from memory to disk.
>
> One way to to check is to read Spark UI. When Spark cache the data, you
> will see a little green dot connected to the blue rectangle in the Spark
> UI. If you see this green dot twice on your two stages, likely Spark spill
> the data after your first job and read it again in the second run. You can
> also confirm it in other metrics from Spark UI.
>
> That is my personal understanding based on what I have read and seen on my
> job runs. If there is any mistake, be free to correct me.
>
> Thank You & Best Regards
> Winston Lai
> ------------------------------
> *From:* Nitin Siwach <nitinsiw...@gmail.com>
> *Sent:* Sunday, May 7, 2023 12:22:32 PM
> *To:* Vikas Kumar <vku...@etsy.com>
> *Cc:* User <user@spark.apache.org>
> *Subject:* Re: Does spark read the same file twice, if two stages are
> using the same DataFrame?
>
> Thank you tons, Vikas :). That makes so much sense now
>
> I'm in learning phase and was just browsing through various concepts of
> spark with self made small examples.
>
> It didn't make sense to me that the two physical plans should be
> different. But, now I understand what you're saying.
>
> Again, thank you for helping me out
>
> On Sun, 7 May, 2023, 07:48 Vikas Kumar, <vku...@etsy.com> wrote:
>
>
> Spark came up with a plan but that may or may not be optimal plan given
> the system settings.
> If you do df1.cache() , i am guessing spark will not read df1 twice.
>
> Btw, Why do you have adaptive enabled to be false?
>
> On Sat, May 6, 2023, 1:46 PM Nitin Siwach <nitinsiw...@gmail.com> wrote:
>
> I hope this email finds you well :)
>
> The following code reads the same csv twice even though only one action is
> called
>
> End to end runnable example:
> ```
> import pandas as pd
> import numpy as np
>
> df1=  pd.DataFrame(np.arange(1_000).reshape(-1,1))
> df1.index = np.random.choice(range(10),size=1000)
> df1.to_csv("./df1.csv",index_label = "index")
>
> ############################################################################
>
> from pyspark.sql import SparkSession
> from pyspark.sql import functions as F
> from pyspark.sql.types import StructType, StringType, StructField
>
> spark =
> SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\
> config("spark.sql.adaptive.enabled","false").getOrCreate()
>
> schema = StructType([StructField('index', StringType(), True),
>                      StructField('0', StringType(), True)])
>
> df1 = spark.read.csv("./df1.csv", header=True, schema = schema)
>
> df2 = df1.groupby("index").agg(F.mean("0"))
> df3 = df1.join(df2,on='index')
>
> df3.explain()
> df3.count()
> ```
>
> The sql tab in the web UI shows the following:
>
> [image: screencapture-localhost-4040-SQL-execution-2023-05-06-19_48_41.png]
>
> As you can see, the df1 file is read twice. Is this the expected
> behaviour? Why is that happening? I have just one action so the same part
> of the pipeline should not run multiple times.
>
> I have read the answer [here][1]
> <https://stackoverflow.com/questions/37894099/does-spark-read-the-same-file-twice-if-two-stages-are-using-the-same-rdd>.
> The question is almost the same it is just that in that question the RDDs
> are used and I am using dataframe in pyspark API. In that question, it is
> suggested that if multiple file scans are to be avoided then DataFrames API
> would help and this is the reason why DataFrama API exists in the first
> place
>
>
> However, as it turns out, I am facing the exact same issue with the
> DataFrames as well. It seems rather weird of spark, which is celebrated for
> its efficiency, to be this inefficient (Mostly I am just missing something
> and that is not a valid criticism :))
>
>

Reply via email to