Thank you Bjorn and Mich. 
Appreciated
Best
    On Saturday, 16 September 2023 at 16:50:04 BST, Mich Talebzadeh 
<mich.talebza...@gmail.com> wrote:  
 
 Hi Bjorn,
I thought that one is better off using percentile_approx as it seems to be the 
recommended approach for computing percentiles and can simplify the code. I 
have modified your code to use percentile_approx rather than manually computing 
it. It would be interesting to hear ideas on this.
Here is the code:
# Standard library importsimport jsonimport multiprocessingimport osimport 
reimport sysimport random
# Third-party importsimport numpy as npimport pandas as pdimport pyarrow
# Pyspark importsfrom pyspark import SparkConf, SparkContextfrom pyspark.sql 
import SparkSession, functions as F, Windowfrom pyspark.sql.functions import (  
  col, concat, concat_ws, expr, lit, trim, udf)from pyspark.sql.types import (  
  IntegerType, StringType, StructField, StructType,    DoubleType, 
TimestampType)from pyspark import pandas as ps
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
number_cores = int(multiprocessing.cpu_count())
mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  # e.g. 
4015976448memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74

def get_spark_session(app_name: str, conf: SparkConf):    
conf.setMaster("local[{}]".format(number_cores))    
conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(        
"spark.sql.repl.eagerEval.enabled", "True"    
).set("spark.sql.adaptive.enabled", "True").set(        "spark.serializer", 
"org.apache.spark.serializer.KryoSerializer"    ).set(        
"spark.sql.repl.eagerEval.maxNumRows", "10000"    )
    return 
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()

