To add some more, if the number of rows to collect is large,
DataFrame.collect can be slower because it launches multiple Spark jobs
sequentially.

Given that DataFrame.toPandas does not take how many rows to collect, it's
controversial to apply the same optimization of DataFrame.collect to here.

We could have a configuration to enable and disable but the implementation
of this in DataFrame.toPandas would be complicated due to existing
optimization such as Arrow. Haven't taken a deeper look though but my guts
say it's not worthwhile.

On Sat, Nov 13, 2021 at 12:05 PM Hyukjin Kwon <gurwls...@gmail.com> wrote:

> Thanks for pinging me Sean.
>
> Yes, there's an optimization on DataFrame.collect which tries to collect
> few first partitioms and see if the number of rows are found (and repeat).
>
> DataFrame.toPandas does not have such optimization.
>
> I suspect that the shuffle isn't actual shuffle but just collects local
> limits on executors to one executor to calculate global limit.
>
> On Fri, Nov 12, 2021 at 11:16 PM Sean Owen <sro...@gmail.com> wrote:
>
>> Hyukjin can you weigh in?
>> Is this exchange due to something in your operations or clearly unique to
>> the toPandas operation?
>> I didn't think it worked that way, but maybe there is some good reason it
>> does.
>>
>> On Fri, Nov 12, 2021 at 7:34 AM Sergey Ivanychev <
>> sergeyivanyc...@gmail.com> wrote:
>>
>>> Hi Sean,
>>>
>>> According to the plan I’m observing, this is what happens indeed.
>>> There’s exchange operation that sends data to a single partition/task in
>>> toPandas() + PyArrow enabled case.
>>>
>>> 12 нояб. 2021 г., в 16:31, Sean Owen <sro...@gmail.com> написал(а):
>>>
>>> Yes, none of the responses are addressing your question.
>>> I do not think it's a bug necessarily; do you end up with one partition
>>> in your execution somewhere?
>>>
>>> On Fri, Nov 12, 2021 at 3:38 AM Sergey Ivanychev <
>>> sergeyivanyc...@gmail.com> wrote:
>>>
>>>> Of course if I give 64G of ram to each executor they will work. But
>>>> what’s the point? Collecting results in the driver should cause a high RAM
>>>> usage in the driver and that’s what is happening in collect() case. In the
>>>> case where pyarrow serialization is enabled all the data is being collected
>>>> on a single executor, which is clearly a wrong way to collect the result on
>>>> the driver.
>>>>
>>>> I guess I’ll open an issue about it in Spark Jira. It clearly looks
>>>> like a bug.
>>>>
>>>> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh <mich.talebza...@gmail.com>
>>>> написал(а):
>>>>
>>>> OK, your findings do not imply those settings are incorrect. Those
>>>> settings will work if you set-up your k8s cluster in peer-to-peer mode with
>>>> equal amounts of RAM for each node which is common practice.
>>>>
>>>> HTH
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev <
>>>> sergeyivanyc...@gmail.com> wrote:
>>>>
>>>>> Yes, in fact those are the settings that cause this behaviour. If set
>>>>> to false, everything goes fine since the implementation in spark sources 
>>>>> in
>>>>> this case is
>>>>>
>>>>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>>>>>
>>>>> Best regards,
>>>>>
>>>>>
>>>>> Sergey Ivanychev
>>>>>
>>>>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>> написал(а):
>>>>>
>>>>> 
>>>>> Have you tried the following settings:
>>>>>
>>>>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>>>>
>>>>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>>>>
>>>>> HTH
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> Ok so it boils down on how spark does create toPandas() DF under the
>>>>>> bonnet. How many executors are involved in k8s cluster. In this model 
>>>>>> spark
>>>>>> will create executors = no of nodes - 1
>>>>>>
>>>>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <
>>>>>> sergeyivanyc...@gmail.com> wrote:
>>>>>>
>>>>>>> > Just to confirm with Collect() alone, this is all on the driver?
>>>>>>>
>>>>>>> I shared the screenshot with the plan in the first email. In the
>>>>>>> collect() case the data gets fetched to the driver without problems.
>>>>>>>
>>>>>>> Best regards,
>>>>>>>
>>>>>>>
>>>>>>> Sergey Ivanychev
>>>>>>>
>>>>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>>> написал(а):
>>>>>>>
>>>>>>> Just to confirm with Collect() alone, this is all on the driver?
>>>>>>>
>>>>>>> --
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>> may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>

Reply via email to