Also can I take my lower bound starting from 1 or is it index?

On Thu, Jun 6, 2024 at 8:42 PM Perez <flinkbyhe...@gmail.com> wrote:

> Thanks again Mich. It gives the clear picture but I have again couple of
> doubts:
>
> 1) I know that there will be multiple threads that will be executed with
> 10 segment sizes each until the upper bound is reached but I didn't get
> this part of the code exactly segments = [(i, min(i + segment_size,
> upper_bound)) for i in range(lower_bound, upper_bound, segment_size)]
>
> 2) Also performing union on these small dataframes won't impact
> performance right? since spark has to shuffle and combine less data from
> these dataframes?
>
>
> On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> well you can dynamically determine the upper bound by first querying the
>> database to find the maximum value of the partition column and use it as
>> the upper bound for your partitioning logic.
>>
>> def get_max_value(spark, mongo_config, column_name):
>>     max_value_df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>>     max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>>     return max_value
>>
>> # Define your MongoDB config without the bounds first
>> mongo_config_base = {
>>     "uri": "mongodb://username:password@host:port/database.collection",
>>     "partitionColumn": "_id"
>> }
>>
>> # Fetch the dynamic upper bound
>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>
>> # Define your segment size
>> segment_size = 10
>> lower_bound = 0
>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>> range(lower_bound, upper_bound, segment_size)]
>>
>> Then you need to aggregate DF from multiple threads When loading data in
>> parallel, each thread will load a segment of data into its own DataFrame.
>> To aggregate all these DataFrames into a single DataFrame, you can use t*he
>> union method in PySpark.*
>>
>> from concurrent.futures import ThreadPoolExecutor, as_completed
>> from pyspark.sql import SparkSession
>>
>> def extract_data_from_mongodb(mongo_config):
>>     df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>>     return df
>>
>> # Function to get the maximum value of the partition column
>> def get_max_value(spark, mongo_config, column_name):
>>     max_value_df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>>     max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>>     return max_value
>>
>> # MongoDB configuration without bounds
>> mongo_config_base = {
>>     "uri": "mongodb://username:password@host:port/database.collection",
>>     "partitionColumn": "_id"
>> }
>>
>> # Initialize Spark session
>> spark = SparkSession.builder \
>>     .appName("MongoDBDataLoad") \
>>     .config("spark.mongodb.input.uri", 
>> "mongodb://username:password@host:port/database.collection")
>> \
>>     .getOrCreate()
>>
>> # Fetch the dynamic upper bound
>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>
>> # Define your segment size
>> segment_size = 10
>> lower_bound = 0
>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>> range(lower_bound, upper_bound, segment_size)]
>>
>> # Function to load a segment
>> def load_segment(segment):
>>     segment_lower_bound, segment_upper_bound = segment
>>     mongo_config = mongo_config_base.copy()
>>     mongo_config["lowerBound"] = str(segment_lower_bound)
>>     mongo_config["upperBound"] = str(segment_upper_bound)
>>     return extract_data_from_mongodb(mongo_config)
>>
>> # Collect all DataFrames from threads
>> all_dfs = []
>>
>> with ThreadPoolExecutor() as executor:
>>     futures = [executor.submit(load_segment, segment) for segment in
>> segments]
>>     for future in as_completed(futures):
>>         try:
>>             df_segment = future.result()
>>             all_dfs.append(df_segment)
>>         except Exception as e:
>>             print(f"Error: {e}")
>>
>> # Union all DataFrames into a single DataFrame
>> if all_dfs:
>>     final_df = all_dfs[0]
>>     for df in all_dfs[1:]:
>>         final_df = final_df.union(df)
>>
>> # Proceed with your final DataFrame
>> final_df.show()
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial
>> College London <https://en.wikipedia.org/wiki/Imperial_College_London>
>> London, United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>
>>
>> On Thu, 6 Jun 2024 at 10:52, Perez <flinkbyhe...@gmail.com> wrote:
>>
>>> Thanks, Mich for your response. However, I have multiple doubts as below:
>>>
>>> 1) I am trying to load the data for the incremental batch so I am not
>>> sure what would be my upper bound. So what can we do?
>>> 2) So as each thread loads the desired segment size's data into a
>>> dataframe if I want to aggregate all the data from all the threads in a
>>> single dataframe what should I do? Keep on appending in a dataframe as it
>>> comes?
>>>
>>>
>>> On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Yes, partitioning and parallel loading can significantly improve the
>>>> performance of data extraction from JDBC sources or databases like MongoDB.
>>>> This approach can leverage Spark's distributed computing capabilities,
>>>> allowing you to load data in parallel, thus speeding up the overall data
>>>> loading process.
>>>>
>>>> When loading data from JDBC sources, specifying partitioning options
>>>> allows Spark to parallelize the data read operation. Here's how you can do
>>>> it for a JDBC source:
>>>>
>>>> Something like below given the information provided
>>>>
>>>> from pyspark.sql import SparkSession
>>>> from concurrent.futures import ThreadPoolExecutor, as_completed
>>>>
>>>> def extract_data_from_mongodb(mongo_config):
>>>>     df =
>>>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>>>>     return df
>>>>
>>>> # MongoDB configuration
>>>> mongo_config_template = {
>>>>     "uri": "mongodb://username:password@host:port/database.collection",
>>>>     "partitionColumn": "_id",
>>>>     "lowerBound": None,
>>>>     "upperBound": None
>>>> }
>>>>
>>>> lower_bound = 0
>>>> upper_bound = 200
>>>> segment_size = 10
>>>>
>>>> # Create segments
>>>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>>>> range(lower_bound, upper_bound, segment_size)]
>>>>
>>>> # Initialize Spark session
>>>> spark = SparkSession.builder \
>>>>     .appName("MongoDBDataLoad") \
>>>>     .config("spark.mongodb.input.uri", 
>>>> "mongodb://username:password@host:port/database.collection")
>>>> \
>>>>     .getOrCreate()
>>>>
>>>> # Extract data in parallel using ThreadPoolExecutor
>>>> def load_segment(segment):
>>>>     segment_lower_bound, segment_upper_bound = segment
>>>>     mongo_config = mongo_config_template.copy()
>>>>     mongo_config["lowerBound"] = str(segment_lower_bound)
>>>>     mongo_config["upperBound"] = str(segment_upper_bound)
>>>>     return extract_data_from_mongodb(mongo_config)
>>>>
>>>> with ThreadPoolExecutor() as executor:
>>>>     futures = [executor.submit(load_segment, segment) for segment in
>>>> segments]
>>>>     for future in as_completed(futures):
>>>>         try:
>>>>             df_segment = future.result()
>>>>             # Process df_segment as needed
>>>>         except Exception as e:
>>>>             print(f"Error: {e}")
>>>>
>>>>
>>>> ThreadPoolExecutor enables parallel execution of tasks using multiple
>>>> threads. Each thread can be responsible for loading a segment of the data.
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>>> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial
>>>> College London <https://en.wikipedia.org/wiki/Imperial_College_London>
>>>>  (voted 2nd best university in the world after MIT
>>>> https://lnkd.in/eCPt6KTj)
>>>> London, United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* The information provided is correct to the best of my
>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>> expert opinions (Werner
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>
>>>>
>>>> On Thu, 6 Jun 2024 at 00:46, Perez <flinkbyhe...@gmail.com> wrote:
>>>>
>>>>> Hello experts,
>>>>>
>>>>> I was just wondering if I could leverage the below thing to expedite
>>>>> the loading of the data process in Spark.
>>>>>
>>>>>
>>>>> def extract_data_from_mongodb(mongo_config): df =
>>>>> glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
>>>>> connection_options=mongo_config ) return df
>>>>>
>>>>> mongo_config = { "connection.uri": "mongodb://url", "database": "",
>>>>> "collection": "", "username": "", "password": "", "partitionColumn":"_id",
>>>>> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
>>>>> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i,
>>>>> min(i + segment_size, upper_bound)) for i in range(lower_bound,
>>>>> upper_bound, segment_size)] with ThreadPoolExecutor() as executor: futures
>>>>> = [executor.submit(execution, segment) for segment in segments] for future
>>>>> in as_completed(futures): try: future.result() except Exception as e:
>>>>> print(f"Error: {e}")
>>>>>
>>>>> I am trying to leverage the parallel threads to pull data in parallel.
>>>>> So is it effective?
>>>>>
>>>>

Reply via email to