Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Sergey Ivanychev
Hi Sean,

According to the plan I’m observing, this is what happens indeed. There’s 
exchange operation that sends data to a single partition/task in toPandas() + 
PyArrow enabled case.

> 12 нояб. 2021 г., в 16:31, Sean Owen  написал(а):
> 
> Yes, none of the responses are addressing your question.
> I do not think it's a bug necessarily; do you end up with one partition in 
> your execution somewhere?
> 
> On Fri, Nov 12, 2021 at 3:38 AM Sergey Ivanychev  <mailto:sergeyivanyc...@gmail.com>> wrote:
> Of course if I give 64G of ram to each executor they will work. But what’s 
> the point? Collecting results in the driver should cause a high RAM usage in 
> the driver and that’s what is happening in collect() case. In the case where 
> pyarrow serialization is enabled all the data is being collected on a single 
> executor, which is clearly a wrong way to collect the result on the driver.
> 
> I guess I’ll open an issue about it in Spark Jira. It clearly looks like a 
> bug.
> 
>> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh > <mailto:mich.talebza...@gmail.com>> написал(а):
>> 
>> OK, your findings do not imply those settings are incorrect. Those settings 
>> will work if you set-up your k8s cluster in peer-to-peer mode with equal 
>> amounts of RAM for each node which is common practice.
>> 
>> HTH
>> 
>> 
>>view my Linkedin profile 
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> 
>> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev > <mailto:sergeyivanyc...@gmail.com>> wrote:
>> Yes, in fact those are the settings that cause this behaviour. If set to 
>> false, everything goes fine since the implementation in spark sources in 
>> this case is
>> 
>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh >> <mailto:mich.talebza...@gmail.com>> написал(а):
>>> 
>>> 
>>> Have you tried the following settings:
>>> 
>>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>> 
>>> HTH
>>> 
>>>view my Linkedin profile 
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
>>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh >> <mailto:mich.talebza...@gmail.com>> wrote:
>>> Ok so it boils down on how spark does create toPandas() DF under the 
>>> bonnet. How many executors are involved in k8s cluster. In this model spark 
>>> will create executors = no of nodes - 1
>>> 
>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev >> <mailto:sergeyivanyc...@gmail.com>> wrote:
>>> > Just to confirm with Collect() alone, this is all on the driver?
>>> 
>>> I shared the screenshot with the plan in the first email. In the collect() 
>>> case the data gets fetched to the driver without problems.
>>> 
>>> Best regards,
>>> 
>>> 
>>> Sergey Ivanychev
>>> 
>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh >>> <mailto:mich.talebza...@gmail.com>> написал(а):
>>>> 
>>> 
>>>> Just to confirm with Collect() alone, this is all on the driver?
>>> -- 
>>> 
>>> 
>>>view my Linkedin profile 
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
> 



Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Sergey Ivanychev
Of course if I give 64G of ram to each executor they will work. But what’s the 
point? Collecting results in the driver should cause a high RAM usage in the 
driver and that’s what is happening in collect() case. In the case where 
pyarrow serialization is enabled all the data is being collected on a single 
executor, which is clearly a wrong way to collect the result on the driver.

I guess I’ll open an issue about it in Spark Jira. It clearly looks like a bug.

> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh  
> написал(а):
> 
> OK, your findings do not imply those settings are incorrect. Those settings 
> will work if you set-up your k8s cluster in peer-to-peer mode with equal 
> amounts of RAM for each node which is common practice.
> 
> HTH
> 
> 
>view my Linkedin profile 
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev  <mailto:sergeyivanyc...@gmail.com>> wrote:
> Yes, in fact those are the settings that cause this behaviour. If set to 
> false, everything goes fine since the implementation in spark sources in this 
> case is
> 
> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
> 
> Best regards,
> 
> 
> Sergey Ivanychev
> 
>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh > <mailto:mich.talebza...@gmail.com>> написал(а):
>> 
>> 
>> Have you tried the following settings:
>> 
>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>> 
>> HTH
>> 
>>view my Linkedin profile 
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction. 
>>  
>> 
>> 
>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh > <mailto:mich.talebza...@gmail.com>> wrote:
>> Ok so it boils down on how spark does create toPandas() DF under the bonnet. 
>> How many executors are involved in k8s cluster. In this model spark will 
>> create executors = no of nodes - 1
>> 
>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev > <mailto:sergeyivanyc...@gmail.com>> wrote:
>> > Just to confirm with Collect() alone, this is all on the driver?
>> 
>> I shared the screenshot with the plan in the first email. In the collect() 
>> case the data gets fetched to the driver without problems.
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh >> <mailto:mich.talebza...@gmail.com>> написал(а):
>>> 
>> 
>>> Just to confirm with Collect() alone, this is all on the driver?
>> -- 
>> 
>> 
>>view my Linkedin profile 
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  



Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Sergey Ivanychev
Hi Gourav,

