Happy Saturday coding 😁

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 16 Sept 2023 at 11:30, Bjørn Jørgensen <bjornjorgen...@gmail.com>
wrote:

> ah.. yes that's right.
> I did have to use some time on this one and I was having some issues with
> the code.
> I restart the notebook kernel now and rerun it and I get the same result.
>
> lør. 16. sep. 2023 kl. 11:41 skrev Mich Talebzadeh <
> mich.talebza...@gmail.com>:
>
>> Splendid code. A minor error glancing at your code.
>>
>> print(df.count())
>> print(result_df.count())
>>
>>
>> You have not defined result_df. I gather you meant "result"?
>>
>>
>> print(result.count())
>>
>>
>> That should fix it 🤔
>>
>> HTH
>>
>>
>> Mich Talebzadeh,
>> Distinguished Technologist, Solutions Architect & Engineer
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 16 Sept 2023 at 06:00, Bjørn Jørgensen <bjornjorgen...@gmail.com>
>> wrote:
>>
>>> Something like this?
>>>
>>>
>>> # Standard library imports
>>> import json
>>> import multiprocessing
>>> import os
>>> import re
>>> import sys
>>> import random
>>>
>>> # Third-party imports
>>> import numpy as np
>>> import pandas as pd
>>> import pyarrow
>>>
>>> # Pyspark imports
>>> from pyspark import SparkConf, SparkContext
>>> from pyspark.sql import SparkSession, functions as F, Window
>>> from pyspark.sql.functions import (
>>>     col, concat, concat_ws, expr, lit, trim, udf
>>> )
>>> from pyspark.sql.types import (
>>>     IntegerType, StringType, StructField, StructType,
>>>     DoubleType, TimestampType
>>> )
>>> from pyspark import pandas as ps
>>>
>>> os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
>>>
>>> number_cores = int(multiprocessing.cpu_count())
>>>
>>> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  # 
>>> e.g. 4015976448
>>> memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74
>>>
>>>
>>> def get_spark_session(app_name: str, conf: SparkConf):
>>>     conf.setMaster("local[{}]".format(number_cores))
>>>     conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
>>>         "spark.sql.repl.eagerEval.enabled", "True"
>>>     ).set("spark.sql.adaptive.enabled", "True").set(
>>>         "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
>>>     ).set(
>>>         "spark.sql.repl.eagerEval.maxNumRows", "10000"
>>>     ).set(
>>>         "sc.setLogLevel", "ERROR"
>>>     )
>>>
>>>     return 
>>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>>>
>>>
>>> spark = get_spark_session("My super app", SparkConf())
>>> spark.sparkContext.setLogLevel("ERROR")
>>>
>>>
>>>
>>> def generate_ip():
>>>     return ".".join(str(random.randint(0, 255)) for _ in range(4))
>>>
>>> def generate_timestamp():
>>>     return pd.Timestamp(
>>>         year=random.randint(2021, 2023),
>>>         month=random.randint(1, 12),
>>>         day=random.randint(1, 28),
>>>         hour=random.randint(0, 23),
>>>         minute=random.randint(0, 59),
>>>         second=random.randint(0, 59)
>>>     )
>>>
>>> def random_gbps():
>>>     return random.uniform(0, 10)
>>>
>>> # Number of rows
>>> n = 20
>>>
>>> data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), "date_time": 
>>> generate_timestamp()} for _ in range(n)]
>>> df = spark.createDataFrame(pd.DataFrame(data))
>>> df.show()
>>>
>>> agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))
>>>
>>> windowRank = Window.orderBy(F.col("total_gbps").desc())
>>> agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))
>>>
>>> top_80_ips = agg_df.filter(F.col("rank") <= 0.80)
>>> result = df.join(top_80_ips, on="incoming_ips", 
>>> how="inner").select("incoming_ips", "gbps", "date_time")
>>> result.show()
>>>
>>> print(df.count())
>>> print(result_df.count())
>>>
>>>
>>> +---------------+-------------------+-------------------+
>>> |   incoming_ips|               gbps|          date_time|
>>> +---------------+-------------------+-------------------+
>>> |   66.186.8.130|  5.074283124722104|2022-03-12 05:09:16|
>>> |  155.45.76.235| 0.6736194760917324|2021-06-19 03:36:28|
>>> | 237.51.137.200|0.43334812775057685|2022-04-27 08:08:47|
>>> |    78.4.48.171| 7.5675453578753435|2022-08-21 18:55:48|
>>> |  241.84.163.17| 3.5681655964070815|2021-01-24 20:39:50|
>>> |130.255.202.138|  6.066112278135983|2023-07-07 22:26:15|
>>> | 198.33.206.140| 1.9147905257021836|2023-03-01 04:44:14|
>>> |  84.183.253.20|  7.707176860385722|2021-08-26 23:24:31|
>>> |218.163.165.232|  9.458673015973213|2021-02-22 12:13:15|
>>> |   62.57.20.153| 1.5764916247359229|2021-11-06 12:41:59|
>>> | 245.24.168.152|0.07452805411698016|2021-06-04 16:14:36|
>>> | 98.171.202.249|  3.546118349483626|2022-07-05 10:55:26|
>>> |   210.5.246.85|0.02430730260109759|2022-04-08 17:26:04|
>>> | 13.236.170.177|   2.41361938344535|2021-08-11 02:19:06|
>>> |180.140.248.193| 0.9512956363005021|2021-06-27 18:16:58|
>>> |  26.140.88.127|   7.51335778127692|2023-06-02 14:13:30|
>>> |  7.118.207.252|  6.450499049816286|2022-12-11 06:36:20|
>>> |    11.8.10.136|  8.750329246667354|2023-02-03 05:33:16|
>>> |  232.140.56.86|  4.289740988237201|2023-02-22 20:10:09|
>>> |   68.117.9.255|  5.384340363304169|2022-12-03 09:55:26|
>>> +---------------+-------------------+-------------------+
>>>
>>> +---------------+------------------+-------------------+
>>> |   incoming_ips|              gbps|          date_time|
>>> +---------------+------------------+-------------------+
>>> |   66.186.8.130| 5.074283124722104|2022-03-12 05:09:16|
>>> |  241.84.163.17|3.5681655964070815|2021-01-24 20:39:50|
>>> |    78.4.48.171|7.5675453578753435|2022-08-21 18:55:48|
>>> |130.255.202.138| 6.066112278135983|2023-07-07 22:26:15|
>>> | 198.33.206.140|1.9147905257021836|2023-03-01 04:44:14|
>>> |  84.183.253.20| 7.707176860385722|2021-08-26 23:24:31|
>>> |218.163.165.232| 9.458673015973213|2021-02-22 12:13:15|
>>> |   62.57.20.153|1.5764916247359229|2021-11-06 12:41:59|
>>> | 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26|
>>> |180.140.248.193|0.9512956363005021|2021-06-27 18:16:58|
>>> | 13.236.170.177|  2.41361938344535|2021-08-11 02:19:06|
>>> |  26.140.88.127|  7.51335778127692|2023-06-02 14:13:30|
>>> |  7.118.207.252| 6.450499049816286|2022-12-11 06:36:20|
>>> |    11.8.10.136| 8.750329246667354|2023-02-03 05:33:16|
>>> |  232.140.56.86| 4.289740988237201|2023-02-22 20:10:09|
>>> |   68.117.9.255| 5.384340363304169|2022-12-03 09:55:26|
>>> +---------------+------------------+-------------------+
>>>
>>> 20
>>> 16
>>>
>>>
>>>
>>> fre. 15. sep. 2023 kl. 20:14 skrev ashok34...@yahoo.com.INVALID
>>> <ashok34...@yahoo.com.invalid>:
>>>
>>>> Hi team,
>>>>
>>>> I am using PySpark 3.4
>>>>
>>>> I have a table of million rows that has few columns. among them
>>>> incoming ips  and what is known as gbps (Gigabytes per second) and
>>>> date and time of  incoming ip.
>>>>
>>>> I want to filter out 20% of low active ips and work on the remainder of
>>>> data. How can I do thiis in PySpark?
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>> --
>>> Bjørn Jørgensen
>>> Vestre Aspehaug 4, 6010 Ålesund
>>> Norge
>>>
>>> +47 480 94 297
>>>
>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>

Reply via email to