Hi,

another thing we can consider while parallelising connection with the
upstream sources is that it means you are querying the system
simultaneously and that causes usage spikes, and in case  the source system
is facing a lot of requests during production workloads the best time to
parallelise workloads from upstream systems is during their off peak hours.

It also kind of makes sense in a way that you are replicating data when
there are less changes in the upstream system, but sometimes businesses may
need very low latency replication to happen, in such cases using reader
instances (in case you are using AWS RDS), or reading database redo log
files, or their replication to brokers helps.

Each individual partitions are processed independently, in case you have
upsert operations running on the target based on which multiple upsert/
merge operations can  occur the best strategy would be first to
replicate the table in a staging area and then do the upsert/ merge
operation to the target.


Regards,
Gourav Sengupta

On Fri, Jun 7, 2024 at 1:01 AM Perez <flinkbyhe...@gmail.com> wrote:

> Also can I take my lower bound starting from 1 or is it index?
>
> On Thu, Jun 6, 2024 at 8:42 PM Perez <flinkbyhe...@gmail.com> wrote:
>
>> Thanks again Mich. It gives the clear picture but I have again couple of
>> doubts:
>>
>> 1) I know that there will be multiple threads that will be executed with
>> 10 segment sizes each until the upper bound is reached but I didn't get
>> this part of the code exactly segments = [(i, min(i + segment_size,
>> upper_bound)) for i in range(lower_bound, upper_bound, segment_size)]
>>
>> 2) Also performing union on these small dataframes won't impact
>> performance right? since spark has to shuffle and combine less data from
>> these dataframes?
>>
>>
>> On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> well you can dynamically determine the upper bound by first querying the
>>> database to find the maximum value of the partition column and use it as
>>> the upper bound for your partitioning logic.
>>>
>>> def get_max_value(spark, mongo_config, column_name):
>>>     max_value_df =
>>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>>>     max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>>>     return max_value
>>>
>>> # Define your MongoDB config without the bounds first
>>> mongo_config_base = {
>>>     "uri": "mongodb://username:password@host:port/database.collection",
>>>     "partitionColumn": "_id"
>>> }
>>>
>>> # Fetch the dynamic upper bound
>>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>>
>>> # Define your segment size
>>> segment_size = 10
>>> lower_bound = 0
>>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>>> range(lower_bound, upper_bound, segment_size)]
>>>
>>> Then you need to aggregate DF from multiple threads When loading data in
>>> parallel, each thread will load a segment of data into its own DataFrame.
>>> To aggregate all these DataFrames into a single DataFrame, you can use t*he
>>> union method in PySpark.*
>>>
>>> from concurrent.futures import ThreadPoolExecutor, as_completed
>>> from pyspark.sql import SparkSession
>>>
>>> def extract_data_from_mongodb(mongo_config):
>>>     df =
>>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>>>     return df
>>>
>>> # Function to get the maximum value of the partition column
>>> def get_max_value(spark, mongo_config, column_name):
>>>     max_value_df =
>>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>>>     max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>>>     return max_value
>>>
>>> # MongoDB configuration without bounds
>>> mongo_config_base = {
>>>     "uri": "mongodb://username:password@host:port/database.collection",
>>>     "partitionColumn": "_id"
>>> }
>>>
>>> # Initialize Spark session
>>> spark = SparkSession.builder \
>>>     .appName("MongoDBDataLoad") \
>>>     .config("spark.mongodb.input.uri", 
>>> "mongodb://username:password@host:port/database.collection")
>>> \
>>>     .getOrCreate()
>>>
>>> # Fetch the dynamic upper bound
>>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>>
>>> # Define your segment size
>>> segment_size = 10
>>> lower_bound = 0
>>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>>> range(lower_bound, upper_bound, segment_size)]
>>>
>>> # Function to load a segment
>>> def load_segment(segment):
>>>     segment_lower_bound, segment_upper_bound = segment
>>>     mongo_config = mongo_config_base.copy()
>>>     mongo_config["lowerBound"] = str(segment_lower_bound)
>>>     mongo_config["upperBound"] = str(segment_upper_bound)
>>>     return extract_data_from_mongodb(mongo_config)
>>>
>>> # Collect all DataFrames from threads
>>> all_dfs = []
>>>
>>> with ThreadPoolExecutor() as executor:
>>>     futures = [executor.submit(load_segment, segment) for segment in
>>> segments]
>>>     for future in as_completed(futures):
>>>         try:
>>>             df_segment = future.result()
>>>             all_dfs.append(df_segment)
>>>         except Exception as e:
>>>             print(f"Error: {e}")
>>>
>>> # Union all DataFrames into a single DataFrame
>>> if all_dfs:
>>>     final_df = all_dfs[0]
>>>     for df in all_dfs[1:]:
>>>         final_df = final_df.union(df)
>>>
>>> # Proceed with your final DataFrame
>>> final_df.show()
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial
>>> College London <https://en.wikipedia.org/wiki/Imperial_College_London>
>>> London, United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Thu, 6 Jun 2024 at 10:52, Perez <flinkbyhe...@gmail.com> wrote:
>>>
>>>> Thanks, Mich for your response. However, I have multiple doubts as
>>>> below:
>>>>
>>>> 1) I am trying to load the data for the incremental batch so I am not
>>>> sure what would be my upper bound. So what can we do?
>>>> 2) So as each thread loads the desired segment size's data into a
>>>> dataframe if I want to aggregate all the data from all the threads in a
>>>> single dataframe what should I do? Keep on appending in a dataframe as it
>>>> comes?
>>>>
>>>>
>>>> On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Yes, partitioning and parallel loading can significantly improve the
>>>>> performance of data extraction from JDBC sources or databases like 
>>>>> MongoDB.
>>>>> This approach can leverage Spark's distributed computing capabilities,
>>>>> allowing you to load data in parallel, thus speeding up the overall data
>>>>> loading process.
>>>>>
>>>>> When loading data from JDBC sources, specifying partitioning options
>>>>> allows Spark to parallelize the data read operation. Here's how you can do
>>>>> it for a JDBC source:
>>>>>
>>>>> Something like below given the information provided
>>>>>
>>>>> from pyspark.sql import SparkSession
>>>>> from concurrent.futures import ThreadPoolExecutor, as_completed
>>>>>
>>>>> def extract_data_from_mongodb(mongo_config):
>>>>>     df =
>>>>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>>>>>     return df
>>>>>
>>>>> # MongoDB configuration
>>>>> mongo_config_template = {
>>>>>     "uri": "mongodb://username:password@host
>>>>> :port/database.collection",
>>>>>     "partitionColumn": "_id",
>>>>>     "lowerBound": None,
>>>>>     "upperBound": None
>>>>> }
>>>>>
>>>>> lower_bound = 0
>>>>> upper_bound = 200
>>>>> segment_size = 10
>>>>>
>>>>> # Create segments
>>>>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>>>>> range(lower_bound, upper_bound, segment_size)]
>>>>>
>>>>> # Initialize Spark session
>>>>> spark = SparkSession.builder \
>>>>>     .appName("MongoDBDataLoad") \
>>>>>     .config("spark.mongodb.input.uri",
>>>>> "mongodb://username:password@host:port/database.collection") \
>>>>>     .getOrCreate()
>>>>>
>>>>> # Extract data in parallel using ThreadPoolExecutor
>>>>> def load_segment(segment):
>>>>>     segment_lower_bound, segment_upper_bound = segment
>>>>>     mongo_config = mongo_config_template.copy()
>>>>>     mongo_config["lowerBound"] = str(segment_lower_bound)
>>>>>     mongo_config["upperBound"] = str(segment_upper_bound)
>>>>>     return extract_data_from_mongodb(mongo_config)
>>>>>
>>>>> with ThreadPoolExecutor() as executor:
>>>>>     futures = [executor.submit(load_segment, segment) for segment in
>>>>> segments]
>>>>>     for future in as_completed(futures):
>>>>>         try:
>>>>>             df_segment = future.result()
>>>>>             # Process df_segment as needed
>>>>>         except Exception as e:
>>>>>             print(f"Error: {e}")
>>>>>
>>>>>
>>>>> ThreadPoolExecutor enables parallel execution of tasks using multiple
>>>>> threads. Each thread can be responsible for loading a segment of the data.
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>>>> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial
>>>>> College London <https://en.wikipedia.org/wiki/Imperial_College_London>
>>>>>  (voted 2nd best university in the world after MIT
>>>>> https://lnkd.in/eCPt6KTj)
>>>>> London, United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>> expert opinions (Werner
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>>
>>>>>
>>>>> On Thu, 6 Jun 2024 at 00:46, Perez <flinkbyhe...@gmail.com> wrote:
>>>>>
>>>>>> Hello experts,
>>>>>>
>>>>>> I was just wondering if I could leverage the below thing to expedite
>>>>>> the loading of the data process in Spark.
>>>>>>
>>>>>>
>>>>>> def extract_data_from_mongodb(mongo_config): df =
>>>>>> glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
>>>>>> connection_options=mongo_config ) return df
>>>>>>
>>>>>> mongo_config = { "connection.uri": "mongodb://url", "database": "",
>>>>>> "collection": "", "username": "", "password": "", 
>>>>>> "partitionColumn":"_id",
>>>>>> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
>>>>>> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i,
>>>>>> min(i + segment_size, upper_bound)) for i in range(lower_bound,
>>>>>> upper_bound, segment_size)] with ThreadPoolExecutor() as executor: 
>>>>>> futures
>>>>>> = [executor.submit(execution, segment) for segment in segments] for 
>>>>>> future
>>>>>> in as_completed(futures): try: future.result() except Exception as e:
>>>>>> print(f"Error: {e}")
>>>>>>
>>>>>> I am trying to leverage the parallel threads to pull data in
>>>>>> parallel. So is it effective?
>>>>>>
>>>>>

Reply via email to