Please, read my question thoroughly. My problem is with the plan of the 
execution and with the fact that toPandas collects all the data not on the 
driver but on an executor, not with the fact that there’s some memory overhead.

I don’t understand how your excerpts answer my question. The chapters you’ve 
shared describe that serialization is costly, that workers can fail due to the 
memory constraints and inter-language serialization.

This is irrelevant to my question — building pandas DataFrame using Spark’s 
collect() works fine and this operation itself involves much deserialization of 
Row objects.

Best regards,


Sergey Ivanychev

> 12 нояб. 2021 г., в 05:05, Gourav Sengupta  
> написал(а):
> 
> Hi Sergey,
> 
> Please read the excerpts from the book of Dr. Zaharia that I had sent, they 
> explain these fundamentals clearly.
> 
> Regards,
> Gourav Sengupta
> 
> On Thu, Nov 11, 2021 at 9:40 PM Sergey Ivanychev  
> wrote:
>> Yes, in fact those are the settings that cause this behaviour. If set to 
>> false, everything goes fine since the implementation in spark sources in 
>> this case is
>> 
>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh  
>>> написал(а):
>>> 
>>> Have you tried the following settings:
>>> 
>>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>> 
>>> HTH
>>> 
>>>view my Linkedin profile
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
>>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh  
>>> wrote:
>>>> Ok so it boils down on how spark does create toPandas() DF under the 
>>>> bonnet. How many executors are involved in k8s cluster. In this model 
>>>> spark will create executors = no of nodes - 1
>>>> 
>>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev  
>>>> wrote:
>>>>> > Just to confirm with Collect() alone, this is all on the driver?
>>>>> 
>>>>> I shared the screenshot with the plan in the first email. In the 
>>>>> collect() case the data gets fetched to the driver without problems.
>>>>> 
>>>>> Best regards,
>>>>> 
>>>>> 
>>>>> Sergey Ivanychev
>>>>> 
>>>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh  
>>>>>> написал(а):
>>>>> 
>>>>>> Just to confirm with Collect() alone, this is all on the driver?
>>>> -- 
>>>> 
>>>> 
>>>>view my Linkedin profile
>>>> 
>>>>  
>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>>> loss, damage or destruction of data or any other property which may arise 
>>>> from relying on this email's technical content is explicitly disclaimed. 
>>>> The author will in no case be liable for any monetary damages arising from 
>>>> such loss, damage or destruction.


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Sergey Ivanychev
Yes, in fact those are the settings that cause this behaviour. If set to false, 
everything goes fine since the implementation in spark sources in this case is

pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)

Best regards,


Sergey Ivanychev

> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh  
> написал(а):
> 
> 
> Have you tried the following settings:
> 
> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
> 
> HTH
> 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh  
>> wrote:
>> Ok so it boils down on how spark does create toPandas() DF under the bonnet. 
>> How many executors are involved in k8s cluster. In this model spark will 
>> create executors = no of nodes - 1
>> 
>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev  
>>> wrote:
>>> > Just to confirm with Collect() alone, this is all on the driver?
>>> 
>>> I shared the screenshot with the plan in the first email. In the collect() 
>>> case the data gets fetched to the driver without problems.
>>> 
>>> Best regards,
>>> 
>>> 
>>> Sergey Ivanychev
>>> 
>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh  
>>>> написал(а):
>>>> 
>>> 
>>>> Just to confirm with Collect() alone, this is all on the driver?
>> -- 
>> 
>> 
>>view my Linkedin profile
>> 
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
> did you get to read the excerpts from the book of Dr. Zaharia?

I read what you have shared but didn’t manage to get your point.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 20:38, Gourav Sengupta  
> написал(а):
> 
> did you get to read the excerpts from the book of Dr. Zaharia?


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
> Just to confirm with Collect() alone, this is all on the driver?

I shared the screenshot with the plan in the first email. In the collect() case 
the data gets fetched to the driver without problems.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh  
> написал(а):
> 
> Just to confirm with Collect() alone, this is all on the driver?


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
I’m sure that its running in client mode. I don’t want to have the same amount 
of RAM on drivers and executors since there’s no point in giving 64G of ram to 
executors in my case.

