ah.. yes that's right. I did have to use some time on this one and I was having some issues with the code. I restart the notebook kernel now and rerun it and I get the same result.
lør. 16. sep. 2023 kl. 11:41 skrev Mich Talebzadeh < mich.talebza...@gmail.com>: > Splendid code. A minor error glancing at your code. > > print(df.count()) > print(result_df.count()) > > > You have not defined result_df. I gather you meant "result"? > > > print(result.count()) > > > That should fix it 🤔 > > HTH > > > Mich Talebzadeh, > Distinguished Technologist, Solutions Architect & Engineer > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 16 Sept 2023 at 06:00, Bjørn Jørgensen <bjornjorgen...@gmail.com> > wrote: > >> Something like this? >> >> >> # Standard library imports >> import json >> import multiprocessing >> import os >> import re >> import sys >> import random >> >> # Third-party imports >> import numpy as np >> import pandas as pd >> import pyarrow >> >> # Pyspark imports >> from pyspark import SparkConf, SparkContext >> from pyspark.sql import SparkSession, functions as F, Window >> from pyspark.sql.functions import ( >> col, concat, concat_ws, expr, lit, trim, udf >> ) >> from pyspark.sql.types import ( >> IntegerType, StringType, StructField, StructType, >> DoubleType, TimestampType >> ) >> from pyspark import pandas as ps >> >> os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" >> >> number_cores = int(multiprocessing.cpu_count()) >> >> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") # e.g. >> 4015976448 >> memory_gb = int(mem_bytes / (1024.0**3)) # e.g. 3.74 >> >> >> def get_spark_session(app_name: str, conf: SparkConf): >> conf.setMaster("local[{}]".format(number_cores)) >> conf.set("spark.driver.memory", "{}g".format(memory_gb)).set( >> "spark.sql.repl.eagerEval.enabled", "True" >> ).set("spark.sql.adaptive.enabled", "True").set( >> "spark.serializer", "org.apache.spark.serializer.KryoSerializer" >> ).set( >> "spark.sql.repl.eagerEval.maxNumRows", "10000" >> ).set( >> "sc.setLogLevel", "ERROR" >> ) >> >> return >> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate() >> >> >> spark = get_spark_session("My super app", SparkConf()) >> spark.sparkContext.setLogLevel("ERROR") >> >> >> >> def generate_ip(): >> return ".".join(str(random.randint(0, 255)) for _ in range(4)) >> >> def generate_timestamp(): >> return pd.Timestamp( >> year=random.randint(2021, 2023), >> month=random.randint(1, 12), >> day=random.randint(1, 28), >> hour=random.randint(0, 23), >> minute=random.randint(0, 59), >> second=random.randint(0, 59) >> ) >> >> def random_gbps(): >> return random.uniform(0, 10) >> >> # Number of rows >> n = 20 >> >> data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), "date_time": >> generate_timestamp()} for _ in range(n)] >> df = spark.createDataFrame(pd.DataFrame(data)) >> df.show() >> >> agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps")) >> >> windowRank = Window.orderBy(F.col("total_gbps").desc()) >> agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank)) >> >> top_80_ips = agg_df.filter(F.col("rank") <= 0.80) >> result = df.join(top_80_ips, on="incoming_ips", >> how="inner").select("incoming_ips", "gbps", "date_time") >> result.show() >> >> print(df.count()) >> print(result_df.count()) >> >> >> +---------------+-------------------+-------------------+ >> | incoming_ips| gbps| date_time| >> +---------------+-------------------+-------------------+ >> | 66.186.8.130| 5.074283124722104|2022-03-12 05:09:16| >> | 155.45.76.235| 0.6736194760917324|2021-06-19 03:36:28| >> | 237.51.137.200|0.43334812775057685|2022-04-27 08:08:47| >> | 78.4.48.171| 7.5675453578753435|2022-08-21 18:55:48| >> | 241.84.163.17| 3.5681655964070815|2021-01-24 20:39:50| >> |130.255.202.138| 6.066112278135983|2023-07-07 22:26:15| >> | 198.33.206.140| 1.9147905257021836|2023-03-01 04:44:14| >> | 84.183.253.20| 7.707176860385722|2021-08-26 23:24:31| >> |218.163.165.232| 9.458673015973213|2021-02-22 12:13:15| >> | 62.57.20.153| 1.5764916247359229|2021-11-06 12:41:59| >> | 245.24.168.152|0.07452805411698016|2021-06-04 16:14:36| >> | 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26| >> | 210.5.246.85|0.02430730260109759|2022-04-08 17:26:04| >> | 13.236.170.177| 2.41361938344535|2021-08-11 02:19:06| >> |180.140.248.193| 0.9512956363005021|2021-06-27 18:16:58| >> | 26.140.88.127| 7.51335778127692|2023-06-02 14:13:30| >> | 7.118.207.252| 6.450499049816286|2022-12-11 06:36:20| >> | 11.8.10.136| 8.750329246667354|2023-02-03 05:33:16| >> | 232.140.56.86| 4.289740988237201|2023-02-22 20:10:09| >> | 68.117.9.255| 5.384340363304169|2022-12-03 09:55:26| >> +---------------+-------------------+-------------------+ >> >> +---------------+------------------+-------------------+ >> | incoming_ips| gbps| date_time| >> +---------------+------------------+-------------------+ >> | 66.186.8.130| 5.074283124722104|2022-03-12 05:09:16| >> | 241.84.163.17|3.5681655964070815|2021-01-24 20:39:50| >> | 78.4.48.171|7.5675453578753435|2022-08-21 18:55:48| >> |130.255.202.138| 6.066112278135983|2023-07-07 22:26:15| >> | 198.33.206.140|1.9147905257021836|2023-03-01 04:44:14| >> | 84.183.253.20| 7.707176860385722|2021-08-26 23:24:31| >> |218.163.165.232| 9.458673015973213|2021-02-22 12:13:15| >> | 62.57.20.153|1.5764916247359229|2021-11-06 12:41:59| >> | 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26| >> |180.140.248.193|0.9512956363005021|2021-06-27 18:16:58| >> | 13.236.170.177| 2.41361938344535|2021-08-11 02:19:06| >> | 26.140.88.127| 7.51335778127692|2023-06-02 14:13:30| >> | 7.118.207.252| 6.450499049816286|2022-12-11 06:36:20| >> | 11.8.10.136| 8.750329246667354|2023-02-03 05:33:16| >> | 232.140.56.86| 4.289740988237201|2023-02-22 20:10:09| >> | 68.117.9.255| 5.384340363304169|2022-12-03 09:55:26| >> +---------------+------------------+-------------------+ >> >> 20 >> 16 >> >> >> >> fre. 15. sep. 2023 kl. 20:14 skrev ashok34...@yahoo.com.INVALID >> <ashok34...@yahoo.com.invalid>: >> >>> Hi team, >>> >>> I am using PySpark 3.4 >>> >>> I have a table of million rows that has few columns. among them incoming >>> ips and what is known as gbps (Gigabytes per second) and date and time >>> of incoming ip. >>> >>> I want to filter out 20% of low active ips and work on the remainder of >>> data. How can I do thiis in PySpark? >>> >>> Thanks >>> >>> >>> >>> >>> >> >> -- >> Bjørn Jørgensen >> Vestre Aspehaug 4, 6010 Ålesund >> Norge >> >> +47 480 94 297 >> > -- Bjørn Jørgensen Vestre Aspehaug 4, 6010 Ålesund Norge +47 480 94 297