Hi sir, Could you please help me on the below two cases in the databricks pyspark data processing terabytes of json data read from aws s3 bucket.
case 1: currently I'm reading multiple tables sequentially to get the day count from each table for ex: table_list.csv having one column with multiple table names year=2019 month=12 tablesDF = spark.read.format("csv").option("header",false).load("s3a://bucket//source/table_list.csv") tabList = tablesDF.toPandas().values.tolist() for table in tabList: tab_name = table[0] // Snowflake Settings and snowflake table count() sfOptions = dict( "sfURL" -> "", "sfAccount" -> "", "sfUser" -> "", "sfPassword" -> "", "sfDatabase" -> "", "sfSchema" -> "", "sfWarehouse" -> "", ) // Read data as dataframe sfxdf = spark.read .format("snowflake") .options(**sfOptions) .option("query", "select y as year,m as month,count(*) as sCount from {} where y={} and m={} group by year,month").format(tab_name,year,month) .load() //databricks delta lake dbxDF = spark.sql("select y as year,m as month,count(*) as dCount from db.{} where y={} and m={}" group by year,month).format(tab_name,year,month) resultDF = dbxDF.join(sfxdf, on=['year', 'month'], how='left_outer' ).na.fill(0).withColumn("flag_col", expr("dCount == sCount")) finalDF = resultDF.withColumn("table_name", lit(tab_name)).select("table_name","year","month","dCount","sCount","flag_col") finalDF.coalesce(1).write.format('csv').option('header', 'true').save("s3a://outputs/reportcsv) Question: 1) Instead of sequence based running the count query taking one by one tables ,how to parallel read all the tables from the csv file from s3 and distributed the jobs across the cluster. 2) Could you please how to optimize the above code in the pyspark for parallel processing all the count query at the same time. Case 2 : Multiprocessing case: ------------------------ Could you please help me how to achieve multiprocessing on the above pyspark query to parallel running in the distributed environment. By using below snippets is there any way to achieve the parallel processing pyspark code in the cluster. # Creating a pool of 20 processes. You can set this as per your intended parallelism and your available resources. start = time.time() pool = multiprocessing.Pool(20) # This will execute get_counts() parallel, on each element inside input_paths. # result (a list of dictionary) is constructed when all executions are completed. //result = pool.map(get_counts, input_paths) end = time.time() result_df = pd.DataFrame(result) # You can use, result_df.to_csv() to store the results in a csv. print(result_df) print('Time take : {}'.format(end - start)) -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org