Thanks, Mich for your response. However, I have multiple doubts as below: 1) I am trying to load the data for the incremental batch so I am not sure what would be my upper bound. So what can we do? 2) So as each thread loads the desired segment size's data into a dataframe if I want to aggregate all the data from all the threads in a single dataframe what should I do? Keep on appending in a dataframe as it comes?
On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Yes, partitioning and parallel loading can significantly improve the > performance of data extraction from JDBC sources or databases like MongoDB. > This approach can leverage Spark's distributed computing capabilities, > allowing you to load data in parallel, thus speeding up the overall data > loading process. > > When loading data from JDBC sources, specifying partitioning options > allows Spark to parallelize the data read operation. Here's how you can do > it for a JDBC source: > > Something like below given the information provided > > from pyspark.sql import SparkSession > from concurrent.futures import ThreadPoolExecutor, as_completed > > def extract_data_from_mongodb(mongo_config): > df = > spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load() > return df > > # MongoDB configuration > mongo_config_template = { > "uri": "mongodb://username:password@host:port/database.collection", > "partitionColumn": "_id", > "lowerBound": None, > "upperBound": None > } > > lower_bound = 0 > upper_bound = 200 > segment_size = 10 > > # Create segments > segments = [(i, min(i + segment_size, upper_bound)) for i in > range(lower_bound, upper_bound, segment_size)] > > # Initialize Spark session > spark = SparkSession.builder \ > .appName("MongoDBDataLoad") \ > .config("spark.mongodb.input.uri", > "mongodb://username:password@host:port/database.collection") > \ > .getOrCreate() > > # Extract data in parallel using ThreadPoolExecutor > def load_segment(segment): > segment_lower_bound, segment_upper_bound = segment > mongo_config = mongo_config_template.copy() > mongo_config["lowerBound"] = str(segment_lower_bound) > mongo_config["upperBound"] = str(segment_upper_bound) > return extract_data_from_mongodb(mongo_config) > > with ThreadPoolExecutor() as executor: > futures = [executor.submit(load_segment, segment) for segment in > segments] > for future in as_completed(futures): > try: > df_segment = future.result() > # Process df_segment as needed > except Exception as e: > print(f"Error: {e}") > > > ThreadPoolExecutor enables parallel execution of tasks using multiple > threads. Each thread can be responsible for loading a segment of the data. > > HTH > > Mich Talebzadeh, > Technologist | Architect | Data Engineer | Generative AI | FinCrime > PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College > London <https://en.wikipedia.org/wiki/Imperial_College_London> (voted 2nd > best university in the world after MIT https://lnkd.in/eCPt6KTj) > London, United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Thu, 6 Jun 2024 at 00:46, Perez <flinkbyhe...@gmail.com> wrote: > >> Hello experts, >> >> I was just wondering if I could leverage the below thing to expedite the >> loading of the data process in Spark. >> >> >> def extract_data_from_mongodb(mongo_config): df = >> glueContext.create_dynamic_frame.from_options( connection_type="mongodb", >> connection_options=mongo_config ) return df >> >> mongo_config = { "connection.uri": "mongodb://url", "database": "", >> "collection": "", "username": "", "password": "", "partitionColumn":"_id", >> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) } >> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i >> + segment_size, upper_bound)) for i in range(lower_bound, upper_bound, >> segment_size)] with ThreadPoolExecutor() as executor: futures = >> [executor.submit(execution, segment) for segment in segments] for future in >> as_completed(futures): try: future.result() except Exception as e: >> print(f"Error: {e}") >> >> I am trying to leverage the parallel threads to pull data in parallel. So >> is it effective? >> >