Hi,

I am not sure what you are trying to achieve here are cume_dist and
percent_rank not different?

If am able to follow your question correctly, you are looking for filtering
our NULLs before applying the function on the dataframe, and I think it
will be fine if you just create another dataframe first with the non null
values and then apply the function to that dataframe.

It will be of much help if you can explain what are you trying to achieve
here. Applying loops on dataframe, like you have done in the dataframe is
surely not recommended at all, please see the explain plan of the dataframe
in each iteration to understand the effect of your loops on the explain
plan - that should give some details.


Regards,
Gourav Sengupta

On Mon, Jan 10, 2022 at 10:49 PM Ramesh Natarajan <rames...@gmail.com>
wrote:

> I want to compute cume_dist on a bunch of columns in a spark dataframe,
> but want to remove NULL values before doing so.
>
> I have this loop in pyspark. While this works, I see the driver runs at
> 100% while the executors are idle for the most part. I am reading that
> running a loop is an anti-pattern and should be avoided. Any pointers on
> how to optimize this section of pyspark code?
>
> I am running this on  the AWS Glue 3.0 environment.
>
> for column_name, new_col in [
>         ("event_duration", "percentile_rank_evt_duration"),
>         ("event_duration_pred", "percentile_pred_evt_duration"),
>         ("alarm_cnt", "percentile_rank_alarm_cnt"),
>         ("alarm_cnt_pred", "percentile_pred_alarm_cnt"),
>         ("event_duration_adj", "percentile_rank_evt_duration_adj"),
>         ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"),
>         ("encounter_time", "percentile_rank_encounter_time"),
>         ("encounter_time_pred", "percentile_pred_encounter_time"),
>         ("encounter_time_adj", "percentile_rank_encounter_time_adj"),
>         ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"),
>     ]:
>         win = (
>             Window().partitionBy(["p_customer_name", "p_site_name",
> "year_month"])
>              .orderBy(col(column_name))
>         )
>         df1 = df.filter(F.col(column_name).isNull())
>         df2 = df.filter(F.col(column_name).isNotNull()).withColumn(
>             new_col, F.round(F.cume_dist().over(win) *
> lit(100)).cast("integer")
>         )
>         df = df2.unionByName(df1, allowMissingColumns=True)
>
> For some reason this code seems to work faster, but it doesn't remove
> NULLs prior to computing the cume_dist. Not sure if this is also a proper
> way to do this :(
>
> for column_name, new_col in [
>         ("event_duration", "percentile_rank_evt_duration"),
>         ("event_duration_pred", "percentile_pred_evt_duration"),
>         ("alarm_cnt", "percentile_rank_alarm_cnt"),
>         ("alarm_cnt_pred", "percentile_pred_alarm_cnt"),
>         ("event_duration_adj", "percentile_rank_evt_duration_adj"),
>         ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"),
>         ("encounter_time", "percentile_rank_encounter_time"),
>         ("encounter_time_pred", "percentile_pred_encounter_time"),
>         ("encounter_time_adj", "percentile_rank_encounter_time_adj"),
>         ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"),
>     ]:
>         win = (
>             Window().partitionBy(["p_customer_name", "p_site_name",
> "year_month"])
>             .orderBy(col(column_name))
>         )
>         df = df.withColumn(
>             new_col,
>             F.when(F.col(column_name).isNull(), F.lit(None)).otherwise(
>                 F.round(F.percent_rank().over(win) *
> lit(100)).cast("integer")
>             ),
>         )
>
> Appreciate if anyone has any pointers on how to go about this..
>
> thanks
> Ramesh
>

Reply via email to