Thanks, Mich for your response. However, I have multiple doubts as below:

1) I am trying to load the data for the incremental batch so I am not sure
what would be my upper bound. So what can we do?
2) So as each thread loads the desired segment size's data into a dataframe
if I want to aggregate all the data from all the threads in a single
dataframe what should I do? Keep on appending in a dataframe as it comes?


On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Yes, partitioning and parallel loading can significantly improve the
> performance of data extraction from JDBC sources or databases like MongoDB.
> This approach can leverage Spark's distributed computing capabilities,
> allowing you to load data in parallel, thus speeding up the overall data
> loading process.
>
> When loading data from JDBC sources, specifying partitioning options
> allows Spark to parallelize the data read operation. Here's how you can do
> it for a JDBC source:
>
> Something like below given the information provided
>
> from pyspark.sql import SparkSession
> from concurrent.futures import ThreadPoolExecutor, as_completed
>
> def extract_data_from_mongodb(mongo_config):
>     df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>     return df
>
> # MongoDB configuration
> mongo_config_template = {
>     "uri": "mongodb://username:password@host:port/database.collection",
>     "partitionColumn": "_id",
>     "lowerBound": None,
>     "upperBound": None
> }
>
> lower_bound = 0
> upper_bound = 200
> segment_size = 10
>
> # Create segments
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> # Initialize Spark session
> spark = SparkSession.builder \
>     .appName("MongoDBDataLoad") \
>     .config("spark.mongodb.input.uri", 
> "mongodb://username:password@host:port/database.collection")
> \
>     .getOrCreate()
>
> # Extract data in parallel using ThreadPoolExecutor
> def load_segment(segment):
>     segment_lower_bound, segment_upper_bound = segment
>     mongo_config = mongo_config_template.copy()
>     mongo_config["lowerBound"] = str(segment_lower_bound)
>     mongo_config["upperBound"] = str(segment_upper_bound)
>     return extract_data_from_mongodb(mongo_config)
>
> with ThreadPoolExecutor() as executor:
>     futures = [executor.submit(load_segment, segment) for segment in
> segments]
>     for future in as_completed(futures):
>         try:
>             df_segment = future.result()
>             # Process df_segment as needed
>         except Exception as e:
>             print(f"Error: {e}")
>
>
> ThreadPoolExecutor enables parallel execution of tasks using multiple
> threads. Each thread can be responsible for loading a segment of the data.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
> London <https://en.wikipedia.org/wiki/Imperial_College_London> (voted 2nd
> best university in the world after MIT https://lnkd.in/eCPt6KTj)
> London, United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Thu, 6 Jun 2024 at 00:46, Perez <flinkbyhe...@gmail.com> wrote:
>
>> Hello experts,
>>
>> I was just wondering if I could leverage the below thing to expedite the
>> loading of the data process in Spark.
>>
>>
>> def extract_data_from_mongodb(mongo_config): df =
>> glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
>> connection_options=mongo_config ) return df
>>
>> mongo_config = { "connection.uri": "mongodb://url", "database": "",
>> "collection": "", "username": "", "password": "", "partitionColumn":"_id",
>> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
>> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i
>> + segment_size, upper_bound)) for i in range(lower_bound, upper_bound,
>> segment_size)] with ThreadPoolExecutor() as executor: futures =
>> [executor.submit(execution, segment) for segment in segments] for future in
>> as_completed(futures): try: future.result() except Exception as e:
>> print(f"Error: {e}")
>>
>> I am trying to leverage the parallel threads to pull data in parallel. So
>> is it effective?
>>
>

Reply via email to