EDIT: I don't think that the question asker will have only returned the top 25 percentages.
lør. 16. sep. 2023 kl. 21:54 skrev Bjørn Jørgensen <bjornjorgen...@gmail.com >: > percentile_approx returns the approximate percentile(s) > <https://github.com/apache/spark/pull/14868> The memory consumption is > bounded. The larger accuracy parameter we choose, the smaller error we get. > The default accuracy value is 10000, to match with Hive default setting. > Choose a smaller value for a smaller memory footprint. > > When I run my code on a single PC where N = 10 millions it takes 22.52 > seconds. Notebook added. > > I don't think that the question asker will have only returned the top > 20 percentages. > > > lør. 16. sep. 2023 kl. 17:49 skrev Mich Talebzadeh < > mich.talebza...@gmail.com>: > >> Hi Bjorn, >> >> I thought that one is better off using percentile_approx as it seems to >> be the recommended approach for computing percentiles and can simplify the >> code. >> I have modified your code to use percentile_approx rather than manually >> computing it. It would be interesting to hear ideas on this. >> >> Here is the code: >> >> # Standard library imports >> import json >> import multiprocessing >> import os >> import re >> import sys >> import random >> >> # Third-party imports >> import numpy as np >> import pandas as pd >> import pyarrow >> >> # Pyspark imports >> from pyspark import SparkConf, SparkContext >> from pyspark.sql import SparkSession, functions as F, Window >> from pyspark.sql.functions import ( >> col, concat, concat_ws, expr, lit, trim, udf >> ) >> from pyspark.sql.types import ( >> IntegerType, StringType, StructField, StructType, >> DoubleType, TimestampType >> ) >> from pyspark import pandas as ps >> >> os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" >> >> number_cores = int(multiprocessing.cpu_count()) >> >> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") # >> e.g. 4015976448 >> memory_gb = int(mem_bytes / (1024.0**3)) # e.g. 3.74 >> >> >> def get_spark_session(app_name: str, conf: SparkConf): >> conf.setMaster("local[{}]".format(number_cores)) >> conf.set("spark.driver.memory", "{}g".format(memory_gb)).set( >> "spark.sql.repl.eagerEval.enabled", "True" >> ).set("spark.sql.adaptive.enabled", "True").set( >> "spark.serializer", "org.apache.spark.serializer.KryoSerializer" >> ).set( >> "spark.sql.repl.eagerEval.maxNumRows", "10000" >> ) >> >> return >> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate() >> >> >> spark = get_spark_session("My super app", SparkConf()) >> sc = SparkContext.getOrCreate() >> sc.setLogLevel("ERROR") >> >> def generate_ip(): >> return ".".join(str(random.randint(0, 255)) for _ in range(4)) >> >> def generate_timestamp(): >> return pd.Timestamp( >> year=random.randint(2021, 2023), >> month=random.randint(1, 12), >> day=random.randint(1, 28), >> hour=random.randint(0, 23), >> minute=random.randint(0, 59), >> second=random.randint(0, 59) >> ) >> >> def random_gbps(): >> return random.uniform(0, 10) >> >> # Number of rows >> n = 20 >> >> data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), >> "date_time": generate_timestamp()} for _ in range(n)] >> df = spark.createDataFrame(pd.DataFrame(data)) >> df.show() >> >> agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps")) >> >> windowRank = Window.orderBy(F.col("total_gbps").desc()) >> agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank)) >> >> # Calculate the 80th percentile value >> percentile_80 = agg_df.agg(F.expr("percentile_approx(total_gbps, >> 0.8)").alias("percentile_80")).collect()[0]["percentile_80"] >> >> # Filter the DataFrame based on the condition >> filtered_df = df.filter(df["gbps"] >= percentile_80) >> >> # Show the filtered DataFrame >> print(f"Filtered DataFrame") >> filtered_df.show() >> >> print(f"Total rows in data frame = {df.count()}") >> print(f"Result satisfying 80% percentile = {filtered_df.count()}") >> >> And this is the results >> >> +---------------+------------------+-------------------+ >> | incoming_ips| gbps| date_time| >> +---------------+------------------+-------------------+ >> |129.189.130.141|2.6517421918102335|2021-09-06 08:29:25| >> | 215.177.39.239|1.8210013026429361|2023-10-10 17:00:13| >> | 78.202.71.184| 8.060958370556456|2022-02-22 04:25:03| >> |219.100.198.137|0.3449002002472945|2023-09-28 01:39:44| >> |234.234.156.107|2.6187481766507013|2022-11-16 11:33:41| >> | 6.223.135.194|0.3510752223686242|2022-01-24 04:13:53| >> | 147.118.171.59|6.4071750880652765|2023-10-08 16:49:10| >> | 75.41.101.165|2.1484984272041685|2022-07-13 21:02:58| >> | 163.26.238.22| 9.8999646499433|2023-01-12 17:54:44| >> | 184.145.98.231|1.8875849709728088|2022-09-18 19:53:58| >> | 125.77.236.177| 1.17126350326476|2021-08-19 18:48:42| >> | 34.103.211.39| 9.51081430594299|2023-02-05 18:39:23| >> | 117.37.42.91| 1.122437784309721|2021-03-23 17:27:27| >> | 108.115.42.171| 8.165187506266607|2023-07-26 03:57:50| >> | 98.105.153.129| 9.284242190156004|2023-10-10 22:36:47| >> | 145.35.252.142| 9.787384042283957|2022-08-26 00:53:27| >> | 18.76.138.108| 6.939770760444909|2022-04-01 01:18:27| >> | 31.33.71.26| 4.820947188427366|2021-06-10 22:02:51| >> | 135.22.8.38| 9.587849542001745|2021-09-21 15:11:59| >> |104.231.110.207| 9.045897927807715|2023-06-28 06:01:00| >> +---------------+------------------+-------------------+ >> >> Filtered DataFrame >> +--------------+-----------------+-------------------+ >> | incoming_ips| gbps| date_time| >> +--------------+-----------------+-------------------+ >> | 163.26.238.22| 9.8999646499433|2023-01-12 17:54:44| >> | 34.103.211.39| 9.51081430594299|2023-02-05 18:39:23| >> |98.105.153.129|9.284242190156004|2023-10-10 22:36:47| >> |145.35.252.142|9.787384042283957|2022-08-26 00:53:27| >> | 135.22.8.38|9.587849542001745|2021-09-21 15:11:59| >> +--------------+-----------------+-------------------+ >> >> Total rows in data frame = 20 >> Result satisfying 80% percentile = 5 >> >> Cheers >> Mich Talebzadeh, >> Distinguished Technologist, Solutions Architect & Engineer >> London >> United Kingdom >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Sat, 16 Sept 2023 at 11:46, Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> Happy Saturday coding 😁 >>> >>> >>> Mich Talebzadeh, >>> Distinguished Technologist, Solutions Architect & Engineer >>> London >>> United Kingdom >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Sat, 16 Sept 2023 at 11:30, Bjørn Jørgensen <bjornjorgen...@gmail.com> >>> wrote: >>> >>>> ah.. yes that's right. >>>> I did have to use some time on this one and I was having some issues >>>> with the code. >>>> I restart the notebook kernel now and rerun it and I get the same >>>> result. >>>> >>>> lør. 16. sep. 2023 kl. 11:41 skrev Mich Talebzadeh < >>>> mich.talebza...@gmail.com>: >>>> >>>>> Splendid code. A minor error glancing at your code. >>>>> >>>>> print(df.count()) >>>>> print(result_df.count()) >>>>> >>>>> >>>>> You have not defined result_df. I gather you meant "result"? >>>>> >>>>> >>>>> print(result.count()) >>>>> >>>>> >>>>> That should fix it 🤔 >>>>> >>>>> HTH >>>>> >>>>> >>>>> Mich Talebzadeh, >>>>> Distinguished Technologist, Solutions Architect & Engineer >>>>> London >>>>> United Kingdom >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Sat, 16 Sept 2023 at 06:00, Bjørn Jørgensen < >>>>> bjornjorgen...@gmail.com> wrote: >>>>> >>>>>> Something like this? >>>>>> >>>>>> >>>>>> # Standard library imports >>>>>> import json >>>>>> import multiprocessing >>>>>> import os >>>>>> import re >>>>>> import sys >>>>>> import random >>>>>> >>>>>> # Third-party imports >>>>>> import numpy as np >>>>>> import pandas as pd >>>>>> import pyarrow >>>>>> >>>>>> # Pyspark imports >>>>>> from pyspark import SparkConf, SparkContext >>>>>> from pyspark.sql import SparkSession, functions as F, Window >>>>>> from pyspark.sql.functions import ( >>>>>> col, concat, concat_ws, expr, lit, trim, udf >>>>>> ) >>>>>> from pyspark.sql.types import ( >>>>>> IntegerType, StringType, StructField, StructType, >>>>>> DoubleType, TimestampType >>>>>> ) >>>>>> from pyspark import pandas as ps >>>>>> >>>>>> os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" >>>>>> >>>>>> number_cores = int(multiprocessing.cpu_count()) >>>>>> >>>>>> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") # >>>>>> e.g. 4015976448 >>>>>> memory_gb = int(mem_bytes / (1024.0**3)) # e.g. 3.74 >>>>>> >>>>>> >>>>>> def get_spark_session(app_name: str, conf: SparkConf): >>>>>> conf.setMaster("local[{}]".format(number_cores)) >>>>>> conf.set("spark.driver.memory", "{}g".format(memory_gb)).set( >>>>>> "spark.sql.repl.eagerEval.enabled", "True" >>>>>> ).set("spark.sql.adaptive.enabled", "True").set( >>>>>> "spark.serializer", "org.apache.spark.serializer.KryoSerializer" >>>>>> ).set( >>>>>> "spark.sql.repl.eagerEval.maxNumRows", "10000" >>>>>> ).set( >>>>>> "sc.setLogLevel", "ERROR" >>>>>> ) >>>>>> >>>>>> return >>>>>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate() >>>>>> >>>>>> >>>>>> spark = get_spark_session("My super app", SparkConf()) >>>>>> spark.sparkContext.setLogLevel("ERROR") >>>>>> >>>>>> >>>>>> >>>>>> def generate_ip(): >>>>>> return ".".join(str(random.randint(0, 255)) for _ in range(4)) >>>>>> >>>>>> def generate_timestamp(): >>>>>> return pd.Timestamp( >>>>>> year=random.randint(2021, 2023), >>>>>> month=random.randint(1, 12), >>>>>> day=random.randint(1, 28), >>>>>> hour=random.randint(0, 23), >>>>>> minute=random.randint(0, 59), >>>>>> second=random.randint(0, 59) >>>>>> ) >>>>>> >>>>>> def random_gbps(): >>>>>> return random.uniform(0, 10) >>>>>> >>>>>> # Number of rows >>>>>> n = 20 >>>>>> >>>>>> data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), >>>>>> "date_time": generate_timestamp()} for _ in range(n)] >>>>>> df = spark.createDataFrame(pd.DataFrame(data)) >>>>>> df.show() >>>>>> >>>>>> agg_df = >>>>>> df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps")) >>>>>> >>>>>> windowRank = Window.orderBy(F.col("total_gbps").desc()) >>>>>> agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank)) >>>>>> >>>>>> top_80_ips = agg_df.filter(F.col("rank") <= 0.80) >>>>>> result = df.join(top_80_ips, on="incoming_ips", >>>>>> how="inner").select("incoming_ips", "gbps", "date_time") >>>>>> result.show() >>>>>> >>>>>> print(df.count()) >>>>>> print(result_df.count()) >>>>>> >>>>>> >>>>>> +---------------+-------------------+-------------------+ >>>>>> | incoming_ips| gbps| date_time| >>>>>> +---------------+-------------------+-------------------+ >>>>>> | 66.186.8.130| 5.074283124722104|2022-03-12 05:09:16| >>>>>> | 155.45.76.235| 0.6736194760917324|2021-06-19 03:36:28| >>>>>> | 237.51.137.200|0.43334812775057685|2022-04-27 08:08:47| >>>>>> | 78.4.48.171| 7.5675453578753435|2022-08-21 18:55:48| >>>>>> | 241.84.163.17| 3.5681655964070815|2021-01-24 20:39:50| >>>>>> |130.255.202.138| 6.066112278135983|2023-07-07 22:26:15| >>>>>> | 198.33.206.140| 1.9147905257021836|2023-03-01 04:44:14| >>>>>> | 84.183.253.20| 7.707176860385722|2021-08-26 23:24:31| >>>>>> |218.163.165.232| 9.458673015973213|2021-02-22 12:13:15| >>>>>> | 62.57.20.153| 1.5764916247359229|2021-11-06 12:41:59| >>>>>> | 245.24.168.152|0.07452805411698016|2021-06-04 16:14:36| >>>>>> | 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26| >>>>>> | 210.5.246.85|0.02430730260109759|2022-04-08 17:26:04| >>>>>> | 13.236.170.177| 2.41361938344535|2021-08-11 02:19:06| >>>>>> |180.140.248.193| 0.9512956363005021|2021-06-27 18:16:58| >>>>>> | 26.140.88.127| 7.51335778127692|2023-06-02 14:13:30| >>>>>> | 7.118.207.252| 6.450499049816286|2022-12-11 06:36:20| >>>>>> | 11.8.10.136| 8.750329246667354|2023-02-03 05:33:16| >>>>>> | 232.140.56.86| 4.289740988237201|2023-02-22 20:10:09| >>>>>> | 68.117.9.255| 5.384340363304169|2022-12-03 09:55:26| >>>>>> +---------------+-------------------+-------------------+ >>>>>> >>>>>> +---------------+------------------+-------------------+ >>>>>> | incoming_ips| gbps| date_time| >>>>>> +---------------+------------------+-------------------+ >>>>>> | 66.186.8.130| 5.074283124722104|2022-03-12 05:09:16| >>>>>> | 241.84.163.17|3.5681655964070815|2021-01-24 20:39:50| >>>>>> | 78.4.48.171|7.5675453578753435|2022-08-21 18:55:48| >>>>>> |130.255.202.138| 6.066112278135983|2023-07-07 22:26:15| >>>>>> | 198.33.206.140|1.9147905257021836|2023-03-01 04:44:14| >>>>>> | 84.183.253.20| 7.707176860385722|2021-08-26 23:24:31| >>>>>> |218.163.165.232| 9.458673015973213|2021-02-22 12:13:15| >>>>>> | 62.57.20.153|1.5764916247359229|2021-11-06 12:41:59| >>>>>> | 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26| >>>>>> |180.140.248.193|0.9512956363005021|2021-06-27 18:16:58| >>>>>> | 13.236.170.177| 2.41361938344535|2021-08-11 02:19:06| >>>>>> | 26.140.88.127| 7.51335778127692|2023-06-02 14:13:30| >>>>>> | 7.118.207.252| 6.450499049816286|2022-12-11 06:36:20| >>>>>> | 11.8.10.136| 8.750329246667354|2023-02-03 05:33:16| >>>>>> | 232.140.56.86| 4.289740988237201|2023-02-22 20:10:09| >>>>>> | 68.117.9.255| 5.384340363304169|2022-12-03 09:55:26| >>>>>> +---------------+------------------+-------------------+ >>>>>> >>>>>> 20 >>>>>> 16 >>>>>> >>>>>> >>>>>> >>>>>> fre. 15. sep. 2023 kl. 20:14 skrev ashok34...@yahoo.com.INVALID >>>>>> <ashok34...@yahoo.com.invalid>: >>>>>> >>>>>>> Hi team, >>>>>>> >>>>>>> I am using PySpark 3.4 >>>>>>> >>>>>>> I have a table of million rows that has few columns. among them >>>>>>> incoming ips and what is known as gbps (Gigabytes per second) and >>>>>>> date and time of incoming ip. >>>>>>> >>>>>>> I want to filter out 20% of low active ips and work on the remainder >>>>>>> of data. How can I do thiis in PySpark? >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Bjørn Jørgensen >>>>>> Vestre Aspehaug 4, 6010 Ålesund >>>>>> Norge >>>>>> >>>>>> +47 480 94 297 >>>>>> >>>>> >>>> >>>> -- >>>> Bjørn Jørgensen >>>> Vestre Aspehaug 4, 6010 Ålesund >>>> Norge >>>> >>>> +47 480 94 297 >>>> >>> > > -- > Bjørn Jørgensen > Vestre Aspehaug 4, 6010 Ålesund > Norge > > +47 480 94 297 > -- Bjørn Jørgensen Vestre Aspehaug 4, 6010 Ålesund Norge +47 480 94 297