Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Hyukjin Kwon
To add some more, if the number of rows to collect is large,
DataFrame.collect can be slower because it launches multiple Spark jobs
sequentially.

Given that DataFrame.toPandas does not take how many rows to collect, it's
controversial to apply the same optimization of DataFrame.collect to here.

We could have a configuration to enable and disable but the implementation
of this in DataFrame.toPandas would be complicated due to existing
optimization such as Arrow. Haven't taken a deeper look though but my guts
say it's not worthwhile.

On Sat, Nov 13, 2021 at 12:05 PM Hyukjin Kwon  wrote:

> Thanks for pinging me Sean.
>
> Yes, there's an optimization on DataFrame.collect which tries to collect
> few first partitioms and see if the number of rows are found (and repeat).
>
> DataFrame.toPandas does not have such optimization.
>
> I suspect that the shuffle isn't actual shuffle but just collects local
> limits on executors to one executor to calculate global limit.
>
> On Fri, Nov 12, 2021 at 11:16 PM Sean Owen  wrote:
>
>> Hyukjin can you weigh in?
>> Is this exchange due to something in your operations or clearly unique to
>> the toPandas operation?
>> I didn't think it worked that way, but maybe there is some good reason it
>> does.
>>
>> On Fri, Nov 12, 2021 at 7:34 AM Sergey Ivanychev <
>> sergeyivanyc...@gmail.com> wrote:
>>
>>> Hi Sean,
>>>
>>> According to the plan I’m observing, this is what happens indeed.
>>> There’s exchange operation that sends data to a single partition/task in
>>> toPandas() + PyArrow enabled case.
>>>
>>> 12 нояб. 2021 г., в 16:31, Sean Owen  написал(а):
>>>
>>> Yes, none of the responses are addressing your question.
>>> I do not think it's a bug necessarily; do you end up with one partition
>>> in your execution somewhere?
>>>
>>> On Fri, Nov 12, 2021 at 3:38 AM Sergey Ivanychev <
>>> sergeyivanyc...@gmail.com> wrote:
>>>
 Of course if I give 64G of ram to each executor they will work. But
 what’s the point? Collecting results in the driver should cause a high RAM
 usage in the driver and that’s what is happening in collect() case. In the
 case where pyarrow serialization is enabled all the data is being collected
 on a single executor, which is clearly a wrong way to collect the result on
 the driver.

 I guess I’ll open an issue about it in Spark Jira. It clearly looks
 like a bug.

 12 нояб. 2021 г., в 11:59, Mich Talebzadeh 
 написал(а):

 OK, your findings do not imply those settings are incorrect. Those
 settings will work if you set-up your k8s cluster in peer-to-peer mode with
 equal amounts of RAM for each node which is common practice.

 HTH


view my Linkedin profile
 


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev <
 sergeyivanyc...@gmail.com> wrote:

> Yes, in fact those are the settings that cause this behaviour. If set
> to false, everything goes fine since the implementation in spark sources 
> in
> this case is
>
> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh 
> написал(а):
>
> 
> Have you tried the following settings:
>
> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>
> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>
> HTH
>
>view my Linkedin profile
> 
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
>
> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Ok so it boils down on how spark does create toPandas() DF under the
>> bonnet. How many executors are involved in k8s cluster. In this model 
>> spark
>> will create executors = no of nodes - 1
>>
>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <
>> sergeyivanyc...@gmail.com> wrote:
>>
>>> > Just to confirm with Collect() alone, this is all on the driver?
>>>
>>> I shared the screenshot with the plan in the 

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Hyukjin Kwon
Thanks for pinging me Sean.

Yes, there's an optimization on DataFrame.collect which tries to collect
few first partitioms and see if the number of rows are found (and repeat).

DataFrame.toPandas does not have such optimization.

I suspect that the shuffle isn't actual shuffle but just collects local
limits on executors to one executor to calculate global limit.

On Fri, Nov 12, 2021 at 11:16 PM Sean Owen  wrote:

> Hyukjin can you weigh in?
> Is this exchange due to something in your operations or clearly unique to
> the toPandas operation?
> I didn't think it worked that way, but maybe there is some good reason it
> does.
>
> On Fri, Nov 12, 2021 at 7:34 AM Sergey Ivanychev <
> sergeyivanyc...@gmail.com> wrote:
>
>> Hi Sean,
>>
>> According to the plan I’m observing, this is what happens indeed. There’s
>> exchange operation that sends data to a single partition/task in toPandas()
>> + PyArrow enabled case.
>>
>> 12 нояб. 2021 г., в 16:31, Sean Owen  написал(а):
>>
>> Yes, none of the responses are addressing your question.
>> I do not think it's a bug necessarily; do you end up with one partition
>> in your execution somewhere?
>>
>> On Fri, Nov 12, 2021 at 3:38 AM Sergey Ivanychev <
>> sergeyivanyc...@gmail.com> wrote:
>>
>>> Of course if I give 64G of ram to each executor they will work. But
>>> what’s the point? Collecting results in the driver should cause a high RAM
>>> usage in the driver and that’s what is happening in collect() case. In the
>>> case where pyarrow serialization is enabled all the data is being collected
>>> on a single executor, which is clearly a wrong way to collect the result on
>>> the driver.
>>>
>>> I guess I’ll open an issue about it in Spark Jira. It clearly looks like
>>> a bug.
>>>
>>> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh 
>>> написал(а):
>>>
>>> OK, your findings do not imply those settings are incorrect. Those
>>> settings will work if you set-up your k8s cluster in peer-to-peer mode with
>>> equal amounts of RAM for each node which is common practice.
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev <
>>> sergeyivanyc...@gmail.com> wrote:
>>>
 Yes, in fact those are the settings that cause this behaviour. If set
 to false, everything goes fine since the implementation in spark sources in
 this case is

 pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)

 Best regards,


 Sergey Ivanychev

 11 нояб. 2021 г., в 13:58, Mich Talebzadeh 
 написал(а):

 
 Have you tried the following settings:

 spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")

 spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")

 HTH

view my Linkedin profile
 


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh 
 wrote:

> Ok so it boils down on how spark does create toPandas() DF under the
> bonnet. How many executors are involved in k8s cluster. In this model 
> spark
> will create executors = no of nodes - 1
>
> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <
> sergeyivanyc...@gmail.com> wrote:
>
>> > Just to confirm with Collect() alone, this is all on the driver?
>>
>> I shared the screenshot with the plan in the first email. In the
>> collect() case the data gets fetched to the driver without problems.
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh 
>> написал(а):
>>
>> Just to confirm with Collect() alone, this is all on the driver?
>>
>> --
>
>
>view my Linkedin profile
> 
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary 

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Sean Owen
Hyukjin can you weigh in?
Is this exchange due to something in your operations or clearly unique to
the toPandas operation?
I didn't think it worked that way, but maybe there is some good reason it
does.

On Fri, Nov 12, 2021 at 7:34 AM Sergey Ivanychev 
wrote:

> Hi Sean,
>
> According to the plan I’m observing, this is what happens indeed. There’s
> exchange operation that sends data to a single partition/task in toPandas()
> + PyArrow enabled case.
>
> 12 нояб. 2021 г., в 16:31, Sean Owen  написал(а):
>
> Yes, none of the responses are addressing your question.
> I do not think it's a bug necessarily; do you end up with one partition in
> your execution somewhere?
>
> On Fri, Nov 12, 2021 at 3:38 AM Sergey Ivanychev <
> sergeyivanyc...@gmail.com> wrote:
>
>> Of course if I give 64G of ram to each executor they will work. But
>> what’s the point? Collecting results in the driver should cause a high RAM
>> usage in the driver and that’s what is happening in collect() case. In the
>> case where pyarrow serialization is enabled all the data is being collected
>> on a single executor, which is clearly a wrong way to collect the result on
>> the driver.
>>
>> I guess I’ll open an issue about it in Spark Jira. It clearly looks like
>> a bug.
>>
>> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh 
>> написал(а):
>>
>> OK, your findings do not imply those settings are incorrect. Those
>> settings will work if you set-up your k8s cluster in peer-to-peer mode with
>> equal amounts of RAM for each node which is common practice.
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev 
>> wrote:
>>
>>> Yes, in fact those are the settings that cause this behaviour. If set to
>>> false, everything goes fine since the implementation in spark sources in
>>> this case is
>>>
>>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>>>
>>> Best regards,
>>>
>>>
>>> Sergey Ivanychev
>>>
>>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh 
>>> написал(а):
>>>
>>> 
>>> Have you tried the following settings:
>>>
>>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>>
>>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>>
>>> HTH
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh 
>>> wrote:
>>>
 Ok so it boils down on how spark does create toPandas() DF under the
 bonnet. How many executors are involved in k8s cluster. In this model spark
 will create executors = no of nodes - 1

 On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <
 sergeyivanyc...@gmail.com> wrote:

> > Just to confirm with Collect() alone, this is all on the driver?
>
> I shared the screenshot with the plan in the first email. In the
> collect() case the data gets fetched to the driver without problems.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh 
> написал(а):
>
> Just to confirm with Collect() alone, this is all on the driver?
>
> --


view my Linkedin profile
 


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.



>>>
>>
>


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Sergey Ivanychev
Hi Sean,

According to the plan I’m observing, this is what happens indeed. There’s 
exchange operation that sends data to a single partition/task in toPandas() + 
PyArrow enabled case.

> 12 нояб. 2021 г., в 16:31, Sean Owen  написал(а):
> 
> Yes, none of the responses are addressing your question.
> I do not think it's a bug necessarily; do you end up with one partition in 
> your execution somewhere?
> 
> On Fri, Nov 12, 2021 at 3:38 AM Sergey Ivanychev  > wrote:
> Of course if I give 64G of ram to each executor they will work. But what’s 
> the point? Collecting results in the driver should cause a high RAM usage in 
> the driver and that’s what is happening in collect() case. In the case where 
> pyarrow serialization is enabled all the data is being collected on a single 
> executor, which is clearly a wrong way to collect the result on the driver.
> 
> I guess I’ll open an issue about it in Spark Jira. It clearly looks like a 
> bug.
> 
>> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh > > написал(а):
>> 
>> OK, your findings do not imply those settings are incorrect. Those settings 
>> will work if you set-up your k8s cluster in peer-to-peer mode with equal 
>> amounts of RAM for each node which is common practice.
>> 
>> HTH
>> 
>> 
>>view my Linkedin profile 
>> 
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> 
>> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev > > wrote:
>> Yes, in fact those are the settings that cause this behaviour. If set to 
>> false, everything goes fine since the implementation in spark sources in 
>> this case is
>> 
>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh >> > написал(а):
>>> 
>>> 
>>> Have you tried the following settings:
>>> 
>>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>> 
>>> HTH
>>> 
>>>view my Linkedin profile 
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
>>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh >> > wrote:
>>> Ok so it boils down on how spark does create toPandas() DF under the 
>>> bonnet. How many executors are involved in k8s cluster. In this model spark 
>>> will create executors = no of nodes - 1
>>> 
>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev >> > wrote:
>>> > Just to confirm with Collect() alone, this is all on the driver?
>>> 
>>> I shared the screenshot with the plan in the first email. In the collect() 
>>> case the data gets fetched to the driver without problems.
>>> 
>>> Best regards,
>>> 
>>> 
>>> Sergey Ivanychev
>>> 
 4 нояб. 2021 г., в 20:37, Mich Talebzadeh >>> > написал(а):
 
>>> 
 Just to confirm with Collect() alone, this is all on the driver?
>>> -- 
>>> 
>>> 
>>>view my Linkedin profile 
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
> 



Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Sean Owen
Yes, none of the responses are addressing your question.
I do not think it's a bug necessarily; do you end up with one partition in
your execution somewhere?

On Fri, Nov 12, 2021 at 3:38 AM Sergey Ivanychev 
wrote:

> Of course if I give 64G of ram to each executor they will work. But what’s
> the point? Collecting results in the driver should cause a high RAM usage
> in the driver and that’s what is happening in collect() case. In the case
> where pyarrow serialization is enabled all the data is being collected on a
> single executor, which is clearly a wrong way to collect the result on the
> driver.
>
> I guess I’ll open an issue about it in Spark Jira. It clearly looks like a
> bug.
>
> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh 
> написал(а):
>
> OK, your findings do not imply those settings are incorrect. Those
> settings will work if you set-up your k8s cluster in peer-to-peer mode with
> equal amounts of RAM for each node which is common practice.
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev 
> wrote:
>
>> Yes, in fact those are the settings that cause this behaviour. If set to
>> false, everything goes fine since the implementation in spark sources in
>> this case is
>>
>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh 
>> написал(а):
>>
>> 
>> Have you tried the following settings:
>>
>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>
>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>
>> HTH
>>
>>view my Linkedin profile
>> 
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh 
>> wrote:
>>
>>> Ok so it boils down on how spark does create toPandas() DF under the
>>> bonnet. How many executors are involved in k8s cluster. In this model spark
>>> will create executors = no of nodes - 1
>>>
>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev 
>>> wrote:
>>>
 > Just to confirm with Collect() alone, this is all on the driver?

 I shared the screenshot with the plan in the first email. In the
 collect() case the data gets fetched to the driver without problems.

 Best regards,


 Sergey Ivanychev

 4 нояб. 2021 г., в 20:37, Mich Talebzadeh 
 написал(а):

 Just to confirm with Collect() alone, this is all on the driver?

 --
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>
>


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Sergey Ivanychev
Of course if I give 64G of ram to each executor they will work. But what’s the 
point? Collecting results in the driver should cause a high RAM usage in the 
driver and that’s what is happening in collect() case. In the case where 
pyarrow serialization is enabled all the data is being collected on a single 
executor, which is clearly a wrong way to collect the result on the driver.

I guess I’ll open an issue about it in Spark Jira. It clearly looks like a bug.

> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh  
> написал(а):
> 
> OK, your findings do not imply those settings are incorrect. Those settings 
> will work if you set-up your k8s cluster in peer-to-peer mode with equal 
> amounts of RAM for each node which is common practice.
> 
> HTH
> 
> 
>view my Linkedin profile 
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev  > wrote:
> Yes, in fact those are the settings that cause this behaviour. If set to 
> false, everything goes fine since the implementation in spark sources in this 
> case is
> 
> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
> 
> Best regards,
> 
> 
> Sergey Ivanychev
> 
>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh > > написал(а):
>> 
>> 
>> Have you tried the following settings:
>> 
>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>> 
>> HTH
>> 
>>view my Linkedin profile 
>> 
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction. 
>>  
>> 
>> 
>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh > > wrote:
>> Ok so it boils down on how spark does create toPandas() DF under the bonnet. 
>> How many executors are involved in k8s cluster. In this model spark will 
>> create executors = no of nodes - 1
>> 
>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev > > wrote:
>> > Just to confirm with Collect() alone, this is all on the driver?
>> 
>> I shared the screenshot with the plan in the first email. In the collect() 
>> case the data gets fetched to the driver without problems.
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh >> > написал(а):
>>> 
>> 
>>> Just to confirm with Collect() alone, this is all on the driver?
>> -- 
>> 
>> 
>>view my Linkedin profile 
>> 
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  



Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Mich Talebzadeh
OK, your findings do not imply those settings are incorrect. Those settings
will work if you set-up your k8s cluster in peer-to-peer mode with equal
amounts of RAM for each node which is common practice.

HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev 
wrote:

> Yes, in fact those are the settings that cause this behaviour. If set to
> false, everything goes fine since the implementation in spark sources in
> this case is
>
> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh 
> написал(а):
>
> 
> Have you tried the following settings:
>
> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh 
> wrote:
>
>> Ok so it boils down on how spark does create toPandas() DF under the
>> bonnet. How many executors are involved in k8s cluster. In this model spark
>> will create executors = no of nodes - 1
>>
>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev 
>> wrote:
>>
>>> > Just to confirm with Collect() alone, this is all on the driver?
>>>
>>> I shared the screenshot with the plan in the first email. In the
>>> collect() case the data gets fetched to the driver without problems.
>>>
>>> Best regards,
>>>
>>>
>>> Sergey Ivanychev
>>>
>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh 
>>> написал(а):
>>>
>>> Just to confirm with Collect() alone, this is all on the driver?
>>>
>>> --
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Gourav Sengupta
Hi,

Sorry

Regards,
Gourav Sengupta


On Fri, Nov 12, 2021 at 6:48 AM Sergey Ivanychev 
wrote:

> Hi Gourav,
>
> Please, read my question thoroughly. My problem is with the plan of the
> execution and with the fact that toPandas collects all the data not on the
> driver but on an executor, not with the fact that there’s some memory
> overhead.
>
> I don’t understand how your excerpts answer my question. The chapters
> you’ve shared describe that serialization is costly, that workers can fail
> due to the memory constraints and inter-language serialization.
>
> This is irrelevant to my question — building pandas DataFrame using
> Spark’s collect() works fine and this operation itself involves much
> deserialization of Row objects.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 12 нояб. 2021 г., в 05:05, Gourav Sengupta 
> написал(а):
>
> 
> Hi Sergey,
>
> Please read the excerpts from the book of Dr. Zaharia that I had sent,
> they explain these fundamentals clearly.
>
> Regards,
> Gourav Sengupta
>
> On Thu, Nov 11, 2021 at 9:40 PM Sergey Ivanychev <
> sergeyivanyc...@gmail.com> wrote:
>
>> Yes, in fact those are the settings that cause this behaviour. If set to
>> false, everything goes fine since the implementation in spark sources in
>> this case is
>>
>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh 
>> написал(а):
>>
>> 
>> Have you tried the following settings:
>>
>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>
>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh 
>> wrote:
>>
>>> Ok so it boils down on how spark does create toPandas() DF under the
>>> bonnet. How many executors are involved in k8s cluster. In this model spark
>>> will create executors = no of nodes - 1
>>>
>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev 
>>> wrote:
>>>
 > Just to confirm with Collect() alone, this is all on the driver?

 I shared the screenshot with the plan in the first email. In the
 collect() case the data gets fetched to the driver without problems.

 Best regards,


 Sergey Ivanychev

 4 нояб. 2021 г., в 20:37, Mich Talebzadeh 
 написал(а):

 Just to confirm with Collect() alone, this is all on the driver?

 --
>>>
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Georg Heiler
https://stackoverflow.com/questions/46832394/spark-access-first-n-rows-take-vs-limit
might be related

Best,
Georg

Am Fr., 12. Nov. 2021 um 07:48 Uhr schrieb Sergey Ivanychev <
sergeyivanyc...@gmail.com>:

> Hi Gourav,
>
> Please, read my question thoroughly. My problem is with the plan of the
> execution and with the fact that toPandas collects all the data not on the
> driver but on an executor, not with the fact that there’s some memory
> overhead.
>
> I don’t understand how your excerpts answer my question. The chapters
> you’ve shared describe that serialization is costly, that workers can fail
> due to the memory constraints and inter-language serialization.
>
> This is irrelevant to my question — building pandas DataFrame using
> Spark’s collect() works fine and this operation itself involves much
> deserialization of Row objects.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 12 нояб. 2021 г., в 05:05, Gourav Sengupta 
> написал(а):
>
> 
> Hi Sergey,
>
> Please read the excerpts from the book of Dr. Zaharia that I had sent,
> they explain these fundamentals clearly.
>
> Regards,
> Gourav Sengupta
>
> On Thu, Nov 11, 2021 at 9:40 PM Sergey Ivanychev <
> sergeyivanyc...@gmail.com> wrote:
>
>> Yes, in fact those are the settings that cause this behaviour. If set to
>> false, everything goes fine since the implementation in spark sources in
>> this case is
>>
>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh 
>> написал(а):
>>
>> 
>> Have you tried the following settings:
>>
>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>
>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh 
>> wrote:
>>
>>> Ok so it boils down on how spark does create toPandas() DF under the
>>> bonnet. How many executors are involved in k8s cluster. In this model spark
>>> will create executors = no of nodes - 1
>>>
>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev 
>>> wrote:
>>>
 > Just to confirm with Collect() alone, this is all on the driver?

 I shared the screenshot with the plan in the first email. In the
 collect() case the data gets fetched to the driver without problems.

 Best regards,


 Sergey Ivanychev

 4 нояб. 2021 г., в 20:37, Mich Talebzadeh 
 написал(а):

 Just to confirm with Collect() alone, this is all on the driver?

 --
>>>
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Sergey Ivanychev
Hi Gourav,

Please, read my question thoroughly. My problem is with the plan of the 
execution and with the fact that toPandas collects all the data not on the 
driver but on an executor, not with the fact that there’s some memory overhead.