My question is why collect and toPandas actions produce so different plans, 
which cause toPandas to fail on executors.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 15:17, Mich Talebzadeh  
> написал(а):
> 
> 
> 
> From your notes ".. IIUC, in the `toPandas` case all the data gets shuffled 
> to a single executor that fails with OOM, which doesn’t happen in `collect` 
> case. This does it work like that? How do I collect a large dataset that fits 
> into memory of the driver?.
> 
> The acid test would be to use pandas and ensure that all nodes have the same 
> amount of RAM. The assumption here is that the master node has a larger 
> amount of RAM that in theory should handle the work. for Jupiter with Pandas. 
> You can easily find out which mode Spark is deploying by looking at Spark GUI 
> page.
> 
> HTH
> 
> 
> 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev  
>> wrote:
>> I will follow up with the output, but I suppose Jupyter runs in client mode 
>> since it’s created via getOrCreate with a K8s api server as master.
>> Also note that I tried both “collect” and “toPandas” in the same conditions 
>> (Jupyter client mode), so IIUC your theory doesn’t explain that difference 
>> in execution plans.
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>>> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh  
>>>> написал(а):
>>>> 
>>> 
>>> Do you have the output for executors from spark GUI, the one that 
>>> eventually ends up with OOM?
>>> 
>>> Also what does 
>>> 
>>> kubectl get pods -n $NAMESPACE 
>>> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print 
>>> $1}'`
>>> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
>>> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>>> 
>>> say?
>>> 
>>> My guess is that Jupyter notebook like Zeppelin notebook does a two stage 
>>> spark-submit under the bonnet. The job starts on the driver where the 
>>> Jupyter notebook is on but the actual job runs on the cluster itself in 
>>> cluster mode. If your assertion is right (executors don't play much of a 
>>> role), just run the whole thing in local mode!
>>> 
>>> HTH
>>> 
>>>view my Linkedin profile
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
>>>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev  
>>>> wrote:
>>>> I want to further clarify the use case I have: an ML engineer collects 
>>>> data so as to use it for training an ML model. The driver is created 
>>>> within Jupiter notebook and has 64G of ram for fetching the training set 
>>>> and feeding it to the model. Naturally, in this case executors shouldn’t 
>>>> be as big as the driver.
>>>> 
>>>> Currently, the best solution I found is to write the dataframe to S3, and 
>>>> then read it via pd.read_parquet.
>>>> 
>>>> Best regards,
>>>> 
>>>> 
>>>> Sergey Ivanychev
>>>> 
>>>>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh  
>>>>>> написал(а):
>>>>>> 
>>>>> 
>>>>> Thanks for clarification on the koalas case.
>>>>> 
>>>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all 
>>>>> the data gets shuffled to a single executor that fails with OOM, 
>>>>> 
>>>>> I still believe that this may be related to the way k8s handles 
>>>>> shuffling. In a balan

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
I will follow up with the output, but I suppose Jupyter runs in client mode 
since it’s created via getOrCreate with a K8s api server as master.
Also note that I tried both “collect” and “toPandas” in the same conditions 
(Jupyter client mode), so IIUC your theory doesn’t explain that difference in 
execution plans.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh  
> написал(а):
> 
> 
> Do you have the output for executors from spark GUI, the one that eventually 
> ends up with OOM?
> 
> Also what does 
> 
> kubectl get pods -n $NAMESPACE 
> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print $1}'`
> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
> 
> say?
> 
> My guess is that Jupyter notebook like Zeppelin notebook does a two stage 
> spark-submit under the bonnet. The job starts on the driver where the Jupyter 
> notebook is on but the actual job runs on the cluster itself in cluster mode. 
> If your assertion is right (executors don't play much of a role), just run 
> the whole thing in local mode!
> 
> HTH
> 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev  
>> wrote:
>> I want to further clarify the use case I have: an ML engineer collects data 
>> so as to use it for training an ML model. The driver is created within 
>> Jupiter notebook and has 64G of ram for fetching the training set and 
>> feeding it to the model. Naturally, in this case executors shouldn’t be as 
>> big as the driver.
>> 
>> Currently, the best solution I found is to write the dataframe to S3, and 
>> then read it via pd.read_parquet.
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh  
>>>> написал(а):
>>>> 
>>> 
>>> Thanks for clarification on the koalas case.
>>> 
>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all 
>>> the data gets shuffled to a single executor that fails with OOM, 
>>> 
>>> I still believe that this may be related to the way k8s handles shuffling. 
>>> In a balanced k8s cluster this could be avoided which does not seem to be 
>>> the case here as the so called driver node has 8 times more RAM than the 
>>> other nodes. 
>>> 
>>> HTH
>>> 
>>>view my Linkedin profile
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
>>>> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>>>> I think you're talking about koalas, which is in Spark 3.2, but that is 
>>>> unrelated to toPandas(), nor to the question of how it differs from 
>>>> collect().
>>>> Shuffle is also unrelated.
>>>> 
>>>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh 
>>>>>  wrote:
>>>>> Hi,
>>>>> 
>>>>> As I understood in the previous versions of Spark the data could not be 
>>>>> processed and stored in Pandas data frames in a distributed mode as these 
>>>>> data frames store data in RAM which is the driver in this case. 
>>>>> However, I was under the impression that this limitation no longer exists 
>>>>> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 
>>>>> 8GB of RAM for others, and PySpark running in cluster mode,  how do you 
>>>>> expect the process to confine itself to the master node? What will happen 
>>>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s 
>>>>> cluster) and run the job again?
>>>>> 
>>>>> Worth noting that the current Spark on k8s  does not support external 
>>>>> shuffle. For now we have two parameters for Dynamic Resource Allocation. 
>>>>&

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Sergey Ivanychev
I want to further clarify the use case I have: an ML engineer collects data so 
as to use it for training an ML model. The driver is created within Jupiter 
notebook and has 64G of ram for fetching the training set and feeding it to the 
model. Naturally, in this case executors shouldn’t be as big as the driver.

