Sorry for forgetting. Add this line to the top of the code

import sys

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 26 Aug 2023 at 19:07, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi guys,
>
> You can try the code below in PySpark relying on* urllib *library to
> download the contents of the URL and then create a new column in the
> DataFrame to store the downloaded contents.
>
> Spark 4.3.0
>
> The limit explained by Varun
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import udf
> from pyspark.sql.types import StringType, StructType, StructField
>
> import urllib.request
>
> def download_url_content(url):
>     try:
>         response = urllib.request.urlopen(url)
>         return response.read().decode('utf-8')  # Assuming UTF-8 encoding
>     except Exception as e:
>         print(f"""{e}, quitting""")
>         sys.exit(1)
>
> def main():
>     # Initialize a Spark session
>     spark = SparkSession.builder.appName("URLDownload").getOrCreate()
>
>     # use sample DataFrame with URLs
>     data = [("https://www.bbc.co.uk";,), ("https://www.bbc.com";,)]
>     # Define the schema for the DataFrame
>     schema = StructType([StructField("url", StringType(), True)])
>     df = spark.createDataFrame(data, schema=schema)
>     # Register UDF to download content
>     download_udf = udf(download_url_content, StringType())
>     # Add a new column with downloaded content
>     df_with_content = df.withColumn("content", download_udf(df["url"]))
>     # Show the DataFrame with downloaded content
>     df_with_content.show(truncate=False)
> if __name__ == "__main__":
>     main()
>
> HTH
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 26 Aug 2023 at 17:51, Harry Jamison
> <harryjamiso...@yahoo.com.invalid> wrote:
>
>>
>> Thank you Varun, this makes sense.
>>
>> I understand a separate process for content ingestion. I was thinking it
>> would be a separate spark job, but it sounds like you are suggesting that
>> ideally I should do it outside of Hadoop entirely?
>>
>> Thanks
>>
>>
>> Harry
>>
>>
>>
>>
>> On Saturday, August 26, 2023 at 09:19:33 AM PDT, Varun Shah <
>> varunshah100...@gmail.com> wrote:
>>
>>
>> Hi Harry,
>>
>> Ideally, you should not be fetching a url in your transformation job but
>> do the API calls separately (outside the cluster if possible). Ingesting
>> data should be treated separately from transformation / cleaning / join
>> operations. You can create another dataframe of urls, dedup if required &
>> store it in a file where your normal python function would ingest the data
>> for the url & after X amount of api calls, create dataframe for it & union
>> with previous dataframe, finally writing the content & then doing a join
>> with the original df based on url, if required.
>>
>> If this is absolutely necessary, here are a few ways to achieve this:
>>
>> Approach-1:
>> You can use the spark's foreachPartition
>> <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.foreachPartition.html>
>> which will require a udf function.
>> In this, you can create a connection to limit the API calls per
>> partition.
>>
>> This can work if you introduce logic that checks for the current number
>> of partitions & then distribute the max_api_calls per partition.
>> eg: if no_of_partitions = 4 and total_max_api_calls = 4, then you can
>> pass in a parameter to this udf with max_partition_api_calls = 1.
>>
>> This approach has limitations as it requires max allowed api calls to be
>> more than that of the number of partitions.
>>
>> Approach-2
>> An alternative approach is to create the connection outside of the udf
>> with rate limiter
>> (link
>> <https://stackoverflow.com/questions/40748687/python-api-rate-limiting-how-to-limit-api-calls-globally>)
>> and use this connection variable inside of the udf function in each
>> partition, invoking time.sleep. This will definitely introduce issues where
>> many partitions are trying to invoke the api.
>>
>> I found this medium-article
>> <https://medium.com/geekculture/how-to-execute-a-rest-api-call-on-apache-spark-the-right-way-in-python-4367f2740e78>
>> which discusses the issue you are facing, but does not discuss a solution
>> for the same. Do check the comments also
>>
>> Regards,
>> Varun
>>
>>
>> On Sat, Aug 26, 2023 at 10:32 AM Harry Jamison
>> <harryjamiso...@yahoo.com.invalid> wrote:
>>
>> I am using python 3.7 and Spark 2.4.7
>>
>> I am not sure what the best way to do this is.
>>
>> I have a dataframe with a url in one of the columns, and I want to
>> download the contents of that url and put it in a new column.
>>
>> Can someone point me in the right direction on how to do this?
>> I looked at the UDFs and they seem confusing to me.
>>
>> Also, is there a good way to rate limit the number of calls I make per
>> second?
>>
>>

Reply via email to