I don’t understand how your excerpts answer my question. The chapters you’ve 
shared describe that serialization is costly, that workers can fail due to the 
memory constraints and inter-language serialization.

This is irrelevant to my question — building pandas DataFrame using Spark’s 
collect() works fine and this operation itself involves much deserialization of 
Row objects.

Best regards,


Sergey Ivanychev

> 12 нояб. 2021 г., в 05:05, Gourav Sengupta  
> написал(а):
> 
> Hi Sergey,
> 
> Please read the excerpts from the book of Dr. Zaharia that I had sent, they 
> explain these fundamentals clearly.
> 
> Regards,
> Gourav Sengupta
> 
> On Thu, Nov 11, 2021 at 9:40 PM Sergey Ivanychev  
> wrote:
>> Yes, in fact those are the settings that cause this behaviour. If set to 
>> false, everything goes fine since the implementation in spark sources in 
>> this case is
>> 
>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh  
>>> написал(а):
>>> 
>>> Have you tried the following settings:
>>> 
>>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>> 
>>> HTH
>>> 
>>>view my Linkedin profile
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
>>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh  
>>> wrote:
 Ok so it boils down on how spark does create toPandas() DF under the 
 bonnet. How many executors are involved in k8s cluster. In this model 
 spark will create executors = no of nodes - 1
 
 On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev  
 wrote:
> > Just to confirm with Collect() alone, this is all on the driver?
> 
> I shared the screenshot with the plan in the first email. In the 
> collect() case the data gets fetched to the driver without problems.
> 
> Best regards,
> 
> 
> Sergey Ivanychev
> 
>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh  
>> написал(а):
> 
>> Just to confirm with Collect() alone, this is all on the driver?
 -- 
 
 
view my Linkedin profile
 
  
 Disclaimer: Use it at your own risk. Any and all responsibility for any 
 loss, damage or destruction of data or any other property which may arise 
 from relying on this email's technical content is explicitly disclaimed. 
 The author will in no case be liable for any monetary damages arising from 
 such loss, damage or destruction.


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Gourav Sengupta
Hi Sergey,

Please read the excerpts from the book of Dr. Zaharia that I had sent, they
explain these fundamentals clearly.

Regards,
Gourav Sengupta

On Thu, Nov 11, 2021 at 9:40 PM Sergey Ivanychev 
wrote:

> Yes, in fact those are the settings that cause this behaviour. If set to
> false, everything goes fine since the implementation in spark sources in
> this case is
>
> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh 
> написал(а):
>
> 
> Have you tried the following settings:
>
> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh 
> wrote:
>
>> Ok so it boils down on how spark does create toPandas() DF under the
>> bonnet. How many executors are involved in k8s cluster. In this model spark
>> will create executors = no of nodes - 1
>>
>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev 
>> wrote:
>>
>>> > Just to confirm with Collect() alone, this is all on the driver?
>>>
>>> I shared the screenshot with the plan in the first email. In the
>>> collect() case the data gets fetched to the driver without problems.
>>>
>>> Best regards,
>>>
>>>
>>> Sergey Ivanychev
>>>
>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh 
>>> написал(а):
>>>
>>> Just to confirm with Collect() alone, this is all on the driver?
>>>
>>> --
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Sergey Ivanychev
Yes, in fact those are the settings that cause this behaviour. If set to false, 
everything goes fine since the implementation in spark sources in this case is

pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)

Best regards,


Sergey Ivanychev

> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh  
> написал(а):
> 
> 
> Have you tried the following settings:
> 
> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
> 
> HTH
> 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh  
>> wrote:
>> Ok so it boils down on how spark does create toPandas() DF under the bonnet. 
>> How many executors are involved in k8s cluster. In this model spark will 
>> create executors = no of nodes - 1
>> 
>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev  
>>> wrote:
>>> > Just to confirm with Collect() alone, this is all on the driver?
>>> 
>>> I shared the screenshot with the plan in the first email. In the collect() 
>>> case the data gets fetched to the driver without problems.
>>> 
>>> Best regards,
>>> 
>>> 
>>> Sergey Ivanychev
>>> 
 4 нояб. 2021 г., в 20:37, Mich Talebzadeh  
 написал(а):
 
>>> 
 Just to confirm with Collect() alone, this is all on the driver?
>> -- 
>> 
>> 
>>view my Linkedin profile
>> 
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Mich Talebzadeh
Have you tried the following settings:

spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")

HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh 
wrote:

> Ok so it boils down on how spark does create toPandas() DF under the
> bonnet. How many executors are involved in k8s cluster. In this model spark
> will create executors = no of nodes - 1
>
> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev 
> wrote:
>
>> > Just to confirm with Collect() alone, this is all on the driver?
>>
>> I shared the screenshot with the plan in the first email. In the
>> collect() case the data gets fetched to the driver without problems.
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh 
>> написал(а):
>>
>> Just to confirm with Collect() alone, this is all on the driver?
>>
>> --
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Mich Talebzadeh
Ok so it boils down on how spark does create toPandas() DF under the
bonnet. How many executors are involved in k8s cluster. In this model spark
will create executors = no of nodes - 1

On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev 
wrote:

> > Just to confirm with Collect() alone, this is all on the driver?
>
> I shared the screenshot with the plan in the first email. In the collect()
> case the data gets fetched to the driver without problems.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh 
> написал(а):
>
> Just to confirm with Collect() alone, this is all on the driver?
>
> --



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
> did you get to read the excerpts from the book of Dr. Zaharia?

I read what you have shared but didn’t manage to get your point.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 20:38, Gourav Sengupta  
> написал(а):
> 
> did you get to read the excerpts from the book of Dr. Zaharia?


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
> Just to confirm with Collect() alone, this is all on the driver?

I shared the screenshot with the plan in the first email. In the collect() case 
the data gets fetched to the driver without problems.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh  
> написал(а):
> 
> Just to confirm with Collect() alone, this is all on the driver?


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Gourav Sengupta
Hi,

did you get to read the excerpts from the book of Dr. Zaharia?

Regards,
Gourav

On Thu, Nov 4, 2021 at 4:11 PM Sergey Ivanychev 
wrote:

