Re: Spark 2.4.7

2023-08-26 Thread Mich Talebzadeh
ax_api_calls = 4, then you can
>> pass in a parameter to this udf with max_partition_api_calls = 1.
>>
>> This approach has limitations as it requires max allowed api calls to be
>> more than that of the number of partitions.
>>
>> Approach-2
>> An alternative approach is to create the connection outside of the udf
>> with rate limiter
>> (link
>> <https://stackoverflow.com/questions/40748687/python-api-rate-limiting-how-to-limit-api-calls-globally>)
>> and use this connection variable inside of the udf function in each
>> partition, invoking time.sleep. This will definitely introduce issues where
>> many partitions are trying to invoke the api.
>>
>> I found this medium-article
>> <https://medium.com/geekculture/how-to-execute-a-rest-api-call-on-apache-spark-the-right-way-in-python-4367f2740e78>
>> which discusses the issue you are facing, but does not discuss a solution
>> for the same. Do check the comments also
>>
>> Regards,
>> Varun
>>
>>
>> On Sat, Aug 26, 2023 at 10:32 AM Harry Jamison
>>  wrote:
>>
>> I am using python 3.7 and Spark 2.4.7
>>
>> I am not sure what the best way to do this is.
>>
>> I have a dataframe with a url in one of the columns, and I want to
>> download the contents of that url and put it in a new column.
>>
>> Can someone point me in the right direction on how to do this?
>> I looked at the UDFs and they seem confusing to me.
>>
>> Also, is there a good way to rate limit the number of calls I make per
>> second?
>>
>>


Re: Spark 2.4.7

2023-08-26 Thread Mich Talebzadeh
Hi guys,

You can try the code below in PySpark relying on* urllib *library to
download the contents of the URL and then create a new column in the
DataFrame to store the downloaded contents.

Spark 4.3.0

The limit explained by Varun

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructType, StructField

import urllib.request

def download_url_content(url):
try:
response = urllib.request.urlopen(url)
return response.read().decode('utf-8')  # Assuming UTF-8 encoding
except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)

def main():
# Initialize a Spark session
spark = SparkSession.builder.appName("URLDownload").getOrCreate()

