Re: Kubernetes cluster: change log4j configuration using uploaded `--files`

2024-06-06 Thread Mich Talebzadeh
The issue you are encountering is due to the order of operations when Spark
initializes the JVM for driver and executor pods. The JVM options
(-Dlog4j2.configurationFile) are evaluated when the JVM starts, but the
--files option copies the files after the JVM has already started. Hence,
the log4j configuration file is not found at the time the JVM is looking
for it.

In summary, you need to ensure the file is in place before the Spark driver
or executor JVM starts.

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 6 Jun 2024 at 17:04, Jennifer Wirth  wrote:

> Hi,
>
> I am trying to change the log4j configuration for jobs submitted to a k8s
> cluster (using submit)
>
> The my-log4j.xml is uploaded using --files ,./my-log4j.xml and the file
> in the working directory of the driver/exec pods.
>
> I added D-flags using the extra java options (and tried many different
> URIs, absolute, with and without file:.
>
> --conf spark.driver.extraJavaOptions="-Dlog4j2.debug=false 
> -Dlog4j2.configurationFile=file:./my-log4j.xml" \
> --conf spark.executor.extraJavaOptions="-Dlog4j2.debug=false 
> -Dlog4j2.configurationFile=file:./my-log4j.xml" \
>
> When debugging i notice that log4j is not able to load my configuration
> file. I see the following additional log entries:
>
> ERROR StatusLogger Reconfiguration failed: No configuration found for 
> '4a87761d' at 'null' in 'null'
> ERROR StatusLogger Reconfiguration failed: No configuration found for 
> 'Default' at 'null' in 'null'
> 24/06/06 09:20:44 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> Files 
> file:///tmp/mk2/spark-upload-36b1f43d-1878-4b06-be5e-e25b703f28d5/orbit-movements.csv
>  from 
> /tmp/mk2/spark-upload-36b1f43d-1878-4b06-be5e-e25b703f28d5/orbit-movements.csv
>  to /opt/spark/work-dir/orbit-movements.csv
> Files 
> file:///tmp/mk2/spark-upload-cc211704-f481-4ebe-b6f0-5dbe66a7c639/my-log4j.xml
>  from /tmp/mk2/spark-upload-cc211704-f481-4ebe-b6f0-5dbe66a7c639/my-log4j.xml 
> to /opt/spark/work-dir/my-log4j.xml
> Files 
> file:///tmp/mk2/spark-upload-7970b482-7669-49aa-9f88-65191a83a18a/out.jar 
> from /tmp/mk2/spark-upload-7970b482-7669-49aa-9f88-65191a83a18a/out.jar to 
> /opt/spark/work-dir/out.jar
>
> The lines starting with Files in the logs of the Driver process, makes me
> wonder if the copying of files from my shared mount to the working
> directory happens in that process and is not something that happens before
> the java process launches. Is that assumption correct, as it would explain
> why my log4j config files are not found at JVM launch.
>
> If so, what is the recommended way to change the logging config *per job*
> when running spark in k8s (i am not using a custom container image, so
> can’t place it in there)
>
> tx.,
>


7368396 - Apache Spark 3.5.1 (Support)

2024-06-06 Thread SANTOS SOUZA, ALEX
Hey guys!



I am part of the team responsible for software approval at EMBRAER S.A.
We are currently in the process of approving the Apache Spark 3.5.1 software 
and are verifying the licensing of the application.
Therefore, I would like to kindly request you to answer the questions below.

-What type of software? (Commercial, Freeware, Component, etc...)
 A:

-What is the licensing model for commercial use? (Subscription, Perpetual, GPL, 
etc...)
A:

-What type of license? (By user, Competitor, Device, Server or others)?
A:

-Number of installations allowed per license/subscription?
A:

Can it be used in the defense and aerospace industry? (Company that 
manufactures products for national defense)
A:

-Does the license allow use in any location regardless of the origin of the 
purchase (tax restriction)?
A:

-Where can I find the End User License Agreement (EULA) for the version in 
question?
A:



Desde já, muito obrigado e qualquer dúvida estou à disposição. / Thank you very 
much in advance and I am at your disposal if you have any questions.


Att,

[cid:babbaea5-d892-4b6e-abd9-d0da0cc3e296]

Alex Santos Souza

Software Asset Management - Embraer

WhatsApp: +55 12 99731-7579

E-mail: alex.santosso...@dxc.com

DXC Technology

São José dos Campos, SP - Brazil



Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Perez
Also can I take my lower bound starting from 1 or is it index?

On Thu, Jun 6, 2024 at 8:42 PM Perez  wrote:

> Thanks again Mich. It gives the clear picture but I have again couple of
> doubts:
>
> 1) I know that there will be multiple threads that will be executed with
> 10 segment sizes each until the upper bound is reached but I didn't get
> this part of the code exactly segments = [(i, min(i + segment_size,
> upper_bound)) for i in range(lower_bound, upper_bound, segment_size)]
>
> 2) Also performing union on these small dataframes won't impact
> performance right? since spark has to shuffle and combine less data from
> these dataframes?
>
>
> On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh 
> wrote:
>
>> well you can dynamically determine the upper bound by first querying the
>> database to find the maximum value of the partition column and use it as
>> the upper bound for your partitioning logic.
>>
>> def get_max_value(spark, mongo_config, column_name):
>> max_value_df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>> return max_value
>>
>> # Define your MongoDB config without the bounds first
>> mongo_config_base = {
>> "uri": "mongodb://username:password@host:port/database.collection",
>> "partitionColumn": "_id"
>> }
>>
>> # Fetch the dynamic upper bound
>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>
>> # Define your segment size
>> segment_size = 10
>> lower_bound = 0
>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>> range(lower_bound, upper_bound, segment_size)]
>>
>> Then you need to aggregate DF from multiple threads When loading data in
>> parallel, each thread will load a segment of data into its own DataFrame.
>> To aggregate all these DataFrames into a single DataFrame, you can use t*he
>> union method in PySpark.*
>>
>> from concurrent.futures import ThreadPoolExecutor, as_completed
>> from pyspark.sql import SparkSession
>>
>> def extract_data_from_mongodb(mongo_config):
>> df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>> return df
>>
>> # Function to get the maximum value of the partition column
>> def get_max_value(spark, mongo_config, column_name):
>> max_value_df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>> return max_value
>>
>> # MongoDB configuration without bounds
>> mongo_config_base = {
>> "uri": "mongodb://username:password@host:port/database.collection",
>> "partitionColumn": "_id"
>> }
>>
>> # Initialize Spark session
>> spark = SparkSession.builder \
>> .appName("MongoDBDataLoad") \
>> .config("spark.mongodb.input.uri", 
>> "mongodb://username:password@host:port/database.collection")
>> \
>> .getOrCreate()
>>
>> # Fetch the dynamic upper bound
>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>
>> # Define your segment size
>> segment_size = 10
>> lower_bound = 0
>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>> range(lower_bound, upper_bound, segment_size)]
>>
>> # Function to load a segment
>> def load_segment(segment):
>> segment_lower_bound, segment_upper_bound = segment
>> mongo_config = mongo_config_base.copy()
>> mongo_config["lowerBound"] = str(segment_lower_bound)
>> mongo_config["upperBound"] = str(segment_upper_bound)
>> return extract_data_from_mongodb(mongo_config)
>>
>> # Collect all DataFrames from threads
>> all_dfs = []
>>
>> with ThreadPoolExecutor() as executor:
>> futures = [executor.submit(load_segment, segment) for segment in
>> segments]
>> for future in as_completed(futures):
>> try:
>> df_segment = future.result()
>> all_dfs.append(df_segment)
>> except Exception as e:
>> print(f"Error: {e}")
>>
>> # Union all DataFrames into a single DataFrame
>> if all_dfs:
>> final_df = all_dfs[0]
>> for df in all_dfs[1:]:
>> final_df = final_df.union(df)
>>
>> # Proceed with your final DataFrame
>> final_df.show()
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> PhD  Imperial
>> College London 
>> London, United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> 

Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Perez
Thanks again Mich. It gives the clear picture but I have again couple of
doubts:

1) I know that there will be multiple threads that will be executed with 10
segment sizes each until the upper bound is reached but I didn't get this
part of the code exactly segments = [(i, min(i + segment_size,
upper_bound)) for i in range(lower_bound, upper_bound, segment_size)]

2) Also performing union on these small dataframes won't impact performance
right? since spark has to shuffle and combine less data from these
dataframes?


On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh 
wrote:

> well you can dynamically determine the upper bound by first querying the
> database to find the maximum value of the partition column and use it as
> the upper bound for your partitioning logic.
>
> def get_max_value(spark, mongo_config, column_name):
> max_value_df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
> return max_value
>
> # Define your MongoDB config without the bounds first
> mongo_config_base = {
> "uri": "mongodb://username:password@host:port/database.collection",
> "partitionColumn": "_id"
> }
>
> # Fetch the dynamic upper bound
> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>
> # Define your segment size
> segment_size = 10
> lower_bound = 0
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> Then you need to aggregate DF from multiple threads When loading data in
> parallel, each thread will load a segment of data into its own DataFrame.
> To aggregate all these DataFrames into a single DataFrame, you can use t*he
> union method in PySpark.*
>
> from concurrent.futures import ThreadPoolExecutor, as_completed
> from pyspark.sql import SparkSession
>
> def extract_data_from_mongodb(mongo_config):
> df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> return df
>
> # Function to get the maximum value of the partition column
> def get_max_value(spark, mongo_config, column_name):
> max_value_df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
> return max_value
>
> # MongoDB configuration without bounds
> mongo_config_base = {
> "uri": "mongodb://username:password@host:port/database.collection",
> "partitionColumn": "_id"
> }
>
> # Initialize Spark session
> spark = SparkSession.builder \
> .appName("MongoDBDataLoad") \
> .config("spark.mongodb.input.uri", 
> "mongodb://username:password@host:port/database.collection")
> \
> .getOrCreate()
>
> # Fetch the dynamic upper bound
> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>
> # Define your segment size
> segment_size = 10
> lower_bound = 0
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> # Function to load a segment
> def load_segment(segment):
> segment_lower_bound, segment_upper_bound = segment
> mongo_config = mongo_config_base.copy()
> mongo_config["lowerBound"] = str(segment_lower_bound)
> mongo_config["upperBound"] = str(segment_upper_bound)
> return extract_data_from_mongodb(mongo_config)
>
> # Collect all DataFrames from threads
> all_dfs = []
>
> with ThreadPoolExecutor() as executor:
> futures = [executor.submit(load_segment, segment) for segment in
> segments]
> for future in as_completed(futures):
> try:
> df_segment = future.result()
> all_dfs.append(df_segment)
> except Exception as e:
> print(f"Error: {e}")
>
> # Union all DataFrames into a single DataFrame
> if all_dfs:
> final_df = all_dfs[0]
> for df in all_dfs[1:]:
> final_df = final_df.union(df)
>
> # Proceed with your final DataFrame
> final_df.show()
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London 
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 6 Jun 2024 at 10:52, Perez  wrote:
>
>> Thanks, Mich for your response. However, I have multiple doubts as below:
>>
>> 1) I am trying to load the data for the incremental batch so I am not
>> sure what 

