I will follow up with the output, but I suppose Jupyter runs in client mode since it’s created via getOrCreate with a K8s api server as master. Also note that I tried both “collect” and “toPandas” in the same conditions (Jupyter client mode), so IIUC your theory doesn’t explain that difference in execution plans.
Best regards, Sergey Ivanychev > 4 нояб. 2021 г., в 13:12, Mich Talebzadeh <mich.talebza...@gmail.com> > написал(а): > > > Do you have the output for executors from spark GUI, the one that eventually > ends up with OOM? > > Also what does > > kubectl get pods -n $NAMESPACE > DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print $1}'` > kubectl logs $DRIVER_POD_NAME -n $NAMESPACE > kubectl logs $EXECUTOR_WITH_OOM -n $NAMESPACE > > say? > > My guess is that Jupyter notebook like Zeppelin notebook does a two stage > spark-submit under the bonnet. The job starts on the driver where the Jupyter > notebook is on but the actual job runs on the cluster itself in cluster mode. > If your assertion is right (executors don't play much of a role), just run > the whole thing in local mode! > > HTH > > view my Linkedin profile > > > Disclaimer: Use it at your own risk. Any and all responsibility for any loss, > damage or destruction of data or any other property which may arise from > relying on this email's technical content is explicitly disclaimed. The > author will in no case be liable for any monetary damages arising from such > loss, damage or destruction. > > > >> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev <sergeyivanyc...@gmail.com> >> wrote: >> I want to further clarify the use case I have: an ML engineer collects data >> so as to use it for training an ML model. The driver is created within >> Jupiter notebook and has 64G of ram for fetching the training set and >> feeding it to the model. Naturally, in this case executors shouldn’t be as >> big as the driver. >> >> Currently, the best solution I found is to write the dataframe to S3, and >> then read it via pd.read_parquet. >> >> Best regards, >> >> >> Sergey Ivanychev >> >>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh <mich.talebza...@gmail.com> >>>> написал(а): >>>> >>> >>> Thanks for clarification on the koalas case. >>> >>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all >>> the data gets shuffled to a single executor that fails with OOM,.... >>> >>> I still believe that this may be related to the way k8s handles shuffling. >>> In a balanced k8s cluster this could be avoided which does not seem to be >>> the case here as the so called driver node has 8 times more RAM than the >>> other nodes. >>> >>> HTH >>> >>> view my Linkedin profile >>> >>> Disclaimer: Use it at your own risk. Any and all responsibility for any >>> loss, damage or destruction of data or any other property which may arise >>> from relying on this email's technical content is explicitly disclaimed. >>> The author will in no case be liable for any monetary damages arising from >>> such loss, damage or destruction. >>> >>> >>> >>>> On Wed, 3 Nov 2021 at 21:00, Sean Owen <sro...@gmail.com> wrote: >>>> I think you're talking about koalas, which is in Spark 3.2, but that is >>>> unrelated to toPandas(), nor to the question of how it differs from >>>> collect(). >>>> Shuffle is also unrelated. >>>> >>>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh >>>>> <mich.talebza...@gmail.com> wrote: >>>>> Hi, >>>>> >>>>> As I understood in the previous versions of Spark the data could not be >>>>> processed and stored in Pandas data frames in a distributed mode as these >>>>> data frames store data in RAM which is the driver in this case. >>>>> However, I was under the impression that this limitation no longer exists >>>>> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and >>>>> 8GB of RAM for others, and PySpark running in cluster mode, how do you >>>>> expect the process to confine itself to the master node? What will happen >>>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s >>>>> cluster) and run the job again? >>>>> >>>>> Worth noting that the current Spark on k8s does not support external >>>>> shuffle. For now we have two parameters for Dynamic Resource Allocation. >>>>> These are >>>>> >>>>> --conf spark.dynamicAllocation.enabled=true \ >>>>> --conf spark.dynamicAllocation.shuffleTracking.enabled=true \ >>>>> >>>>> The idea is to use dynamic resource allocation where the driver tracks >>>>> the shuffle files and evicts only executors not storing active shuffle >>>>> files. So in a nutshell these shuffle files are stored in the executors >>>>> themselves in the absence of the external shuffle. The model works on the >>>>> basis of the "one-container-per-Pod" model meaning that for each node of >>>>> the cluster there will be one node running the driver and each remaining >>>>> node running one executor each. >>>>> >>>>> >>>>> HTH >>>>> , >>>>> view my Linkedin profile >>>>> >>>>> >>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any >>>>> loss, damage or destruction of data or any other property which may arise >>>>> from relying on this email's technical content is explicitly disclaimed. >>>>> The author will in no case be liable for any monetary damages arising >>>>> from such loss, damage or destruction. >>>>> >>>>>