Hi sir,

Could you please help me on the below two cases in the databricks pyspark
data processing terabytes of json data read from aws s3 bucket.

case 1:

currently I'm reading multiple tables sequentially to get the day count 
from each table

for ex: table_list.csv having one column with multiple table names

year=2019
month=12

tablesDF =
spark.read.format("csv").option("header",false).load("s3a://bucket//source/table_list.csv")
tabList = tablesDF.toPandas().values.tolist()
for table in tabList:
tab_name = table[0]

 // Snowflake Settings and snowflake  table count()
 
    sfOptions = dict(
      "sfURL" -> "",
      "sfAccount" -> "",
      "sfUser" -> "",
      "sfPassword" -> "",
      "sfDatabase" -> "",
      "sfSchema" -> "",
      "sfWarehouse" -> "",
    )
    
    // Read data as dataframe
        
    sfxdf = spark.read
      .format("snowflake")
      .options(**sfOptions)
      .option("query", "select y as year,m as month,count(*) as sCount from
{} where y={} and m={} group by year,month").format(tab_name,year,month)
      .load()
          
//databricks delta lake
          
         dbxDF = spark.sql("select y as year,m as month,count(*) as dCount from
db.{} where y={} and m={}" group by year,month).format(tab_name,year,month) 
          
    resultDF = dbxDF.join(sfxdf, on=['year', 'month'], how='left_outer'
).na.fill(0).withColumn("flag_col", expr("dCount == sCount"))
  
        finalDF = resultDF.withColumn("table_name",
lit(tab_name)).select("table_name","year","month","dCount","sCount","flag_col")
        
        
        finalDF.coalesce(1).write.format('csv').option('header',
'true').save("s3a://outputs/reportcsv)
        
        Question:
        
        1) Instead of sequence based running the count query taking one by one
tables ,how to parallel read all the tables from the csv file from s3 and 
        distributed the jobs across the cluster.
        
        2) Could you please how to optimize the above code in the pyspark for
parallel processing all the count query at the same time.
        


Case 2 :
        
Multiprocessing case:
  ------------------------
        
        Could you please help me how to achieve multiprocessing on the above
pyspark query to parallel running in the distributed environment.
        
        By using below snippets is there any way to achieve the parallel 
processing
pyspark code in the cluster.
        
        # Creating a pool of 20 processes. You can set this as per your intended
parallelism and your available resources.
        
    


   start = time.time()
    pool = multiprocessing.Pool(20)
    # This will execute get_counts() parallel, on each element inside
input_paths.
    # result (a list of dictionary) is constructed when all executions are
completed.
    //result = pool.map(get_counts, input_paths)
        
    end = time.time()

    result_df = pd.DataFrame(result)
    # You can use, result_df.to_csv() to store the results in a csv.
    print(result_df)
    print('Time take : {}'.format(end - start))



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to