Something like this?
# Standard library imports import json import multiprocessing import os import re import sys import random # Third-party imports import numpy as np import pandas as pd import pyarrow # Pyspark imports from pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession, functions as F, Window from pyspark.sql.functions import ( col, concat, concat_ws, expr, lit, trim, udf ) from pyspark.sql.types import ( IntegerType, StringType, StructField, StructType, DoubleType, TimestampType ) from pyspark import pandas as ps os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" number_cores = int(multiprocessing.cpu_count()) mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") # e.g. 4015976448 memory_gb = int(mem_bytes / (1024.0**3)) # e.g. 3.74 def get_spark_session(app_name: str, conf: SparkConf): conf.setMaster("local[{}]".format(number_cores)) conf.set("spark.driver.memory", "{}g".format(memory_gb)).set( "spark.sql.repl.eagerEval.enabled", "True" ).set("spark.sql.adaptive.enabled", "True").set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" ).set( "spark.sql.repl.eagerEval.maxNumRows", "10000" ).set( "sc.setLogLevel", "ERROR" ) return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate() spark = get_spark_session("My super app", SparkConf()) spark.sparkContext.setLogLevel("ERROR") def generate_ip(): return ".".join(str(random.randint(0, 255)) for _ in range(4)) def generate_timestamp(): return pd.Timestamp( year=random.randint(2021, 2023), month=random.randint(1, 12), day=random.randint(1, 28), hour=random.randint(0, 23), minute=random.randint(0, 59), second=random.randint(0, 59) ) def random_gbps(): return random.uniform(0, 10) # Number of rows n = 20 data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), "date_time": generate_timestamp()} for _ in range(n)] df = spark.createDataFrame(pd.DataFrame(data)) df.show() agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps")) windowRank = Window.orderBy(F.col("total_gbps").desc()) agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank)) top_80_ips = agg_df.filter(F.col("rank") <= 0.80) result = df.join(top_80_ips, on="incoming_ips", how="inner").select("incoming_ips", "gbps", "date_time") result.show() print(df.count()) print(result_df.count()) +---------------+-------------------+-------------------+ | incoming_ips| gbps| date_time| +---------------+-------------------+-------------------+ | 66.186.8.130| 5.074283124722104|2022-03-12 05:09:16| | 155.45.76.235| 0.6736194760917324|2021-06-19 03:36:28| | 237.51.137.200|0.43334812775057685|2022-04-27 08:08:47| | 78.4.48.171| 7.5675453578753435|2022-08-21 18:55:48| | 241.84.163.17| 3.5681655964070815|2021-01-24 20:39:50| |130.255.202.138| 6.066112278135983|2023-07-07 22:26:15| | 198.33.206.140| 1.9147905257021836|2023-03-01 04:44:14| | 84.183.253.20| 7.707176860385722|2021-08-26 23:24:31| |218.163.165.232| 9.458673015973213|2021-02-22 12:13:15| | 62.57.20.153| 1.5764916247359229|2021-11-06 12:41:59| | 245.24.168.152|0.07452805411698016|2021-06-04 16:14:36| | 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26| | 210.5.246.85|0.02430730260109759|2022-04-08 17:26:04| | 13.236.170.177| 2.41361938344535|2021-08-11 02:19:06| |180.140.248.193| 0.9512956363005021|2021-06-27 18:16:58| | 26.140.88.127| 7.51335778127692|2023-06-02 14:13:30| | 7.118.207.252| 6.450499049816286|2022-12-11 06:36:20| | 11.8.10.136| 8.750329246667354|2023-02-03 05:33:16| | 232.140.56.86| 4.289740988237201|2023-02-22 20:10:09| | 68.117.9.255| 5.384340363304169|2022-12-03 09:55:26| +---------------+-------------------+-------------------+ +---------------+------------------+-------------------+ | incoming_ips| gbps| date_time| +---------------+------------------+-------------------+ | 66.186.8.130| 5.074283124722104|2022-03-12 05:09:16| | 241.84.163.17|3.5681655964070815|2021-01-24 20:39:50| | 78.4.48.171|7.5675453578753435|2022-08-21 18:55:48| |130.255.202.138| 6.066112278135983|2023-07-07 22:26:15| | 198.33.206.140|1.9147905257021836|2023-03-01 04:44:14| | 84.183.253.20| 7.707176860385722|2021-08-26 23:24:31| |218.163.165.232| 9.458673015973213|2021-02-22 12:13:15| | 62.57.20.153|1.5764916247359229|2021-11-06 12:41:59| | 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26| |180.140.248.193|0.9512956363005021|2021-06-27 18:16:58| | 13.236.170.177| 2.41361938344535|2021-08-11 02:19:06| | 26.140.88.127| 7.51335778127692|2023-06-02 14:13:30| | 7.118.207.252| 6.450499049816286|2022-12-11 06:36:20| | 11.8.10.136| 8.750329246667354|2023-02-03 05:33:16| | 232.140.56.86| 4.289740988237201|2023-02-22 20:10:09| | 68.117.9.255| 5.384340363304169|2022-12-03 09:55:26| +---------------+------------------+-------------------+ 20 16 fre. 15. sep. 2023 kl. 20:14 skrev ashok34...@yahoo.com.INVALID <ashok34...@yahoo.com.invalid>: > Hi team, > > I am using PySpark 3.4 > > I have a table of million rows that has few columns. among them incoming > ips and what is known as gbps (Gigabytes per second) and date and time > of incoming ip. > > I want to filter out 20% of low active ips and work on the remainder of > data. How can I do thiis in PySpark? > > Thanks > > > > > -- Bjørn Jørgensen Vestre Aspehaug 4, 6010 Ålesund Norge +47 480 94 297