Currently, the best solution I found is to write the dataframe to S3, and then 
read it via pd.read_parquet.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh  
> написал(а):
> 
> 
> Thanks for clarification on the koalas case.
> 
> The thread owner states and I quote: .. IIUC, in the `toPandas` case all the 
> data gets shuffled to a single executor that fails with OOM, 
> 
> I still believe that this may be related to the way k8s handles shuffling. In 
> a balanced k8s cluster this could be avoided which does not seem to be the 
> case here as the so called driver node has 8 times more RAM than the other 
> nodes. 
> 
> HTH
> 
>view my Linkedin profile
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>> I think you're talking about koalas, which is in Spark 3.2, but that is 
>> unrelated to toPandas(), nor to the question of how it differs from 
>> collect().
>> Shuffle is also unrelated.
>> 
>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh  
>>> wrote:
>>> Hi,
>>> 
>>> As I understood in the previous versions of Spark the data could not be 
>>> processed and stored in Pandas data frames in a distributed mode as these 
>>> data frames store data in RAM which is the driver in this case. 
>>> However, I was under the impression that this limitation no longer exists 
>>> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 8GB 
>>> of RAM for others, and PySpark running in cluster mode,  how do you expect 
>>> the process to confine itself to the master node? What will happen if you 
>>> increase executor node(s) RAM to 64GB temporarily (balanced k8s cluster) 
>>> and run the job again?
>>> 
>>> Worth noting that the current Spark on k8s  does not support external 
>>> shuffle. For now we have two parameters for Dynamic Resource Allocation. 
>>> These are 
>>> 
>>>  --conf spark.dynamicAllocation.enabled=true \
>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>  
>>> The idea is to use dynamic resource allocation where the driver tracks the 
>>> shuffle files and evicts only executors not storing active shuffle files. 
>>> So in a nutshell these shuffle files are stored in the executors themselves 
>>> in the absence of the external shuffle. The model works on the basis of the 
>>> "one-container-per-Pod" model  meaning that for each node of the cluster 
>>> there will be one node running the driver and each remaining node running 
>>> one executor each. 
>>> 
>>> 
>>> HTH
>>> , 
>>>view my Linkedin profile
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Sergey Ivanychev
I’m pretty sure

WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 20) (10.20.167.28 executor 
2): java.lang.OutOfMemoryError
at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
If you look at the «toPandas» you can see the exchange stage that doesn’t occur 
in the «collect» example. I suppose it tries to pull all the data to a single 
executor for some reason.

> 3 нояб. 2021 г., в 21:34, Sean Owen  написал(а):
> 
> No, you can collect() a DataFrame. You get Rows. collect() must create an 
> in-memory representation - how else could it work? Those aren't differences.
> Are you sure it's the executor that's OOM, not the driver? 
> 
> On Wed, Nov 3, 2021 at 1:32 PM Gourav Sengupta  <mailto:gourav.sengu...@gmail.com>> wrote:
> Hi,
> 
> I might be wrong but toPandas() works with dataframes, where as collect works 
> at RDD. Also toPandas() converts to Python objects in memory I do not think 
> that collect does it.
> 
> Regards,
> Gourav
> 
> On Wed, Nov 3, 2021 at 2:24 PM Sergey Ivanychev  <mailto:sergeyivanyc...@gmail.com>> wrote:
> Hi, 
> 
> Spark 3.1.2 K8s.
> 
> I encountered OOM error while trying to create a Pandas DataFrame from Spark 
> DataFrame. My Spark driver has 60G of ram, but the executors are tiny 
> compared to that (8G)
> 
> If I do `spark.table(…).limit(100).collect()` I get the following plan
> 
> <2021-11-03_17-07-55.png>
> 
> 
> If I do `spark.table(…).limit(100).toPandas()` I get a more complicated 
> plan with an extra shuffle
> 
> <2021-11-03_17-08-31.png>
> 
> IIUC, in the `toPandas` case all the data gets shuffled to a single executor 
> that fails with OOM, which doesn’t happen in `collect` case. This does it 
> work like that? How do I collect a large dataset that fits into memory of the 
> driver?