> I’m sure that its running in client mode. I don’t want to have the same
> amount of RAM on drivers and executors since there’s no point in giving 64G
> of ram to executors in my case.
>
> My question is why collect and toPandas actions produce so different
> plans, which cause toPandas to fail on executors.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 15:17, Mich Talebzadeh 
> написал(а):
>
> 
>
> From your notes ".. IIUC, in the `toPandas` case all the data gets
> shuffled to a single executor that fails with OOM, which doesn’t happen in
> `collect` case. This does it work like that? How do I collect a large
> dataset that fits into memory of the driver?.
>
> The acid test would be to use pandas and ensure that all nodes have the
> same amount of RAM. The assumption here is that the master node has a
> larger amount of RAM that in theory should handle the work. for Jupiter
> with Pandas. You can easily find out which mode Spark is deploying by
> looking at Spark GUI page.
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev 
> wrote:
>
>> I will follow up with the output, but I suppose Jupyter runs in client
>> mode since it’s created via getOrCreate with a K8s api server as master.
>> Also note that I tried both “collect” and “toPandas” in the same
>> conditions (Jupyter client mode), so IIUC your theory doesn’t explain that
>> difference in execution plans.
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh 
>> написал(а):
>>
>> 
>> Do you have the output for executors from spark GUI, the one that
>> eventually ends up with OOM?
>>
>> Also what does
>>
>> kubectl get pods -n $NAMESPACE
>> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print
>> $1}'`
>> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
>> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>>
>>
>> say?
>>
>>
>> My guess is that Jupyter notebook like Zeppelin notebook does a two stage
>> spark-submit under the bonnet. The job starts on the driver where the
>> Jupyter notebook is on but the actual job runs on the cluster itself in
>> cluster mode. If your assertion is right (executors don't play much of a
>> role), just run the whole thing in local mode!
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev 
>> wrote:
>>
>>> I want to further clarify the use case I have: an ML engineer collects
>>> data so as to use it for training an ML model. The driver is created within
>>> Jupiter notebook and has 64G of ram for fetching the training set and
>>> feeding it to the model. Naturally, in this case executors shouldn’t be as
>>> big as the driver.
>>>
>>> Currently, the best solution I found is to write the dataframe to S3,
>>> and then read it via pd.read_parquet.
>>>
>>> Best regards,
>>>
>>>
>>> Sergey Ivanychev
>>>
>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh 
>>> написал(а):
>>>
>>> 
>>> Thanks for clarification on the koalas case.
>>>
>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
>>> the data gets shuffled to a single executor that fails with OOM,
>>>
>>> I still believe that this may be related to the way k8s handles
>>> shuffling. In a balanced k8s cluster this could be avoided which does not
>>> seem to be the case here as the so called driver node has 8 times more
>>> RAM than the other nodes.
>>>
>>> HTH
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>>>
 I think you're talking about koalas, 

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Mich Talebzadeh
Well evidently the indication is that this is happening on the executor and
not on the driver node as assumed. Just to confirm with Collect() alone,
this is all on the driver?

HTH

On Thu, 4 Nov 2021 at 16:10, Sergey Ivanychev 
wrote:

> I’m sure that its running in clientele mode. I don’t want to have the same
> amount of RAM on drivers and executors since there’s no point in giving 64G
> of ram to executors in my case.
> My question is why collect and toPandas actions produce so different
> plans, which cause toPandas to fail on executors.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 15:17, Mich Talebzadeh 
> написал(а):
>
> 
>
>
> From your notes ".. IIUC, in the `toPandas` case all the data gets
> shuffled to a single executor that fails with OOM, which doesn’t happen in
> `collect` case. This does it work like that? How do I collect a large
> dataset that fits into memory of the driver?.
>
> The acid test would be to use pandas and ensure that all nodes have the
> same amount of RAM. The assumption here is that the master node has a
> larger amount of RAM that in theory should handle the work. for Jupiter
> with Pandas. You can easily find out which mode Spark is deploying by
> looking at Spark GUI page.
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev 
> wrote:
>
>> I will follow up with the output, but I suppose Jupyter runs in client
>> mode since it’s created via getOrCreate with a K8s api server as master.
>> Also note that I tried both “collect” and “toPandas” in the same
>> conditions (Jupyter client mode), so IIUC your theory doesn’t explain that
>> difference in execution plans.
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh 
>> написал(а):
>>
>> 
>> Do you have the output for executors from spark GUI, the one that
>> eventually ends up with OOM?
>>
>> Also what does
>>
>> kubectl get pods -n $NAMESPACE
>> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print
>> $1}'`
>> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
>> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>>
>>
>> say?
>>
>>
>> My guess is that Jupyter notebook like Zeppelin notebook does a two stage
>> spark-submit under the bonnet. The job starts on the driver where the
>> Jupyter notebook is on but the actual job runs on the cluster itself in
>> cluster mode. If your assertion is right (executors don't play much of a
>> role), just run the whole thing in local mode!
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev 
>> wrote:
>>
>>> I want to further clarify the use case I have: an ML engineer collects
>>> data so as to use it for training an ML model. The driver is created within
>>> Jupiter notebook and has 64G of ram for fetching the training set and
>>> feeding it to the model. Naturally, in this case executors shouldn’t be as
>>> big as the driver.
>>>
>>> Currently, the best solution I found is to write the dataframe to S3,
>>> and then read it via pd.read_parquet.
>>>
>>> Best regards,
>>>
>>>
>>> Sergey Ivanychev
>>>
>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh 
>>> написал(а):
>>>
>>> 
>>> Thanks for clarification on the koalas case.
>>>
>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
>>> the data gets shuffled to a single executor that fails with OOM,
>>>
>>> I still believe that this may be related to the way k8s handles
>>> shuffling. In a balanced k8s cluster this could be avoided which does not
>>> seem to be the case here as the so called driver node has 8 times more
>>> RAM than the other nodes.
>>>
>>> HTH
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
I’m sure that its running in client mode. I don’t want to have the same amount 
of RAM on drivers and executors since there’s no point in giving 64G of ram to 
executors in my case.

My question is why collect and toPandas actions produce so different plans, 
which cause toPandas to fail on executors.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 15:17, Mich Talebzadeh  
> написал(а):
> 
> 
> 
> From your notes ".. IIUC, in the `toPandas` case all the data gets shuffled 
> to a single executor that fails with OOM, which doesn’t happen in `collect` 
> case. This does it work like that? How do I collect a large dataset that fits 
> into memory of the driver?.
> 
> The acid test would be to use pandas and ensure that all nodes have the same 
> amount of RAM. The assumption here is that the master node has a larger 
> amount of RAM that in theory should handle the work. for Jupiter with Pandas. 
> You can easily find out which mode Spark is deploying by looking at Spark GUI 
> page.
> 
> HTH
> 
> 
> 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev  
>> wrote:
>> I will follow up with the output, but I suppose Jupyter runs in client mode 
>> since it’s created via getOrCreate with a K8s api server as master.
>> Also note that I tried both “collect” and “toPandas” in the same conditions 
>> (Jupyter client mode), so IIUC your theory doesn’t explain that difference 
>> in execution plans.
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
 4 нояб. 2021 г., в 13:12, Mich Talebzadeh  
 написал(а):
 
>>> 
>>> Do you have the output for executors from spark GUI, the one that 
>>> eventually ends up with OOM?
>>> 
>>> Also what does 
>>> 
>>> kubectl get pods -n $NAMESPACE 
>>> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print 
>>> $1}'`
>>> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
>>> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>>> 
>>> say?
>>> 
>>> My guess is that Jupyter notebook like Zeppelin notebook does a two stage 
>>> spark-submit under the bonnet. The job starts on the driver where the 
>>> Jupyter notebook is on but the actual job runs on the cluster itself in 
>>> cluster mode. If your assertion is right (executors don't play much of a 
>>> role), just run the whole thing in local mode!
>>> 
>>> HTH
>>> 
>>>view my Linkedin profile
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
 On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev  
 wrote:
 I want to further clarify the use case I have: an ML engineer collects 
 data so as to use it for training an ML model. The driver is created 
 within Jupiter notebook and has 64G of ram for fetching the training set 
 and feeding it to the model. Naturally, in this case executors shouldn’t 
 be as big as the driver.
 
 Currently, the best solution I found is to write the dataframe to S3, and 
 then read it via pd.read_parquet.
 
 Best regards,
 
 
 Sergey Ivanychev
 