Re: [SPARK-48463] Mllib Feature transformer failing with nested dataset (Dot notation)

2024-06-06 Thread Someshwar Kale
As a fix, you may consider adding a transformer to rename columns (perhaps
replace all columns with dot to underscore) and use the renamed columns in
your pipeline as below-

val renameColumn = new
RenameColumn().setInputCol("location.longitude").setOutputCol("location_longitude")
val si = new 
StringIndexer().setInputCol("location_longitude").setOutputCol("longitutdee")
val pipeline = new Pipeline().setStages(Array(renameColumn, si))
pipeline.fit(flattenedDf).transform(flattenedDf).show()


refer my comment

for
elaboration.
Thanks!!

*Regards,*
*Someshwar Kale*





On Thu, Jun 6, 2024 at 3:24 AM Chhavi Bansal 
wrote:

> Hello team
> I was exploring feature transformation exposed via Mllib on nested
> dataset, and encountered an error while applying any transformer to a
> column with dot notation naming. I thought of raising a ticket on spark
> https://issues.apache.org/jira/browse/SPARK-48463, where I have mentioned
> the entire scenario.
>
> I wanted to get suggestions on what would be the best way to solve the
> problem while using the dot notation. One workaround is to use`_` while
> flattening the dataframe, but that would mean having an additional overhead
> to convert back to `.` (dot notation ) since that’s the convention for our
> other flattened data.
>
> I would be happy to make a contribution to the code if someone can shed
> some light on how this could be solved.
>
>
>
> --
> Thanks and Regards,
> Chhavi Bansal
>


