Hi, I am not sure what you are trying to achieve here are cume_dist and percent_rank not different?
If am able to follow your question correctly, you are looking for filtering our NULLs before applying the function on the dataframe, and I think it will be fine if you just create another dataframe first with the non null values and then apply the function to that dataframe. It will be of much help if you can explain what are you trying to achieve here. Applying loops on dataframe, like you have done in the dataframe is surely not recommended at all, please see the explain plan of the dataframe in each iteration to understand the effect of your loops on the explain plan - that should give some details. Regards, Gourav Sengupta On Mon, Jan 10, 2022 at 10:49 PM Ramesh Natarajan <rames...@gmail.com> wrote: > I want to compute cume_dist on a bunch of columns in a spark dataframe, > but want to remove NULL values before doing so. > > I have this loop in pyspark. While this works, I see the driver runs at > 100% while the executors are idle for the most part. I am reading that > running a loop is an anti-pattern and should be avoided. Any pointers on > how to optimize this section of pyspark code? > > I am running this on the AWS Glue 3.0 environment. > > for column_name, new_col in [ > ("event_duration", "percentile_rank_evt_duration"), > ("event_duration_pred", "percentile_pred_evt_duration"), > ("alarm_cnt", "percentile_rank_alarm_cnt"), > ("alarm_cnt_pred", "percentile_pred_alarm_cnt"), > ("event_duration_adj", "percentile_rank_evt_duration_adj"), > ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"), > ("encounter_time", "percentile_rank_encounter_time"), > ("encounter_time_pred", "percentile_pred_encounter_time"), > ("encounter_time_adj", "percentile_rank_encounter_time_adj"), > ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"), > ]: > win = ( > Window().partitionBy(["p_customer_name", "p_site_name", > "year_month"]) > .orderBy(col(column_name)) > ) > df1 = df.filter(F.col(column_name).isNull()) > df2 = df.filter(F.col(column_name).isNotNull()).withColumn( > new_col, F.round(F.cume_dist().over(win) * > lit(100)).cast("integer") > ) > df = df2.unionByName(df1, allowMissingColumns=True) > > For some reason this code seems to work faster, but it doesn't remove > NULLs prior to computing the cume_dist. Not sure if this is also a proper > way to do this :( > > for column_name, new_col in [ > ("event_duration", "percentile_rank_evt_duration"), > ("event_duration_pred", "percentile_pred_evt_duration"), > ("alarm_cnt", "percentile_rank_alarm_cnt"), > ("alarm_cnt_pred", "percentile_pred_alarm_cnt"), > ("event_duration_adj", "percentile_rank_evt_duration_adj"), > ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"), > ("encounter_time", "percentile_rank_encounter_time"), > ("encounter_time_pred", "percentile_pred_encounter_time"), > ("encounter_time_adj", "percentile_rank_encounter_time_adj"), > ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"), > ]: > win = ( > Window().partitionBy(["p_customer_name", "p_site_name", > "year_month"]) > .orderBy(col(column_name)) > ) > df = df.withColumn( > new_col, > F.when(F.col(column_name).isNull(), F.lit(None)).otherwise( > F.round(F.percent_rank().over(win) * > lit(100)).cast("integer") > ), > ) > > Appreciate if anyone has any pointers on how to go about this.. > > thanks > Ramesh >