percentile_approx returns the approximate percentile(s) <https://github.com/apache/spark/pull/14868> The memory consumption is bounded. The larger accuracy parameter we choose, the smaller error we get. The default accuracy value is 10000, to match with Hive default setting. Choose a smaller value for a smaller memory footprint.
When I run my code on a single PC where N = 10 millions it takes 22.52 seconds. Notebook added. I don't think that the question asker will have only returned the top 20 percentages. lør. 16. sep. 2023 kl. 17:49 skrev Mich Talebzadeh < mich.talebza...@gmail.com>: > Hi Bjorn, > > I thought that one is better off using percentile_approx as it seems to be > the recommended approach for computing percentiles and can simplify the > code. > I have modified your code to use percentile_approx rather than manually > computing it. It would be interesting to hear ideas on this. > > Here is the code: > > # Standard library imports > import json > import multiprocessing > import os > import re > import sys > import random > > # Third-party imports > import numpy as np > import pandas as pd > import pyarrow > > # Pyspark imports > from pyspark import SparkConf, SparkContext > from pyspark.sql import SparkSession, functions as F, Window > from pyspark.sql.functions import ( > col, concat, concat_ws, expr, lit, trim, udf > ) > from pyspark.sql.types import ( > IntegerType, StringType, StructField, StructType, > DoubleType, TimestampType > ) > from pyspark import pandas as ps > > os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" > > number_cores = int(multiprocessing.cpu_count()) > > mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") # > e.g. 4015976448 > memory_gb = int(mem_bytes / (1024.0**3)) # e.g. 3.74 > > > def get_spark_session(app_name: str, conf: SparkConf): > conf.setMaster("local[{}]".format(number_cores)) > conf.set("spark.driver.memory", "{}g".format(memory_gb)).set( > "spark.sql.repl.eagerEval.enabled", "True" > ).set("spark.sql.adaptive.enabled", "True").set( > "spark.serializer", "org.apache.spark.serializer.KryoSerializer" > ).set( > "spark.sql.repl.eagerEval.maxNumRows", "10000" > ) > > return > SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate() > > > spark = get_spark_session("My super app", SparkConf()) > sc = SparkContext.getOrCreate() > sc.setLogLevel("ERROR") > > def generate_ip(): > return ".".join(str(random.randint(0, 255)) for _ in range(4)) > > def generate_timestamp(): > return pd.Timestamp( > year=random.randint(2021, 2023), > month=random.randint(1, 12), > day=random.randint(1, 28), > hour=random.randint(0, 23), > minute=random.randint(0, 59), > second=random.randint(0, 59) > ) > > def random_gbps(): > return random.uniform(0, 10) > > # Number of rows > n = 20 > > data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), > "date_time": generate_timestamp()} for _ in range(n)] > df = spark.createDataFrame(pd.DataFrame(data)) > df.show() > > agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps")) > > windowRank = Window.orderBy(F.col("total_gbps").desc()) > agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank)) > > # Calculate the 80th percentile value > percentile_80 = agg_df.agg(F.expr("percentile_approx(total_gbps, > 0.8)").alias("percentile_80")).collect()[0]["percentile_80"] > > # Filter the DataFrame based on the condition > filtered_df = df.filter(df["gbps"] >= percentile_80) > > # Show the filtered DataFrame > print(f"Filtered DataFrame") > filtered_df.show() > > print(f"Total rows in data frame = {df.count()}") > print(f"Result satisfying 80% percentile = {filtered_df.count()}") > > And this is the results > > +---------------+------------------+-------------------+ > | incoming_ips| gbps| date_time| > +---------------+------------------+-------------------+ > |129.189.130.141|2.6517421918102335|2021-09-06 08:29:25| > | 215.177.39.239|1.8210013026429361|2023-10-10 17:00:13| > | 78.202.71.184| 8.060958370556456|2022-02-22 04:25:03| > |219.100.198.137|0.3449002002472945|2023-09-28 01:39:44| > |234.234.156.107|2.6187481766507013|2022-11-16 11:33:41| > | 6.223.135.194|0.3510752223686242|2022-01-24 04:13:53| > | 147.118.171.59|6.4071750880652765|2023-10-08 16:49:10| > | 75.41.101.165|2.1484984272041685|2022-07-13 21:02:58| > | 163.26.238.22| 9.8999646499433|2023-01-12 17:54:44| > | 184.145.98.231|1.8875849709728088|2022-09-18 19:53:58| > | 125.77.236.177| 1.17126350326476|2021-08-19 18:48:42| > | 34.103.211.39| 9.51081430594299|2023-02-05 18:39:23| > | 117.37.42.91| 1.122437784309721|2021-03-23 17:27:27| > | 108.115.42.171| 8.165187506266607|2023-07-26 03:57:50| > | 98.105.153.129| 9.284242190156004|2023-10-10 22:36:47| > | 145.35.252.142| 9.787384042283957|2022-08-26 00:53:27| > | 18.76.138.108| 6.939770760444909|2022-04-01 01:18:27| > | 31.33.71.26| 4.820947188427366|2021-06-10 22:02:51| > | 135.22.8.38| 9.587849542001745|2021-09-21 15:11:59| > |104.231.110.207| 9.045897927807715|2023-06-28 06:01:00| > +---------------+------------------+-------------------+ > > Filtered DataFrame > +--------------+-----------------+-------------------+ > | incoming_ips| gbps| date_time| > +--------------+-----------------+-------------------+ > | 163.26.238.22| 9.8999646499433|2023-01-12 17:54:44| > | 34.103.211.39| 9.51081430594299|2023-02-05 18:39:23| > |98.105.153.129|9.284242190156004|2023-10-10 22:36:47| > |145.35.252.142|9.787384042283957|2022-08-26 00:53:27| > | 135.22.8.38|9.587849542001745|2021-09-21 15:11:59| > +--------------+-----------------+-------------------+ > > Total rows in data frame = 20 > Result satisfying 80% percentile = 5 > > Cheers > Mich Talebzadeh, > Distinguished Technologist, Solutions Architect & Engineer > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 16 Sept 2023 at 11:46, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> Happy Saturday coding 😁 >> >> >> Mich Talebzadeh, >> Distinguished Technologist, Solutions Architect & Engineer >> London >> United Kingdom >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Sat, 16 Sept 2023 at 11:30, Bjørn Jørgensen <bjornjorgen...@gmail.com> >> wrote: >> >>> ah.. yes that's right. >>> I did have to use some time on this one and I was having some issues >>> with the code. >>> I restart the notebook kernel now and rerun it and I get the same >>> result. >>> >>> lør. 16. sep. 2023 kl. 11:41 skrev Mich Talebzadeh < >>> mich.talebza...@gmail.com>: >>> >>>> Splendid code. A minor error glancing at your code. >>>> >>>> print(df.count()) >>>> print(result_df.count()) >>>> >>>> >>>> You have not defined result_df. I gather you meant "result"? >>>> >>>> >>>> print(result.count()) >>>> >>>> >>>> That should fix it 🤔 >>>> >>>> HTH >>>> >>>> >>>> Mich Talebzadeh, >>>> Distinguished Technologist, Solutions Architect & Engineer >>>> London >>>> United Kingdom >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Sat, 16 Sept 2023 at 06:00, Bjørn Jørgensen < >>>> bjornjorgen...@gmail.com> wrote: >>>> >>>>> Something like this? >>>>> >>>>> >>>>> # Standard library imports >>>>> import json >>>>> import multiprocessing >>>>> import os >>>>> import re >>>>> import sys >>>>> import random >>>>> >>>>> # Third-party imports >>>>> import numpy as np >>>>> import pandas as pd >>>>> import pyarrow >>>>> >>>>> # Pyspark imports >>>>> from pyspark import SparkConf, SparkContext >>>>> from pyspark.sql import SparkSession, functions as F, Window >>>>> from pyspark.sql.functions import ( >>>>> col, concat, concat_ws, expr, lit, trim, udf >>>>> ) >>>>> from pyspark.sql.types import ( >>>>> IntegerType, StringType, StructField, StructType, >>>>> DoubleType, TimestampType >>>>> ) >>>>> from pyspark import pandas as ps >>>>> >>>>> os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" >>>>> >>>>> number_cores = int(multiprocessing.cpu_count()) >>>>> >>>>> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") # >>>>> e.g. 4015976448 >>>>> memory_gb = int(mem_bytes / (1024.0**3)) # e.g. 3.74 >>>>> >>>>> >>>>> def get_spark_session(app_name: str, conf: SparkConf): >>>>> conf.setMaster("local[{}]".format(number_cores)) >>>>> conf.set("spark.driver.memory", "{}g".format(memory_gb)).set( >>>>> "spark.sql.repl.eagerEval.enabled", "True" >>>>> ).set("spark.sql.adaptive.enabled", "True").set( >>>>> "spark.serializer", "org.apache.spark.serializer.KryoSerializer" >>>>> ).set( >>>>> "spark.sql.repl.eagerEval.maxNumRows", "10000" >>>>> ).set( >>>>> "sc.setLogLevel", "ERROR" >>>>> ) >>>>> >>>>> return >>>>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate() >>>>> >>>>> >>>>> spark = get_spark_session("My super app", SparkConf()) >>>>> spark.sparkContext.setLogLevel("ERROR") >>>>> >>>>> >>>>> >>>>> def generate_ip(): >>>>> return ".".join(str(random.randint(0, 255)) for _ in range(4)) >>>>> >>>>> def generate_timestamp(): >>>>> return pd.Timestamp( >>>>> year=random.randint(2021, 2023), >>>>> month=random.randint(1, 12), >>>>> day=random.randint(1, 28), >>>>> hour=random.randint(0, 23), >>>>> minute=random.randint(0, 59), >>>>> second=random.randint(0, 59) >>>>> ) >>>>> >>>>> def random_gbps(): >>>>> return random.uniform(0, 10) >>>>> >>>>> # Number of rows >>>>> n = 20 >>>>> >>>>> data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), >>>>> "date_time": generate_timestamp()} for _ in range(n)] >>>>> df = spark.createDataFrame(pd.DataFrame(data)) >>>>> df.show() >>>>> >>>>> agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps")) >>>>> >>>>> windowRank = Window.orderBy(F.col("total_gbps").desc()) >>>>> agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank)) >>>>> >>>>> top_80_ips = agg_df.filter(F.col("rank") <= 0.80) >>>>> result = df.join(top_80_ips, on="incoming_ips", >>>>> how="inner").select("incoming_ips", "gbps", "date_time") >>>>> result.show() >>>>> >>>>> print(df.count()) >>>>> print(result_df.count()) >>>>> >>>>> >>>>> +---------------+-------------------+-------------------+ >>>>> | incoming_ips| gbps| date_time| >>>>> +---------------+-------------------+-------------------+ >>>>> | 66.186.8.130| 5.074283124722104|2022-03-12 05:09:16| >>>>> | 155.45.76.235| 0.6736194760917324|2021-06-19 03:36:28| >>>>> | 237.51.137.200|0.43334812775057685|2022-04-27 08:08:47| >>>>> | 78.4.48.171| 7.5675453578753435|2022-08-21 18:55:48| >>>>> | 241.84.163.17| 3.5681655964070815|2021-01-24 20:39:50| >>>>> |130.255.202.138| 6.066112278135983|2023-07-07 22:26:15| >>>>> | 198.33.206.140| 1.9147905257021836|2023-03-01 04:44:14| >>>>> | 84.183.253.20| 7.707176860385722|2021-08-26 23:24:31| >>>>> |218.163.165.232| 9.458673015973213|2021-02-22 12:13:15| >>>>> | 62.57.20.153| 1.5764916247359229|2021-11-06 12:41:59| >>>>> | 245.24.168.152|0.07452805411698016|2021-06-04 16:14:36| >>>>> | 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26| >>>>> | 210.5.246.85|0.02430730260109759|2022-04-08 17:26:04| >>>>> | 13.236.170.177| 2.41361938344535|2021-08-11 02:19:06| >>>>> |180.140.248.193| 0.9512956363005021|2021-06-27 18:16:58| >>>>> | 26.140.88.127| 7.51335778127692|2023-06-02 14:13:30| >>>>> | 7.118.207.252| 6.450499049816286|2022-12-11 06:36:20| >>>>> | 11.8.10.136| 8.750329246667354|2023-02-03 05:33:16| >>>>> | 232.140.56.86| 4.289740988237201|2023-02-22 20:10:09| >>>>> | 68.117.9.255| 5.384340363304169|2022-12-03 09:55:26| >>>>> +---------------+-------------------+-------------------+ >>>>> >>>>> +---------------+------------------+-------------------+ >>>>> | incoming_ips| gbps| date_time| >>>>> +---------------+------------------+-------------------+ >>>>> | 66.186.8.130| 5.074283124722104|2022-03-12 05:09:16| >>>>> | 241.84.163.17|3.5681655964070815|2021-01-24 20:39:50| >>>>> | 78.4.48.171|7.5675453578753435|2022-08-21 18:55:48| >>>>> |130.255.202.138| 6.066112278135983|2023-07-07 22:26:15| >>>>> | 198.33.206.140|1.9147905257021836|2023-03-01 04:44:14| >>>>> | 84.183.253.20| 7.707176860385722|2021-08-26 23:24:31| >>>>> |218.163.165.232| 9.458673015973213|2021-02-22 12:13:15| >>>>> | 62.57.20.153|1.5764916247359229|2021-11-06 12:41:59| >>>>> | 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26| >>>>> |180.140.248.193|0.9512956363005021|2021-06-27 18:16:58| >>>>> | 13.236.170.177| 2.41361938344535|2021-08-11 02:19:06| >>>>> | 26.140.88.127| 7.51335778127692|2023-06-02 14:13:30| >>>>> | 7.118.207.252| 6.450499049816286|2022-12-11 06:36:20| >>>>> | 11.8.10.136| 8.750329246667354|2023-02-03 05:33:16| >>>>> | 232.140.56.86| 4.289740988237201|2023-02-22 20:10:09| >>>>> | 68.117.9.255| 5.384340363304169|2022-12-03 09:55:26| >>>>> +---------------+------------------+-------------------+ >>>>> >>>>> 20 >>>>> 16 >>>>> >>>>> >>>>> >>>>> fre. 15. sep. 2023 kl. 20:14 skrev ashok34...@yahoo.com.INVALID >>>>> <ashok34...@yahoo.com.invalid>: >>>>> >>>>>> Hi team, >>>>>> >>>>>> I am using PySpark 3.4 >>>>>> >>>>>> I have a table of million rows that has few columns. among them >>>>>> incoming ips and what is known as gbps (Gigabytes per second) and >>>>>> date and time of incoming ip. >>>>>> >>>>>> I want to filter out 20% of low active ips and work on the remainder >>>>>> of data. How can I do thiis in PySpark? >>>>>> >>>>>> Thanks >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Bjørn Jørgensen >>>>> Vestre Aspehaug 4, 6010 Ålesund >>>>> Norge >>>>> >>>>> +47 480 94 297 >>>>> >>>> >>> >>> -- >>> Bjørn Jørgensen >>> Vestre Aspehaug 4, 6010 Ålesund >>> Norge >>> >>> +47 480 94 297 >>> >> -- Bjørn Jørgensen Vestre Aspehaug 4, 6010 Ålesund Norge +47 480 94 297
Untitled7.ipynb
Description: Binary data
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org