spark = get_spark_session("My super app", SparkConf())sc = 
SparkContext.getOrCreate()sc.setLogLevel("ERROR")
def generate_ip():    return ".".join(str(random.randint(0, 255)) for _ in 
range(4))
def generate_timestamp():    return pd.Timestamp(        
year=random.randint(2021, 2023),        month=random.randint(1, 12),        
day=random.randint(1, 28),        hour=random.randint(0, 23),        
minute=random.randint(0, 59),        second=random.randint(0, 59)    )
def random_gbps():    return random.uniform(0, 10)
# Number of rowsn = 20
data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), "date_time": 
generate_timestamp()} for _ in range(n)]df = 
spark.createDataFrame(pd.DataFrame(data))df.show()
agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))
windowRank = Window.orderBy(F.col("total_gbps").desc())agg_df = 
agg_df.withColumn("rank", F.percent_rank().over(windowRank))
# Calculate the 80th percentile valuepercentile_80 = 
agg_df.agg(F.expr("percentile_approx(total_gbps, 
0.8)").alias("percentile_80")).collect()[0]["percentile_80"]
# Filter the DataFrame based on the conditionfiltered_df = df.filter(df["gbps"] 
>= percentile_80)
# Show the filtered DataFrameprint(f"Filtered DataFrame")filtered_df.show()
print(f"Total rows in data frame = {df.count()}")print(f"Result satisfying 80% 
percentile = {filtered_df.count()}")
And this is the results

+---------------+------------------+-------------------+|   incoming_ips|       
       gbps|          
date_time|+---------------+------------------+-------------------+|129.189.130.141|2.6517421918102335|2021-09-06
 08:29:25|| 215.177.39.239|1.8210013026429361|2023-10-10 17:00:13||  
78.202.71.184| 8.060958370556456|2022-02-22 
04:25:03||219.100.198.137|0.3449002002472945|2023-09-28 
01:39:44||234.234.156.107|2.6187481766507013|2022-11-16 11:33:41||  
6.223.135.194|0.3510752223686242|2022-01-24 04:13:53|| 
147.118.171.59|6.4071750880652765|2023-10-08 16:49:10||  
75.41.101.165|2.1484984272041685|2022-07-13 21:02:58||  163.26.238.22|   
9.8999646499433|2023-01-12 17:54:44|| 
184.145.98.231|1.8875849709728088|2022-09-18 19:53:58|| 125.77.236.177|  
1.17126350326476|2021-08-19 18:48:42||  34.103.211.39|  
9.51081430594299|2023-02-05 18:39:23||   117.37.42.91| 
1.122437784309721|2021-03-23 17:27:27|| 108.115.42.171| 
8.165187506266607|2023-07-26 03:57:50|| 98.105.153.129| 
9.284242190156004|2023-10-10 22:36:47|| 145.35.252.142| 
9.787384042283957|2022-08-26 00:53:27||  18.76.138.108| 
6.939770760444909|2022-04-01 01:18:27||    31.33.71.26| 
4.820947188427366|2021-06-10 22:02:51||    135.22.8.38| 
9.587849542001745|2021-09-21 15:11:59||104.231.110.207| 
9.045897927807715|2023-06-28 
06:01:00|+---------------+------------------+-------------------+
Filtered DataFrame+--------------+-----------------+-------------------+|  
incoming_ips|             gbps|          
date_time|+--------------+-----------------+-------------------+| 
163.26.238.22|  9.8999646499433|2023-01-12 17:54:44|| 34.103.211.39| 
9.51081430594299|2023-02-05 
18:39:23||98.105.153.129|9.284242190156004|2023-10-10 
22:36:47||145.35.252.142|9.787384042283957|2022-08-26 00:53:27||   
135.22.8.38|9.587849542001745|2021-09-21 
15:11:59|+--------------+-----------------+-------------------+
Total rows in data frame = 20Result satisfying 80% percentile = 5
CheersMich Talebzadeh,Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom



   view my Linkedin profile




 https://en.everybodywiki.com/Mich_Talebzadeh

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destructionof data or any other property which may arise from relying 
on this email's technical content is explicitly disclaimed.The author will in 
no case be liable for any monetary damages arising from suchloss, damage or 
destruction. 

 


On Sat, 16 Sept 2023 at 11:46, Mich Talebzadeh <mich.talebza...@gmail.com> 
wrote:

Happy Saturday coding 😁

Mich Talebzadeh,Distinguished Technologist, Solutions Architect & Engineer
LondonUnited Kingdom



   view my Linkedin profile




 https://en.everybodywiki.com/Mich_Talebzadeh

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destructionof data or any other property which may arise from relying 
on this email's technical content is explicitly disclaimed.The author will in 
no case be liable for any monetary damages arising from suchloss, damage or 
destruction. 

 


On Sat, 16 Sept 2023 at 11:30, Bjørn Jørgensen <bjornjorgen...@gmail.com> wrote:

ah.. yes that's right. I did have to use some time on this one and I was having 
some issues with the code. I restart the notebook kernel now and rerun it and I 
get the same result.  
lør. 16. sep. 2023 kl. 11:41 skrev Mich Talebzadeh <mich.talebza...@gmail.com>:

Splendid code. A minor error glancing at your code. 
print(df.count())
print(result_df.count())
You have not defined result_df. I gather you meant "result"?

print(result.count())
That should fix it 🤔HTH
Mich Talebzadeh,Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom



   view my Linkedin profile




 https://en.everybodywiki.com/Mich_Talebzadeh

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destructionof data or any other property which may arise from relying 
on this email's technical content is explicitly disclaimed.The author will in 
no case be liable for any monetary damages arising from suchloss, damage or 
destruction. 

 


On Sat, 16 Sept 2023 at 06:00, Bjørn Jørgensen <bjornjorgen...@gmail.com> wrote:

Something like this? 
# Standard library imports
import json
import multiprocessing
import os
import re
import sys
import random

# Third-party imports
import numpy as np
import pandas as pd
import pyarrow

# Pyspark imports
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.functions import (
    col, concat, concat_ws, expr, lit, trim, udf
)
from pyspark.sql.types import (
    IntegerType, StringType, StructField, StructType,
    DoubleType, TimestampType
)
from pyspark import pandas as ps

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

number_cores = int(multiprocessing.cpu_count())

mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  # e.g. 
4015976448
memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74


def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("local[{}]".format(number_cores))
    conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
        "spark.sql.repl.eagerEval.enabled", "True"
    ).set("spark.sql.adaptive.enabled", "True").set(
        "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
    ).set(
        "spark.sql.repl.eagerEval.maxNumRows", "10000"
    ).set(
        "sc.setLogLevel", "ERROR"
    )

    return 
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()


spark = get_spark_session("My super app", SparkConf())
spark.sparkContext.setLogLevel("ERROR")



def generate_ip():
    return ".".join(str(random.randint(0, 255)) for _ in range(4))

def generate_timestamp():
    return pd.Timestamp(
        year=random.randint(2021, 2023),
        month=random.randint(1, 12),
        day=random.randint(1, 28),
        hour=random.randint(0, 23),
        minute=random.randint(0, 59),
        second=random.randint(0, 59)
    )

def random_gbps():
    return random.uniform(0, 10)

# Number of rows
n = 20