Kubernetes cluster: change log4j configuration using uploaded `--files`

2024-06-06 Thread Jennifer Wirth
Hi,

I am trying to change the log4j configuration for jobs submitted to a k8s
cluster (using submit)

The my-log4j.xml is uploaded using --files ,./my-log4j.xml and the file in
the working directory of the driver/exec pods.

I added D-flags using the extra java options (and tried many different
URIs, absolute, with and without file:.

--conf spark.driver.extraJavaOptions="-Dlog4j2.debug=false
-Dlog4j2.configurationFile=file:./my-log4j.xml" \
--conf spark.executor.extraJavaOptions="-Dlog4j2.debug=false
-Dlog4j2.configurationFile=file:./my-log4j.xml" \

When debugging i notice that log4j is not able to load my configuration
file. I see the following additional log entries:

ERROR StatusLogger Reconfiguration failed: No configuration found for
'4a87761d' at 'null' in 'null'
ERROR StatusLogger Reconfiguration failed: No configuration found for
'Default' at 'null' in 'null'
24/06/06 09:20:44 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable
Files 
file:///tmp/mk2/spark-upload-36b1f43d-1878-4b06-be5e-e25b703f28d5/orbit-movements.csv
from 
/tmp/mk2/spark-upload-36b1f43d-1878-4b06-be5e-e25b703f28d5/orbit-movements.csv
to /opt/spark/work-dir/orbit-movements.csv
Files 
file:///tmp/mk2/spark-upload-cc211704-f481-4ebe-b6f0-5dbe66a7c639/my-log4j.xml
from /tmp/mk2/spark-upload-cc211704-f481-4ebe-b6f0-5dbe66a7c639/my-log4j.xml
to /opt/spark/work-dir/my-log4j.xml
Files file:///tmp/mk2/spark-upload-7970b482-7669-49aa-9f88-65191a83a18a/out.jar
from /tmp/mk2/spark-upload-7970b482-7669-49aa-9f88-65191a83a18a/out.jar
to /opt/spark/work-dir/out.jar

The lines starting with Files in the logs of the Driver process, makes me
wonder if the copying of files from my shared mount to the working
directory happens in that process and is not something that happens before
the java process launches. Is that assumption correct, as it would explain
why my log4j config files are not found at JVM launch.

If so, what is the recommended way to change the logging config *per job*
when running spark in k8s (i am not using a custom container image, so
can’t place it in there)

tx.,


Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Mich Talebzadeh
well you can dynamically determine the upper bound by first querying the
database to find the maximum value of the partition column and use it as
the upper bound for your partitioning logic.

def get_max_value(spark, mongo_config, column_name):
max_value_df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
return max_value

# Define your MongoDB config without the bounds first
mongo_config_base = {
"uri": "mongodb://username:password@host:port/database.collection",
"partitionColumn": "_id"
}

# Fetch the dynamic upper bound
upper_bound = get_max_value(spark, mongo_config_base, "_id")

# Define your segment size
segment_size = 10
lower_bound = 0
segments = [(i, min(i + segment_size, upper_bound)) for i in
range(lower_bound, upper_bound, segment_size)]

Then you need to aggregate DF from multiple threads When loading data in
parallel, each thread will load a segment of data into its own DataFrame.
To aggregate all these DataFrames into a single DataFrame, you can use t*he
union method in PySpark.*

from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import SparkSession

def extract_data_from_mongodb(mongo_config):
df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
return df

# Function to get the maximum value of the partition column
def get_max_value(spark, mongo_config, column_name):
max_value_df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
return max_value

# MongoDB configuration without bounds
mongo_config_base = {
"uri": "mongodb://username:password@host:port/database.collection",
"partitionColumn": "_id"
}

# Initialize Spark session
spark = SparkSession.builder \
.appName("MongoDBDataLoad") \
.config("spark.mongodb.input.uri",
"mongodb://username:password@host:port/database.collection")
\
.getOrCreate()

# Fetch the dynamic upper bound
upper_bound = get_max_value(spark, mongo_config_base, "_id")

# Define your segment size
segment_size = 10
lower_bound = 0
segments = [(i, min(i + segment_size, upper_bound)) for i in
range(lower_bound, upper_bound, segment_size)]

# Function to load a segment
def load_segment(segment):
segment_lower_bound, segment_upper_bound = segment
mongo_config = mongo_config_base.copy()
mongo_config["lowerBound"] = str(segment_lower_bound)
mongo_config["upperBound"] = str(segment_upper_bound)
return extract_data_from_mongodb(mongo_config)

# Collect all DataFrames from threads
all_dfs = []

with ThreadPoolExecutor() as executor:
futures = [executor.submit(load_segment, segment) for segment in
segments]
for future in as_completed(futures):
try:
df_segment = future.result()
all_dfs.append(df_segment)
except Exception as e:
print(f"Error: {e}")

# Union all DataFrames into a single DataFrame
if all_dfs:
final_df = all_dfs[0]
for df in all_dfs[1:]:
final_df = final_df.union(df)

# Proceed with your final DataFrame
final_df.show()

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 6 Jun 2024 at 10:52, Perez  wrote:

> Thanks, Mich for your response. However, I have multiple doubts as below:
>
> 1) I am trying to load the data for the incremental batch so I am not sure
> what would be my upper bound. So what can we do?
> 2) So as each thread loads the desired segment size's data into a
> dataframe if I want to aggregate all the data from all the threads in a
> single dataframe what should I do? Keep on appending in a dataframe as it
> comes?
>
>
> On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh 
> wrote:
>
>> Yes, partitioning and parallel loading can significantly improve the
>> performance of data extraction from JDBC sources or databases like MongoDB.
>> This approach can leverage Spark's distributed computing capabilities,
>> allowing you to load data in parallel, thus speeding up the overall data
>> loading process.
>>
>> When loading data from JDBC sources, specifying partitioning options
>> allows Spark to parallelize the data read operation. Here's how you 

Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Perez
Thanks, Mich for your response. However, I have multiple doubts as below:

1) I am trying to load the data for the incremental batch so I am not sure
what would be my upper bound. So what can we do?
2) So as each thread loads the desired segment size's data into a dataframe
if I want to aggregate all the data from all the threads in a single
dataframe what should I do? Keep on appending in a dataframe as it comes?


On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh 
wrote:

> Yes, partitioning and parallel loading can significantly improve the
> performance of data extraction from JDBC sources or databases like MongoDB.
> This approach can leverage Spark's distributed computing capabilities,
> allowing you to load data in parallel, thus speeding up the overall data
> loading process.
>
> When loading data from JDBC sources, specifying partitioning options
> allows Spark to parallelize the data read operation. Here's how you can do
> it for a JDBC source:
>
> Something like below given the information provided
>
> from pyspark.sql import SparkSession
> from concurrent.futures import ThreadPoolExecutor, as_completed
>
> def extract_data_from_mongodb(mongo_config):
> df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> return df
>
> # MongoDB configuration
> mongo_config_template = {
> "uri": "mongodb://username:password@host:port/database.collection",
> "partitionColumn": "_id",
> "lowerBound": None,
> "upperBound": None
> }
>
> lower_bound = 0
> upper_bound = 200
> segment_size = 10
>
> # Create segments
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> # Initialize Spark session
> spark = SparkSession.builder \
> .appName("MongoDBDataLoad") \
> .config("spark.mongodb.input.uri", 
> "mongodb://username:password@host:port/database.collection")
> \
> .getOrCreate()
>
> # Extract data in parallel using ThreadPoolExecutor
> def load_segment(segment):
> segment_lower_bound, segment_upper_bound = segment
> mongo_config = mongo_config_template.copy()
> mongo_config["lowerBound"] = str(segment_lower_bound)
> mongo_config["upperBound"] = str(segment_upper_bound)
> return extract_data_from_mongodb(mongo_config)
>
> with ThreadPoolExecutor() as executor:
> futures = [executor.submit(load_segment, segment) for segment in
> segments]
> for future in as_completed(futures):
> try:
> df_segment = future.result()
> # Process df_segment as needed
> except Exception as e:
> print(f"Error: {e}")
>
>
> ThreadPoolExecutor enables parallel execution of tasks using multiple
> threads. Each thread can be responsible for loading a segment of the data.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London  (voted 2nd
> best university in the world after MIT https://lnkd.in/eCPt6KTj)
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 6 Jun 2024 at 00:46, Perez  wrote:
>
>> Hello experts,
>>
>> I was just wondering if I could leverage the below thing to expedite the
>> loading of the data process in Spark.
>>
>>
>> def extract_data_from_mongodb(mongo_config): df =
>> glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
>> connection_options=mongo_config ) return df
>>
>> mongo_config = { "connection.uri": "mongodb://url", "database": "",
>> "collection": "", "username": "", "password": "", "partitionColumn":"_id",
>> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
>> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i
>> + segment_size, upper_bound)) for i in range(lower_bound, upper_bound,
>> segment_size)] with ThreadPoolExecutor() as executor: futures =
>> [executor.submit(execution, segment) for segment in segments] for future in
>> as_completed(futures): try: future.result() except Exception as e:
>> print(f"Error: {e}")
>>
>> I am trying to leverage the parallel threads to pull data in parallel. So
>> is it effective?
>>
>


Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Mich Talebzadeh
Yes, partitioning and parallel loading can significantly improve the
performance of data extraction from JDBC sources or databases like MongoDB.
This approach can leverage Spark's distributed computing capabilities,
allowing you to load data in parallel, thus speeding up the overall data
loading process.

