Two new tickets for Spark on K8s

2023-08-26 Thread Mich Talebzadeh
Hi,

@holden Karau recently created two Jiras that deal with two items of
interest namely:


   1. Improve Spark Driver Launch Time SPARK-44950
   
   2. Improve Spark Dynamic Allocation SPARK-44951
   

These are both very much in demand (at least IMO)

These topics have been discussed a few times. Most recently in spark-dev
thread
I*mproving Dynamic Allocation Logic for Spark 4+*  on 7th August

Pretty much in skeleton form yet. Add your vote if you are interested and
comment.

Thanks,


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Spark 2.4.7

2023-08-26 Thread Mich Talebzadeh
Sorry for forgetting. Add this line to the top of the code

import sys

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 26 Aug 2023 at 19:07, Mich Talebzadeh 
wrote:

> Hi guys,
>
> You can try the code below in PySpark relying on* urllib *library to
> download the contents of the URL and then create a new column in the
> DataFrame to store the downloaded contents.
>
> Spark 4.3.0
>
> The limit explained by Varun
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import udf
> from pyspark.sql.types import StringType, StructType, StructField
>
> import urllib.request
>
> def download_url_content(url):
> try:
> response = urllib.request.urlopen(url)
> return response.read().decode('utf-8')  # Assuming UTF-8 encoding
> except Exception as e:
> print(f"""{e}, quitting""")
> sys.exit(1)
>
> def main():
> # Initialize a Spark session
> spark = SparkSession.builder.appName("URLDownload").getOrCreate()
>
> # use sample DataFrame with URLs
> data = [("https://www.bbc.co.uk;,), ("https://www.bbc.com;,)]
> # Define the schema for the DataFrame
> schema = StructType([StructField("url", StringType(), True)])
> df = spark.createDataFrame(data, schema=schema)
> # Register UDF to download content
> download_udf = udf(download_url_content, StringType())
> # Add a new column with downloaded content
> df_with_content = df.withColumn("content", download_udf(df["url"]))
> # Show the DataFrame with downloaded content
> df_with_content.show(truncate=False)
> if __name__ == "__main__":
> main()
>
> HTH
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 26 Aug 2023 at 17:51, Harry Jamison
>  wrote:
>
>>
>> Thank you Varun, this makes sense.
>>
>> I understand a separate process for content ingestion. I was thinking it
>> would be a separate spark job, but it sounds like you are suggesting that
>> ideally I should do it outside of Hadoop entirely?
>>
>> Thanks
>>
>>
>> Harry
>>
>>
>>
>>
>> On Saturday, August 26, 2023 at 09:19:33 AM PDT, Varun Shah <
>> varunshah100...@gmail.com> wrote:
>>
>>
>> Hi Harry,
>>
>> Ideally, you should not be fetching a url in your transformation job but
>> do the API calls separately (outside the cluster if possible). Ingesting
>> data should be treated separately from transformation / cleaning / join
>> operations. You can create another dataframe of urls, dedup if required &
>> store it in a file where your normal python function would ingest the data
>> for the url & after X amount of api calls, create dataframe for it & union
>> with previous dataframe, finally writing the content & then doing a join
>> with the original df based on url, if required.
>>
>> If this is absolutely necessary, here are a few ways to achieve this:
>>
>> Approach-1:
>> You can use the spark's foreachPartition
>> 
>> which will require a udf function.
>> In this, you can create a connection to limit the API calls per
>> partition.
>>
>> This can work if you introduce logic that checks for the current number
>> of partitions & then distribute the max_api_calls per partition.
>> eg: if no_of_partitions = 4 and total_max_api_calls = 4, then you can
>> pass in a parameter to this udf with max_partition_api_calls = 1.
>>
>> This approach has limitations as it requires max allowed api calls to be
>> more than that of the number of partitions.
>>
>> Approach-2
>> An alternative approach is to create the connection outside of the udf
>> with rate limiter
>> (link
>> )
>> and use this connection variable inside of the udf function in each
>> partition, invoking time.sleep. This will definitely introduce issues where
>> 

Re: Spark 2.4.7

2023-08-26 Thread Mich Talebzadeh
Hi guys,

You can try the code below in PySpark relying on* urllib *library to
download the contents of the URL and then create a new column in the
DataFrame to store the downloaded contents.

Spark 4.3.0

The limit explained by Varun

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructType, StructField

import urllib.request

def download_url_content(url):
try:
response = urllib.request.urlopen(url)
return response.read().decode('utf-8')  # Assuming UTF-8 encoding
except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)

def main():
# Initialize a Spark session
spark = SparkSession.builder.appName("URLDownload").getOrCreate()

# use sample DataFrame with URLs
data = [("https://www.bbc.co.uk;,), ("https://www.bbc.com;,)]
# Define the schema for the DataFrame
schema = StructType([StructField("url", StringType(), True)])
df = spark.createDataFrame(data, schema=schema)
# Register UDF to download content
download_udf = udf(download_url_content, StringType())
# Add a new column with downloaded content
df_with_content = df.withColumn("content", download_udf(df["url"]))
# Show the DataFrame with downloaded content
df_with_content.show(truncate=False)
if __name__ == "__main__":
main()

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 26 Aug 2023 at 17:51, Harry Jamison
 wrote:

>
> Thank you Varun, this makes sense.
>
> I understand a separate process for content ingestion. I was thinking it
> would be a separate spark job, but it sounds like you are suggesting that
> ideally I should do it outside of Hadoop entirely?
>
> Thanks
>
>
> Harry
>
>
>
>
> On Saturday, August 26, 2023 at 09:19:33 AM PDT, Varun Shah <
> varunshah100...@gmail.com> wrote:
>
>
> Hi Harry,
>
> Ideally, you should not be fetching a url in your transformation job but
> do the API calls separately (outside the cluster if possible). Ingesting
> data should be treated separately from transformation / cleaning / join
> operations. You can create another dataframe of urls, dedup if required &
> store it in a file where your normal python function would ingest the data
> for the url & after X amount of api calls, create dataframe for it & union
> with previous dataframe, finally writing the content & then doing a join
> with the original df based on url, if required.
>
> If this is absolutely necessary, here are a few ways to achieve this:
>
> Approach-1:
> You can use the spark's foreachPartition
> 
> which will require a udf function.
> In this, you can create a connection to limit the API calls per partition.
>
> This can work if you introduce logic that checks for the current number of
> partitions & then distribute the max_api_calls per partition.
> eg: if no_of_partitions = 4 and total_max_api_calls = 4, then you can pass
> in a parameter to this udf with max_partition_api_calls = 1.
>
> This approach has limitations as it requires max allowed api calls to be
> more than that of the number of partitions.
>
> Approach-2
> An alternative approach is to create the connection outside of the udf
> with rate limiter
> (link
> )
> and use this connection variable inside of the udf function in each
> partition, invoking time.sleep. This will definitely introduce issues where
> many partitions are trying to invoke the api.
>
> I found this medium-article
> 
> which discusses the issue you are facing, but does not discuss a solution
> for the same. Do check the comments also
>
> Regards,
> Varun
>
>
> On Sat, Aug 26, 2023 at 10:32 AM Harry Jamison
>  wrote:
>
> I am using python 3.7 and Spark 2.4.7
>
> I am not sure what the best way to do this is.
>
> I have a dataframe with a url in one of the columns, and I want to
> download the contents of that url and put it in a new column.
>
> Can someone point me in the right direction on how to do this?
> I looked at the UDFs and they seem confusing to me.
>
> Also, is there a good way to rate limit the number of calls I make per
> second?
>
>


Re: Spark 2.4.7

2023-08-26 Thread Harry Jamison
 
Thank you Varun, this makes sense.
I understand a separate process for content ingestion. I was thinking it would 
be a separate spark job, but it sounds like you are suggesting that ideally I 
should do it outside of Hadoop entirely?
Thanks

Harry



On Saturday, August 26, 2023 at 09:19:33 AM PDT, Varun Shah 
 wrote:  
 
 Hi Harry, 

Ideally, you should not be fetching a url in your transformation job but do the 
API calls separately (outside the cluster if possible). Ingesting data should 
be treated separately from transformation / cleaning / join operations. You can 
create another dataframe of urls, dedup if required & store it in a file where 
your normal python function would ingest the data for the url & after X amount 
of api calls, create dataframe for it & union with previous dataframe, finally 
writing the content & then doing a join with the original df based on url, if 
required.

If this is absolutely necessary, here are a few ways to achieve this:

Approach-1:
You can use the spark's foreachPartition which will require a udf function.In 
this, you can create a connection to limit the API calls per partition. 

This can work if you introduce logic that checks for the current number of 
partitions & then distribute the max_api_calls per partition.eg: if 
no_of_partitions = 4 and total_max_api_calls = 4, then you can pass in a 
parameter to this udf with max_partition_api_calls = 1. 

This approach has limitations as it requires max allowed api calls to be more 
than that of the number of partitions.
Approach-2
An alternative approach is to create the connection outside of the udf with 
rate limiter(link) and use this connection variable inside of the udf function 
in each partition, invoking time.sleep. This will definitely introduce issues 
where many partitions are trying to invoke the api.
I found this medium-article which discusses the issue you are facing, but does 
not discuss a solution for the same. Do check the comments also 

Regards,Varun


On Sat, Aug 26, 2023 at 10:32 AM Harry Jamison 
 wrote:

I am using python 3.7 and Spark 2.4.7
I am not sure what the best way to do this is.
I have a dataframe with a url in one of the columns, and I want to download the 
contents of that url and put it in a new column.
Can someone point me in the right direction on how to do this?I looked at the 
UDFs and they seem confusing to me.
Also, is there a good way to rate limit the number of calls I make per second?
  

Re: Spark 2.4.7

2023-08-26 Thread Varun Shah
Hi Harry,

Ideally, you should not be fetching a url in your transformation job but do
the API calls separately (outside the cluster if possible). Ingesting data
should be treated separately from transformation / cleaning / join
operations. You can create another dataframe of urls, dedup if required &
store it in a file where your normal python function would ingest the data
for the url & after X amount of api calls, create dataframe for it & union
with previous dataframe, finally writing the content & then doing a join
with the original df based on url, if required.

If this is absolutely necessary, here are a few ways to achieve this:

Approach-1:
You can use the spark's foreachPartition

which will require a udf function.
In this, you can create a connection to limit the API calls per partition.

This can work if you introduce logic that checks for the current number of
partitions & then distribute the max_api_calls per partition.
eg: if no_of_partitions = 4 and total_max_api_calls = 4, then you can pass
in a parameter to this udf with max_partition_api_calls = 1.

This approach has limitations as it requires max allowed api calls to be
more than that of the number of partitions.

Approach-2
An alternative approach is to create the connection outside of the udf with
rate limiter
(link
)
and use this connection variable inside of the udf function in each
partition, invoking time.sleep. This will definitely introduce issues where
many partitions are trying to invoke the api.

I found this medium-article

which discusses the issue you are facing, but does not discuss a solution
for the same. Do check the comments also

Regards,
Varun


On Sat, Aug 26, 2023 at 10:32 AM Harry Jamison
 wrote:

> I am using python 3.7 and Spark 2.4.7
>
> I am not sure what the best way to do this is.
>
> I have a dataframe with a url in one of the columns, and I want to
> download the contents of that url and put it in a new column.
>
> Can someone point me in the right direction on how to do this?
> I looked at the UDFs and they seem confusing to me.
>
> Also, is there a good way to rate limit the number of calls I make per
> second?
>


Unsubscribe

2023-08-26 Thread Ozair Khan
Unsubscribe

Regards,
Ozair Khan


Elasticsearch support for Spark 3.x

2023-08-26 Thread Dipayan Dev
Hi All,

We're using Spark 2.4.x to write dataframe into the Elasticsearch index.
As we're upgrading to Spark 3.3.0, it throwing out error
Caused by: java.lang.ClassNotFoundException: es.DefaultSource
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)

Looking at a few responses from Stackoverflow
. it seems this is not yet supported
by Elasticsearch-hadoop.

Does anyone have experience with this? Or faced/resolved this issue in
Spark 3?

Thanks in advance!

Regards
Dipayan