well you can dynamically determine the upper bound by first querying the database to find the maximum value of the partition column and use it as the upper bound for your partitioning logic.
def get_max_value(spark, mongo_config, column_name): max_value_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load() max_value = max_value_df.agg({column_name: "max"}).collect()[0][0] return max_value # Define your MongoDB config without the bounds first mongo_config_base = { "uri": "mongodb://username:password@host:port/database.collection", "partitionColumn": "_id" } # Fetch the dynamic upper bound upper_bound = get_max_value(spark, mongo_config_base, "_id") # Define your segment size segment_size = 10 lower_bound = 0 segments = [(i, min(i + segment_size, upper_bound)) for i in range(lower_bound, upper_bound, segment_size)] Then you need to aggregate DF from multiple threads When loading data in parallel, each thread will load a segment of data into its own DataFrame. To aggregate all these DataFrames into a single DataFrame, you can use t*he union method in PySpark.* from concurrent.futures import ThreadPoolExecutor, as_completed from pyspark.sql import SparkSession def extract_data_from_mongodb(mongo_config): df = spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load() return df # Function to get the maximum value of the partition column def get_max_value(spark, mongo_config, column_name): max_value_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load() max_value = max_value_df.agg({column_name: "max"}).collect()[0][0] return max_value # MongoDB configuration without bounds mongo_config_base = { "uri": "mongodb://username:password@host:port/database.collection", "partitionColumn": "_id" } # Initialize Spark session spark = SparkSession.builder \ .appName("MongoDBDataLoad") \ .config("spark.mongodb.input.uri", "mongodb://username:password@host:port/database.collection") \ .getOrCreate() # Fetch the dynamic upper bound upper_bound = get_max_value(spark, mongo_config_base, "_id") # Define your segment size segment_size = 10 lower_bound = 0 segments = [(i, min(i + segment_size, upper_bound)) for i in range(lower_bound, upper_bound, segment_size)] # Function to load a segment def load_segment(segment): segment_lower_bound, segment_upper_bound = segment mongo_config = mongo_config_base.copy() mongo_config["lowerBound"] = str(segment_lower_bound) mongo_config["upperBound"] = str(segment_upper_bound) return extract_data_from_mongodb(mongo_config) # Collect all DataFrames from threads all_dfs = [] with ThreadPoolExecutor() as executor: futures = [executor.submit(load_segment, segment) for segment in segments] for future in as_completed(futures): try: df_segment = future.result() all_dfs.append(df_segment) except Exception as e: print(f"Error: {e}") # Union all DataFrames into a single DataFrame if all_dfs: final_df = all_dfs[0] for df in all_dfs[1:]: final_df = final_df.union(df) # Proceed with your final DataFrame final_df.show() HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Thu, 6 Jun 2024 at 10:52, Perez <flinkbyhe...@gmail.com> wrote: > Thanks, Mich for your response. However, I have multiple doubts as below: > > 1) I am trying to load the data for the incremental batch so I am not sure > what would be my upper bound. So what can we do? > 2) So as each thread loads the desired segment size's data into a > dataframe if I want to aggregate all the data from all the threads in a > single dataframe what should I do? Keep on appending in a dataframe as it > comes? > > > On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> Yes, partitioning and parallel loading can significantly improve the >> performance of data extraction from JDBC sources or databases like MongoDB. >> This approach can leverage Spark's distributed computing capabilities, >> allowing you to load data in parallel, thus speeding up the overall data >> loading process. >> >> When loading data from JDBC sources, specifying partitioning options >> allows Spark to parallelize the data read operation. Here's how you can do >> it for a JDBC source: >> >> Something like below given the information provided >> >> from pyspark.sql import SparkSession >> from concurrent.futures import ThreadPoolExecutor, as_completed >> >> def extract_data_from_mongodb(mongo_config): >> df = >> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load() >> return df >> >> # MongoDB configuration >> mongo_config_template = { >> "uri": "mongodb://username:password@host:port/database.collection", >> "partitionColumn": "_id", >> "lowerBound": None, >> "upperBound": None >> } >> >> lower_bound = 0 >> upper_bound = 200 >> segment_size = 10 >> >> # Create segments >> segments = [(i, min(i + segment_size, upper_bound)) for i in >> range(lower_bound, upper_bound, segment_size)] >> >> # Initialize Spark session >> spark = SparkSession.builder \ >> .appName("MongoDBDataLoad") \ >> .config("spark.mongodb.input.uri", >> "mongodb://username:password@host:port/database.collection") >> \ >> .getOrCreate() >> >> # Extract data in parallel using ThreadPoolExecutor >> def load_segment(segment): >> segment_lower_bound, segment_upper_bound = segment >> mongo_config = mongo_config_template.copy() >> mongo_config["lowerBound"] = str(segment_lower_bound) >> mongo_config["upperBound"] = str(segment_upper_bound) >> return extract_data_from_mongodb(mongo_config) >> >> with ThreadPoolExecutor() as executor: >> futures = [executor.submit(load_segment, segment) for segment in >> segments] >> for future in as_completed(futures): >> try: >> df_segment = future.result() >> # Process df_segment as needed >> except Exception as e: >> print(f"Error: {e}") >> >> >> ThreadPoolExecutor enables parallel execution of tasks using multiple >> threads. Each thread can be responsible for loading a segment of the data. >> >> HTH >> >> Mich Talebzadeh, >> Technologist | Architect | Data Engineer | Generative AI | FinCrime >> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial >> College London <https://en.wikipedia.org/wiki/Imperial_College_London> (voted >> 2nd best university in the world after MIT https://lnkd.in/eCPt6KTj) >> London, United Kingdom >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* The information provided is correct to the best of my >> knowledge but of course cannot be guaranteed . It is essential to note >> that, as with any advice, quote "one test result is worth one-thousand >> expert opinions (Werner >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >> >> >> On Thu, 6 Jun 2024 at 00:46, Perez <flinkbyhe...@gmail.com> wrote: >> >>> Hello experts, >>> >>> I was just wondering if I could leverage the below thing to expedite the >>> loading of the data process in Spark. >>> >>> >>> def extract_data_from_mongodb(mongo_config): df = >>> glueContext.create_dynamic_frame.from_options( connection_type="mongodb", >>> connection_options=mongo_config ) return df >>> >>> mongo_config = { "connection.uri": "mongodb://url", "database": "", >>> "collection": "", "username": "", "password": "", "partitionColumn":"_id", >>> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) } >>> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, >>> min(i + segment_size, upper_bound)) for i in range(lower_bound, >>> upper_bound, segment_size)] with ThreadPoolExecutor() as executor: futures >>> = [executor.submit(execution, segment) for segment in segments] for future >>> in as_completed(futures): try: future.result() except Exception as e: >>> print(f"Error: {e}") >>> >>> I am trying to leverage the parallel threads to pull data in parallel. >>> So is it effective? >>> >>