In general you can probably do all this in spark-sql by reading in Hive table through a DF in Pyspark, then creating a TempView on that DF, select PM data through CAST() function and then use a windowing function to select the top 5 with DENSE_RANK()
#Read Hive table as a DataFrame df = spark.read.table("hive.sample_data") # Create a view on the DataFrame df.createOrReplaceTempView("tmp") sqltext = f""" SELECT incoming_ip, total_volume FROM ( SELECT incoming_ip, SUM(volume) AS total_volume, DENSE_RANK() OVER ( ORDER BY SUM(volume) DESC) AS rank FROM tmp WHERE CAST(time_in AS TIME) BETWEEN '12:00:00' AND '23:59:59' GROUP BY incoming_ip ) ranked_ips WHERE rank <= 5; """ spark.sql(sqltext).show(5,False) HTH Mich Talebzadeh, Distinguished Technologist, Solutions Architect & Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 21 Sept 2023 at 18:03, ashok34...@yahoo.com.INVALID <ashok34...@yahoo.com.invalid> wrote: > Hello gurus, > > I have a Hive table created as below (there are more columns) > > CREATE TABLE hive.sample_data ( incoming_ip STRING, time_in TIMESTAMP, > volume INT ); > > Data is stored in that table > > In PySpark, I want to select the top 5 incoming IP addresses with the > highest total volume of data transferred during the PM hours. PM hours are > decided by the column time_in with values like '00:45:00', '11:35:00', > '18:25:00' > > Any advice is appreciated. > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >