What you described is not clear here.

Do you want to rank your data based on (date, hour, language, item_type, 
time_zone), and sort by score;

or you want to rank your data based on (date, hour) and sort by language, 
item_type, time_zone and score?


If you mean the first one, then your Spark code looks like right, but the 
example you gave didn't include "time_zone", which maybe the reason the rank 
starting from 1 again.


In Spark windows specification, partition by is for the columns you want to 
grouping at, order by is to decide the ordering order within the partition. 
Both can be applied for multi columns.


Yong

________________________________
From: Dana Ram Meghwal <dana...@saavn.com>
Sent: Friday, February 24, 2017 2:08 AM
To: user@spark.apache.org
Subject: Fwd: Duplicate Rank for within same partitions


---------- Forwarded message ----------
From: Dana Ram Meghwal <dana...@saavn.com<mailto:dana...@saavn.com>>
Date: Thu, Feb 23, 2017 at 10:40 PM
Subject: Duplicate Rank for within same partitions
To: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>


Hey Guys,

I am new to spark. I am trying to write a spark script which involves finding  
rank of records over same data partitions-- (I will be clear in short while )


I have a table which have following column name and example data looks like 
this (record are around 20 million for each pair of  date ,hour, language and 
item_type)

Id,      language,   date,              hour,  item_type,   score
1        hindi            20170220        00        song            10
2        hindi             20170220        00        song          12
3        hindi              20170220       00         song          15
.
.
.
till 20 million


4       english            20170220       00         song        9
5       english            20170220        00         song        18
6      english             20170220        00          song        12
.
.
.
till 20 million


Now I want to rank them over language, date, hour, item_type

so finally it will look like this

Id,      language,   date,            hour,  item_type,   score   rank
1        hindi            20170220     00        song            10      1
2        hindi             20170220     00            song          12      2
3        hindi              20170220    00              song          15       3

4       english            20170220     00          song        9       1
6      english             20170220      00         song        12      2
5       english            20170220       00             song        18      3



to solve this I use rank function in spark

code look like following

1. converting rdd to dataframe

rdd_with_final_score_df  = spark.read.json(rdd_with_final_score).repartition(1);

2. setting window specifications

w = 
Window.partitionBy("dt","hour","language","item_type","time_zone").orderBy(rdd_with_final_score_df.score.cast("float").desc())

3. calculating ranks by repartition to 1  partition

rdd_with_final_score_df_rank_df = 
rdd_with_final_score_df.repartition(1).withColumn('rank', row_number().over(w))

Now number of row in " rdd_with_final_score" is so high  so this RDD is 
distributed across machines in cluster.


I am getting result but for each partition I am getting duplicate rank within 
partition

for e.g.

Id,      language,   date,            hour,  item_type,   score   rank
1        hindi            20170220     00        song            10      1
2        hindi             20170220     00            song        12      2
3        hindi              20170220    00              song      15       1


here record 1 and record 3 have same rank but it is expected that they should 
have different rank or rank should be unique for different score values.

 is case that each partition of RDD  rank is getting calculated separately ? 
and then merging because of that that multiple row getting same rank.


It will be very very help for me if you guys can help me understand what is 
going on here and how can we solve this.. I thought repartition would work but 
it did not..


I try to use rowBetween or rangeBetween but  it was giving error --

pyspark.sql.utils.AnalysisException: u'Window Frame ROWS BETWEEN 1 PRECEDING 
AND 1 FOLLOWING must match the required frame ROWS BETWEEN UNBOUNDED PRECEDING 
AN

D CURRENT ROW;'








--
Dana Ram Meghwal
Software Engineer
dana...@saavn.com<mailto:dana...@saavn.com>




--
Dana Ram Meghwal
Software Engineer
dana...@saavn.com<mailto:dana...@saavn.com>

Reply via email to