Hi,

did you get to read the excerpts from the book of Dr. Zaharia?

Regards,
Gourav

On Thu, Nov 4, 2021 at 4:11 PM Sergey Ivanychev <sergeyivanyc...@gmail.com>
wrote:

> I’m sure that its running in client mode. I don’t want to have the same
> amount of RAM on drivers and executors since there’s no point in giving 64G
> of ram to executors in my case.
>
> My question is why collect and toPandas actions produce so different
> plans, which cause toPandas to fail on executors.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 15:17, Mich Talebzadeh <mich.talebza...@gmail.com>
> написал(а):
>
> 
>
> From your notes ".. IIUC, in the `toPandas` case all the data gets
> shuffled to a single executor that fails with OOM, which doesn’t happen in
> `collect` case. This does it work like that? How do I collect a large
> dataset that fits into memory of the driver?.
>
> The acid test would be to use pandas and ensure that all nodes have the
> same amount of RAM. The assumption here is that the master node has a
> larger amount of RAM that in theory should handle the work. for Jupiter
> with Pandas. You can easily find out which mode Spark is deploying by
> looking at Spark GUI page.
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev <sergeyivanyc...@gmail.com>
> wrote:
>
>> I will follow up with the output, but I suppose Jupyter runs in client
>> mode since it’s created via getOrCreate with a K8s api server as master.
>> Also note that I tried both “collect” and “toPandas” in the same
>> conditions (Jupyter client mode), so IIUC your theory doesn’t explain that
>> difference in execution plans.
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh <mich.talebza...@gmail.com>
>> написал(а):
>>
>> 
>> Do you have the output for executors from spark GUI, the one that
>> eventually ends up with OOM?
>>
>> Also what does
>>
>> kubectl get pods -n $NAMESPACE
>> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print
>> $1}'`
>> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
>> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>>
>>
>> say?
>>
>>
>> My guess is that Jupyter notebook like Zeppelin notebook does a two stage
>> spark-submit under the bonnet. The job starts on the driver where the
>> Jupyter notebook is on but the actual job runs on the cluster itself in
>> cluster mode. If your assertion is right (executors don't play much of a
>> role), just run the whole thing in local mode!
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev <sergeyivanyc...@gmail.com>
>> wrote:
>>
>>> I want to further clarify the use case I have: an ML engineer collects
>>> data so as to use it for training an ML model. The driver is created within
>>> Jupiter notebook and has 64G of ram for fetching the training set and
>>> feeding it to the model. Naturally, in this case executors shouldn’t be as
>>> big as the driver.
>>>
>>> Currently, the best solution I found is to write the dataframe to S3,
>>> and then read it via pd.read_parquet.
>>>
>>> Best regards,
>>>
>>>
>>> Sergey Ivanychev
>>>
>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh <mich.talebza...@gmail.com>
>>> написал(а):
>>>
>>> 
>>> Thanks for clarification on the koalas case.
>>>
>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
>>> the data gets shuffled to a single executor that fails with OOM,....
>>>
>>> I still believe that this may be related to the way k8s handles
>>> shuffling. In a balanced k8s cluster this could be avoided which does not
>>> seem to be the case here as the so called driver node has 8 times more
>>> RAM than the other nodes.
>>>
>>> HTH
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 3 Nov 2021 at 21:00, Sean Owen <sro...@gmail.com> wrote:
>>>
>>>> I think you're talking about koalas, which is in Spark 3.2, but that is
>>>> unrelated to toPandas(), nor to the question of how it differs from
>>>> collect().
>>>> Shuffle is also unrelated.
>>>>
>>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> As I understood in the previous versions of Spark the data could not
>>>>> be processed and stored in Pandas data frames in a distributed mode as
>>>>> these data frames store data in RAM which is the driver in this case.
>>>>> However, I was under the impression that this limitation no longer
>>>>> exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node
>>>>> and 8GB of RAM for others, and PySpark running in cluster mode,  how do 
>>>>> you
>>>>> expect the process to confine itself to the master node? What will happen
>>>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s
>>>>> cluster) and run the job again?
>>>>>
>>>>> Worth noting that the current Spark on k8s  does not support external
>>>>> shuffle. For now we have two parameters for Dynamic Resource Allocation.
>>>>> These are
>>>>>
>>>>>  --conf spark.dynamicAllocation.enabled=true \
>>>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>>>
>>>>>
>>>>> The idea is to use dynamic resource allocation where the driver
>>>>> tracks the shuffle files and evicts only executors not storing active
>>>>> shuffle files. So in a nutshell these shuffle files are stored in the
>>>>> executors themselves in the absence of the external shuffle. The model
>>>>> works on the basis of the "one-container-per-Pod" model
>>>>> <https://kubernetes.io/docs/concepts/workloads/pods/> meaning that
>>>>> for each node of the cluster there will be one node running the driver and
>>>>> each remaining node running one executor each.
>>>>>
>>>>>
>>>>>
>>>>> HTH
>>>>> ,
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to