Hi, did you get to read the excerpts from the book of Dr. Zaharia?
Regards, Gourav On Thu, Nov 4, 2021 at 4:11 PM Sergey Ivanychev <sergeyivanyc...@gmail.com> wrote: > I’m sure that its running in client mode. I don’t want to have the same > amount of RAM on drivers and executors since there’s no point in giving 64G > of ram to executors in my case. > > My question is why collect and toPandas actions produce so different > plans, which cause toPandas to fail on executors. > > Best regards, > > > Sergey Ivanychev > > 4 нояб. 2021 г., в 15:17, Mich Talebzadeh <mich.talebza...@gmail.com> > написал(а): > > > > From your notes ".. IIUC, in the `toPandas` case all the data gets > shuffled to a single executor that fails with OOM, which doesn’t happen in > `collect` case. This does it work like that? How do I collect a large > dataset that fits into memory of the driver?. > > The acid test would be to use pandas and ensure that all nodes have the > same amount of RAM. The assumption here is that the master node has a > larger amount of RAM that in theory should handle the work. for Jupiter > with Pandas. You can easily find out which mode Spark is deploying by > looking at Spark GUI page. > > HTH > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev <sergeyivanyc...@gmail.com> > wrote: > >> I will follow up with the output, but I suppose Jupyter runs in client >> mode since it’s created via getOrCreate with a K8s api server as master. >> Also note that I tried both “collect” and “toPandas” in the same >> conditions (Jupyter client mode), so IIUC your theory doesn’t explain that >> difference in execution plans. >> >> Best regards, >> >> >> Sergey Ivanychev >> >> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh <mich.talebza...@gmail.com> >> написал(а): >> >> >> Do you have the output for executors from spark GUI, the one that >> eventually ends up with OOM? >> >> Also what does >> >> kubectl get pods -n $NAMESPACE >> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print >> $1}'` >> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE >> kubectl logs $EXECUTOR_WITH_OOM -n $NAMESPACE >> >> >> say? >> >> >> My guess is that Jupyter notebook like Zeppelin notebook does a two stage >> spark-submit under the bonnet. The job starts on the driver where the >> Jupyter notebook is on but the actual job runs on the cluster itself in >> cluster mode. If your assertion is right (executors don't play much of a >> role), just run the whole thing in local mode! >> >> >> HTH >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev <sergeyivanyc...@gmail.com> >> wrote: >> >>> I want to further clarify the use case I have: an ML engineer collects >>> data so as to use it for training an ML model. The driver is created within >>> Jupiter notebook and has 64G of ram for fetching the training set and >>> feeding it to the model. Naturally, in this case executors shouldn’t be as >>> big as the driver. >>> >>> Currently, the best solution I found is to write the dataframe to S3, >>> and then read it via pd.read_parquet. >>> >>> Best regards, >>> >>> >>> Sergey Ivanychev >>> >>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh <mich.talebza...@gmail.com> >>> написал(а): >>> >>> >>> Thanks for clarification on the koalas case. >>> >>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all >>> the data gets shuffled to a single executor that fails with OOM,.... >>> >>> I still believe that this may be related to the way k8s handles >>> shuffling. In a balanced k8s cluster this could be avoided which does not >>> seem to be the case here as the so called driver node has 8 times more >>> RAM than the other nodes. >>> >>> HTH >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Wed, 3 Nov 2021 at 21:00, Sean Owen <sro...@gmail.com> wrote: >>> >>>> I think you're talking about koalas, which is in Spark 3.2, but that is >>>> unrelated to toPandas(), nor to the question of how it differs from >>>> collect(). >>>> Shuffle is also unrelated. >>>> >>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> As I understood in the previous versions of Spark the data could not >>>>> be processed and stored in Pandas data frames in a distributed mode as >>>>> these data frames store data in RAM which is the driver in this case. >>>>> However, I was under the impression that this limitation no longer >>>>> exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node >>>>> and 8GB of RAM for others, and PySpark running in cluster mode, how do >>>>> you >>>>> expect the process to confine itself to the master node? What will happen >>>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s >>>>> cluster) and run the job again? >>>>> >>>>> Worth noting that the current Spark on k8s does not support external >>>>> shuffle. For now we have two parameters for Dynamic Resource Allocation. >>>>> These are >>>>> >>>>> --conf spark.dynamicAllocation.enabled=true \ >>>>> --conf spark.dynamicAllocation.shuffleTracking.enabled=true \ >>>>> >>>>> >>>>> The idea is to use dynamic resource allocation where the driver >>>>> tracks the shuffle files and evicts only executors not storing active >>>>> shuffle files. So in a nutshell these shuffle files are stored in the >>>>> executors themselves in the absence of the external shuffle. The model >>>>> works on the basis of the "one-container-per-Pod" model >>>>> <https://kubernetes.io/docs/concepts/workloads/pods/> meaning that >>>>> for each node of the cluster there will be one node running the driver and >>>>> each remaining node running one executor each. >>>>> >>>>> >>>>> >>>>> HTH >>>>> , >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>>