I will follow up with the output, but I suppose Jupyter runs in client mode 
since it’s created via getOrCreate with a K8s api server as master.
Also note that I tried both “collect” and “toPandas” in the same conditions 
(Jupyter client mode), so IIUC your theory doesn’t explain that difference in 
execution plans.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh <mich.talebza...@gmail.com> 
> написал(а):
> 
> 
> Do you have the output for executors from spark GUI, the one that eventually 
> ends up with OOM?
> 
> Also what does 
> 
> kubectl get pods -n $NAMESPACE 
> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print $1}'`
> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
> 
> say?
> 
> My guess is that Jupyter notebook like Zeppelin notebook does a two stage 
> spark-submit under the bonnet. The job starts on the driver where the Jupyter 
> notebook is on but the actual job runs on the cluster itself in cluster mode. 
> If your assertion is right (executors don't play much of a role), just run 
> the whole thing in local mode!
> 
> HTH
> 
>    view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev <sergeyivanyc...@gmail.com> 
>> wrote:
>> I want to further clarify the use case I have: an ML engineer collects data 
>> so as to use it for training an ML model. The driver is created within 
>> Jupiter notebook and has 64G of ram for fetching the training set and 
>> feeding it to the model. Naturally, in this case executors shouldn’t be as 
>> big as the driver.
>> 
>> Currently, the best solution I found is to write the dataframe to S3, and 
>> then read it via pd.read_parquet.
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh <mich.talebza...@gmail.com> 
>>>> написал(а):
>>>> 
>>> 
>>> Thanks for clarification on the koalas case.
>>> 
>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all 
>>> the data gets shuffled to a single executor that fails with OOM,.... 
>>> 
>>> I still believe that this may be related to the way k8s handles shuffling. 
>>> In a balanced k8s cluster this could be avoided which does not seem to be 
>>> the case here as the so called driver node has 8 times more RAM than the 
>>> other nodes. 
>>> 
>>> HTH
>>> 
>>>    view my Linkedin profile
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
>>>> On Wed, 3 Nov 2021 at 21:00, Sean Owen <sro...@gmail.com> wrote:
>>>> I think you're talking about koalas, which is in Spark 3.2, but that is 
>>>> unrelated to toPandas(), nor to the question of how it differs from 
>>>> collect().
>>>> Shuffle is also unrelated.
>>>> 
>>>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh 
>>>>> <mich.talebza...@gmail.com> wrote:
>>>>> Hi,
>>>>> 
>>>>> As I understood in the previous versions of Spark the data could not be 
>>>>> processed and stored in Pandas data frames in a distributed mode as these 
>>>>> data frames store data in RAM which is the driver in this case. 
>>>>> However, I was under the impression that this limitation no longer exists 
>>>>> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 
>>>>> 8GB of RAM for others, and PySpark running in cluster mode,  how do you 
>>>>> expect the process to confine itself to the master node? What will happen 
>>>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s 
>>>>> cluster) and run the job again?
>>>>> 
>>>>> Worth noting that the current Spark on k8s  does not support external 
>>>>> shuffle. For now we have two parameters for Dynamic Resource Allocation. 
>>>>> These are 
>>>>> 
>>>>>  --conf spark.dynamicAllocation.enabled=true \
>>>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>>>  
>>>>> The idea is to use dynamic resource allocation where the driver tracks 
>>>>> the shuffle files and evicts only executors not storing active shuffle 
>>>>> files. So in a nutshell these shuffle files are stored in the executors 
>>>>> themselves in the absence of the external shuffle. The model works on the 
>>>>> basis of the "one-container-per-Pod" model  meaning that for each node of 
>>>>> the cluster there will be one node running the driver and each remaining 
>>>>> node running one executor each. 
>>>>> 
>>>>> 
>>>>> HTH
>>>>> , 
>>>>>    view my Linkedin profile
>>>>> 
>>>>>  
>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>>>> loss, damage or destruction of data or any other property which may arise 
>>>>> from relying on this email's technical content is explicitly disclaimed. 
>>>>> The author will in no case be liable for any monetary damages arising 
>>>>> from such loss, damage or destruction.
>>>>>  
>>>>> 

Reply via email to