Well evidently the indication is that this is happening on the executor and
not on the driver node as assumed. Just to confirm with Collect() alone,
this is all on the driver?

HTH

On Thu, 4 Nov 2021 at 16:10, Sergey Ivanychev <sergeyivanyc...@gmail.com>
wrote:

> I’m sure that its running in clientele mode. I don’t want to have the same
> amount of RAM on drivers and executors since there’s no point in giving 64G
> of ram to executors in my case.
> My question is why collect and toPandas actions produce so different
> plans, which cause toPandas to fail on executors.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 15:17, Mich Talebzadeh <mich.talebza...@gmail.com>
> написал(а):
>
> 
>
>
> From your notes ".. IIUC, in the `toPandas` case all the data gets
> shuffled to a single executor that fails with OOM, which doesn’t happen in
> `collect` case. This does it work like that? How do I collect a large
> dataset that fits into memory of the driver?.
>
> The acid test would be to use pandas and ensure that all nodes have the
> same amount of RAM. The assumption here is that the master node has a
> larger amount of RAM that in theory should handle the work. for Jupiter
> with Pandas. You can easily find out which mode Spark is deploying by
> looking at Spark GUI page.
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev <sergeyivanyc...@gmail.com>
> wrote:
>
>> I will follow up with the output, but I suppose Jupyter runs in client
>> mode since it’s created via getOrCreate with a K8s api server as master.
>> Also note that I tried both “collect” and “toPandas” in the same
>> conditions (Jupyter client mode), so IIUC your theory doesn’t explain that
>> difference in execution plans.
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh <mich.talebza...@gmail.com>
>> написал(а):
>>
>> 
>> Do you have the output for executors from spark GUI, the one that
>> eventually ends up with OOM?
>>
>> Also what does
>>
>> kubectl get pods -n $NAMESPACE
>> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print
>> $1}'`
>> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
>> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>>
>>
>> say?
>>
>>
>> My guess is that Jupyter notebook like Zeppelin notebook does a two stage
>> spark-submit under the bonnet. The job starts on the driver where the
>> Jupyter notebook is on but the actual job runs on the cluster itself in
>> cluster mode. If your assertion is right (executors don't play much of a
>> role), just run the whole thing in local mode!
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev <sergeyivanyc...@gmail.com>
>> wrote:
>>
>>> I want to further clarify the use case I have: an ML engineer collects
>>> data so as to use it for training an ML model. The driver is created within
>>> Jupiter notebook and has 64G of ram for fetching the training set and
>>> feeding it to the model. Naturally, in this case executors shouldn’t be as
>>> big as the driver.
>>>
>>> Currently, the best solution I found is to write the dataframe to S3,
>>> and then read it via pd.read_parquet.
>>>
>>> Best regards,
>>>
>>>
>>> Sergey Ivanychev
>>>
>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh <mich.talebza...@gmail.com>
>>> написал(а):
>>>
>>> 
>>> Thanks for clarification on the koalas case.
>>>
>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
>>> the data gets shuffled to a single executor that fails with OOM,....
>>>
>>> I still believe that this may be related to the way k8s handles
>>> shuffling. In a balanced k8s cluster this could be avoided which does not
>>> seem to be the case here as the so called driver node has 8 times more
>>> RAM than the other nodes.
>>>
>>> HTH
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 3 Nov 2021 at 21:00, Sean Owen <sro...@gmail.com> wrote:
>>>
>>>> I think you're talking about koalas, which is in Spark 3.2, but that is
>>>> unrelated to toPandas(), nor to the question of how it differs from
>>>> collect().
>>>> Shuffle is also unrelated.
>>>>
>>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> As I understood in the previous versions of Spark the data could not
>>>>> be processed and stored in Pandas data frames in a distributed mode as
>>>>> these data frames store data in RAM which is the driver in this case.
>>>>> However, I was under the impression that this limitation no longer
>>>>> exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node
>>>>> and 8GB of RAM for others, and PySpark running in cluster mode,  how do 
>>>>> you
>>>>> expect the process to confine itself to the master node? What will happen
>>>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s
>>>>> cluster) and run the job again?
>>>>>
>>>>> Worth noting that the current Spark on k8s  does not support external
>>>>> shuffle. For now we have two parameters for Dynamic Resource Allocation.
>>>>> These are
>>>>>
>>>>>  --conf spark.dynamicAllocation.enabled=true \
>>>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>>>
>>>>>
>>>>> The idea is to use dynamic resource allocation where the driver
>>>>> tracks the shuffle files and evicts only executors not storing active
>>>>> shuffle files. So in a nutshell these shuffle files are stored in the
>>>>> executors themselves in the absence of the external shuffle. The model
>>>>> works on the basis of the "one-container-per-Pod" model
>>>>> <https://kubernetes.io/docs/concepts/workloads/pods/> meaning that
>>>>> for each node of the cluster there will be one node running the driver and
>>>>> each remaining node running one executor each.
>>>>>
>>>>>
>>>>>
>>>>> HTH
>>>>> ,
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>> --



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Reply via email to