Hi guys,

You can try the code below in PySpark relying on* urllib *library to
download the contents of the URL and then create a new column in the
DataFrame to store the downloaded contents.

Spark 4.3.0

The limit explained by Varun

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructType, StructField

import urllib.request

def download_url_content(url):
    try:
        response = urllib.request.urlopen(url)
        return response.read().decode('utf-8')  # Assuming UTF-8 encoding
    except Exception as e:
        print(f"""{e}, quitting""")
        sys.exit(1)

def main():
    # Initialize a Spark session
    spark = SparkSession.builder.appName("URLDownload").getOrCreate()

    # use sample DataFrame with URLs
    data = [("https://www.bbc.co.uk";,), ("https://www.bbc.com";,)]
    # Define the schema for the DataFrame
    schema = StructType([StructField("url", StringType(), True)])
    df = spark.createDataFrame(data, schema=schema)
    # Register UDF to download content
    download_udf = udf(download_url_content, StringType())
    # Add a new column with downloaded content
    df_with_content = df.withColumn("content", download_udf(df["url"]))
    # Show the DataFrame with downloaded content
    df_with_content.show(truncate=False)
if __name__ == "__main__":
    main()

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 26 Aug 2023 at 17:51, Harry Jamison
<harryjamiso...@yahoo.com.invalid> wrote:

>
> Thank you Varun, this makes sense.
>
> I understand a separate process for content ingestion. I was thinking it
> would be a separate spark job, but it sounds like you are suggesting that
> ideally I should do it outside of Hadoop entirely?
>
> Thanks
>
>
> Harry
>
>
>
>
> On Saturday, August 26, 2023 at 09:19:33 AM PDT, Varun Shah <
> varunshah100...@gmail.com> wrote:
>
>
> Hi Harry,
>
> Ideally, you should not be fetching a url in your transformation job but
> do the API calls separately (outside the cluster if possible). Ingesting
> data should be treated separately from transformation / cleaning / join
> operations. You can create another dataframe of urls, dedup if required &
> store it in a file where your normal python function would ingest the data
> for the url & after X amount of api calls, create dataframe for it & union
> with previous dataframe, finally writing the content & then doing a join
> with the original df based on url, if required.
>
> If this is absolutely necessary, here are a few ways to achieve this:
>
> Approach-1:
> You can use the spark's foreachPartition
> <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.foreachPartition.html>
> which will require a udf function.
> In this, you can create a connection to limit the API calls per partition.
>
> This can work if you introduce logic that checks for the current number of
> partitions & then distribute the max_api_calls per partition.
> eg: if no_of_partitions = 4 and total_max_api_calls = 4, then you can pass
> in a parameter to this udf with max_partition_api_calls = 1.
>
> This approach has limitations as it requires max allowed api calls to be
> more than that of the number of partitions.
>
> Approach-2
> An alternative approach is to create the connection outside of the udf
> with rate limiter
> (link
> <https://stackoverflow.com/questions/40748687/python-api-rate-limiting-how-to-limit-api-calls-globally>)
> and use this connection variable inside of the udf function in each
> partition, invoking time.sleep. This will definitely introduce issues where
> many partitions are trying to invoke the api.
>
> I found this medium-article
> <https://medium.com/geekculture/how-to-execute-a-rest-api-call-on-apache-spark-the-right-way-in-python-4367f2740e78>
> which discusses the issue you are facing, but does not discuss a solution
> for the same. Do check the comments also
>
> Regards,
> Varun
>
>
> On Sat, Aug 26, 2023 at 10:32 AM Harry Jamison
> <harryjamiso...@yahoo.com.invalid> wrote:
>
> I am using python 3.7 and Spark 2.4.7
>
> I am not sure what the best way to do this is.
>
> I have a dataframe with a url in one of the columns, and I want to
> download the contents of that url and put it in a new column.
>
> Can someone point me in the right direction on how to do this?
> I looked at the UDFs and they seem confusing to me.
>
> Also, is there a good way to rate limit the number of calls I make per
> second?
>
>

Reply via email to