EDIT:
I don't think that the question asker will have only returned the top 25
percentages.

lør. 16. sep. 2023 kl. 21:54 skrev Bjørn Jørgensen <bjornjorgen...@gmail.com
>:

> percentile_approx returns the approximate percentile(s)
> <https://github.com/apache/spark/pull/14868> The memory consumption is
> bounded. The larger accuracy parameter we choose, the smaller error we get.
> The default accuracy value is 10000, to match with Hive default setting.
> Choose a smaller value for a smaller memory footprint.
>
> When I run my code on a single PC where N = 10 millions it takes 22.52
> seconds. Notebook added.
>
> I don't think that the question asker will have only returned the top
> 20 percentages.
>
>
> lør. 16. sep. 2023 kl. 17:49 skrev Mich Talebzadeh <
> mich.talebza...@gmail.com>:
>
>> Hi Bjorn,
>>
>> I thought that one is better off using percentile_approx as it seems to
>> be the recommended approach for computing percentiles and can simplify the
>> code.
>> I have modified your code to use percentile_approx rather than manually
>> computing it. It would be interesting to hear ideas on this.
>>
>> Here is the code:
>>
>> # Standard library imports
>> import json
>> import multiprocessing
>> import os
>> import re
>> import sys
>> import random
>>
>> # Third-party imports
>> import numpy as np
>> import pandas as pd
>> import pyarrow
>>
>> # Pyspark imports
>> from pyspark import SparkConf, SparkContext
>> from pyspark.sql import SparkSession, functions as F, Window
>> from pyspark.sql.functions import (
>>     col, concat, concat_ws, expr, lit, trim, udf
>> )
>> from pyspark.sql.types import (
>>     IntegerType, StringType, StructField, StructType,
>>     DoubleType, TimestampType
>> )
>> from pyspark import pandas as ps
>>
>> os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
>>
>> number_cores = int(multiprocessing.cpu_count())
>>
>> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  #
>> e.g. 4015976448
>> memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74
>>
>>
>> def get_spark_session(app_name: str, conf: SparkConf):
>>     conf.setMaster("local[{}]".format(number_cores))
>>     conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
>>         "spark.sql.repl.eagerEval.enabled", "True"
>>     ).set("spark.sql.adaptive.enabled", "True").set(
>>         "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
>>     ).set(
>>         "spark.sql.repl.eagerEval.maxNumRows", "10000"
>>     )
>>
>>     return
>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>>
>>
>> spark = get_spark_session("My super app", SparkConf())
>> sc = SparkContext.getOrCreate()
>> sc.setLogLevel("ERROR")
>>
>> def generate_ip():
>>     return ".".join(str(random.randint(0, 255)) for _ in range(4))
>>
>> def generate_timestamp():
>>     return pd.Timestamp(
>>         year=random.randint(2021, 2023),
>>         month=random.randint(1, 12),
>>         day=random.randint(1, 28),
>>         hour=random.randint(0, 23),
>>         minute=random.randint(0, 59),
>>         second=random.randint(0, 59)
>>     )
>>
>> def random_gbps():
>>     return random.uniform(0, 10)
>>
>> # Number of rows
>> n = 20
>>
>> data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(),
>> "date_time": generate_timestamp()} for _ in range(n)]
>> df = spark.createDataFrame(pd.DataFrame(data))
>> df.show()
>>
>> agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))
>>
>> windowRank = Window.orderBy(F.col("total_gbps").desc())
>> agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))
>>
>> # Calculate the 80th percentile value
>> percentile_80 = agg_df.agg(F.expr("percentile_approx(total_gbps,
>> 0.8)").alias("percentile_80")).collect()[0]["percentile_80"]
>>
>> # Filter the DataFrame based on the condition
>> filtered_df = df.filter(df["gbps"] >= percentile_80)
>>
>> # Show the filtered DataFrame
>> print(f"Filtered DataFrame")
>> filtered_df.show()
>>
>> print(f"Total rows in data frame = {df.count()}")
>> print(f"Result satisfying 80% percentile = {filtered_df.count()}")
>>
>> And this is the results
>>
>> +---------------+------------------+-------------------+
>> |   incoming_ips|              gbps|          date_time|
>> +---------------+------------------+-------------------+
>> |129.189.130.141|2.6517421918102335|2021-09-06 08:29:25|
>> | 215.177.39.239|1.8210013026429361|2023-10-10 17:00:13|
>> |  78.202.71.184| 8.060958370556456|2022-02-22 04:25:03|
>> |219.100.198.137|0.3449002002472945|2023-09-28 01:39:44|
>> |234.234.156.107|2.6187481766507013|2022-11-16 11:33:41|
>> |  6.223.135.194|0.3510752223686242|2022-01-24 04:13:53|
>> | 147.118.171.59|6.4071750880652765|2023-10-08 16:49:10|
>> |  75.41.101.165|2.1484984272041685|2022-07-13 21:02:58|
>> |  163.26.238.22|   9.8999646499433|2023-01-12 17:54:44|
>> | 184.145.98.231|1.8875849709728088|2022-09-18 19:53:58|
>> | 125.77.236.177|  1.17126350326476|2021-08-19 18:48:42|
>> |  34.103.211.39|  9.51081430594299|2023-02-05 18:39:23|
>> |   117.37.42.91| 1.122437784309721|2021-03-23 17:27:27|
>> | 108.115.42.171| 8.165187506266607|2023-07-26 03:57:50|
>> | 98.105.153.129| 9.284242190156004|2023-10-10 22:36:47|
>> | 145.35.252.142| 9.787384042283957|2022-08-26 00:53:27|
>> |  18.76.138.108| 6.939770760444909|2022-04-01 01:18:27|
>> |    31.33.71.26| 4.820947188427366|2021-06-10 22:02:51|
>> |    135.22.8.38| 9.587849542001745|2021-09-21 15:11:59|
>> |104.231.110.207| 9.045897927807715|2023-06-28 06:01:00|
>> +---------------+------------------+-------------------+
>>
>> Filtered DataFrame
>> +--------------+-----------------+-------------------+
>> |  incoming_ips|             gbps|          date_time|
>> +--------------+-----------------+-------------------+
>> | 163.26.238.22|  9.8999646499433|2023-01-12 17:54:44|
>> | 34.103.211.39| 9.51081430594299|2023-02-05 18:39:23|
>> |98.105.153.129|9.284242190156004|2023-10-10 22:36:47|
>> |145.35.252.142|9.787384042283957|2022-08-26 00:53:27|
>> |   135.22.8.38|9.587849542001745|2021-09-21 15:11:59|
>> +--------------+-----------------+-------------------+
>>
>> Total rows in data frame = 20
>> Result satisfying 80% percentile = 5
>>
>> Cheers
>> Mich Talebzadeh,
>> Distinguished Technologist, Solutions Architect & Engineer
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 16 Sept 2023 at 11:46, Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> Happy Saturday coding 😁
>>>
>>>
>>> Mich Talebzadeh,
>>> Distinguished Technologist, Solutions Architect & Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sat, 16 Sept 2023 at 11:30, Bjørn Jørgensen <bjornjorgen...@gmail.com>
>>> wrote:
>>>
>>>> ah.. yes that's right.
>>>> I did have to use some time on this one and I was having some issues
>>>> with the code.
>>>> I restart the notebook kernel now and rerun it and I get the same
>>>> result.
>>>>
>>>> lør. 16. sep. 2023 kl. 11:41 skrev Mich Talebzadeh <
>>>> mich.talebza...@gmail.com>:
>>>>
>>>>> Splendid code. A minor error glancing at your code.
>>>>>
>>>>> print(df.count())
>>>>> print(result_df.count())
>>>>>
>>>>>
>>>>> You have not defined result_df. I gather you meant "result"?
>>>>>
>>>>>
>>>>> print(result.count())
>>>>>
>>>>>
>>>>> That should fix it 🤔
>>>>>
>>>>> HTH
>>>>>
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Distinguished Technologist, Solutions Architect & Engineer
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, 16 Sept 2023 at 06:00, Bjørn Jørgensen <
>>>>> bjornjorgen...@gmail.com> wrote:
>>>>>
>>>>>> Something like this?
>>>>>>
>>>>>>
>>>>>> # Standard library imports
>>>>>> import json
>>>>>> import multiprocessing
>>>>>> import os
>>>>>> import re
>>>>>> import sys
>>>>>> import random
>>>>>>
>>>>>> # Third-party imports
>>>>>> import numpy as np
>>>>>> import pandas as pd
>>>>>> import pyarrow
>>>>>>
>>>>>> # Pyspark imports
>>>>>> from pyspark import SparkConf, SparkContext
>>>>>> from pyspark.sql import SparkSession, functions as F, Window
>>>>>> from pyspark.sql.functions import (
>>>>>>     col, concat, concat_ws, expr, lit, trim, udf
>>>>>> )
>>>>>> from pyspark.sql.types import (
>>>>>>     IntegerType, StringType, StructField, StructType,
>>>>>>     DoubleType, TimestampType
>>>>>> )
>>>>>> from pyspark import pandas as ps
>>>>>>
>>>>>> os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
>>>>>>
>>>>>> number_cores = int(multiprocessing.cpu_count())
>>>>>>
>>>>>> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  # 
>>>>>> e.g. 4015976448
>>>>>> memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74
>>>>>>
>>>>>>
>>>>>> def get_spark_session(app_name: str, conf: SparkConf):
>>>>>>     conf.setMaster("local[{}]".format(number_cores))
>>>>>>     conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
>>>>>>         "spark.sql.repl.eagerEval.enabled", "True"
>>>>>>     ).set("spark.sql.adaptive.enabled", "True").set(
>>>>>>         "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
>>>>>>     ).set(
>>>>>>         "spark.sql.repl.eagerEval.maxNumRows", "10000"
>>>>>>     ).set(
>>>>>>         "sc.setLogLevel", "ERROR"
>>>>>>     )
>>>>>>
>>>>>>     return 
>>>>>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>>>>>>
>>>>>>
>>>>>> spark = get_spark_session("My super app", SparkConf())
>>>>>> spark.sparkContext.setLogLevel("ERROR")
>>>>>>
>>>>>>
>>>>>>
>>>>>> def generate_ip():
>>>>>>     return ".".join(str(random.randint(0, 255)) for _ in range(4))
>>>>>>
>>>>>> def generate_timestamp():
>>>>>>     return pd.Timestamp(
>>>>>>         year=random.randint(2021, 2023),
>>>>>>         month=random.randint(1, 12),
>>>>>>         day=random.randint(1, 28),
>>>>>>         hour=random.randint(0, 23),
>>>>>>         minute=random.randint(0, 59),
>>>>>>         second=random.randint(0, 59)
>>>>>>     )
>>>>>>
>>>>>> def random_gbps():
>>>>>>     return random.uniform(0, 10)
>>>>>>
>>>>>> # Number of rows
>>>>>> n = 20
>>>>>>
>>>>>> data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), 
>>>>>> "date_time": generate_timestamp()} for _ in range(n)]
>>>>>> df = spark.createDataFrame(pd.DataFrame(data))
>>>>>> df.show()
>>>>>>
>>>>>> agg_df = 
>>>>>> df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))
>>>>>>
>>>>>> windowRank = Window.orderBy(F.col("total_gbps").desc())
>>>>>> agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))
>>>>>>
>>>>>> top_80_ips = agg_df.filter(F.col("rank") <= 0.80)
>>>>>> result = df.join(top_80_ips, on="incoming_ips", 
>>>>>> how="inner").select("incoming_ips", "gbps", "date_time")
>>>>>> result.show()
>>>>>>
>>>>>> print(df.count())
>>>>>> print(result_df.count())
>>>>>>
>>>>>>
>>>>>> +---------------+-------------------+-------------------+
>>>>>> |   incoming_ips|               gbps|          date_time|
>>>>>> +---------------+-------------------+-------------------+
>>>>>> |   66.186.8.130|  5.074283124722104|2022-03-12 05:09:16|
>>>>>> |  155.45.76.235| 0.6736194760917324|2021-06-19 03:36:28|
>>>>>> | 237.51.137.200|0.43334812775057685|2022-04-27 08:08:47|
>>>>>> |    78.4.48.171| 7.5675453578753435|2022-08-21 18:55:48|
>>>>>> |  241.84.163.17| 3.5681655964070815|2021-01-24 20:39:50|
>>>>>> |130.255.202.138|  6.066112278135983|2023-07-07 22:26:15|
>>>>>> | 198.33.206.140| 1.9147905257021836|2023-03-01 04:44:14|
>>>>>> |  84.183.253.20|  7.707176860385722|2021-08-26 23:24:31|
>>>>>> |218.163.165.232|  9.458673015973213|2021-02-22 12:13:15|
>>>>>> |   62.57.20.153| 1.5764916247359229|2021-11-06 12:41:59|
>>>>>> | 245.24.168.152|0.07452805411698016|2021-06-04 16:14:36|
>>>>>> | 98.171.202.249|  3.546118349483626|2022-07-05 10:55:26|
>>>>>> |   210.5.246.85|0.02430730260109759|2022-04-08 17:26:04|
>>>>>> | 13.236.170.177|   2.41361938344535|2021-08-11 02:19:06|
>>>>>> |180.140.248.193| 0.9512956363005021|2021-06-27 18:16:58|
>>>>>> |  26.140.88.127|   7.51335778127692|2023-06-02 14:13:30|
>>>>>> |  7.118.207.252|  6.450499049816286|2022-12-11 06:36:20|
>>>>>> |    11.8.10.136|  8.750329246667354|2023-02-03 05:33:16|
>>>>>> |  232.140.56.86|  4.289740988237201|2023-02-22 20:10:09|
>>>>>> |   68.117.9.255|  5.384340363304169|2022-12-03 09:55:26|
>>>>>> +---------------+-------------------+-------------------+
>>>>>>
>>>>>> +---------------+------------------+-------------------+
>>>>>> |   incoming_ips|              gbps|          date_time|
>>>>>> +---------------+------------------+-------------------+
>>>>>> |   66.186.8.130| 5.074283124722104|2022-03-12 05:09:16|
>>>>>> |  241.84.163.17|3.5681655964070815|2021-01-24 20:39:50|
>>>>>> |    78.4.48.171|7.5675453578753435|2022-08-21 18:55:48|
>>>>>> |130.255.202.138| 6.066112278135983|2023-07-07 22:26:15|
>>>>>> | 198.33.206.140|1.9147905257021836|2023-03-01 04:44:14|
>>>>>> |  84.183.253.20| 7.707176860385722|2021-08-26 23:24:31|
>>>>>> |218.163.165.232| 9.458673015973213|2021-02-22 12:13:15|
>>>>>> |   62.57.20.153|1.5764916247359229|2021-11-06 12:41:59|
>>>>>> | 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26|
>>>>>> |180.140.248.193|0.9512956363005021|2021-06-27 18:16:58|
>>>>>> | 13.236.170.177|  2.41361938344535|2021-08-11 02:19:06|
>>>>>> |  26.140.88.127|  7.51335778127692|2023-06-02 14:13:30|
>>>>>> |  7.118.207.252| 6.450499049816286|2022-12-11 06:36:20|
>>>>>> |    11.8.10.136| 8.750329246667354|2023-02-03 05:33:16|
>>>>>> |  232.140.56.86| 4.289740988237201|2023-02-22 20:10:09|
>>>>>> |   68.117.9.255| 5.384340363304169|2022-12-03 09:55:26|
>>>>>> +---------------+------------------+-------------------+
>>>>>>
>>>>>> 20
>>>>>> 16
>>>>>>
>>>>>>
>>>>>>
>>>>>> fre. 15. sep. 2023 kl. 20:14 skrev ashok34...@yahoo.com.INVALID
>>>>>> <ashok34...@yahoo.com.invalid>:
>>>>>>
>>>>>>> Hi team,
>>>>>>>
>>>>>>> I am using PySpark 3.4
>>>>>>>
>>>>>>> I have a table of million rows that has few columns. among them
>>>>>>> incoming ips  and what is known as gbps (Gigabytes per second) and
>>>>>>> date and time of  incoming ip.
>>>>>>>
>>>>>>> I want to filter out 20% of low active ips and work on the remainder
>>>>>>> of data. How can I do thiis in PySpark?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Bjørn Jørgensen
>>>>>> Vestre Aspehaug 4, 6010 Ålesund
>>>>>> Norge
>>>>>>
>>>>>> +47 480 94 297
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Bjørn Jørgensen
>>>> Vestre Aspehaug 4, 6010 Ålesund
>>>> Norge
>>>>
>>>> +47 480 94 297
>>>>
>>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to