When loading data from JDBC sources, specifying partitioning options allows
Spark to parallelize the data read operation. Here's how you can do it for
a JDBC source:

Something like below given the information provided

from pyspark.sql import SparkSession
from concurrent.futures import ThreadPoolExecutor, as_completed

def extract_data_from_mongodb(mongo_config):
df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
return df

# MongoDB configuration
mongo_config_template = {
"uri": "mongodb://username:password@host:port/database.collection",
"partitionColumn": "_id",
"lowerBound": None,
"upperBound": None
}

lower_bound = 0
upper_bound = 200
segment_size = 10

# Create segments
segments = [(i, min(i + segment_size, upper_bound)) for i in
range(lower_bound, upper_bound, segment_size)]

# Initialize Spark session
spark = SparkSession.builder \
.appName("MongoDBDataLoad") \
.config("spark.mongodb.input.uri",
"mongodb://username:password@host:port/database.collection")
\
.getOrCreate()

# Extract data in parallel using ThreadPoolExecutor
def load_segment(segment):
segment_lower_bound, segment_upper_bound = segment
mongo_config = mongo_config_template.copy()
mongo_config["lowerBound"] = str(segment_lower_bound)
mongo_config["upperBound"] = str(segment_upper_bound)
return extract_data_from_mongodb(mongo_config)

with ThreadPoolExecutor() as executor:
futures = [executor.submit(load_segment, segment) for segment in
segments]
for future in as_completed(futures):
try:
df_segment = future.result()
# Process df_segment as needed
except Exception as e:
print(f"Error: {e}")


ThreadPoolExecutor enables parallel execution of tasks using multiple
threads. Each thread can be responsible for loading a segment of the data.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London  (voted 2nd
best university in the world after MIT https://lnkd.in/eCPt6KTj)
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 6 Jun 2024 at 00:46, Perez  wrote:

> Hello experts,
>
> I was just wondering if I could leverage the below thing to expedite the
> loading of the data process in Spark.
>
>
> def extract_data_from_mongodb(mongo_config): df =
> glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
> connection_options=mongo_config ) return df
>
> mongo_config = { "connection.uri": "mongodb://url", "database": "",
> "collection": "", "username": "", "password": "", "partitionColumn":"_id",
> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i
> + segment_size, upper_bound)) for i in range(lower_bound, upper_bound,
> segment_size)] with ThreadPoolExecutor() as executor: futures =
> [executor.submit(execution, segment) for segment in segments] for future in
> as_completed(futures): try: future.result() except Exception as e:
> print(f"Error: {e}")
>
> I am trying to leverage the parallel threads to pull data in parallel. So
> is it effective?
>