>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh  
>> написал(а):
>> 
> 
> Thanks for clarification on the koalas case.
> 
> The thread owner states and I quote: .. IIUC, in the `toPandas` case all 
> the data gets shuffled to a single executor that fails with OOM, 
> 
> I still believe that this may be related to the way k8s handles 
> shuffling. In a balanced k8s cluster this could be avoided which does not 
> seem to be the case here as the so called driver node has 8 times more 
> RAM than the other nodes. 
> 
> HTH
> 
>view my Linkedin profile
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any 
> loss, damage or destruction of data or any other property which may arise 
> from relying on this email's technical content is explicitly disclaimed. 
> The author will in no case be liable for any monetary damages arising 
> from such loss, damage or destruction.
>  
> 
> 
>> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>> I think you're talking about koalas, which is in Spark 3.2, but that is 
>> unrelated to toPandas(), nor to the question of how it differs from 
>> collect().
>> Shuffle is also unrelated.
>> 
>>> On 

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Mich Talebzadeh
>From your notes ".. IIUC, in the `toPandas` case all the data gets shuffled
to a single executor that fails with OOM, which doesn’t happen in `collect`
case. This does it work like that? How do I collect a large dataset that
fits into memory of the driver?.

The acid test would be to use pandas and ensure that all nodes have the
same amount of RAM. The assumption here is that the master node has a
larger amount of RAM that in theory should handle the work. for Jupiter
with Pandas. You can easily find out which mode Spark is deploying by
looking at Spark GUI page.

HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev 
wrote:

> I will follow up with the output, but I suppose Jupyter runs in client
> mode since it’s created via getOrCreate with a K8s api server as master.
> Also note that I tried both “collect” and “toPandas” in the same
> conditions (Jupyter client mode), so IIUC your theory doesn’t explain that
> difference in execution plans.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh 
> написал(а):
>
> 
> Do you have the output for executors from spark GUI, the one that
> eventually ends up with OOM?
>
> Also what does
>
> kubectl get pods -n $NAMESPACE
> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print
> $1}'`
> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>
>
> say?
>
>
> My guess is that Jupyter notebook like Zeppelin notebook does a two stage
> spark-submit under the bonnet. The job starts on the driver where the
> Jupyter notebook is on but the actual job runs on the cluster itself in
> cluster mode. If your assertion is right (executors don't play much of a
> role), just run the whole thing in local mode!
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev 
> wrote:
>
>> I want to further clarify the use case I have: an ML engineer collects
>> data so as to use it for training an ML model. The driver is created within
>> Jupiter notebook and has 64G of ram for fetching the training set and
>> feeding it to the model. Naturally, in this case executors shouldn’t be as
>> big as the driver.
>>
>> Currently, the best solution I found is to write the dataframe to S3, and
>> then read it via pd.read_parquet.
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh 
>> написал(а):
>>
>> 
>> Thanks for clarification on the koalas case.
>>
>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
>> the data gets shuffled to a single executor that fails with OOM,
>>
>> I still believe that this may be related to the way k8s handles
>> shuffling. In a balanced k8s cluster this could be avoided which does not
>> seem to be the case here as the so called driver node has 8 times more
>> RAM than the other nodes.
>>
>> HTH
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>>
>>> I think you're talking about koalas, which is in Spark 3.2, but that is
>>> unrelated to toPandas(), nor to the question of how it differs from
>>> collect().
>>> Shuffle is also unrelated.
>>>
>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Hi,

 As I understood in the previous versions of Spark the data could not
 be processed and stored in Pandas data frames in a distributed mode as
 these data frames store data in RAM which is the driver in this case.
 However, I was under the impression that this limitation no longer
 exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node
 and 8GB of RAM for others, and PySpark running in cluster mode,  how do 

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
I will follow up with the output, but I suppose Jupyter runs in client mode 
since it’s created via getOrCreate with a K8s api server as master.
Also note that I tried both “collect” and “toPandas” in the same conditions 
(Jupyter client mode), so IIUC your theory doesn’t explain that difference in 
execution plans.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh  
> написал(а):
> 
> 
> Do you have the output for executors from spark GUI, the one that eventually 
> ends up with OOM?
> 
> Also what does 
> 
> kubectl get pods -n $NAMESPACE 
> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print $1}'`
> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
> 
> say?
> 
> My guess is that Jupyter notebook like Zeppelin notebook does a two stage 
> spark-submit under the bonnet. The job starts on the driver where the Jupyter 
> notebook is on but the actual job runs on the cluster itself in cluster mode. 
> If your assertion is right (executors don't play much of a role), just run 
> the whole thing in local mode!
> 
> HTH
> 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev  
>> wrote:
>> I want to further clarify the use case I have: an ML engineer collects data 
>> so as to use it for training an ML model. The driver is created within 
>> Jupiter notebook and has 64G of ram for fetching the training set and 
>> feeding it to the model. Naturally, in this case executors shouldn’t be as 
>> big as the driver.
>> 
>> Currently, the best solution I found is to write the dataframe to S3, and 
>> then read it via pd.read_parquet.
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
 4 нояб. 2021 г., в 00:18, Mich Talebzadeh  
 написал(а):
 
>>> 
>>> Thanks for clarification on the koalas case.
>>> 
>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all 
>>> the data gets shuffled to a single executor that fails with OOM, 
>>> 
>>> I still believe that this may be related to the way k8s handles shuffling. 
>>> In a balanced k8s cluster this could be avoided which does not seem to be 
>>> the case here as the so called driver node has 8 times more RAM than the 
>>> other nodes. 
>>> 
>>> HTH
>>> 
>>>view my Linkedin profile
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
 On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
 I think you're talking about koalas, which is in Spark 3.2, but that is 
 unrelated to toPandas(), nor to the question of how it differs from 
 collect().
 Shuffle is also unrelated.
 
> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh 
>  wrote:
> Hi,
> 
> As I understood in the previous versions of Spark the data could not be 
> processed and stored in Pandas data frames in a distributed mode as these 
> data frames store data in RAM which is the driver in this case. 
> However, I was under the impression that this limitation no longer exists 
> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 
> 8GB of RAM for others, and PySpark running in cluster mode,  how do you 
> expect the process to confine itself to the master node? What will happen 
> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s 
> cluster) and run the job again?
> 
> Worth noting that the current Spark on k8s  does not support external 
> shuffle. For now we have two parameters for Dynamic Resource Allocation. 
> These are 
> 
>  --conf spark.dynamicAllocation.enabled=true \
>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>  
> The idea is to use dynamic resource allocation where the driver tracks 
> the shuffle files and evicts only executors not storing active shuffle 
> files. So in a nutshell these shuffle files are stored in the executors 
> themselves in the absence of the external shuffle. The model works on the 
> basis of the "one-container-per-Pod" model  meaning that for each node of 
> the cluster there will be one node running the driver and each remaining 
> node running one executor each. 
> 
> 
> HTH
> , 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it 

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Mich Talebzadeh
Do you have the output for executors from spark GUI, the one that
eventually ends up with OOM?