# use sample DataFrame with URLs
data = [("https://www.bbc.co.uk;,), ("https://www.bbc.com;,)]
# Define the schema for the DataFrame
schema = StructType([StructField("url", StringType(), True)])
df = spark.createDataFrame(data, schema=schema)
# Register UDF to download content
download_udf = udf(download_url_content, StringType())
# Add a new column with downloaded content
df_with_content = df.withColumn("content", download_udf(df["url"]))
# Show the DataFrame with downloaded content
df_with_content.show(truncate=False)
if __name__ == "__main__":
main()

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 26 Aug 2023 at 17:51, Harry Jamison
 wrote:

>
> Thank you Varun, this makes sense.
>
> I understand a separate process for content ingestion. I was thinking it
> would be a separate spark job, but it sounds like you are suggesting that
> ideally I should do it outside of Hadoop entirely?
>
> Thanks
>
>
> Harry
>
>
>
>
> On Saturday, August 26, 2023 at 09:19:33 AM PDT, Varun Shah <
> varunshah100...@gmail.com> wrote:
>
>
> Hi Harry,
>
> Ideally, you should not be fetching a url in your transformation job but
> do the API calls separately (outside the cluster if possible). Ingesting
> data should be treated separately from transformation / cleaning / join
> operations. You can create another dataframe of urls, dedup if required &
> store it in a file where your normal python function would ingest the data
> for the url & after X amount of api calls, create dataframe for it & union
> with previous dataframe, finally writing the content & then doing a join
> with the original df based on url, if required.
>
> If this is absolutely necessary, here are a few ways to achieve this:
>
> Approach-1:
> You can use the spark's foreachPartition
> <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.foreachPartition.html>
> which will require a udf function.
> In this, you can create a connection to limit the API calls per partition.
>
> This can work if you introduce logic that checks for the current number of
> partitions & then distribute the max_api_calls per partition.
> eg: if no_of_partitions = 4 and total_max_api_calls = 4, then you can pass
> in a parameter to this udf with max_partition_api_calls = 1.
>
> This approach has limitations as it requires max allowed api calls to be
> more than that of the number of partitions.
>
> Approach-2
> An alternative approach is to create the connection outside of the udf
> with rate limiter
> (link
> <https://stackoverflow.com/questions/40748687/python-api-rate-limiting-how-to-limit-api-calls-globally>)
> and use this connection variable inside of the udf function in each
> partition, invoking time.sleep. This will definitely introduce issues where
> many partitions are trying to invoke the api.
>
> I found this medium-article
> <https://medium.com/geekculture/how-to-execute-a-rest-api-call-on-apache-spark-the-right-way-in-python-4367f2740e78>
> which discusses the issue you are facing, but does not discuss a solution
> for the same. Do check the comments also
>
> Regards,
> Varun
>
>
> On Sat, Aug 26, 2023 at 10:32 AM Harry Jamison
>  wrote:
>
> I am using python 3.7 and Spark 2.4.7
>
> I am not sure what the best way to do this is.
>
> I have a dataframe with a url in one of the columns, and I want to
> download the contents of that url and put it in a new column.
>
> Can someone point me in the right direction on how to do this?
> I looked at the UDFs and they seem confusing to me.
>
> Also, is there a good way to rate limit the number of calls I make per
> second?
>
>


Re: Spark 2.4.7

2023-08-26 Thread Harry Jamison
 
Thank you Varun, this makes sense.
I understand a separate process for content ingestion. I was thinking it would 
be a separate spark job, but it sounds like you are suggesting that ideally I 
should do it outside of Hadoop entirely?
Thanks

Harry



On Saturday, August 26, 2023 at 09:19:33 AM PDT, Varun Shah 
 wrote:  
 
 Hi Harry, 

Ideally, you should not be fetching a url in your transformation job but do the 
API calls separately (outside the cluster if possible). Ingesting data should 
be treated separately from transformation / cleaning / join operations. You can 
create another dataframe of urls, dedup if required & store it in a file where 
your normal python function would ingest the data for the url & after X amount 
of api calls, create dataframe for it & union with previous dataframe, finally 
writing the content & then doing a join with the original df based on url, if 
required.

If this is absolutely necessary, here are a few ways to achieve this:

Approach-1:
You can use the spark's foreachPartition which will require a udf function.In 
this, you can create a connection to limit the API calls per partition. 

This can work if you introduce logic that checks for the current number of 
partitions & then distribute the max_api_calls per partition.eg: if 
no_of_partitions = 4 and total_max_api_calls = 4, then you can pass in a 
parameter to this udf with max_partition_api_calls = 1. 

This approach has limitations as it requires max allowed api calls to be more 
than that of the number of partitions.
Approach-2
An alternative approach is to create the connection outside of the udf with 
rate limiter(link) and use this connection variable inside of the udf function 
in each partition, invoking time.sleep. This will definitely introduce issues 
where many partitions are trying to invoke the api.
I found this medium-article which discusses the issue you are facing, but does 
not discuss a solution for the same. Do check the comments also 

Regards,Varun


On Sat, Aug 26, 2023 at 10:32 AM Harry Jamison 
 wrote:

I am using python 3.7 and Spark 2.4.7
I am not sure what the best way to do this is.
I have a dataframe with a url in one of the columns, and I want to download the 
contents of that url and put it in a new column.
Can someone point me in the right direction on how to do this?I looked at the 
UDFs and they seem confusing to me.
Also, is there a good way to rate limit the number of calls I make per second?
  

Re: Spark 2.4.7

2023-08-26 Thread Varun Shah
Hi Harry,

Ideally, you should not be fetching a url in your transformation job but do
the API calls separately (outside the cluster if possible). Ingesting data
should be treated separately from transformation / cleaning / join
operations. You can create another dataframe of urls, dedup if required &
store it in a file where your normal python function would ingest the data
for the url & after X amount of api calls, create dataframe for it & union
with previous dataframe, finally writing the content & then doing a join
with the original df based on url, if required.

If this is absolutely necessary, here are a few ways to achieve this:

Approach-1:
You can use the spark's foreachPartition
<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.foreachPartition.html>
which will require a udf function.
In this, you can create a connection to limit the API calls per partition.

This can work if you introduce logic that checks for the current number of
partitions & then distribute the max_api_calls per partition.
eg: if no_of_partitions = 4 and total_max_api_calls = 4, then you can pass
in a parameter to this udf with max_partition_api_calls = 1.

This approach has limitations as it requires max allowed api calls to be
more than that of the number of partitions.

Approach-2
An alternative approach is to create the connection outside of the udf with
rate limiter
(link
<https://stackoverflow.com/questions/40748687/python-api-rate-limiting-how-to-limit-api-calls-globally>)
and use this connection variable inside of the udf function in each
partition, invoking time.sleep. This will definitely introduce issues where
many partitions are trying to invoke the api.

I found this medium-article
<https://medium.com/geekculture/how-to-execute-a-rest-api-call-on-apache-spark-the-right-way-in-python-4367f2740e78>
which discusses the issue you are facing, but does not discuss a solution
for the same. Do check the comments also

Regards,
Varun


On Sat, Aug 26, 2023 at 10:32 AM Harry Jamison
 wrote:

> I am using python 3.7 and Spark 2.4.7
>
> I am not sure what the best way to do this is.
>
> I have a dataframe with a url in one of the columns, and I want to
> download the contents of that url and put it in a new column.
>
> Can someone point me in the right direction on how to do this?
> I looked at the UDFs and they seem confusing to me.
>
> Also, is there a good way to rate limit the number of calls I make per
> second?
>


Spark 2.4.7

2023-08-25 Thread Harry Jamison
I am using python 3.7 and Spark 2.4.7
I am not sure what the best way to do this is.
I have a dataframe with a url in one of the columns, and I want to download the 
contents of that url and put it in a new column.
Can someone point me in the right direction on how to do this?I looked at the 
UDFs and they seem confusing to me.
Also, is there a good way to rate limit the number of calls I make per second?

Re: Suggestion on Spark 2.4.7 vs Spark 3 for Kubernetes

2021-01-05 Thread Sachit Murarka
Thanks for the link Prashant.

Regards
Sachit

On Tue, 5 Jan 2021, 15:08 Prashant Sharma,  wrote:

>  A lot of developers may have already moved to 3.0.x, FYI 3.1.0 is just
> around the corner hopefully(in a few days) and has a lot of improvements to
> spark on K8s, including it will be transitioning from experimental to GA in
> this release.
>
> See: https://issues.apache.org/jira/browse/SPARK-33005
>
> Thanks,
>
> On Tue, Jan 5, 2021 at 12:41 AM Sachit Murarka 
> wrote:
>
>> Hi Users,
>>
>> Could you please tell which Spark version have you used in Production for
>> Kubernetes.
>> Which is a recommended version for Production provided that both
>> Streaming and core apis have to be used using Pyspark.
>>
>> Thanks !
>>
>> Kind Regards,
>> Sachit Murarka
>>
>


Re: Suggestion on Spark 2.4.7 vs Spark 3 for Kubernetes

2021-01-05 Thread Prashant Sharma
 A lot of developers may have already moved to 3.0.x, FYI 3.1.0 is just
around the corner hopefully(in a few days) and has a lot of improvements to
spark on K8s, including it will be transitioning from experimental to GA in
this release.

See: https://issues.apache.org/jira/browse/SPARK-33005

Thanks,

On Tue, Jan 5, 2021 at 12:41 AM Sachit Murarka 
wrote:

> Hi Users,
>
> Could you please tell which Spark version have you used in Production for
> Kubernetes.
> Which is a recommended version for Production provided that both Streaming
> and core apis have to be used using Pyspark.
>
> Thanks !
>
> Kind Regards,
> Sachit Murarka
>


Suggestion on Spark 2.4.7 vs Spark 3 for Kubernetes

2021-01-04 Thread Sachit Murarka
Hi Users,

Could you please tell which Spark version have you used in Production for
Kubernetes.
Which is a recommended version for Production provided that both Streaming
and core apis have to be used using Pyspark.

Thanks !

Kind Regards,
Sachit Murarka