Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Hyukjin Kwon
To add some more, if the number of rows to collect is large, DataFrame.collect can be slower because it launches multiple Spark jobs sequentially. Given that DataFrame.toPandas does not take how many rows to collect, it's controversial to apply the same optimization of DataFrame.collect to here.

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Hyukjin Kwon
Thanks for pinging me Sean. Yes, there's an optimization on DataFrame.collect which tries to collect few first partitioms and see if the number of rows are found (and repeat). DataFrame.toPandas does not have such optimization. I suspect that the shuffle isn't actual shuffle but just collects

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Sean Owen
Hyukjin can you weigh in? Is this exchange due to something in your operations or clearly unique to the toPandas operation? I didn't think it worked that way, but maybe there is some good reason it does. On Fri, Nov 12, 2021 at 7:34 AM Sergey Ivanychev wrote: > Hi Sean, > > According to the

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Sergey Ivanychev
Hi Sean, According to the plan I’m observing, this is what happens indeed. There’s exchange operation that sends data to a single partition/task in toPandas() + PyArrow enabled case. > 12 нояб. 2021 г., в 16:31, Sean Owen написал(а): > > Yes, none of the responses are addressing your

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Sean Owen
Yes, none of the responses are addressing your question. I do not think it's a bug necessarily; do you end up with one partition in your execution somewhere? On Fri, Nov 12, 2021 at 3:38 AM Sergey Ivanychev wrote: > Of course if I give 64G of ram to each executor they will work. But what’s >

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Sergey Ivanychev
Of course if I give 64G of ram to each executor they will work. But what’s the point? Collecting results in the driver should cause a high RAM usage in the driver and that’s what is happening in collect() case. In the case where pyarrow serialization is enabled all the data is being collected

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Mich Talebzadeh
OK, your findings do not imply those settings are incorrect. Those settings will work if you set-up your k8s cluster in peer-to-peer mode with equal amounts of RAM for each node which is common practice. HTH view my Linkedin profile

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Gourav Sengupta
Hi, Sorry Regards, Gourav Sengupta On Fri, Nov 12, 2021 at 6:48 AM Sergey Ivanychev wrote: > Hi Gourav, > > Please, read my question thoroughly. My problem is with the plan of the > execution and with the fact that toPandas collects all the data not on the > driver but on an executor, not

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Georg Heiler
https://stackoverflow.com/questions/46832394/spark-access-first-n-rows-take-vs-limit might be related Best, Georg Am Fr., 12. Nov. 2021 um 07:48 Uhr schrieb Sergey Ivanychev < sergeyivanyc...@gmail.com>: > Hi Gourav, > > Please, read my question thoroughly. My problem is with the plan of the >

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Sergey Ivanychev
Hi Gourav, Please, read my question thoroughly. My problem is with the plan of the execution and with the fact that toPandas collects all the data not on the driver but on an executor, not with the fact that there’s some memory overhead. I don’t understand how your excerpts answer my question.

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Gourav Sengupta
Hi Sergey, Please read the excerpts from the book of Dr. Zaharia that I had sent, they explain these fundamentals clearly. Regards, Gourav Sengupta On Thu, Nov 11, 2021 at 9:40 PM Sergey Ivanychev wrote: > Yes, in fact those are the settings that cause this behaviour. If set to > false,

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Sergey Ivanychev
Yes, in fact those are the settings that cause this behaviour. If set to false, everything goes fine since the implementation in spark sources in this case is pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) Best regards, Sergey Ivanychev > 11 нояб. 2021 г., в 13:58,

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Mich Talebzadeh
Have you tried the following settings: spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true") spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true") HTH view my Linkedin profile *Disclaimer:*

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Mich Talebzadeh
Ok so it boils down on how spark does create toPandas() DF under the bonnet. How many executors are involved in k8s cluster. In this model spark will create executors = no of nodes - 1 On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev wrote: > > Just to confirm with Collect() alone, this is all on

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
> did you get to read the excerpts from the book of Dr. Zaharia? I read what you have shared but didn’t manage to get your point. Best regards, Sergey Ivanychev > 4 нояб. 2021 г., в 20:38, Gourav Sengupta > написал(а): > > did you get to read the excerpts from the book of Dr. Zaharia?

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
> Just to confirm with Collect() alone, this is all on the driver? I shared the screenshot with the plan in the first email. In the collect() case the data gets fetched to the driver without problems. Best regards, Sergey Ivanychev > 4 нояб. 2021 г., в 20:37, Mich Talebzadeh > написал(а):

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Gourav Sengupta
Hi, did you get to read the excerpts from the book of Dr. Zaharia? Regards, Gourav On Thu, Nov 4, 2021 at 4:11 PM Sergey Ivanychev wrote: > I’m sure that its running in client mode. I don’t want to have the same > amount of RAM on drivers and executors since there’s no point in giving 64G >

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Mich Talebzadeh
Well evidently the indication is that this is happening on the executor and not on the driver node as assumed. Just to confirm with Collect() alone, this is all on the driver? HTH On Thu, 4 Nov 2021 at 16:10, Sergey Ivanychev wrote: > I’m sure that its running in clientele mode. I don’t want

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
I’m sure that its running in client mode. I don’t want to have the same amount of RAM on drivers and executors since there’s no point in giving 64G of ram to executors in my case. My question is why collect and toPandas actions produce so different plans, which cause toPandas to fail on

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Mich Talebzadeh
>From your notes ".. IIUC, in the `toPandas` case all the data gets shuffled to a single executor that fails with OOM, which doesn’t happen in `collect` case. This does it work like that? How do I collect a large dataset that fits into memory of the driver?. The acid test would be to use pandas

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
I will follow up with the output, but I suppose Jupyter runs in client mode since it’s created via getOrCreate with a K8s api server as master. Also note that I tried both “collect” and “toPandas” in the same conditions (Jupyter client mode), so IIUC your theory doesn’t explain that difference

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Mich Talebzadeh
Do you have the output for executors from spark GUI, the one that eventually ends up with OOM? Also what does kubectl get pods -n $NAMESPACE DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print $1}'` kubectl logs $DRIVER_POD_NAME -n $NAMESPACE kubectl logs $EXECUTOR_WITH_OOM

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Gourav Sengupta
Hi, I am copying Dr. Zaharia in this email as I am quoting from his book (once again I may be wrong): Chapter 5: Basic Structured Operations >> Creating Rows You can create rows by manually instantiating a Row object with the values that belong in each column. It’s important to note that only

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Sergey Ivanychev
I want to further clarify the use case I have: an ML engineer collects data so as to use it for training an ML model. The driver is created within Jupiter notebook and has 64G of ram for fetching the training set and feeding it to the model. Naturally, in this case executors shouldn’t be as big

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Mich Talebzadeh
Thanks for clarification on the koalas case. The thread owner states and I quote: .. IIUC, in the `toPandas` case all the data gets shuffled to a single executor that fails with OOM, I still believe that this may be related to the way k8s handles shuffling. In a balanced k8s cluster this

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Sean Owen
I think you're talking about koalas, which is in Spark 3.2, but that is unrelated to toPandas(), nor to the question of how it differs from collect(). Shuffle is also unrelated. On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh wrote: > Hi, > > As I understood in the previous versions of Spark the

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Sergey Ivanychev
I’m pretty sure WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 20) (10.20.167.28 executor 2): java.lang.OutOfMemoryError at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source) If you look at the «toPandas» you can see the exchange stage that doesn’t occur in the