Also what does

kubectl get pods -n $NAMESPACE
DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print
$1}'`
kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE


say?


My guess is that Jupyter notebook like Zeppelin notebook does a two stage
spark-submit under the bonnet. The job starts on the driver where the
Jupyter notebook is on but the actual job runs on the cluster itself in
cluster mode. If your assertion is right (executors don't play much of a
role), just run the whole thing in local mode!


HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev 
wrote:

> I want to further clarify the use case I have: an ML engineer collects
> data so as to use it for training an ML model. The driver is created within
> Jupiter notebook and has 64G of ram for fetching the training set and
> feeding it to the model. Naturally, in this case executors shouldn’t be as
> big as the driver.
>
> Currently, the best solution I found is to write the dataframe to S3, and
> then read it via pd.read_parquet.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh 
> написал(а):
>
> 
> Thanks for clarification on the koalas case.
>
> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
> the data gets shuffled to a single executor that fails with OOM,
>
> I still believe that this may be related to the way k8s handles shuffling.
> In a balanced k8s cluster this could be avoided which does not seem to be
> the case here as the so called driver node has 8 times more RAM than the
> other nodes.
>
> HTH
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>
>> I think you're talking about koalas, which is in Spark 3.2, but that is
>> unrelated to toPandas(), nor to the question of how it differs from
>> collect().
>> Shuffle is also unrelated.
>>
>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh 
>> wrote:
>>
>>> Hi,
>>>
>>> As I understood in the previous versions of Spark the data could not be
>>> processed and stored in Pandas data frames in a distributed mode as these
>>> data frames store data in RAM which is the driver in this case.
>>> However, I was under the impression that this limitation no longer
>>> exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node
>>> and 8GB of RAM for others, and PySpark running in cluster mode,  how do you
>>> expect the process to confine itself to the master node? What will happen
>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s
>>> cluster) and run the job again?
>>>
>>> Worth noting that the current Spark on k8s  does not support external
>>> shuffle. For now we have two parameters for Dynamic Resource Allocation.
>>> These are
>>>
>>>  --conf spark.dynamicAllocation.enabled=true \
>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>
>>>
>>> The idea is to use dynamic resource allocation where the driver tracks
>>> the shuffle files and evicts only executors not storing active shuffle
>>> files. So in a nutshell these shuffle files are stored in the executors
>>> themselves in the absence of the external shuffle. The model works on the
>>> basis of the "one-container-per-Pod" model
>>>  meaning that for
>>> each node of the cluster there will be one node running the driver and each
>>> remaining node running one executor each.
>>>
>>>
>>>
>>> HTH
>>> ,
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Gourav Sengupta
Hi,

I am copying Dr. Zaharia in this email as I am quoting from his book (once
again I may be wrong):
Chapter 5: Basic Structured Operations >> Creating Rows

You can create rows by manually instantiating a Row object with the values
that belong in each column. It’s important to note that only DataFrames
have schemas. Rows themselves do not have schemas. This means that if you
create a Row manually, you must specify the values in the same order as the
schema of the DataFrame to which they might be appended (we will see this
when we discuss creating DataFrames):

Chapter 6: Working with different types of data
Starting this Python process is expensive, but the real cost is in
serializing the data to Python. This is costly for two reasons: it is an
expensive computation, but also, after the data enters Python, Spark cannot
manage the memory of the worker. This means that you could potentially
cause a worker to fail if it becomes resource constrained (because both the
JVM and Python are competing for memory on the same machine).

Chapter 18: Monitoring and Debugging (as Sean was mentioning this is about
Driver OOM error)
Issues with JVMs running out of memory can happen if you are using another
language binding, such as Python, due to data conversion between the two
requiring too much memory in the JVM. Try to see whether your issue is
specific to your chosen language and bring back less data to the driver
node, or write it to a file instead of bringing it back as in-memory
objects.

Regards,
Gourav Sengupta


On Wed, Nov 3, 2021 at 10:09 PM Sergey Ivanychev 
wrote:

> I want to further clarify the use case I have: an ML engineer collects
> data so as to use it for training an ML model. The driver is created within
> Jupiter notebook and has 64G of ram for fetching the training set and
> feeding it to the model. Naturally, in this case executors shouldn’t be as
> big as the driver.
>
> Currently, the best solution I found is to write the dataframe to S3, and
> then read it via pd.read_parquet.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh 
> написал(а):
>
> 
> Thanks for clarification on the koalas case.
>
> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
> the data gets shuffled to a single executor that fails with OOM,
>
> I still believe that this may be related to the way k8s handles shuffling.
> In a balanced k8s cluster this could be avoided which does not seem to be
> the case here as the so called driver node has 8 times more RAM than the
> other nodes.
>
> HTH
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>
>> I think you're talking about koalas, which is in Spark 3.2, but that is
>> unrelated to toPandas(), nor to the question of how it differs from
>> collect().
>> Shuffle is also unrelated.
>>
>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh 
>> wrote:
>>
>>> Hi,
>>>
>>> As I understood in the previous versions of Spark the data could not be
>>> processed and stored in Pandas data frames in a distributed mode as these
>>> data frames store data in RAM which is the driver in this case.
>>> However, I was under the impression that this limitation no longer
>>> exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node
>>> and 8GB of RAM for others, and PySpark running in cluster mode,  how do you
>>> expect the process to confine itself to the master node? What will happen
>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s
>>> cluster) and run the job again?
>>>
>>> Worth noting that the current Spark on k8s  does not support external
>>> shuffle. For now we have two parameters for Dynamic Resource Allocation.
>>> These are
>>>
>>>  --conf spark.dynamicAllocation.enabled=true \
>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>
>>>
>>> The idea is to use dynamic resource allocation where the driver tracks
>>> the shuffle files and evicts only executors not storing active shuffle
>>> files. So in a nutshell these shuffle files are stored in the executors
>>> themselves in the absence of the external shuffle. The model works on the
>>> basis of the "one-container-per-Pod" model
>>>  meaning that for
>>> each node of the cluster there will be one node running the driver and each
>>> remaining node running one executor each.
>>>
>>>
>>>
>>> HTH
>>> ,
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* 

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Sergey Ivanychev
I want to further clarify the use case I have: an ML engineer collects data so 
as to use it for training an ML model. The driver is created within Jupiter 
notebook and has 64G of ram for fetching the training set and feeding it to the 
model. Naturally, in this case executors shouldn’t be as big as the driver.