data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), "date_time": 
generate_timestamp()} for _ in range(n)]
df = spark.createDataFrame(pd.DataFrame(data))
df.show()

agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))

windowRank = Window.orderBy(F.col("total_gbps").desc())
agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))

top_80_ips = agg_df.filter(F.col("rank") <= 0.80)
result = df.join(top_80_ips, on="incoming_ips", 
how="inner").select("incoming_ips", "gbps", "date_time")
result.show()

print(df.count())
print(result_df.count())

+---------------+-------------------+-------------------+
|   incoming_ips|               gbps|          date_time|
+---------------+-------------------+-------------------+
|   66.186.8.130|  5.074283124722104|2022-03-12 05:09:16|
|  155.45.76.235| 0.6736194760917324|2021-06-19 03:36:28|
| 237.51.137.200|0.43334812775057685|2022-04-27 08:08:47|
|    78.4.48.171| 7.5675453578753435|2022-08-21 18:55:48|
|  241.84.163.17| 3.5681655964070815|2021-01-24 20:39:50|
|130.255.202.138|  6.066112278135983|2023-07-07 22:26:15|
| 198.33.206.140| 1.9147905257021836|2023-03-01 04:44:14|
|  84.183.253.20|  7.707176860385722|2021-08-26 23:24:31|
|218.163.165.232|  9.458673015973213|2021-02-22 12:13:15|
|   62.57.20.153| 1.5764916247359229|2021-11-06 12:41:59|
| 245.24.168.152|0.07452805411698016|2021-06-04 16:14:36|
| 98.171.202.249|  3.546118349483626|2022-07-05 10:55:26|
|   210.5.246.85|0.02430730260109759|2022-04-08 17:26:04|
| 13.236.170.177|   2.41361938344535|2021-08-11 02:19:06|
|180.140.248.193| 0.9512956363005021|2021-06-27 18:16:58|
|  26.140.88.127|   7.51335778127692|2023-06-02 14:13:30|
|  7.118.207.252|  6.450499049816286|2022-12-11 06:36:20|
|    11.8.10.136|  8.750329246667354|2023-02-03 05:33:16|
|  232.140.56.86|  4.289740988237201|2023-02-22 20:10:09|
|   68.117.9.255|  5.384340363304169|2022-12-03 09:55:26|
+---------------+-------------------+-------------------+

+---------------+------------------+-------------------+
|   incoming_ips|              gbps|          date_time|
+---------------+------------------+-------------------+
|   66.186.8.130| 5.074283124722104|2022-03-12 05:09:16|
|  241.84.163.17|3.5681655964070815|2021-01-24 20:39:50|
|    78.4.48.171|7.5675453578753435|2022-08-21 18:55:48|
|130.255.202.138| 6.066112278135983|2023-07-07 22:26:15|
| 198.33.206.140|1.9147905257021836|2023-03-01 04:44:14|
|  84.183.253.20| 7.707176860385722|2021-08-26 23:24:31|
|218.163.165.232| 9.458673015973213|2021-02-22 12:13:15|
|   62.57.20.153|1.5764916247359229|2021-11-06 12:41:59|
| 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26|
|180.140.248.193|0.9512956363005021|2021-06-27 18:16:58|
| 13.236.170.177|  2.41361938344535|2021-08-11 02:19:06|
|  26.140.88.127|  7.51335778127692|2023-06-02 14:13:30|
|  7.118.207.252| 6.450499049816286|2022-12-11 06:36:20|
|    11.8.10.136| 8.750329246667354|2023-02-03 05:33:16|
|  232.140.56.86| 4.289740988237201|2023-02-22 20:10:09|
|   68.117.9.255| 5.384340363304169|2022-12-03 09:55:26|
+---------------+------------------+-------------------+

20
16

fre. 15. sep. 2023 kl. 20:14 skrev ashok34...@yahoo.com.INVALID 
<ashok34...@yahoo.com.invalid>:

Hi team,
I am using PySpark 3.4
I have a table of million rows that has few columns. among them incoming ips  
and what is known as gbps (Gigabytes per second) and date and time of  incoming 
ip.
I want to filter out 20% of low active ips and work on the remainder of data. 
How can I do thiis in PySpark?
Thanks






-- 
Bjørn Jørgensen 
Vestre Aspehaug 4, 6010 Ålesund 
Norge

+47 480 94 297



-- 
Bjørn Jørgensen 
Vestre Aspehaug 4, 6010 Ålesund 
Norge

+47 480 94 297

  

Reply via email to