ah.. yes that's right.
I did have to use some time on this one and I was having some issues with
the code.
I restart the notebook kernel now and rerun it and I get the same result.

lør. 16. sep. 2023 kl. 11:41 skrev Mich Talebzadeh <
mich.talebza...@gmail.com>:

> Splendid code. A minor error glancing at your code.
>
> print(df.count())
> print(result_df.count())
>
>
> You have not defined result_df. I gather you meant "result"?
>
>
> print(result.count())
>
>
> That should fix it 🤔
>
> HTH
>
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 16 Sept 2023 at 06:00, Bjørn Jørgensen <bjornjorgen...@gmail.com>
> wrote:
>
>> Something like this?
>>
>>
>> # Standard library imports
>> import json
>> import multiprocessing
>> import os
>> import re
>> import sys
>> import random
>>
>> # Third-party imports
>> import numpy as np
>> import pandas as pd
>> import pyarrow
>>
>> # Pyspark imports
>> from pyspark import SparkConf, SparkContext
>> from pyspark.sql import SparkSession, functions as F, Window
>> from pyspark.sql.functions import (
>>     col, concat, concat_ws, expr, lit, trim, udf
>> )
>> from pyspark.sql.types import (
>>     IntegerType, StringType, StructField, StructType,
>>     DoubleType, TimestampType
>> )
>> from pyspark import pandas as ps
>>
>> os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
>>
>> number_cores = int(multiprocessing.cpu_count())
>>
>> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  # e.g. 
>> 4015976448
>> memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74
>>
>>
>> def get_spark_session(app_name: str, conf: SparkConf):
>>     conf.setMaster("local[{}]".format(number_cores))
>>     conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
>>         "spark.sql.repl.eagerEval.enabled", "True"
>>     ).set("spark.sql.adaptive.enabled", "True").set(
>>         "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
>>     ).set(
>>         "spark.sql.repl.eagerEval.maxNumRows", "10000"
>>     ).set(
>>         "sc.setLogLevel", "ERROR"
>>     )
>>
>>     return 
>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>>
>>
>> spark = get_spark_session("My super app", SparkConf())
>> spark.sparkContext.setLogLevel("ERROR")
>>
>>
>>
>> def generate_ip():
>>     return ".".join(str(random.randint(0, 255)) for _ in range(4))
>>
>> def generate_timestamp():
>>     return pd.Timestamp(
>>         year=random.randint(2021, 2023),
>>         month=random.randint(1, 12),
>>         day=random.randint(1, 28),
>>         hour=random.randint(0, 23),
>>         minute=random.randint(0, 59),
>>         second=random.randint(0, 59)
>>     )
>>
>> def random_gbps():
>>     return random.uniform(0, 10)
>>
>> # Number of rows
>> n = 20
>>
>> data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), "date_time": 
>> generate_timestamp()} for _ in range(n)]
>> df = spark.createDataFrame(pd.DataFrame(data))
>> df.show()
>>
>> agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))
>>
>> windowRank = Window.orderBy(F.col("total_gbps").desc())
>> agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))
>>
>> top_80_ips = agg_df.filter(F.col("rank") <= 0.80)
>> result = df.join(top_80_ips, on="incoming_ips", 
>> how="inner").select("incoming_ips", "gbps", "date_time")
>> result.show()
>>
>> print(df.count())
>> print(result_df.count())
>>
>>
>> +---------------+-------------------+-------------------+
>> |   incoming_ips|               gbps|          date_time|
>> +---------------+-------------------+-------------------+
>> |   66.186.8.130|  5.074283124722104|2022-03-12 05:09:16|
>> |  155.45.76.235| 0.6736194760917324|2021-06-19 03:36:28|
>> | 237.51.137.200|0.43334812775057685|2022-04-27 08:08:47|
>> |    78.4.48.171| 7.5675453578753435|2022-08-21 18:55:48|
>> |  241.84.163.17| 3.5681655964070815|2021-01-24 20:39:50|
>> |130.255.202.138|  6.066112278135983|2023-07-07 22:26:15|
>> | 198.33.206.140| 1.9147905257021836|2023-03-01 04:44:14|
>> |  84.183.253.20|  7.707176860385722|2021-08-26 23:24:31|
>> |218.163.165.232|  9.458673015973213|2021-02-22 12:13:15|
>> |   62.57.20.153| 1.5764916247359229|2021-11-06 12:41:59|
>> | 245.24.168.152|0.07452805411698016|2021-06-04 16:14:36|
>> | 98.171.202.249|  3.546118349483626|2022-07-05 10:55:26|
>> |   210.5.246.85|0.02430730260109759|2022-04-08 17:26:04|
>> | 13.236.170.177|   2.41361938344535|2021-08-11 02:19:06|
>> |180.140.248.193| 0.9512956363005021|2021-06-27 18:16:58|
>> |  26.140.88.127|   7.51335778127692|2023-06-02 14:13:30|
>> |  7.118.207.252|  6.450499049816286|2022-12-11 06:36:20|
>> |    11.8.10.136|  8.750329246667354|2023-02-03 05:33:16|
>> |  232.140.56.86|  4.289740988237201|2023-02-22 20:10:09|
>> |   68.117.9.255|  5.384340363304169|2022-12-03 09:55:26|
>> +---------------+-------------------+-------------------+
>>
>> +---------------+------------------+-------------------+
>> |   incoming_ips|              gbps|          date_time|
>> +---------------+------------------+-------------------+
>> |   66.186.8.130| 5.074283124722104|2022-03-12 05:09:16|
>> |  241.84.163.17|3.5681655964070815|2021-01-24 20:39:50|
>> |    78.4.48.171|7.5675453578753435|2022-08-21 18:55:48|
>> |130.255.202.138| 6.066112278135983|2023-07-07 22:26:15|
>> | 198.33.206.140|1.9147905257021836|2023-03-01 04:44:14|
>> |  84.183.253.20| 7.707176860385722|2021-08-26 23:24:31|
>> |218.163.165.232| 9.458673015973213|2021-02-22 12:13:15|
>> |   62.57.20.153|1.5764916247359229|2021-11-06 12:41:59|
>> | 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26|
>> |180.140.248.193|0.9512956363005021|2021-06-27 18:16:58|
>> | 13.236.170.177|  2.41361938344535|2021-08-11 02:19:06|
>> |  26.140.88.127|  7.51335778127692|2023-06-02 14:13:30|
>> |  7.118.207.252| 6.450499049816286|2022-12-11 06:36:20|
>> |    11.8.10.136| 8.750329246667354|2023-02-03 05:33:16|
>> |  232.140.56.86| 4.289740988237201|2023-02-22 20:10:09|
>> |   68.117.9.255| 5.384340363304169|2022-12-03 09:55:26|
>> +---------------+------------------+-------------------+
>>
>> 20
>> 16
>>
>>
>>
>> fre. 15. sep. 2023 kl. 20:14 skrev ashok34...@yahoo.com.INVALID
>> <ashok34...@yahoo.com.invalid>:
>>
>>> Hi team,
>>>
>>> I am using PySpark 3.4
>>>
>>> I have a table of million rows that has few columns. among them incoming
>>> ips  and what is known as gbps (Gigabytes per second) and date and time
>>> of  incoming ip.
>>>
>>> I want to filter out 20% of low active ips and work on the remainder of
>>> data. How can I do thiis in PySpark?
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to