Currently, the best solution I found is to write the dataframe to S3, and then 
read it via pd.read_parquet.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh  
> написал(а):
> 
> 
> Thanks for clarification on the koalas case.
> 
> The thread owner states and I quote: .. IIUC, in the `toPandas` case all the 
> data gets shuffled to a single executor that fails with OOM, 
> 
> I still believe that this may be related to the way k8s handles shuffling. In 
> a balanced k8s cluster this could be avoided which does not seem to be the 
> case here as the so called driver node has 8 times more RAM than the other 
> nodes. 
> 
> HTH
> 
>view my Linkedin profile
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>> I think you're talking about koalas, which is in Spark 3.2, but that is 
>> unrelated to toPandas(), nor to the question of how it differs from 
>> collect().
>> Shuffle is also unrelated.
>> 
>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh  
>>> wrote:
>>> Hi,
>>> 
>>> As I understood in the previous versions of Spark the data could not be 
>>> processed and stored in Pandas data frames in a distributed mode as these 
>>> data frames store data in RAM which is the driver in this case. 
>>> However, I was under the impression that this limitation no longer exists 
>>> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 8GB 
>>> of RAM for others, and PySpark running in cluster mode,  how do you expect 
>>> the process to confine itself to the master node? What will happen if you 
>>> increase executor node(s) RAM to 64GB temporarily (balanced k8s cluster) 
>>> and run the job again?
>>> 
>>> Worth noting that the current Spark on k8s  does not support external 
>>> shuffle. For now we have two parameters for Dynamic Resource Allocation. 
>>> These are 
>>> 
>>>  --conf spark.dynamicAllocation.enabled=true \
>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>  
>>> The idea is to use dynamic resource allocation where the driver tracks the 
>>> shuffle files and evicts only executors not storing active shuffle files. 
>>> So in a nutshell these shuffle files are stored in the executors themselves 
>>> in the absence of the external shuffle. The model works on the basis of the 
>>> "one-container-per-Pod" model  meaning that for each node of the cluster 
>>> there will be one node running the driver and each remaining node running 
>>> one executor each. 
>>> 
>>> 
>>> HTH
>>> , 
>>>view my Linkedin profile
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Mich Talebzadeh
Thanks for clarification on the koalas case.

The thread owner states and I quote: .. IIUC, in the `toPandas` case all
the data gets shuffled to a single executor that fails with OOM,

I still believe that this may be related to the way k8s handles shuffling.
In a balanced k8s cluster this could be avoided which does not seem to be
the case here as the so called driver node has 8 times more RAM than the
other nodes.

HTH

   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:

> I think you're talking about koalas, which is in Spark 3.2, but that is
> unrelated to toPandas(), nor to the question of how it differs from
> collect().
> Shuffle is also unrelated.
>
> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> As I understood in the previous versions of Spark the data could not be
>> processed and stored in Pandas data frames in a distributed mode as these
>> data frames store data in RAM which is the driver in this case.
>> However, I was under the impression that this limitation no longer exists
>> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 8GB
>> of RAM for others, and PySpark running in cluster mode,  how do you expect
>> the process to confine itself to the master node? What will happen if you
>> increase executor node(s) RAM to 64GB temporarily (balanced k8s cluster)
>> and run the job again?
>>
>> Worth noting that the current Spark on k8s  does not support external
>> shuffle. For now we have two parameters for Dynamic Resource Allocation.
>> These are
>>
>>  --conf spark.dynamicAllocation.enabled=true \
>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>
>>
>> The idea is to use dynamic resource allocation where the driver tracks
>> the shuffle files and evicts only executors not storing active shuffle
>> files. So in a nutshell these shuffle files are stored in the executors
>> themselves in the absence of the external shuffle. The model works on the
>> basis of the "one-container-per-Pod" model
>>  meaning that for
>> each node of the cluster there will be one node running the driver and each
>> remaining node running one executor each.
>>
>>
>>
>> HTH
>> ,
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Sean Owen
I think you're talking about koalas, which is in Spark 3.2, but that is
unrelated to toPandas(), nor to the question of how it differs from
collect().
Shuffle is also unrelated.

On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh 
wrote:

> Hi,
>
> As I understood in the previous versions of Spark the data could not be
> processed and stored in Pandas data frames in a distributed mode as these
> data frames store data in RAM which is the driver in this case.
> However, I was under the impression that this limitation no longer exists
> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 8GB
> of RAM for others, and PySpark running in cluster mode,  how do you expect
> the process to confine itself to the master node? What will happen if you
> increase executor node(s) RAM to 64GB temporarily (balanced k8s cluster)
> and run the job again?
>
> Worth noting that the current Spark on k8s  does not support external
> shuffle. For now we have two parameters for Dynamic Resource Allocation.
> These are
>
>  --conf spark.dynamicAllocation.enabled=true \
>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>
>
> The idea is to use dynamic resource allocation where the driver tracks
> the shuffle files and evicts only executors not storing active shuffle
> files. So in a nutshell these shuffle files are stored in the executors
> themselves in the absence of the external shuffle. The model works on the
> basis of the "one-container-per-Pod" model
>  meaning that for
> each node of the cluster there will be one node running the driver and each
> remaining node running one executor each.
>
>
>
> HTH
> ,
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Sergey Ivanychev
I’m pretty sure

WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 20) (10.20.167.28 executor 
2): java.lang.OutOfMemoryError
at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
If you look at the «toPandas» you can see the exchange stage that doesn’t occur 
in the «collect» example. I suppose it tries to pull all the data to a single 
executor for some reason.

> 3 нояб. 2021 г., в 21:34, Sean Owen  написал(а):
> 
> No, you can collect() a DataFrame. You get Rows. collect() must create an 
> in-memory representation - how else could it work? Those aren't differences.
> Are you sure it's the executor that's OOM, not the driver? 
> 
> On Wed, Nov 3, 2021 at 1:32 PM Gourav Sengupta  > wrote:
> Hi,
> 
> I might be wrong but toPandas() works with dataframes, where as collect works 
> at RDD. Also toPandas() converts to Python objects in memory I do not think 
> that collect does it.
> 
> Regards,
> Gourav
> 
> On Wed, Nov 3, 2021 at 2:24 PM Sergey Ivanychev  > wrote:
> Hi, 
> 
> Spark 3.1.2 K8s.
> 
> I encountered OOM error while trying to create a Pandas DataFrame from Spark 
> DataFrame. My Spark driver has 60G of ram, but the executors are tiny 
> compared to that (8G)
> 
> If I do `spark.table(…).limit(100).collect()` I get the following plan
> 
> <2021-11-03_17-07-55.png>
> 
> 
> If I do `spark.table(…).limit(100).toPandas()` I get a more complicated 
> plan with an extra shuffle
> 
> <2021-11-03_17-08-31.png>
> 
> IIUC, in the `toPandas` case all the data gets shuffled to a single executor 
> that fails with OOM, which doesn’t happen in `collect` case. This does it 
> work like that? How do I collect a large dataset that fits into memory of the 
> driver?