Also can I take my lower bound starting from 1 or is it index? On Thu, Jun 6, 2024 at 8:42 PM Perez <flinkbyhe...@gmail.com> wrote:
> Thanks again Mich. It gives the clear picture but I have again couple of > doubts: > > 1) I know that there will be multiple threads that will be executed with > 10 segment sizes each until the upper bound is reached but I didn't get > this part of the code exactly segments = [(i, min(i + segment_size, > upper_bound)) for i in range(lower_bound, upper_bound, segment_size)] > > 2) Also performing union on these small dataframes won't impact > performance right? since spark has to shuffle and combine less data from > these dataframes? > > > On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> well you can dynamically determine the upper bound by first querying the >> database to find the maximum value of the partition column and use it as >> the upper bound for your partitioning logic. >> >> def get_max_value(spark, mongo_config, column_name): >> max_value_df = >> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load() >> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0] >> return max_value >> >> # Define your MongoDB config without the bounds first >> mongo_config_base = { >> "uri": "mongodb://username:password@host:port/database.collection", >> "partitionColumn": "_id" >> } >> >> # Fetch the dynamic upper bound >> upper_bound = get_max_value(spark, mongo_config_base, "_id") >> >> # Define your segment size >> segment_size = 10 >> lower_bound = 0 >> segments = [(i, min(i + segment_size, upper_bound)) for i in >> range(lower_bound, upper_bound, segment_size)] >> >> Then you need to aggregate DF from multiple threads When loading data in >> parallel, each thread will load a segment of data into its own DataFrame. >> To aggregate all these DataFrames into a single DataFrame, you can use t*he >> union method in PySpark.* >> >> from concurrent.futures import ThreadPoolExecutor, as_completed >> from pyspark.sql import SparkSession >> >> def extract_data_from_mongodb(mongo_config): >> df = >> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load() >> return df >> >> # Function to get the maximum value of the partition column >> def get_max_value(spark, mongo_config, column_name): >> max_value_df = >> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load() >> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0] >> return max_value >> >> # MongoDB configuration without bounds >> mongo_config_base = { >> "uri": "mongodb://username:password@host:port/database.collection", >> "partitionColumn": "_id" >> } >> >> # Initialize Spark session >> spark = SparkSession.builder \ >> .appName("MongoDBDataLoad") \ >> .config("spark.mongodb.input.uri", >> "mongodb://username:password@host:port/database.collection") >> \ >> .getOrCreate() >> >> # Fetch the dynamic upper bound >> upper_bound = get_max_value(spark, mongo_config_base, "_id") >> >> # Define your segment size >> segment_size = 10 >> lower_bound = 0 >> segments = [(i, min(i + segment_size, upper_bound)) for i in >> range(lower_bound, upper_bound, segment_size)] >> >> # Function to load a segment >> def load_segment(segment): >> segment_lower_bound, segment_upper_bound = segment >> mongo_config = mongo_config_base.copy() >> mongo_config["lowerBound"] = str(segment_lower_bound) >> mongo_config["upperBound"] = str(segment_upper_bound) >> return extract_data_from_mongodb(mongo_config) >> >> # Collect all DataFrames from threads >> all_dfs = [] >> >> with ThreadPoolExecutor() as executor: >> futures = [executor.submit(load_segment, segment) for segment in >> segments] >> for future in as_completed(futures): >> try: >> df_segment = future.result() >> all_dfs.append(df_segment) >> except Exception as e: >> print(f"Error: {e}") >> >> # Union all DataFrames into a single DataFrame >> if all_dfs: >> final_df = all_dfs[0] >> for df in all_dfs[1:]: >> final_df = final_df.union(df) >> >> # Proceed with your final DataFrame >> final_df.show() >> >> HTH >> >> Mich Talebzadeh, >> Technologist | Architect | Data Engineer | Generative AI | FinCrime >> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial >> College London <https://en.wikipedia.org/wiki/Imperial_College_London> >> London, United Kingdom >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* The information provided is correct to the best of my >> knowledge but of course cannot be guaranteed . It is essential to note >> that, as with any advice, quote "one test result is worth one-thousand >> expert opinions (Werner >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >> >> >> On Thu, 6 Jun 2024 at 10:52, Perez <flinkbyhe...@gmail.com> wrote: >> >>> Thanks, Mich for your response. However, I have multiple doubts as below: >>> >>> 1) I am trying to load the data for the incremental batch so I am not >>> sure what would be my upper bound. So what can we do? >>> 2) So as each thread loads the desired segment size's data into a >>> dataframe if I want to aggregate all the data from all the threads in a >>> single dataframe what should I do? Keep on appending in a dataframe as it >>> comes? >>> >>> >>> On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Yes, partitioning and parallel loading can significantly improve the >>>> performance of data extraction from JDBC sources or databases like MongoDB. >>>> This approach can leverage Spark's distributed computing capabilities, >>>> allowing you to load data in parallel, thus speeding up the overall data >>>> loading process. >>>> >>>> When loading data from JDBC sources, specifying partitioning options >>>> allows Spark to parallelize the data read operation. Here's how you can do >>>> it for a JDBC source: >>>> >>>> Something like below given the information provided >>>> >>>> from pyspark.sql import SparkSession >>>> from concurrent.futures import ThreadPoolExecutor, as_completed >>>> >>>> def extract_data_from_mongodb(mongo_config): >>>> df = >>>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load() >>>> return df >>>> >>>> # MongoDB configuration >>>> mongo_config_template = { >>>> "uri": "mongodb://username:password@host:port/database.collection", >>>> "partitionColumn": "_id", >>>> "lowerBound": None, >>>> "upperBound": None >>>> } >>>> >>>> lower_bound = 0 >>>> upper_bound = 200 >>>> segment_size = 10 >>>> >>>> # Create segments >>>> segments = [(i, min(i + segment_size, upper_bound)) for i in >>>> range(lower_bound, upper_bound, segment_size)] >>>> >>>> # Initialize Spark session >>>> spark = SparkSession.builder \ >>>> .appName("MongoDBDataLoad") \ >>>> .config("spark.mongodb.input.uri", >>>> "mongodb://username:password@host:port/database.collection") >>>> \ >>>> .getOrCreate() >>>> >>>> # Extract data in parallel using ThreadPoolExecutor >>>> def load_segment(segment): >>>> segment_lower_bound, segment_upper_bound = segment >>>> mongo_config = mongo_config_template.copy() >>>> mongo_config["lowerBound"] = str(segment_lower_bound) >>>> mongo_config["upperBound"] = str(segment_upper_bound) >>>> return extract_data_from_mongodb(mongo_config) >>>> >>>> with ThreadPoolExecutor() as executor: >>>> futures = [executor.submit(load_segment, segment) for segment in >>>> segments] >>>> for future in as_completed(futures): >>>> try: >>>> df_segment = future.result() >>>> # Process df_segment as needed >>>> except Exception as e: >>>> print(f"Error: {e}") >>>> >>>> >>>> ThreadPoolExecutor enables parallel execution of tasks using multiple >>>> threads. Each thread can be responsible for loading a segment of the data. >>>> >>>> HTH >>>> >>>> Mich Talebzadeh, >>>> Technologist | Architect | Data Engineer | Generative AI | FinCrime >>>> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial >>>> College London <https://en.wikipedia.org/wiki/Imperial_College_London> >>>> (voted 2nd best university in the world after MIT >>>> https://lnkd.in/eCPt6KTj) >>>> London, United Kingdom >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>> >>>> >>>> >>>> *Disclaimer:* The information provided is correct to the best of my >>>> knowledge but of course cannot be guaranteed . It is essential to note >>>> that, as with any advice, quote "one test result is worth one-thousand >>>> expert opinions (Werner >>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>>> >>>> >>>> On Thu, 6 Jun 2024 at 00:46, Perez <flinkbyhe...@gmail.com> wrote: >>>> >>>>> Hello experts, >>>>> >>>>> I was just wondering if I could leverage the below thing to expedite >>>>> the loading of the data process in Spark. >>>>> >>>>> >>>>> def extract_data_from_mongodb(mongo_config): df = >>>>> glueContext.create_dynamic_frame.from_options( connection_type="mongodb", >>>>> connection_options=mongo_config ) return df >>>>> >>>>> mongo_config = { "connection.uri": "mongodb://url", "database": "", >>>>> "collection": "", "username": "", "password": "", "partitionColumn":"_id", >>>>> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) } >>>>> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, >>>>> min(i + segment_size, upper_bound)) for i in range(lower_bound, >>>>> upper_bound, segment_size)] with ThreadPoolExecutor() as executor: futures >>>>> = [executor.submit(execution, segment) for segment in segments] for future >>>>> in as_completed(futures): try: future.result() except Exception as e: >>>>> print(f"Error: {e}") >>>>> >>>>> I am trying to leverage the parallel threads to pull data in parallel. >>>>> So is it effective? >>>>> >>>>