pyspark loop optimization

2022-01-10 Thread Ramesh Natarajan
I want to compute cume_dist on a bunch of columns in a spark dataframe, but
want to remove NULL values before doing so.

I have this loop in pyspark. While this works, I see the driver runs at
100% while the executors are idle for the most part. I am reading that
running a loop is an anti-pattern and should be avoided. Any pointers on
how to optimize this section of pyspark code?

I am running this on  the AWS Glue 3.0 environment.

for column_name, new_col in [
("event_duration", "percentile_rank_evt_duration"),
("event_duration_pred", "percentile_pred_evt_duration"),
("alarm_cnt", "percentile_rank_alarm_cnt"),
("alarm_cnt_pred", "percentile_pred_alarm_cnt"),
("event_duration_adj", "percentile_rank_evt_duration_adj"),
("event_duration_adj_pred", "percentile_pred_evt_duration_adj"),
("encounter_time", "percentile_rank_encounter_time"),
("encounter_time_pred", "percentile_pred_encounter_time"),
("encounter_time_adj", "percentile_rank_encounter_time_adj"),
("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"),
]:
win = (
Window().partitionBy(["p_customer_name", "p_site_name",
"year_month"])
 .orderBy(col(column_name))
)
df1 = df.filter(F.col(column_name).isNull())
df2 = df.filter(F.col(column_name).isNotNull()).withColumn(
new_col, F.round(F.cume_dist().over(win) *
lit(100)).cast("integer")
)
df = df2.unionByName(df1, allowMissingColumns=True)

For some reason this code seems to work faster, but it doesn't remove NULLs
prior to computing the cume_dist. Not sure if this is also a proper way to
do this :(

for column_name, new_col in [
("event_duration", "percentile_rank_evt_duration"),
("event_duration_pred", "percentile_pred_evt_duration"),
("alarm_cnt", "percentile_rank_alarm_cnt"),
("alarm_cnt_pred", "percentile_pred_alarm_cnt"),
("event_duration_adj", "percentile_rank_evt_duration_adj"),
("event_duration_adj_pred", "percentile_pred_evt_duration_adj"),
("encounter_time", "percentile_rank_encounter_time"),
("encounter_time_pred", "percentile_pred_encounter_time"),
("encounter_time_adj", "percentile_rank_encounter_time_adj"),
("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"),
]:
win = (
Window().partitionBy(["p_customer_name", "p_site_name",
"year_month"])
.orderBy(col(column_name))
)
df = df.withColumn(
new_col,
F.when(F.col(column_name).isNull(), F.lit(None)).otherwise(
F.round(F.percent_rank().over(win) *
lit(100)).cast("integer")
),
)

Appreciate if anyone has any pointers on how to go about this..

thanks
Ramesh


Re: How to add a row number column with out reordering my data frame

2022-01-10 Thread Gourav Sengupta
Hi,

I am a bit confused here, it is not entirely clear to me why are you
creating the row numbers, and how creating the row numbers helps you with
the joins?

Can you please explain with some sample data?


Regards,
Gourav

On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson 
wrote:

> Hi
>
>
>
> I am trying to work through a OOM error. I have 10411 files. I want to
> select a single column from each file and then join them into a single
> table.
>
>
>
> The files have a row unique id. However it is a very long string. The data
> file with just the name and column of interest is about 470 M. The column
> of interest alone is 21 m. it is a column over 5 million real numbers.
>
>
>
> So I thought I would save a lot of memory if I can join over row numbers.
>
>
>
> # create *dummy* variable to *orderby*
> https://www.py4u.net/discuss/1840945
>
> w = Window().orderBy(lit('A'))
>
> sampleDF = sampleDF.select( ["NumReads"] )\
>
> .withColumnRenamed( "NumReads", sampleName )\
>
> .withColumn( "*tid*",row_number().over(w) )
>
>
>
>
>
> This code seem pretty complicated as someone coming from pandas an R
> dataframes. My unit test works however it generates the following warning.
>
>
>
>
>
> WARN WindowExec: No Partition Defined for Window operation! Moving all
> data to a single partition, this can cause serious performance degradation.
>
>
>
>
>
> Is there a better  way to create a row number with our reordering my data?
> The order is important
>
>
>
> Kind regards
>
>
>
> Andy
>


Re: hive table with large column data size

2022-01-10 Thread Gourav Sengupta
Hi,

As always, before answering the question, can I please ask what are you
trying to achieve by storing the data in a table? How are you planning to
query a binary data?

If you look at any relational theory, then it states that a table is a
relation/ entity and the fields the attributes. You might consider an image
to be an attribute of a tuple (or record) belonging to a particular
relation, but there might be more efficient methods of storing the binary
data, but it all depends on what are you trying to do?

For the data types please look here:
https://spark.apache.org/docs/latest/sql-ref-datatypes.html. Parquet is
definitely a columnar format, and if I am not entirely wrong, it definitely
supports columnar reading of data by default in SPARK.


Regards,
Gourav Sengupta

On Sun, Jan 9, 2022 at 2:34 PM weoccc  wrote:

> Hi ,
>
> I want to store binary data (such as images) into hive table but the
> binary data column might be much larger than other columns per row.  I'm
> worried about the query performance. One way I can think of is to separate
> binary data storage from other columns by creating 2 hive tables and run 2
> separate spark query and join them later.
>
> Later, I found parquet has supported column split into different files as
> shown here:
> https://parquet.apache.org/documentation/latest/
>
> I'm wondering if spark sql already supports that ? If so, how to use ?
>
> Weide
>


Difference in behavior for Spark 3.0 vs Spark 3.1 "create database "

2022-01-10 Thread Pralabh Kumar
Hi Spark Team

When creating a database via Spark 3.0 on Hive

1) spark.sql("create database test location '/user/hive'").  It creates the
database location on hdfs . As expected

2) When running the same command on 3.1 the database is created on the
local file system by default. I have to prefix with hdfs to create db on
hdfs.

Why is there a difference in the behavior, Can you please point me to the
jira which causes this change.

Note : spark.sql.warehouse.dir and hive.metastore.warehouse.dir both are
having default values(not explicitly set)

Regards
Pralabh Kumar