Something like this?

# Standard library imports
import json
import multiprocessing
import os
import re
import sys
import random

# Third-party imports
import numpy as np
import pandas as pd
import pyarrow

# Pyspark imports
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.functions import (
    col, concat, concat_ws, expr, lit, trim, udf
)
from pyspark.sql.types import (
    IntegerType, StringType, StructField, StructType,
    DoubleType, TimestampType
)
from pyspark import pandas as ps

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

number_cores = int(multiprocessing.cpu_count())

mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")
# e.g. 4015976448
memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74


def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("local[{}]".format(number_cores))
    conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
        "spark.sql.repl.eagerEval.enabled", "True"
    ).set("spark.sql.adaptive.enabled", "True").set(
        "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
    ).set(
        "spark.sql.repl.eagerEval.maxNumRows", "10000"
    ).set(
        "sc.setLogLevel", "ERROR"
    )

    return 
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()


spark = get_spark_session("My super app", SparkConf())
spark.sparkContext.setLogLevel("ERROR")



def generate_ip():
    return ".".join(str(random.randint(0, 255)) for _ in range(4))

def generate_timestamp():
    return pd.Timestamp(
        year=random.randint(2021, 2023),
        month=random.randint(1, 12),
        day=random.randint(1, 28),
        hour=random.randint(0, 23),
        minute=random.randint(0, 59),
        second=random.randint(0, 59)
    )

def random_gbps():
    return random.uniform(0, 10)

# Number of rows
n = 20

data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(),
"date_time": generate_timestamp()} for _ in range(n)]
df = spark.createDataFrame(pd.DataFrame(data))
df.show()

agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))

windowRank = Window.orderBy(F.col("total_gbps").desc())
agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))

top_80_ips = agg_df.filter(F.col("rank") <= 0.80)
result = df.join(top_80_ips, on="incoming_ips",
how="inner").select("incoming_ips", "gbps", "date_time")
result.show()

print(df.count())
print(result_df.count())


+---------------+-------------------+-------------------+
|   incoming_ips|               gbps|          date_time|
+---------------+-------------------+-------------------+
|   66.186.8.130|  5.074283124722104|2022-03-12 05:09:16|
|  155.45.76.235| 0.6736194760917324|2021-06-19 03:36:28|
| 237.51.137.200|0.43334812775057685|2022-04-27 08:08:47|
|    78.4.48.171| 7.5675453578753435|2022-08-21 18:55:48|
|  241.84.163.17| 3.5681655964070815|2021-01-24 20:39:50|
|130.255.202.138|  6.066112278135983|2023-07-07 22:26:15|
| 198.33.206.140| 1.9147905257021836|2023-03-01 04:44:14|
|  84.183.253.20|  7.707176860385722|2021-08-26 23:24:31|
|218.163.165.232|  9.458673015973213|2021-02-22 12:13:15|
|   62.57.20.153| 1.5764916247359229|2021-11-06 12:41:59|
| 245.24.168.152|0.07452805411698016|2021-06-04 16:14:36|
| 98.171.202.249|  3.546118349483626|2022-07-05 10:55:26|
|   210.5.246.85|0.02430730260109759|2022-04-08 17:26:04|
| 13.236.170.177|   2.41361938344535|2021-08-11 02:19:06|
|180.140.248.193| 0.9512956363005021|2021-06-27 18:16:58|
|  26.140.88.127|   7.51335778127692|2023-06-02 14:13:30|
|  7.118.207.252|  6.450499049816286|2022-12-11 06:36:20|
|    11.8.10.136|  8.750329246667354|2023-02-03 05:33:16|
|  232.140.56.86|  4.289740988237201|2023-02-22 20:10:09|
|   68.117.9.255|  5.384340363304169|2022-12-03 09:55:26|
+---------------+-------------------+-------------------+

+---------------+------------------+-------------------+
|   incoming_ips|              gbps|          date_time|
+---------------+------------------+-------------------+
|   66.186.8.130| 5.074283124722104|2022-03-12 05:09:16|
|  241.84.163.17|3.5681655964070815|2021-01-24 20:39:50|
|    78.4.48.171|7.5675453578753435|2022-08-21 18:55:48|
|130.255.202.138| 6.066112278135983|2023-07-07 22:26:15|
| 198.33.206.140|1.9147905257021836|2023-03-01 04:44:14|
|  84.183.253.20| 7.707176860385722|2021-08-26 23:24:31|
|218.163.165.232| 9.458673015973213|2021-02-22 12:13:15|
|   62.57.20.153|1.5764916247359229|2021-11-06 12:41:59|
| 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26|
|180.140.248.193|0.9512956363005021|2021-06-27 18:16:58|
| 13.236.170.177|  2.41361938344535|2021-08-11 02:19:06|
|  26.140.88.127|  7.51335778127692|2023-06-02 14:13:30|
|  7.118.207.252| 6.450499049816286|2022-12-11 06:36:20|
|    11.8.10.136| 8.750329246667354|2023-02-03 05:33:16|
|  232.140.56.86| 4.289740988237201|2023-02-22 20:10:09|
|   68.117.9.255| 5.384340363304169|2022-12-03 09:55:26|
+---------------+------------------+-------------------+

20
16



fre. 15. sep. 2023 kl. 20:14 skrev ashok34...@yahoo.com.INVALID
<ashok34...@yahoo.com.invalid>:

> Hi team,
>
> I am using PySpark 3.4
>
> I have a table of million rows that has few columns. among them incoming
> ips  and what is known as gbps (Gigabytes per second) and date and time
> of  incoming ip.
>
> I want to filter out 20% of low active ips and work on the remainder of
> data. How can I do thiis in PySpark?
>
> Thanks
>
>
>
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to